Reactive Streams – Java 9 Flow API, RxJava, Akka and Reactor Examples

January 14th, 2018 by

Reactive Streams is an initiative trying to standardize asynchronous stream processing with non-blocking  back-pressure. With Java 9, new classes in the java.util.concurrent.flow package offer a semantically equivalent counterpart to this standard that may be adopted by other frameworks.

In the following short tutorial we’re implementing examples for reactive streams with Java 9 and the Flow API, with RxJava2, with Akka, with Reactor and finally there is an example in RxJava1, too though it does not follow the standard.

Reactive Streams - Simplified Communication Flow

Reactive Streams - Simplified Communication Flow

 

Setup

For the following examples with the exception of the Java9 example, we’ll be using Maven to manage our dependencies.

In addition the following POJO will be used in the following examples as a payload object that is used for publishing news.

public class News {
 
	private final String headline;
	private final LocalDate date;
 
	public static News create(String headline) {
	  return new News(headline, LocalDate.now());
	}
 
	// getter, setter, constructor omitted
}

Java 9 Flow API

Our first examples uses the new Flow API in Java 9 that follows the Reactive Streams Specification and is aligned with the paradigms of the Reactive Manifesto: Responsive, Resilient, Elastic, Message Driven.

For more details there is the Java 9 JavaDoc and an excellent blog article from Oracle: “Reactive Programming with JDK 9 Flow API“.

The basic communication flow is depicted in the following sequence diagram:

Reactive Streams - Simplified Communication Flow

Reactive Streams - Simplified Communication Flow

This is our sample application:

package com.hascode.tutorial;
 
import java.time.LocalDate;
import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
 
public class Main {
  static class NewsSubscriber implements Flow.Subscriber<News> {
 
    private Subscription subscription;
    private static final int MAX_NEWS = 3;
    private int newsReceived = 0;
 
    @Override
    public void onSubscribe(Subscription subscription) {
      System.out.printf("new subscription %s\n", subscription);
      this.subscription = subscription;
      subscription.request(1);
    }
 
    @Override
    public void onNext(News item) {
      System.out.printf("news received: %s (%s)\n", item.getHeadline(), item.getDate());
      newsReceived++;
      if (newsReceived >= MAX_NEWS) {
        System.out.printf("%d news received (max: %d), cancelling subscription\n", newsReceived,
            MAX_NEWS);
        subscription.cancel();
        return;
      }
 
      subscription.request(1);
 
    }
 
    @Override
    public void onError(Throwable throwable) {
      System.err.printf("error occurred fetching news: %s\n", throwable.getMessage());
      throwable.printStackTrace(System.err);
 
    }
 
    @Override
    public void onComplete() {
      System.out.println("fetching news completed");
    }
  }
 
  public static void main(String[] args) {
    try (SubmissionPublisher<News> newsPublisher = new SubmissionPublisher()) {
 
      NewsSubscriber newsSubscriber = new NewsSubscriber();
      newsPublisher.subscribe(newsSubscriber);
 
      List.of(News.create("Important news"), News.create("Some other news"),
          News.create("And news, news, news")).forEach(newsPublisher::submit);
 
      while (newsPublisher.hasSubscribers()) {
        // wait
      }
      System.out.println("no more news subscribers left, closing publisher..");
    }
  }
}

Running the code above should produce a similar result:

new subscription java.util.concurrent.SubmissionPublisher$BufferedSubscription@27109f81
news received: Important news (2018-01-08)
news received: Some other news (2018-01-08)
news received: And news, news, news (2018-01-08)
3 news received (max: 3), cancelling subscription
no more news subscribers left, closing publisher..

RxJava 2

RxJava is a a Java implementation of Reactive Extensions, RxJava2 also implements the Reactive Streams Specification mentioned above.

To use RxJava2 we just need to add the following dependency to our project’s pom.xml:

<dependency>
	<groupId>io.reactivex.rxjava2</groupId>
	<artifactId>rxjava</artifactId>
	<version>2.1.8</version>
</dependency>

This is our exemplary implementation using RxJava2:

package com.hascode.tutorial;
 
import io.reactivex.Flowable;
import io.reactivex.FlowableSubscriber;
import java.time.LocalDate;
 
public class Main {
 
