Implementing Reactive Client-Server Communication over TCP or Websockets with RSocket and Java

November 25th, 2018 by

Reactive design or reactive architecture has an impact on how modern software systems are implemented. RSocket is a project that aims to adapt the benefits of the patterns described in the Reactive Manifesto and resulting tools like Reactive Streams or Reactive Extensions  to a formal new communication protocol.

RSocket works with TCP, WebSockets and Aeron transport layers and offers additional features like session resumption.

In the following tutorial I’m going to demonstrate how to implement simple client-server communication over TCP and Websockets for different interaction models like request-response, request-stream, fire-and-forget and event subscription.

RStreams Tutorial

RStreams Tutorial

 

Prerequisites

Using Maven, we only need to add the following two dependencies to our project’s pom.xml:

<dependency>
  <groupId>io.rsocket</groupId>
  <artifactId>rsocket-core</artifactId>
  <version>0.11.5</version>
</dependency>
<dependency>
  <groupId>io.rsocket</groupId>
  <artifactId>rsocket-transport-netty</artifactId>
  <version>0.11.5</version>
</dependency>

Request – Response Communication

Request – Response communication is modeled as a stream of one.

TCP

This request-response communication example is using a TCP connection as transport.

package com.hascode.tutorial;
 
import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
 
public class TcpRequestResponseExample {
 
  public static void main(String[] args) {
    final int port = 7777;
 
    RSocket responseHandler = new AbstractRSocket() {
      @Override
      public Mono<Payload> requestResponse(Payload payload) {
        return Mono
            .just(DefaultPayload
                .create(String.format("request-response: %s", payload.getDataUtf8())));
      }
    };
 
    Disposable server = RSocketFactory.receive()
        .acceptor(
            (setupPayload, rsocket) ->
                Mono.just(responseHandler))
        .transport(TcpServerTransport.create("localhost", port))
        .start()
        .subscribe();
 
    System.out.printf("tcp server started on port %d%n", port);
 
    RSocket socket =
        RSocketFactory.connect()
            .transport(TcpClientTransport.create("localhost", port))
            .start()
            .block();
 
    System.out.printf("tcp client initialized, connecting to port %d%n", port);
 
    socket
        .requestResponse(DefaultPayload.create("request-response message"))
        .map(Payload::getDataUtf8)
        .doOnNext(System.out::println)
        .block();
 
    socket.dispose();
    server.dispose();
  }
}

Output

Running the code above should produce a similar output:

tcp server started on port 7777
tcp client initialized, connecting to port 7777
request-response: request-response message

Websockets

This request-response communication example is using a Websocket connection as transport.

package com.hascode.tutorial;
 
import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import io.rsocket.transport.netty.server.WebsocketServerTransport;
import io.rsocket.util.DefaultPayload;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
 
public class WebsocketRequestResponseExample {
 
  public static void main(String[] args) {
    final int port = 7777;
 
    RSocket responseHandler = new AbstractRSocket() {
      @Override
      public Mono<Payload> requestResponse(Payload payload) {
        return Mono
            .just(DefaultPayload
                .create(String.format("request-response: %s", payload.getDataUtf8())));
      }
    };
 
    Disposable server = RSocketFactory.receive()
        .acceptor(
            (setupPayload, rsocket) ->
                Mono.just(responseHandler))
        .transport(WebsocketServerTransport.create("localhost", port))
        .start()
        .subscribe();
 
    System.out.printf("websocket server started on port %d%n", port);
 
    RSocket socket =
        RSocketFactory.connect()
            .transport(WebsocketClientTransport.create("localhost", port))
            .start()
            .block();
 
    System.out.printf("websocket client initialized, connecting to port %d%n", port);
 
    socket
        .requestResponse(DefaultPayload.create("request-response message"))
        .map(Payload::getDataUtf8)
        .doOnNext(System.out::println)
        .block();
 
    socket.dispose();
    server.dispose();
  }
}

