1.创建可观察对象Observable
下面都是创建可观察对象的方式:
enum MyError: Error {
case errorA
case errorB
}
//创建可观察对象
let _ = Observable<String>.just("liuxingxing")
let _ = Observable<Int>.of(1,2,3,4)
let _ = Observable<Bool>.from([true, false, true])
let _ = Observable<Double>.empty()
let _ = Observable<Float>.never() //never表示该观察对象永远不会发出event事件, 也不会终止
let _ = Observable<Int>.error(MyError.errorA)
let _ = Observable<Int>.range(start: 1, count: 6) // 1,2,3,4,5,6
let _ = Observable<Int>.repeatElement(333) //重复发出元素的event 不会停止
let _ = Observable<Int>.generate(initialState: 0, condition: { (num) -> Bool in
return num <= 10
}) { (num) -> Int in
return num + 2
} //0, 2,4,6,8,10
let _ = Observable.generate(initialState: 10, condition: {$0 >= 0}, iterate: {$0 - 2}) // 10,8,6,4,2,0
let _ = Observable<String>.create { (observer) -> Disposable in
observer.onNext("hahahaha")
observer.onCompleted()
//每一个订阅行为都会有一个Disposables类型的返回值
return Disposables.create {
}
}
var isOdd = true
let factory = Observable<String>.deferred { () -> Observable<String> in
isOdd = !isOdd
if isOdd {
return Observable<String>.of("1", "2")
}
return Observable<String>.of("1a", "2b")
}
factory.subscribe {event in
print(event)
}
factory.subscribe {event in
print(event)
}
let ob = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
ob.subscribe {event in
print("每隔1秒产生一个元素:",event)
}
let ob1 = Observable<Int>.timer(5, scheduler: MainScheduler.instance)
ob1.subscribe { event in
print("经过5s后产生的唯一元素:",event)
}
let ob2 = Observable<Int>.timer(5, period: 1, scheduler: MainScheduler.instance)
ob2.subscribe { event in
print("经过5s后,每隔1s产生一个元素:",event)
}
2.订阅可观察对象Observable
// 第一种写法
let ob = Observable<Int>.of(1,2,3,4,5)
ob.subscribe {event in
switch event {
case .next(let element):
print(element)
case .error(let error):
print(error)
case .completed:
print("completed")
}
}
// 第二种写法
ob.subscribe(onNext: { (num) in
print(num)
}, onError: { (error) in
print(error)
}, onCompleted: {
print("onCompleted")
}, onDisposed: {
print("disposed")
})
Disposable
每当Observeable被订阅时,都会返回一个Disposable实例,当调用Disposable的dispose,就相当于取消订阅。在不需要再接收事件时,建议取消订阅,释放资源。有3种常见方式取消订阅。
let observable = Observable<Int>.create { (observer) -> Disposable in
observer.onNext(123)
return Disposables.create()
}
let disposable = observable.subscribe { (event) in
switch event {
case .next(let element):
print(element)
case .error(let error):
print(error)
case .completed:
print("completed")
}
}
// 如果在某个时刻,你不再需要订阅了,可以调用以下方法来取消订阅
disposable.dispose()
//如果订阅后立即取消订阅,可以这样实现:
observable.subscribe { (event) in
switch event {
case .next(let element):
print(element)
case .error(let error):
print(error)
case .completed:
print("completed")
}
}.dispose()
// 通过DisposeBag来销毁,当bag销毁(deinit)时,会自动
// 调用Disposable实例的dispose
let disposeBag = DisposeBag()
observable.subscribe { (event) in
}.disposed(by: disposeBag)
//第三种,监听自身销毁takeUntil
let _ = observable.takeUntil(self.rx.deallocated).map{"\($0)"}
.bind(to: timerLabel.rx.text)
do方法会在每一次subscribe函数调用之前调用,是监听事件生命周期的函数:
let ob = Observable<Int>.of(1,2,3,4,5)
// do方法会在每一次subscribe函数调用z之前调用,是监听事件生命周期的函数
ob.do(onNext: { (num) in
print("---->\(num)")
}, onError: { (error) in
print(error)
}, onCompleted: {
print("----->onCompleted")
}, onSubscribe: {
print("---->onSubscribe")
}, onSubscribed: {
print("---->onSubscribed")
}, onDispose: {
print("---->onDispose")
}).subscribe(onNext: { (num) in
print(num)
}, onError: { (error) in
print(error)
}, onCompleted: {
print("onCompleted")
}, onDisposed: {
print("disposed")
})
}
示例1:
//创建可观察对象
_ = Observable.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
let evenObservable = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).map { Int($0) }
.filter { $0 % 2 == 0 }
//订阅这个可观察对象
_ = evenObservable.subscribe { (event) in
print("this event: \(event)")
}
this event: next(2)
this event: next(4)
this event: next(6)
this event: next(8)
this event: next(10)
this event: completed
使用skip
忽略前两个事件:
//创建可观察对象
_ = Observable.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
let evenObservable = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).map { Int($0) }
.filter { $0 % 2 == 0 }
//订阅这个可观察对象
_ = evenObservable.skip(2).subscribe { (event) in
print("this event: \(event)")
}
this event: next(6)
this event: next(8)
this event: next(10)
this event: completed
3.事件的状态与绑定
事件状态的改变:
let evenObservable = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).map { Int($0) }
.filter { $0 % 2 == 0 }
//订阅这个可观察对象
_ = evenObservable.subscribe(
onNext: {event in print(event )},
onError: { print("error: \($0)") },
onCompleted: { print("completed" ) },
onDisposed: {print("disposed" ) }
)
2
4
6
8
10
completed
disposed
通过bind方式来进行事件绑定:
func startTimer() {
let observer: AnyObserver<String> = AnyObserver { (event) in
switch event {
case .next(let text):
print(text)
break
default:
break
}
}
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable.map {"当前值为\($0)"}.skip(1).bind(to: observer).disposed(by: disposeBag)
}
// 也可以绑定一个UI控件
通过Binder进行事件绑定:
func startTimer() {
let observer: Binder<String> = Binder(self) { (_, text) in
print(text)
}
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable.map {"当前值为\($0)"}.skip(1).bind(to: observer).disposed(by: disposeBag)
}
let binder = Binder<String>.init(timerLabel) { (label, value) in
label.text = value
}
Observable.just("123").bind(to: binder).dispose()
//也可以通过下面的方式
Observable.just("abc").subscribe(binder).dispose()
实现一个label的循环展示与隐藏
//我们可以通过以下代码实现一个label的循环展示与隐藏
let observerable = Observable<Int>.timer(.seconds(2), period: .seconds(1), scheduler: MainScheduler.instance)
let binder = Binder<Bool>.init(timerLabel) { (label, value) in
label.isHidden = value
}
observerable.map{ $0 % 2 == 0 }.bind(to: binder).disposed(by: bag)
我们也可以通过扩展来更简单的实现这个功能:
// 添加扩展
extension Reactive where Base: UILabel {
var hidden: Binder<Bool> {
return Binder<Bool>(base) { label, value in
label.isHidden = value
}
}
}
// 使用
let observerable = Observable<Int>.timer(.seconds(2), period: .seconds(1), scheduler: MainScheduler.instance)
observerable.map{ $0 % 2 == 0 }.bind(to: timerLabel.rx.hidden).disposed(by: bag)
实现一个简单的tableview(此tableview不需要设置代理):
let bag = DisposeBag()
let persons = Observable.just([Person(name: "lxx", age: 234),Person(name: "lxx32", age: 234),Person(name: "lddxx", age: 234),Person(name: "lxggx", age: 234)])
persons.bind(to: tableView.rx.items(cellIdentifier: "cell")) { row, person, cell in
cell.textLabel?.text = person.name
cell.detailTextLabel?.text = "\(person.age)"
}.disposed(by: bag)
// 第一种点击事件
// tableView.rx.itemSelected.subscribe({ path in
// print(path)
// }).disposed(by: bag)
// 第二种点击事件
tableView.rx.modelSelected(Person.self).subscribe(onNext: { (person) in
print(person.name)
}).disposed(by: bag)
监听一个按钮的点击
// 方式1:
button.rx.controlEvent(.touchUpInside).subscribe(onNext: {
print("被点击了")
}).disposed(by: bag)
// 方式2:
button.rx.tap.subscribe(onNext: {
print("被点击了")
}).disposed(by: bag)
创建Observer
let observer = AnyObserver<Int>.init { (event) in
}
Observable.just(1).subscribe(observer).dispose()
4、针对无限发送的事件做销毁
//5秒钟后销毁
let disposable = Observable<Int>.interval(1, scheduler: MainScheduler.instance).subscribe(
onNext: {event in print(event )},
onDisposed: {print("disposed" ) }
)
delay(5) {
disposable.dispose()
}
func delay(_ delay: Double, closure: @escaping () -> ()) {
DispatchQueue.main.asyncAfter(deadline: .now() + delay) {
closure()
}
}
0
1
2
3
4
disposed
5、DisposeBag来销毁对象
//5秒钟后销毁
var bag = DisposeBag()
Observable<Int>.interval(1, scheduler: MainScheduler.instance).subscribe(
onNext: {event in print(event )},
onDisposed: {print("disposed" ) }
).disposed(by: bag)
delay(5) {
//通过创建新的bag,使原来bag绑定的对象便会销毁
bag = DisposeBag()
}
6、手动创建可观察对象
enum CustomError: Error {
case somethingError
}
func test4() {
let customObservable = Observable<Int>.create { (observer) -> Disposable in
observer.onNext(10)
observer.onNext(11)
observer.onNext(12)
observer.onError(CustomError.somethingError)
observer.onCompleted()
return Disposables.create()
}
let bag = DisposeBag()
customObservable.subscribe(
onNext: { print($0)},
onCompleted: { print("completed" ) },
onDisposed: {print("disposed" ) }
).disposed(by: bag)
}
- 成功打印:
10
11
12
completed
disposed
- 失败打印(不会再打印completed了):
10
11
12
Unhandled error happened: somethingError
subscription called from:
disposed
7、使用do(注意:do使用的是onDispose)
func test4() {
let customObservable = Observable<Int>.create { (observer) -> Disposable in
observer.onNext(10)
observer.onNext(11)
observer.onNext(12)
observer.onCompleted()
return Disposables.create()
}
let bag = DisposeBag()
customObservable.do(
onNext: { print("intercepted:\($0)") },
onCompleted: { print("intercepted:completed") },
onDispose: {print("intercepted: disposed" ) }
)
.subscribe(
onNext: { print($0)},
onCompleted: { print("completed" ) },
onDisposed: {print("disposed" ) }
).disposed(by: bag)
}
intercepted:10
10
intercepted:11
11
intercepted:12
12
intercepted:completed
completed
disposed
intercepted: disposed
8、debug
func test4() {
let customObservable = Observable<Int>.create { (observer) -> Disposable in
observer.onNext(10)
observer.onNext(11)
observer.onNext(12)
observer.onCompleted()
return Disposables.create()
}
let bag = DisposeBag()
customObservable.debug().subscribe(
onNext: { print($0)},
onCompleted: { print("completed" ) },
onDisposed: {print("disposed" ) }
).disposed(by: bag)
}
2019-03-01 10:25:29.252: ViewController.swift:187 (test4()) -> subscribed
2019-03-01 10:25:29.261: ViewController.swift:187 (test4()) -> Event next(10)
10
2019-03-01 10:25:29.261: ViewController.swift:187 (test4()) -> Event next(11)
11
2019-03-01 10:25:29.261: ViewController.swift:187 (test4()) -> Event next(12)
12
2019-03-01 10:25:29.262: ViewController.swift:187 (test4()) -> Event completed
completed
disposed
9. PublishSubject
PublishSubject即是订阅者,又是观察者:
//初始化发送者
let subject = PublishSubject<String>()
//订阅者需要在发送消息前订阅,不然是接收不到订阅前发送的消息的
let scribe1 = subject.subscribe({str in
print("scribe1 - what happend: \(str)")
})
//发送一个消息
subject.onNext("episode1 updated ")
//订阅者被销毁了
scribe1.dispose()
let scribe2 = subject.subscribe({str in
print("scribe2 - what happend: \(str)")
})
//发送一个消息
subject.onNext("episode2 updated ")
subject.onNext("episode3 updated ")
//订阅者被销毁了
scribe2.dispose()
打印结果:
scribe1 - what happend: next(episode1 updated )
scribe2 - what happend: next(episode2 updated )
scribe2 - what happend: next(episode3 updated )
如果还是不很清楚,那么我们结合示意图,来完整演示一下:
func test5() {
//初始化
let subject = PublishSubject<String>()
//没有订阅,发送事件是接收不到的
subject.onNext("我是一个事件,但是我不能被接收到。。。")
//先订阅
subject.subscribe(onNext: { (str) in
print("我是一号订阅者:",str)
}, onError: { (error) in
print(error)
}, onCompleted: {
print("我是一号订阅者:onCompleted")
}, onDisposed: {
print("我是一号订阅者:onDisposed")
}).disposed(by: disposeBag)
//然后发送事件
subject.onNext("我是一个事件")
//可以多次订阅
subject.subscribe(onNext: { (str) in
print("我是二号订阅者:",str)
}, onError: { (error) in
print(error)
}, onCompleted: {
print("我是二号订阅者:onCompleted")
}, onDisposed: {
print("我是二号订阅者:onDisposed")
}).disposed(by: disposeBag)
//再次发送一个事件
subject.onNext("哈哈哈哈,我也是一个事件")
//更改为完成状态
subject.onCompleted()
//完成后就没法再接收事件了
subject.onNext("完成后,我不能被接受到")
//这时候订阅,只能接收到完成的状态
subject.subscribe(onNext: { (str) in
print("我是三号订阅者:",str)
}, onError: { (error) in
print(error)
}, onCompleted: {
print("我是三号订阅者:onCompleted")
}, onDisposed: {
print("我是三号订阅者:onDisposed")
}).disposed(by: disposeBag)
}
打印结果:
我是一号订阅者: 我是一个事件
我是一号订阅者: 哈哈哈哈,我也是一个事件
我是二号订阅者: 哈哈哈哈,我也是一个事件
我是一号订阅者:onCompleted
我是一号订阅者:onDisposed
我是二号订阅者:onCompleted
我是二号订阅者:onDisposed
我是三号订阅者:onCompleted
我是三号订阅者:onDisposed
10. BehaviorSubject
BehaviorSubject需要一个初始化值。与PublishSubject不同的是,BehaviorSubject可以接受到最后一条历史值(也就是订阅之前发送的事件)。
//初始化发送者
let subject = BehaviorSubject<String>(value: "RxSwift step by step")
//当没有事件的话,订阅者可以接收到发送者发送的默认值
let scribe1 = subject.subscribe({str in
print("scribe1 - what happend: \(str)")
})
//发送一个消息
subject.onNext("episode1 updated ")
//订阅者被销毁了
scribe1.dispose()
let scribe2 = subject.subscribe({str in
print("scribe2 - what happend: \(str)")
})
//发送一个消息
subject.onNext("episode2 updated ")
subject.onNext("episode3 updated ")
//订阅者被销毁了
scribe2.dispose()
打印结果:
scribe1 - what happend: next(RxSwift step by step)
scribe1 - what happend: next(episode1 updated )
scribe2 - what happend: next(episode1 updated )
scribe2 - what happend: next(episode2 updated )
scribe2 - what happend: next(episode3 updated )
注意第三条的打印: scribe2 - what happend: next(episode1 updated ) 当scribe2订阅事件的时候,发送者已经有历史事件了,所以会被scribe2接收到最后一条 (最后一条 最后一条)历史事件
如果产生错误事件:
func test6() {
let subject = BehaviorSubject<String>(value: "我是默认事件")
//如果没有事件,则会接收默认事件
subject.subscribe(onNext: { (str) in
print("我是一号订阅者:",str)
}, onError: { (error) in
print("我是一号订阅者:",error)
}, onCompleted: {
print("我是一号订阅者:onCompleted")
}, onDisposed: {
print("我是一号订阅者:onDisposed")
}).disposed(by: disposeBag)
subject.onNext("我是一个事件")
subject.onError(NSError(domain: "---", code: 0, userInfo: ["msg": "哈哈哈我发送了错误"]))
subject.subscribe(onNext: { (str) in
print("我是二号订阅者:",str)
}, onError: { (error) in
print("我是二号订阅者:",error)
}, onCompleted: {
print("我是二号订阅者:onCompleted")
}, onDisposed: {
print("我是二号订阅者:onDisposed")
}).disposed(by: disposeBag)
}
打印结果:
我是一号订阅者: 我是默认事件
我是一号订阅者: 我是一个事件
我是一号订阅者: Error Domain=--- Code=0 "(null)" UserInfo={msg=哈哈哈我发送了错误}
我是一号订阅者:onDisposed
我是二号订阅者: Error Domain=--- Code=0 "(null)" UserInfo={msg=哈哈哈我发送了错误}
我是二号订阅者:onDisposed
11. ReplaySubject(可以指定历史事件的个数)
func test() {
//初始化发送者,并指定l可以接收到历史事件的个数
let subject = ReplaySubject<String>.create(bufferSize: 1)
//当没有事件的话,订阅者可以接收到发送者发送的默认值
let scribe1 = subject.subscribe({str in
print("scribe1 - what happend: \(str)")
})
//发送一个消息
subject.onNext("episode1 updated ")
//发送一个消息
subject.onNext("episode1 updated------- ")
//订阅者被销毁了
scribe1.dispose()
let scribe2 = subject.subscribe({str in
print("scribe2 - what happend: \(str)")
})
//发送一个消息
subject.onNext("episode2 updated ")
subject.onNext("episode3 updated ")
//订阅者被销毁了
scribe2.dispose()
}
如果是1,打印结果如下:
scribe1 - what happend: next(episode1 updated )
scribe1 - what happend: next(episode1 updated------- )
scribe2 - what happend: next(episode1 updated------- )
scribe2 - what happend: next(episode2 updated )
scribe2 - what happend: next(episode3 updated )
如果是2,打印结果如下:
scribe1 - what happend: next(episode1 updated )
scribe1 - what happend: next(episode1 updated------- )
scribe2 - what happend: next(episode1 updated )
scribe2 - what happend: next(episode1 updated------- )
scribe2 - what happend: next(episode2 updated )
scribe2 - what happend: next(episode3 updated )
缓冲区是2时,scribe2可以接受到订阅前的2条历史数据(第3、4条的打印信息)
如果不太清楚,那么就再简单点:
func test7() {
let subject = ReplaySubject<String>.create(bufferSize: 3)
subject.onNext("我是第一个事件")
subject.onNext("我是第二个事件")
subject.onNext("我是第三个事件")
subject.onNext("我是第四个事件")
//如果没有事件,则会接收默认事件
subject.subscribe(onNext: { (str) in
print("我是一号订阅者:",str)
}, onError: { (error) in
print("我是一号订阅者:",error)
}, onCompleted: {
print("我是一号订阅者:onCompleted")
}, onDisposed: {
print("我是一号订阅者:onDisposed")
}).disposed(by: disposeBag)
subject.onNext("我是五个事件")
subject.subscribe(onNext: { (str) in
print("我是二号订阅者:",str)
}, onError: { (error) in
print("我是二号订阅者:",error)
}, onCompleted: {
print("我是二号订阅者:onCompleted")
}, onDisposed: {
print("我是二号订阅者:onDisposed")
}).disposed(by: disposeBag)
}
打印结果:
我是一号订阅者: 我是第二个事件
我是一号订阅者: 我是第三个事件
我是一号订阅者: 我是第四个事件
我是一号订阅者: 我是五个事件
我是二号订阅者: 我是第三个事件
我是二号订阅者: 我是第四个事件
我是二号订阅者: 我是五个事件
打印结果说明:我们缓存了3个历史事件,所以只能缓存“最近”的三个事件。一号订阅者订阅前已经产生了4条事件,那么它只能接收到第二三四个事件。此时,在发送一个事件五,一号订阅者正常接收;添加二号订阅者,对于二号订阅者,历史事件是三四五,所以只能接收到三四五事件。
12.Variable
func test() {
let stringVariable = Variable("Episode1")
let sub1 = stringVariable.asObservable().subscribe({
print("sub1: \($0)")
})
//简单写法
print("easy print:" + stringVariable.value)
//我们还可以修改值,相当于重新发送事件(onNext)
stringVariable.value = "new data"
print("easy print----:" + stringVariable.value)
}
- 打印结果:
ℹ️ [DEPRECATED] `Variable` is planned for future deprecation. Please consider `BehaviorRelay` as a replacement. Read more at: https://git.io/vNqvx
sub1: next(Episode1)
easy print:Episode1
sub1: next(new data)
easy print----:new data
sub1: completed
简单实现:
func test8() {
let subject = Variable<String>("我是默认值")
subject.asObservable().subscribe(onNext: { (str) in
print("我是一号订阅者:",str)
}, onError: { (error) in
print("我是一号订阅者:",error)
}, onCompleted: {
print("我是一号订阅者:onCompleted")
}, onDisposed: {
print("我是一号订阅者:onDisposed")
}).disposed(by: disposeBag)
subject.value = "我是一个新的值111"
subject.value = "我是一个新的值222"
subject.value = "我是一个新的值333"
subject.asObservable().subscribe(onNext: { (str) in
print("我是二号订阅者:",str)
}, onError: { (error) in
print("我是二号订阅者:",error)
}, onCompleted: {
print("我是二号订阅者:onCompleted")
}, onDisposed: {
print("我是二号订阅者:onDisposed")
}).disposed(by: disposeBag)
}
打印结果:
ℹ️ [DEPRECATED] `Variable` is planned for future deprecation. Please consider `BehaviorRelay` as a replacement. Read more at: https://git.io/vNqvx
我是一号订阅者: 我是默认值
我是一号订阅者: 我是一个新的值111
我是一号订阅者: 我是一个新的值222
我是一号订阅者: 我是一个新的值333
我是二号订阅者: 我是一个新的值333
我是一号订阅者:onCompleted
我是一号订阅者:onDisposed
我是二号订阅者:onCompleted
我是二号订阅者:onDisposed
打印说明:一号订阅者会接收到默认事件,当更改subject的值时,相当于发送了一个next事件,所以一号订阅者可以接收到;二号订阅者可以接受到最后一条历史事件,所以可以接收到“我是一个新的值333”
UI扩展
我们可以使用2种方式的扩展,一种常规的,一种rx风格的:
extension UILabel {
//方式1的扩展
public var fontSize: Binder<CGFloat> {
return Binder(self) { (label, fontSize) in
label.font = UIFont.systemFont(ofSize: fontSize)
}
}
}
extension Reactive where Base: UILabel {
//方式2的扩展
public var rxFontSize: Binder<CGFloat> {
return Binder(self.base) { (label, fontSize) in
label.font = UIFont.systemFont(ofSize: fontSize)
}
}
}
使用的风格:
func test4() {
//方式1的使用
let observable = Observable<Int>.interval(0.5, scheduler: MainScheduler.instance)
observable.map{CGFloat($0)}.bind(to: myLabel.fontSize).disposed(by: disposeBag)
//方式2的使用
let observable2 = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable2.map{CGFloat($0) * 10}.bind(to: myLabel.rx.rxFontSize).disposed(by: disposeBag)
}