  public static void main(String[] args) {
    Flowable.just(News.create("Important news"), News.create("Some other news"),
        News.create("And news, news, news")).subscribe(new FlowableSubscriber<>() {
      private org.reactivestreams.Subscription subscription;
      private static final int MAX_NEWS = 3;
      private int newsReceived = 0;
 
      @Override
      public void onSubscribe(org.reactivestreams.Subscription subscription) {
        System.out.printf("new subscription %s\n", subscription);
        this.subscription = subscription;
        subscription.request(1);
      }
 
      @Override
      public void onNext(News news) {
        System.out.printf("news received: %s (%s)\n", news.getHeadline(), news.getDate());
        newsReceived++;
        if (newsReceived >= MAX_NEWS) {
          System.out.printf("%d news received (max: %d), cancelling subscription\n", newsReceived,
              MAX_NEWS);
          subscription.cancel();
          return;
        }
 
        subscription.request(1);
      }
 
      @Override
      public void onError(Throwable throwable) {
        System.err.printf("error occurred fetching news: %s\n", throwable.getMessage());
        throwable.printStackTrace(System.err);
      }
 
      @Override
      public void onComplete() {
        System.out.println("fetching news completed");
      }
    });
  }
}

Running the code above should produce a similar output:

new subscription 0
news received: Important news (2018-01-08)
news received: Some other news (2018-01-08)
news received: And news, news, news (2018-01-08)
3 news received (max: 3), cancelling subscription

For an example using RxJava1, please feel free to skip to this section in the appendix: “Appendix A: RxJava 1“.

Akka

Akka offers its own implementation of reactive streams but additionally offers compatibility with the reactive streams contract e.g. using Producers and Subscribers.

Using Akka’s Java DSL, we need to add the following dependencies to our project’s pom.xml:

<dependencies>
	<dependency>
		<groupId>com.typesafe.akka</groupId>
		<artifactId>akka-actor_2.12</artifactId>
		<version>2.5.3</version>
	</dependency>
	<dependency>
		<groupId>com.typesafe.akka</groupId>
		<artifactId>akka-stream_2.12</artifactId>
		<version>2.5.3</version>
	</dependency>
</dependencies>

Out application looks like this one:

package com.hascode.tutorial;
 
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.AsPublisher;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.time.LocalDate;
import java.util.List;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
 
public class Main {
 
  public static void main(String[] args) {
    final ActorSystem system = ActorSystem.create("sample-system");
    final Materializer materializer = ActorMaterializer.create(system);
 
    final Publisher<News> publisher =
        Source.from(List.of(News.create("Important news"), News.create("Some other news"),
            News.create("And news, news, news")))
            .runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);
 
    Subscriber<News> newsSubscriber = new Subscriber<>() {
      private org.reactivestreams.Subscription subscription;
      private static final int MAX_NEWS = 3;
      private int newsReceived = 0;
 
      @Override
      public void onSubscribe(org.reactivestreams.Subscription subscription) {
        System.out.printf("new subscription %s\n", subscription);
        this.subscription = subscription;
        subscription.request(1);
      }
 
      @Override
      public void onNext(News news) {
        System.out.printf("news received: %s (%s)\n", news.getHeadline(), news.getDate());
        newsReceived++;
        if (newsReceived >= MAX_NEWS) {
          System.out.printf("%d news received (max: %d), cancelling subscription\n", newsReceived,
              MAX_NEWS);
          subscription.cancel();
          return;
        }
 
        subscription.request(1);
      }
 
      @Override
      public void onError(Throwable throwable) {
        System.err.printf("error occurred fetching news: %s\n", throwable.getMessage());
        throwable.printStackTrace(System.err);
      }
 
      @Override
      public void onComplete() {
        System.out.println("fetching news completed");
      }
    };
 
    publisher.subscribe(newsSubscriber);
  }
}

Running the application should produce something similar to this:

new subscription akka.stream.impl.ActorSubscriptionWithCursor@79f555e4
news received: Important news (2018-01-14)
news received: Some other news (2018-01-14)
news received: And news, news, news (2018-01-14)
3 news received (max: 3), cancelling subscription
fetching news completed

Reactor

Reactor is another library for building non-blocking applications also implementing the Reactive Streams Specification.

maven

<dependency>
	<groupId>io.projectreactor</groupId>
	<artifactId>reactor-core</artifactId>
	<version>3.1.2.RELEASE</version>
</dependency>

java

package com.hascode.tutorial;
 
import java.time.LocalDate;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
 
public class Main {
 
  public static void main(String[] args) {
    Flux.just(News.create("Important news"), News.create("Some other news"),
        News.create("And news, news, news")).subscribe(new Subscriber<News>() {
      private static final int MAX_NEWS = 3;
      private int newsReceived = 0;
      private Subscription subscription;
 
      @Override
      public void onSubscribe(Subscription subscription) {
        System.out.printf("new subscription %s\n", subscription);
        this.subscription = subscription;
        subscription.request(1);
      }
 
      @Override
      public void onNext(News news) {
        System.out.printf("news received: %s (%s)\n", news.getHeadline(), news.getDate());
        newsReceived++;
        if (newsReceived >= MAX_NEWS) {
          System.out.printf("%d news received (max: %d), cancelling subscription\n", newsReceived,
              MAX_NEWS);
          subscription.cancel();
          return;
        }
 
        subscription.request(1);
      }
 
      @Override
      public void onError(Throwable throwable) {
        System.err.printf("error occurred fetching news: %s\n", throwable.getMessage());
        throwable.printStackTrace(System.err);
 
      }
 
      @Override
      public void onComplete() {
        System.out.println("fetching news completed");
      }
    });
  }
}