Output

Running the code above should produce a similar output:

websocket server started on port 7777
websocket client initialized, connecting to port 7777
request-response: request-response message

Request – Stream Communication

Request – Stream communication is modeled as a finite Stream of Many..

TCP

This request-stream communication example is using a TCP connection as transport.

package com.hascode.tutorial;
 
import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
 
public class TcpRequestStreamExample {
 
  public static void main(String[] args) throws Exception {
    final int port = 7777;
 
    RSocket responseHandler = new AbstractRSocket() {
      @Override
      public Flux<Payload> requestStream(Payload payload) {
        return Flux.range(1, 20)
            .map(i -> DefaultPayload.create("part-" + i));
      }
    };
 
    Disposable server = RSocketFactory.receive()
        .acceptor(
            (setupPayload, rsocket) ->
                Mono.just(responseHandler))
        .transport(TcpServerTransport.create("localhost", port))
        .start()
        .subscribe();
 
    System.out.printf("tcp server started on port %d%n", port);
 
    RSocket socket =
        RSocketFactory.connect()
            .transport(TcpClientTransport.create("localhost", port))
            .start()
            .block();
 
    System.out.printf("tcp client initialized, connecting to port %d%n", port);
 
    socket
        .requestStream(DefaultPayload.create("request-stream message"))
        .subscribe(payload -> System.out.println(payload.getDataUtf8()));
 
    Thread.sleep(3000);
    socket.dispose();
    server.dispose();
  }
}

Output

Running the code above should produce a similar output:

tcp server started on port 7777
tcp client initialized, connecting to port 7777
part-1
part-2
part-3
part-4
part-5
part-6
part-7
part-8
part-9
part-10
part-11
part-12
part-13
part-14
part-15
part-16
part-17
part-18
part-19
part-20

Websockets

This request-stream communication example is using a Websocket connection as transport.

package com.hascode.tutorial;
 
import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import io.rsocket.transport.netty.server.WebsocketServerTransport;
import io.rsocket.util.DefaultPayload;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
 
public class WebsocketRequestStreamExample {
 
  public static void main(String[] args) throws Exception {
    final int port = 7777;
 
    RSocket responseHandler = new AbstractRSocket() {
      @Override
      public Flux<Payload> requestStream(Payload payload) {
        return Flux.range(1, 20)
            .map(i -> DefaultPayload.create("part-" + i));
      }
    };
 
    Disposable server = RSocketFactory.receive()
        .acceptor(
            (setupPayload, rsocket) ->
                Mono.just(responseHandler))
        .transport(WebsocketServerTransport.create("localhost", port))
        .start()
        .subscribe();
 
    System.out.printf("websocket server started on port %d%n", port);
 
    RSocket socket =
        RSocketFactory.connect()
            .transport(WebsocketClientTransport.create("localhost", port))
            .start()
            .block();
 
    System.out.printf("websocket client initialized, connecting to port %d%n", port);
 
    socket
        .requestStream(DefaultPayload.create("request-stream message"))
        .subscribe(payload -> System.out.println(payload.getDataUtf8()));
 
    Thread.sleep(3000);
    socket.dispose();
    server.dispose();
  }
}

Output

Running the code above should produce a similar output:

websocket server started on port 7777
websocket client initialized, connecting to port 7777
part-1
part-2
part-3
part-4
part-5
part-6
part-7
part-8
part-9
part-10
part-11
part-12
part-13
part-14
part-15
part-16
part-17
part-18
part-19
part-20

Fire and Forget Communication

Fire and forget is a communication model where the caller receives no response.

TCP

This fire and forget communication example is using a TCP connection as transport.

package com.hascode.tutorial;
 
import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
 
public class TcpFireForgetExample {
 
