Reactive Streams – Java 9 Flow API, RxJava, Akka and Reactor Examples
January 14th, 2018 by Micha KopsReactive Streams is an initiative trying to standardize asynchronous stream processing with non-blocking back-pressure. With Java 9, new classes in the java.util.concurrent.flow package offer a semantically equivalent counterpart to this standard that may be adopted by other frameworks.
In the following short tutorial we’re implementing examples for reactive streams with Java 9 and the Flow API, with RxJava2, with Akka, with Reactor and finally there is an example in RxJava1, too though it does not follow the standard.
Contents
Setup
For the following examples with the exception of the Java9 example, we’ll be using Maven to manage our dependencies.
In addition the following POJO will be used in the following examples as a payload object that is used for publishing news.
public class News { private final String headline; private final LocalDate date; public static News create(String headline) { return new News(headline, LocalDate.now()); } // getter, setter, constructor omitted }
Java 9 Flow API
Our first examples uses the new Flow API in Java 9 that follows the Reactive Streams Specification and is aligned with the paradigms of the Reactive Manifesto: Responsive, Resilient, Elastic, Message Driven.
For more details there is the Java 9 JavaDoc and an excellent blog article from Oracle: “Reactive Programming with JDK 9 Flow API“.
The basic communication flow is depicted in the following sequence diagram:
This is our sample application:
package com.hascode.tutorial; import java.time.LocalDate; import java.util.List; import java.util.concurrent.Flow; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.SubmissionPublisher; public class Main { static class NewsSubscriber implements Flow.Subscriber<News> { private Subscription subscription; private static final int MAX_NEWS = 3; private int newsReceived = 0; @Override public void onSubscribe(Subscription subscription) { System.out.printf("new subscription %s\n", subscription); this.subscription = subscription; subscription.request(1); } @Override public void onNext(News item) { System.out.printf("news received: %s (%s)\n", item.getHeadline(), item.getDate()); newsReceived++; if (newsReceived >= MAX_NEWS) { System.out.printf("%d news received (max: %d), cancelling subscription\n", newsReceived, MAX_NEWS); subscription.cancel(); return; } subscription.request(1); } @Override public void onError(Throwable throwable) { System.err.printf("error occurred fetching news: %s\n", throwable.getMessage()); throwable.printStackTrace(System.err); } @Override public void onComplete() { System.out.println("fetching news completed"); } } public static void main(String[] args) { try (SubmissionPublisher<News> newsPublisher = new SubmissionPublisher()) { NewsSubscriber newsSubscriber = new NewsSubscriber(); newsPublisher.subscribe(newsSubscriber); List.of(News.create("Important news"), News.create("Some other news"), News.create("And news, news, news")).forEach(newsPublisher::submit); while (newsPublisher.hasSubscribers()) { // wait } System.out.println("no more news subscribers left, closing publisher.."); } } }
Running the code above should produce a similar result:
new subscription java.util.concurrent.SubmissionPublisher$BufferedSubscription@27109f81 news received: Important news (2018-01-08) news received: Some other news (2018-01-08) news received: And news, news, news (2018-01-08) 3 news received (max: 3), cancelling subscription no more news subscribers left, closing publisher..
RxJava 2
RxJava is a a Java implementation of Reactive Extensions, RxJava2 also implements the Reactive Streams Specification mentioned above.
To use RxJava2 we just need to add the following dependency to our project’s pom.xml:
<dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>2.1.8</version> </dependency>
This is our exemplary implementation using RxJava2:
package com.hascode.tutorial; import io.reactivex.Flowable; import io.reactivex.FlowableSubscriber; import java.time.LocalDate; public class Main { public static void main(String[] args) { Flowable.just(News.create("Important news"), News.create("Some other news"), News.create("And news, news, news")).subscribe(new FlowableSubscriber<>() { private org.reactivestreams.Subscription subscription; private static final int MAX_NEWS = 3; private int newsReceived = 0; @Override public void onSubscribe(org.reactivestreams.Subscription subscription) { System.out.printf("new subscription %s\n", subscription); this.subscription = subscription; subscription.request(1); } @Override public void onNext(News news) { System.out.printf("news received: %s (%s)\n", news.getHeadline(), news.getDate()); newsReceived++; if (newsReceived >= MAX_NEWS) { System.out.printf("%d news received (max: %d), cancelling subscription\n", newsReceived, MAX_NEWS); subscription.cancel(); return; } subscription.request(1); } @Override public void onError(Throwable throwable) { System.err.printf("error occurred fetching news: %s\n", throwable.getMessage()); throwable.printStackTrace(System.err); } @Override public void onComplete() { System.out.println("fetching news completed"); } }); } }
Running the code above should produce a similar output:
new subscription 0 news received: Important news (2018-01-08) news received: Some other news (2018-01-08) news received: And news, news, news (2018-01-08) 3 news received (max: 3), cancelling subscription
For an example using RxJava1, please feel free to skip to this section in the appendix: “Appendix A: RxJava 1“.
Akka
Akka offers its own implementation of reactive streams but additionally offers compatibility with the reactive streams contract e.g. using Producers and Subscribers.
Using Akka’s Java DSL, we need to add the following dependencies to our project’s pom.xml:
<dependencies> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_2.12</artifactId> <version>2.5.3</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_2.12</artifactId> <version>2.5.3</version> </dependency> </dependencies>
Out application looks like this one:
package com.hascode.tutorial; import akka.actor.ActorSystem; import akka.stream.ActorMaterializer; import akka.stream.Materializer; import akka.stream.javadsl.AsPublisher; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import java.time.LocalDate; import java.util.List; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; public class Main { public static void main(String[] args) { final ActorSystem system = ActorSystem.create("sample-system"); final Materializer materializer = ActorMaterializer.create(system); final Publisher<News> publisher = Source.from(List.of(News.create("Important news"), News.create("Some other news"), News.create("And news, news, news"))) .runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer); Subscriber<News> newsSubscriber = new Subscriber<>() { private org.reactivestreams.Subscription subscription; private static final int MAX_NEWS = 3; private int newsReceived = 0; @Override public void onSubscribe(org.reactivestreams.Subscription subscription) { System.out.printf("new subscription %s\n", subscription); this.subscription = subscription; subscription.request(1); } @Override public void onNext(News news) { System.out.printf("news received: %s (%s)\n", news.getHeadline(), news.getDate()); newsReceived++; if (newsReceived >= MAX_NEWS) { System.out.printf("%d news received (max: %d), cancelling subscription\n", newsReceived, MAX_NEWS); subscription.cancel(); return; } subscription.request(1); } @Override public void onError(Throwable throwable) { System.err.printf("error occurred fetching news: %s\n", throwable.getMessage()); throwable.printStackTrace(System.err); } @Override public void onComplete() { System.out.println("fetching news completed"); } }; publisher.subscribe(newsSubscriber); } }
Running the application should produce something similar to this:
new subscription akka.stream.impl.ActorSubscriptionWithCursor@79f555e4 news received: Important news (2018-01-14) news received: Some other news (2018-01-14) news received: And news, news, news (2018-01-14) 3 news received (max: 3), cancelling subscription fetching news completed
Reactor
Reactor is another library for building non-blocking applications also implementing the Reactive Streams Specification.
To use Reactor we need to add the following dependency to our project’s pom.xml:
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.1.2.RELEASE</version> </dependency>
This is our implementation with Reactor:
package com.hascode.tutorial; import java.time.LocalDate; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; public class Main { public static void main(String[] args) { Flux.just(News.create("Important news"), News.create("Some other news"), News.create("And news, news, news")).subscribe(new Subscriber<News>() { private static final int MAX_NEWS = 3; private int newsReceived = 0; private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { System.out.printf("new subscription %s\n", subscription); this.subscription = subscription; subscription.request(1); } @Override public void onNext(News news) { System.out.printf("news received: %s (%s)\n", news.getHeadline(), news.getDate()); newsReceived++; if (newsReceived >= MAX_NEWS) { System.out.printf("%d news received (max: %d), cancelling subscription\n", newsReceived, MAX_NEWS); subscription.cancel(); return; } subscription.request(1); } @Override public void onError(Throwable throwable) { System.err.printf("error occurred fetching news: %s\n", throwable.getMessage()); throwable.printStackTrace(System.err); } @Override public void onComplete() { System.out.println("fetching news completed"); } }); } }
Running the code above should produce a similar result:
[DEBUG] (main) Using Console logging new subscription reactor.core.publisher.StrictSubscriber@704a52ec news received: Important news (2018-01-08) news received: Some other news (2018-01-08) news received: And news, news, news (2018-01-08) 3 news received (max: 3), cancelling subscription
Appendix A: RxJava1
RxJava1 does not implement the Reactive Streams Specification but nevertheless here is an example similar to the ones above.
To use RxJava1 we just need to add the following dependency to our pom.xml:
<dependency> <groupId>io.reactivex</groupId> <artifactId>rxjava</artifactId> <version>1.3.4</version> </dependency>
This is our sample application:
package com.hascode.tutorial; import java.time.LocalDate; import rx.Observable; import rx.Subscriber; public class Main { public static void main(String[] args) { Observable.just(News.create("Important news"), News.create("Some other news"), News.create("And news, news, news")).subscribe(new Subscriber<News>() { private static final int MAX_NEWS = 3; private int newsReceived = 0; @Override public void onStart() { System.out.println("new subscription"); request(1); } @Override public void onCompleted() { System.out.println("fetching news completed"); } @Override public void onError(Throwable throwable) { System.err.printf("error occurred fetching news: %s\n", throwable.getMessage()); throwable.printStackTrace(System.err); } @Override public void onNext(News news) { System.out.printf("news received: %s (%s)\n", news.getHeadline(), news.getDate()); newsReceived++; if (newsReceived >= MAX_NEWS) { System.out.printf("%d news received (max: %d), cancelling subscription\n", newsReceived, MAX_NEWS); unsubscribe(); return; } request(1); } }); } }
Running the application should produce a similar result:
new subscription news received: Important news (2018-01-08) news received: Some other news (2018-01-08) news received: And news, news, news (2018-01-08) 3 news received (max: 3), cancelling subscription
Appendix B: Modeling with PlantUML
To create diagrams in no time I’m using PlantUML. It’s like Markdown for diagrams and offers a super-easy ASCII-art like syntax.
The diagram created for this tutorial is generated from the following markup (the first ~10 lines are just for styling):
@startuml title Reactive Streams Specification\nSimplified Communication Flow skinparam handwritten true skinparam sequence { ArrowColor black ActorBorderColor black LifeLineBorderColor black LifeLineBackgroundColor black ParticipantBorderColor black ParticipantBackgroundColor white ParticipantFontColor black } Subscriber -> Publisher : subscribe() Publisher -> Subscription : new() Publisher -> Subscriber : onSubscribe(Subscription) loop until completed Subscriber -> Subscription : request(int) Subscription -> Publisher alt no error Publisher -> Subscriber : onNext(T) else error Publisher -> Subscriber : onError(Throwable) end alt cancel subscription Subscriber -> Subscription : cancel() Subscription -> Publisher end end Publisher -> Subscriber : onComplete() @enduml
Tutorial Sources
Please feel free to download the tutorial sources from my Bitbucket repository, fork it there or clone it using Git:
git clone https://bitbucket.org/hascode/java9-reactive-flow.git
Resources
- Java 9 Flow API
- Reactive Streams Specification
- ReactiveX/RxJava 2 on GitHub
- ReactiveX/RxJava 1 (Branch) on GitHub
- Reactor Project on GitHub
- Akka Streams Documentation
- Akka Project Website
- Oracle.com: Reactive Programming with JDK 9 Flow API
- The Reactive Manifesto
- Reactive Extensions
- Java 9 TCK for Flow
Other Java 9 Articles
Please feel free to read other Java 9 articles of mine:
Article Updates
- 2018-07-29: Missing texts added.
- 2018-01-19: Typos in the PlantUML section fixed.
- 2018-01-15: Section about PlantUML diagramming added.
- 2018-01-15: Typo in Akka Maven dependencies fixed (thanks Jochen for mentioning).
Tags: akka, backpressure, consumer, diagramming, flow, flow-api, java9, jdk9, manifesto, maven, plantuml, producer, publisher, reactive, reactivex, reactor, rxjava, rxjava1, rxjava2, streams, subscriber, uml
January 19th, 2018 at 12:09 pm
Thank you for the article.
The ULM tool is exactly what I was looking for for a log time)
January 19th, 2018 at 2:28 pm
Hi Yuri,
thanks, you’re welcome!
I really love PlantUML for its simplicity and the feature to use a handwritten style.
You can do even crazy things like reverting the colors (with skinparam monochrome reverse).
Greets,
Micha
February 17th, 2019 at 12:09 pm
Thanks for the blog. Do we have a comparison of these implementations ? which framework is recommended
?