bash

[DEBUG] (main) Using Console logging
new subscription reactor.core.publisher.StrictSubscriber@704a52ec
news received: Important news (2018-01-08)
news received: Some other news (2018-01-08)
news received: And news, news, news (2018-01-08)
3 news received (max: 3), cancelling subscription

Appendix A: RxJava1

RxJava1 does not implement the Reactive Streams Specification but nevertheless here is an example similar to the ones above.

To use RxJava1 we just need to add the following dependency to our pom.xml:

<dependency>
	<groupId>io.reactivex</groupId>
	<artifactId>rxjava</artifactId>
	<version>1.3.4</version>
</dependency>

This is our sample application:

package com.hascode.tutorial;
 
import java.time.LocalDate;
import rx.Observable;
import rx.Subscriber;
 
public class Main {
 
  public static void main(String[] args) {
    Observable.just(News.create("Important news"), News.create("Some other news"),
        News.create("And news, news, news")).subscribe(new Subscriber<News>() {
 
      private static final int MAX_NEWS = 3;
      private int newsReceived = 0;
 
      @Override
      public void onStart() {
        System.out.println("new subscription");
        request(1);
      }
 
      @Override
      public void onCompleted() {
        System.out.println("fetching news completed");
      }
 
      @Override
      public void onError(Throwable throwable) {
        System.err.printf("error occurred fetching news: %s\n", throwable.getMessage());
        throwable.printStackTrace(System.err);
      }
 
      @Override
      public void onNext(News news) {
        System.out.printf("news received: %s (%s)\n", news.getHeadline(), news.getDate());
        newsReceived++;
        if (newsReceived >= MAX_NEWS) {
          System.out.printf("%d news received (max: %d), cancelling subscription\n", newsReceived,
              MAX_NEWS);
          unsubscribe();
          return;
        }
        request(1);
      }
    });
  }
}

Running the application should produce a similar result:

new subscription
news received: Important news (2018-01-08)
news received: Some other news (2018-01-08)
news received: And news, news, news (2018-01-08)
3 news received (max: 3), cancelling subscription

Appendix B: Modeling with PlantUML

To create diagrams in no time I’m using PlantUML. It’s like Markdown for diagrams and offers a super-easy ASCII-art like syntax.

The diagram created for this tutorial is generated from the following markup (the first  ~10 lines are just for styling):

@startuml
 
title Reactive Streams Specification\nSimplified Communication Flow
skinparam handwritten true
 
skinparam sequence {
ArrowColor black
ActorBorderColor black
LifeLineBorderColor black
LifeLineBackgroundColor black
 
ParticipantBorderColor black
ParticipantBackgroundColor white
ParticipantFontColor black
}
 
Subscriber -> Publisher : subscribe()
Publisher -> Subscription : new()
Publisher -> Subscriber : onSubscribe(Subscription)
loop until completed
Subscriber -> Subscription : request(int)
Subscription -> Publisher
alt no error
Publisher -> Subscriber : onNext(T)
else error
Publisher -> Subscriber : onError(Throwable)
end
 
alt cancel subscription
Subscriber -> Subscription : cancel()
Subscription -> Publisher
end
end
Publisher -> Subscriber : onComplete()
 
@enduml

Tutorial Sources

Please feel free to download the tutorial sources from my Bitbucket repository, fork it there or clone it using Git:

git clone https://bitbucket.org/hascode/java9-reactive-flow.git

Resources

Other Java 9 Articles

Please feel free to read other Java 9 articles of mine:

Article Updates

  • 2018-01-19: Typos in the PlantUML section fixed.
  • 2018-01-15: Section about PlantUML diagramming added.
  • 2018-01-15: Typo in Akka Maven dependencies fixed (thanks Jochen for mentioning).

Tags: , , , , , , , , , , , , , , , , , , , , ,

2 Responses to “Reactive Streams – Java 9 Flow API, RxJava, Akka and Reactor Examples”

  1. Yuri Says:

    Thank you for the article.
    The ULM tool is exactly what I was looking for for a log time)

  2. Micha Kops Says:

    Hi Yuri,
    thanks, you’re welcome!
    I really love PlantUML for its simplicity and the feature to use a handwritten style.
    You can do even crazy things like reverting the colors (with skinparam monochrome reverse).

    Greets,
    Micha

Leave a Reply

Please note, that no personal information like your IP address is stored and you're not required to enter you real name.

Comments must be approved before they are published, so please be patient after having posted a comment - it might take a short while.

Please leave these two fields as-is:
Search
Categories