RxJava 2.0的基本使用

发布时间:2025-12-10 11:27:10 浏览次数:16

之前在android项目中使用的是RxJava 1.x和RxAndroid,结合Retrofit处理网络请求,比起Handler, AsyncTask确实开发起来方便很多,架构也比较清晰。

RxJava 2.0新版出来已经有段时间了,也计划着更新一下项目。


通过JetBrain IntelliJ IDEA创建一个Gradle工程。

加入RxJava的依赖

sourceCompatibility = 1.8repositories {//mavenLocal() //maven{ url 'http://maven.aliyun.com/nexus/content/groups/public/'} jcenter()}dependencies {compile "io.reactivex.rxjava2:rxjava:2.0.7" testCompile group: 'junit', name: 'junit', version: '4.11'}

首先使用最普通的模式

//定义被观察者 //RxJava 2.0 保留了Observable, 不支持背压 Observable observable = Observable.create(new ObservableOnSubscribe<String>() {@Override public void subscribe(ObservableEmitter e) throws Exception {// emitter 发射器 e.onNext("hello");e.onNext("world");e.onComplete();}});//定义观察者 Observer<String> observer = new Observer<String>(){@Override public void onSubscribe(Disposable d) {// disposable 一次性的 System.out.println("onSubscribe");}@Override public void onNext(String s) {System.out.println("onNext: " + s);}@Override public void onError(Throwable e) {System.out.println("onError");}@Override public void onComplete() {System.out.println("onComplete");}};// 建立被观察者和观察者之间的订阅关系 observable.subscribe(observer);

接着使用支持背压的模式

//定义被观察者 Flowable支持背压Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {@Override public void subscribe(FlowableEmitter<String> e) throws Exception {e.onNext("hello");e.onNext("world");e.onComplete();}}, BackpressureStrategy.BUFFER);//定义观察者Subscriber subscriber = new Subscriber<String>(){@Override public void onSubscribe(Subscription s) {System.out.println("onSubscribe");//React Pull s.request(Long.MAX_VALUE);}@Override public void onNext(String s) {System.out.println("onNext: " + s);}@Override public void onError(Throwable t) {System.out.println("onError");}@Override public void onComplete() {System.out.println("onComplete");}};// 建立被观察者和观察者之间的订阅关系flowable.subscribe(subscriber);

一种简化写法

String[] s = {"Hello", "world"}; Flowable.fromArray(s).subscribe(new Consumer<String>() {@Override public void accept(String s) {System.out.println(s);}});

更外一种更简单的写法

Flowable.fromArray(s).subscribe(System.out::println);


参考资料

背压问题说明

https://github.com/ReactiveX/RxJava/wiki/Backpressure

官方网站

https://github.com/ReactiveX/RxJava

http://www.vogella.com/tutorials/RxJava/article.html#rxjava-observable-types


需要做网站?需要网络推广?欢迎咨询客户经理 13272073477