  public static void main(String[] args) throws Exception {
    final int port = 7777;
 
    RSocket responseHandler = new AbstractRSocket() {
      @Override
      public Mono<Void> fireAndForget(Payload payload) {
        System.out.printf("fire-forget: %s%n", payload.getDataUtf8());
        return Mono.empty();
      }
    };
 
    Disposable server = RSocketFactory.receive()
        .acceptor(
            (setupPayload, rsocket) ->
                Mono.just(responseHandler))
        .transport(TcpServerTransport.create("localhost", port))
        .start()
        .subscribe();
 
    System.out.printf("tcp server started on port %d%n", port);
 
    RSocket socket =
        RSocketFactory.connect()
            .transport(TcpClientTransport.create("localhost", port))
            .start()
            .block();
 
    System.out.printf("tcp client initialized, connecting to port %d%n", port);
 
    socket
        .fireAndForget(DefaultPayload.create("message send as fire and forget"))
        .doOnNext(System.out::println)
        .block();
 
    Thread.sleep(2_000);
    socket.dispose();
    server.dispose();
  }
}

Output

Running the code above should produce a similar output:

tcp server started on port 7777
tcp client initialized, connecting to port 7777
fire-forget: message send as fire and forget

Websockets

This fire and forget communication example is using a Websocket connection as transport.

package com.hascode.tutorial;
 
import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import io.rsocket.transport.netty.server.WebsocketServerTransport;
import io.rsocket.util.DefaultPayload;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
 
public class WebsocketFireForgetExample {
 
  public static void main(String[] args) throws Exception {
    final int port = 7777;
 
    RSocket responseHandler = new AbstractRSocket() {
      @Override
      public Mono<Void> fireAndForget(Payload payload) {
        System.out.printf("fire-forget: %s%n", payload.getDataUtf8());
        return Mono.empty();
      }
    };
 
    Disposable server = RSocketFactory.receive()
        .acceptor(
            (setupPayload, rsocket) ->
                Mono.just(responseHandler))
        .transport(WebsocketServerTransport.create("localhost", port))
        .start()
        .subscribe();
 
    System.out.printf("websocket server started on port %d%n", port);
 
    RSocket socket =
        RSocketFactory.connect()
            .transport(WebsocketClientTransport.create("localhost", port))
            .start()
            .block();
 
    System.out.printf("websocket client initialized, connecting to port %d%n", port);
 
    socket
        .fireAndForget(DefaultPayload.create("message send as fire and forget"))
        .doOnNext(System.out::println)
        .block();
 
    Thread.sleep(1_000);
    socket.dispose();
    server.dispose();
  }
}

Output

Running the code above should produce a similar output:

websocket server started on port 7777
websocket client initialized, connecting to port 7777
fire-forget: message send as fire and forget

Event Subscription

Event subscriptions are modeled as infinite stream of many.

TCP

This event subscription example is using a TCP connection as transport.

package com.hascode.tutorial;
 
import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import java.time.Duration;
import java.time.Instant;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
 
public class TcpRequestChannelExample {
 
  public static void main(String[] args) {
    final int port = 7777;
 
    RSocket responseHandler = new AbstractRSocket() {
      @Override
      public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
        return Flux.from(payloads)
            .map(Payload::getDataUtf8)
            .map(str -> String.format("channel message received: '%s'", str))
            .map(DefaultPayload::create);
      }
    };
 
    Disposable server = RSocketFactory.receive()
        .acceptor(
            (setupPayload, rsocket) ->
                Mono.just(responseHandler))
        .transport(TcpServerTransport.create("localhost", port))
        .start()
        .subscribe();
 
    System.out.printf("tcp server started on port %d%n", port);
 
    RSocket socket =
        RSocketFactory.connect()
            .transport(TcpClientTransport.create("localhost", port))
            .start()
            .block();
 
    System.out.printf("tcp client initialized, connecting to port %d%n", port);
 
    socket
        .requestChannel(
            Flux.interval(Duration.ofMillis(1_000))
                .map(i -> DefaultPayload.create("channel message " + Instant.now())))
        .map(Payload::getDataUtf8)
        .doOnNext(System.out::println)
        .take(10)
        .doFinally(signalType -> socket.dispose())
        .then().block();
 
    server.dispose();
  }
}

Output

Running the code above should produce a similar output:

tcp server started on port 7777
tcp client initialized, connecting to port 7777
channel message received: 'channel message 2018-11-25T16:20:02.347Z'
channel message received: 'channel message 2018-11-25T16:20:03.345Z'
channel message received: 'channel message 2018-11-25T16:20:04.345Z'
channel message received: 'channel message 2018-11-25T16:20:05.345Z'
channel message received: 'channel message 2018-11-25T16:20:06.345Z'
channel message received: 'channel message 2018-11-25T16:20:07.345Z'
channel message received: 'channel message 2018-11-25T16:20:08.345Z'
channel message received: 'channel message 2018-11-25T16:20:09.345Z'
channel message received: 'channel message 2018-11-25T16:20:10.345Z'
channel message received: 'channel message 2018-11-25T16:20:11.345Z'

Websockets

This event subscription example is using a Websocket connection as transport.

package com.hascode.tutorial;
 
import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import io.rsocket.transport.netty.server.WebsocketServerTransport;
import io.rsocket.util.DefaultPayload;
import java.time.Duration;
import java.time.Instant;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
 
public class WebsocketRequestChannelExample {
 
  public static void main(String[] args) {
    final int port = 7777;
 
    RSocket responseHandler = new AbstractRSocket() {
      @Override
      public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
        return Flux.from(payloads)
            .map(Payload::getDataUtf8)
            .map(str -> String.format("channel message received: '%s'", str))
            .map(DefaultPayload::create);
      }
    };
 
    Disposable server = RSocketFactory.receive()
        .acceptor(
            (setupPayload, rsocket) ->
                Mono.just(responseHandler))
        .transport(WebsocketServerTransport.create("localhost", port))
        .start()
        .subscribe();
 
    System.out.printf("websocket server started on port %d%n", port);
 
    RSocket socket =
        RSocketFactory.connect()
            .transport(WebsocketClientTransport.create("localhost", port))
            .start()
            .block();
 
    System.out.printf("websocket client initialized, connecting to port %d%n", port);
 
    socket
        .requestChannel(
            Flux.interval(Duration.ofMillis(1_000))
                .map(i -> DefaultPayload.create("channel message " + Instant.now())))
        .map(Payload::getDataUtf8)
        .doOnNext(System.out::println)
        .take(10)
        .doFinally(signalType -> socket.dispose())
        .then().block();
 
    server.dispose();
  }
}

Output

Running the code above should produce a similar output:

websocket server started on port 7777
websocket client initialized, connecting to port 7777
channel message received: 'channel message 2018-11-25T16:20:50.299Z'
channel message received: 'channel message 2018-11-25T16:20:51.296Z'
channel message received: 'channel message 2018-11-25T16:20:52.296Z'
channel message received: 'channel message 2018-11-25T16:20:53.296Z'
channel message received: 'channel message 2018-11-25T16:20:54.296Z'
channel message received: 'channel message 2018-11-25T16:20:55.296Z'
channel message received: 'channel message 2018-11-25T16:20:56.296Z'
channel message received: 'channel message 2018-11-25T16:20:57.296Z'
channel message received: 'channel message 2018-11-25T16:20:58.296Z'
channel message received: 'channel message 2018-11-25T16:20:59.296Z'

Tutorial Sources

Please feel free to download the tutorial sources from my Bitbucket repository, fork it there or clone it using Git:

git clone https://bitbucket.org/hascode/rsocket-tutorial.git

Resources

Tags: , , , , , , , , , , , , , ,

Leave a Reply

Please note, that no personal information like your IP address is stored and you're not required to enter you real name.

Comments must be approved before they are published, so please be patient after having posted a comment - it might take a short while.

Please leave these two fields as-is:
Search
Categories