Implementing Reactive Client-Server Communication over TCP or Websockets with RSocket and Java
November 25th, 2018 by Micha KopsReactive design or reactive architecture has an impact on how modern software systems are implemented. RSocket is a project that aims to adapt the benefits of the patterns described in the Reactive Manifesto and resulting tools like Reactive Streams or Reactive Extensions to a formal new communication protocol.
RSocket works with TCP, WebSockets and Aeron transport layers and offers additional features like session resumption.
In the following tutorial I’m going to demonstrate how to implement simple client-server communication over TCP and Websockets for different interaction models like request-response, request-stream, fire-and-forget and event subscription.
Contents
Prerequisites
Using Maven, we only need to add the following two dependencies to our project’s pom.xml:
<dependency> <groupId>io.rsocket</groupId> <artifactId>rsocket-core</artifactId> <version>0.11.5</version> </dependency> <dependency> <groupId>io.rsocket</groupId> <artifactId>rsocket-transport-netty</artifactId> <version>0.11.5</version> </dependency>
Request – Response Communication
Request – Response communication is modeled as a stream of one.
TCP
This request-response communication example is using a TCP connection as transport.
package com.hascode.tutorial; import io.rsocket.AbstractRSocket; import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.RSocketFactory; import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.transport.netty.server.TcpServerTransport; import io.rsocket.util.DefaultPayload; import reactor.core.Disposable; import reactor.core.publisher.Mono; public class TcpRequestResponseExample { public static void main(String[] args) { final int port = 7777; RSocket responseHandler = new AbstractRSocket() { @Override public Mono<Payload> requestResponse(Payload payload) { return Mono .just(DefaultPayload .create(String.format("request-response: %s", payload.getDataUtf8()))); } }; Disposable server = RSocketFactory.receive() .acceptor( (setupPayload, rsocket) -> Mono.just(responseHandler)) .transport(TcpServerTransport.create("localhost", port)) .start() .subscribe(); System.out.printf("tcp server started on port %d%n", port); RSocket socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", port)) .start() .block(); System.out.printf("tcp client initialized, connecting to port %d%n", port); socket .requestResponse(DefaultPayload.create("request-response message")) .map(Payload::getDataUtf8) .doOnNext(System.out::println) .block(); socket.dispose(); server.dispose(); } }
Output
Running the code above should produce a similar output:
tcp server started on port 7777 tcp client initialized, connecting to port 7777 request-response: request-response message
Websockets
This request-response communication example is using a Websocket connection as transport.
package com.hascode.tutorial; import io.rsocket.AbstractRSocket; import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.RSocketFactory; import io.rsocket.transport.netty.client.WebsocketClientTransport; import io.rsocket.transport.netty.server.WebsocketServerTransport; import io.rsocket.util.DefaultPayload; import reactor.core.Disposable; import reactor.core.publisher.Mono; public class WebsocketRequestResponseExample { public static void main(String[] args) { final int port = 7777; RSocket responseHandler = new AbstractRSocket() { @Override public Mono<Payload> requestResponse(Payload payload) { return Mono .just(DefaultPayload .create(String.format("request-response: %s", payload.getDataUtf8()))); } }; Disposable server = RSocketFactory.receive() .acceptor( (setupPayload, rsocket) -> Mono.just(responseHandler)) .transport(WebsocketServerTransport.create("localhost", port)) .start() .subscribe(); System.out.printf("websocket server started on port %d%n", port); RSocket socket = RSocketFactory.connect() .transport(WebsocketClientTransport.create("localhost", port)) .start() .block(); System.out.printf("websocket client initialized, connecting to port %d%n", port); socket .requestResponse(DefaultPayload.create("request-response message")) .map(Payload::getDataUtf8) .doOnNext(System.out::println) .block(); socket.dispose(); server.dispose(); } }
Output
Running the code above should produce a similar output:
websocket server started on port 7777 websocket client initialized, connecting to port 7777 request-response: request-response message
Request – Stream Communication
Request – Stream communication is modeled as a finite Stream of Many..
TCP
This request-stream communication example is using a TCP connection as transport.
package com.hascode.tutorial; import io.rsocket.AbstractRSocket; import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.RSocketFactory; import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.transport.netty.server.TcpServerTransport; import io.rsocket.util.DefaultPayload; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class TcpRequestStreamExample { public static void main(String[] args) throws Exception { final int port = 7777; RSocket responseHandler = new AbstractRSocket() { @Override public Flux<Payload> requestStream(Payload payload) { return Flux.range(1, 20) .map(i -> DefaultPayload.create("part-" + i)); } }; Disposable server = RSocketFactory.receive() .acceptor( (setupPayload, rsocket) -> Mono.just(responseHandler)) .transport(TcpServerTransport.create("localhost", port)) .start() .subscribe(); System.out.printf("tcp server started on port %d%n", port); RSocket socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", port)) .start() .block(); System.out.printf("tcp client initialized, connecting to port %d%n", port); socket .requestStream(DefaultPayload.create("request-stream message")) .subscribe(payload -> System.out.println(payload.getDataUtf8())); Thread.sleep(3000); socket.dispose(); server.dispose(); } }
Output
Running the code above should produce a similar output:
tcp server started on port 7777 tcp client initialized, connecting to port 7777 part-1 part-2 part-3 part-4 part-5 part-6 part-7 part-8 part-9 part-10 part-11 part-12 part-13 part-14 part-15 part-16 part-17 part-18 part-19 part-20
Websockets
This request-stream communication example is using a Websocket connection as transport.
package com.hascode.tutorial; import io.rsocket.AbstractRSocket; import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.RSocketFactory; import io.rsocket.transport.netty.client.WebsocketClientTransport; import io.rsocket.transport.netty.server.WebsocketServerTransport; import io.rsocket.util.DefaultPayload; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class WebsocketRequestStreamExample { public static void main(String[] args) throws Exception { final int port = 7777; RSocket responseHandler = new AbstractRSocket() { @Override public Flux<Payload> requestStream(Payload payload) { return Flux.range(1, 20) .map(i -> DefaultPayload.create("part-" + i)); } }; Disposable server = RSocketFactory.receive() .acceptor( (setupPayload, rsocket) -> Mono.just(responseHandler)) .transport(WebsocketServerTransport.create("localhost", port)) .start() .subscribe(); System.out.printf("websocket server started on port %d%n", port); RSocket socket = RSocketFactory.connect() .transport(WebsocketClientTransport.create("localhost", port)) .start() .block(); System.out.printf("websocket client initialized, connecting to port %d%n", port); socket .requestStream(DefaultPayload.create("request-stream message")) .subscribe(payload -> System.out.println(payload.getDataUtf8())); Thread.sleep(3000); socket.dispose(); server.dispose(); } }
Output
Running the code above should produce a similar output:
websocket server started on port 7777 websocket client initialized, connecting to port 7777 part-1 part-2 part-3 part-4 part-5 part-6 part-7 part-8 part-9 part-10 part-11 part-12 part-13 part-14 part-15 part-16 part-17 part-18 part-19 part-20
Fire and Forget Communication
Fire and forget is a communication model where the caller receives no response.
TCP
This fire and forget communication example is using a TCP connection as transport.
package com.hascode.tutorial; import io.rsocket.AbstractRSocket; import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.RSocketFactory; import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.transport.netty.server.TcpServerTransport; import io.rsocket.util.DefaultPayload; import reactor.core.Disposable; import reactor.core.publisher.Mono; public class TcpFireForgetExample { public static void main(String[] args) throws Exception { final int port = 7777; RSocket responseHandler = new AbstractRSocket() { @Override public Mono<Void> fireAndForget(Payload payload) { System.out.printf("fire-forget: %s%n", payload.getDataUtf8()); return Mono.empty(); } }; Disposable server = RSocketFactory.receive() .acceptor( (setupPayload, rsocket) -> Mono.just(responseHandler)) .transport(TcpServerTransport.create("localhost", port)) .start() .subscribe(); System.out.printf("tcp server started on port %d%n", port); RSocket socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", port)) .start() .block(); System.out.printf("tcp client initialized, connecting to port %d%n", port); socket .fireAndForget(DefaultPayload.create("message send as fire and forget")) .doOnNext(System.out::println) .block(); Thread.sleep(2_000); socket.dispose(); server.dispose(); } }
Output
Running the code above should produce a similar output:
tcp server started on port 7777 tcp client initialized, connecting to port 7777 fire-forget: message send as fire and forget
Websockets
This fire and forget communication example is using a Websocket connection as transport.
package com.hascode.tutorial; import io.rsocket.AbstractRSocket; import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.RSocketFactory; import io.rsocket.transport.netty.client.WebsocketClientTransport; import io.rsocket.transport.netty.server.WebsocketServerTransport; import io.rsocket.util.DefaultPayload; import reactor.core.Disposable; import reactor.core.publisher.Mono; public class WebsocketFireForgetExample { public static void main(String[] args) throws Exception { final int port = 7777; RSocket responseHandler = new AbstractRSocket() { @Override public Mono<Void> fireAndForget(Payload payload) { System.out.printf("fire-forget: %s%n", payload.getDataUtf8()); return Mono.empty(); } }; Disposable server = RSocketFactory.receive() .acceptor( (setupPayload, rsocket) -> Mono.just(responseHandler)) .transport(WebsocketServerTransport.create("localhost", port)) .start() .subscribe(); System.out.printf("websocket server started on port %d%n", port); RSocket socket = RSocketFactory.connect() .transport(WebsocketClientTransport.create("localhost", port)) .start() .block(); System.out.printf("websocket client initialized, connecting to port %d%n", port); socket .fireAndForget(DefaultPayload.create("message send as fire and forget")) .doOnNext(System.out::println) .block(); Thread.sleep(1_000); socket.dispose(); server.dispose(); } }
Output
Running the code above should produce a similar output:
websocket server started on port 7777 websocket client initialized, connecting to port 7777 fire-forget: message send as fire and forget
Event Subscription
Event subscriptions are modeled as infinite stream of many.
TCP
This event subscription example is using a TCP connection as transport.
package com.hascode.tutorial; import io.rsocket.AbstractRSocket; import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.RSocketFactory; import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.transport.netty.server.TcpServerTransport; import io.rsocket.util.DefaultPayload; import java.time.Duration; import java.time.Instant; import org.reactivestreams.Publisher; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class TcpRequestChannelExample { public static void main(String[] args) { final int port = 7777; RSocket responseHandler = new AbstractRSocket() { @Override public Flux<Payload> requestChannel(Publisher<Payload> payloads) { return Flux.from(payloads) .map(Payload::getDataUtf8) .map(str -> String.format("channel message received: '%s'", str)) .map(DefaultPayload::create); } }; Disposable server = RSocketFactory.receive() .acceptor( (setupPayload, rsocket) -> Mono.just(responseHandler)) .transport(TcpServerTransport.create("localhost", port)) .start() .subscribe(); System.out.printf("tcp server started on port %d%n", port); RSocket socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", port)) .start() .block(); System.out.printf("tcp client initialized, connecting to port %d%n", port); socket .requestChannel( Flux.interval(Duration.ofMillis(1_000)) .map(i -> DefaultPayload.create("channel message " + Instant.now()))) .map(Payload::getDataUtf8) .doOnNext(System.out::println) .take(10) .doFinally(signalType -> socket.dispose()) .then().block(); server.dispose(); } }
Output
Running the code above should produce a similar output:
tcp server started on port 7777 tcp client initialized, connecting to port 7777 channel message received: 'channel message 2018-11-25T16:20:02.347Z' channel message received: 'channel message 2018-11-25T16:20:03.345Z' channel message received: 'channel message 2018-11-25T16:20:04.345Z' channel message received: 'channel message 2018-11-25T16:20:05.345Z' channel message received: 'channel message 2018-11-25T16:20:06.345Z' channel message received: 'channel message 2018-11-25T16:20:07.345Z' channel message received: 'channel message 2018-11-25T16:20:08.345Z' channel message received: 'channel message 2018-11-25T16:20:09.345Z' channel message received: 'channel message 2018-11-25T16:20:10.345Z' channel message received: 'channel message 2018-11-25T16:20:11.345Z'
Websockets
This event subscription example is using a Websocket connection as transport.
package com.hascode.tutorial; import io.rsocket.AbstractRSocket; import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.RSocketFactory; import io.rsocket.transport.netty.client.WebsocketClientTransport; import io.rsocket.transport.netty.server.WebsocketServerTransport; import io.rsocket.util.DefaultPayload; import java.time.Duration; import java.time.Instant; import org.reactivestreams.Publisher; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class WebsocketRequestChannelExample { public static void main(String[] args) { final int port = 7777; RSocket responseHandler = new AbstractRSocket() { @Override public Flux<Payload> requestChannel(Publisher<Payload> payloads) { return Flux.from(payloads) .map(Payload::getDataUtf8) .map(str -> String.format("channel message received: '%s'", str)) .map(DefaultPayload::create); } }; Disposable server = RSocketFactory.receive() .acceptor( (setupPayload, rsocket) -> Mono.just(responseHandler)) .transport(WebsocketServerTransport.create("localhost", port)) .start() .subscribe(); System.out.printf("websocket server started on port %d%n", port); RSocket socket = RSocketFactory.connect() .transport(WebsocketClientTransport.create("localhost", port)) .start() .block(); System.out.printf("websocket client initialized, connecting to port %d%n", port); socket .requestChannel( Flux.interval(Duration.ofMillis(1_000)) .map(i -> DefaultPayload.create("channel message " + Instant.now()))) .map(Payload::getDataUtf8) .doOnNext(System.out::println) .take(10) .doFinally(signalType -> socket.dispose()) .then().block(); server.dispose(); } }
Output
Running the code above should produce a similar output:
websocket server started on port 7777 websocket client initialized, connecting to port 7777 channel message received: 'channel message 2018-11-25T16:20:50.299Z' channel message received: 'channel message 2018-11-25T16:20:51.296Z' channel message received: 'channel message 2018-11-25T16:20:52.296Z' channel message received: 'channel message 2018-11-25T16:20:53.296Z' channel message received: 'channel message 2018-11-25T16:20:54.296Z' channel message received: 'channel message 2018-11-25T16:20:55.296Z' channel message received: 'channel message 2018-11-25T16:20:56.296Z' channel message received: 'channel message 2018-11-25T16:20:57.296Z' channel message received: 'channel message 2018-11-25T16:20:58.296Z' channel message received: 'channel message 2018-11-25T16:20:59.296Z'
Tutorial Sources
Please feel free to download the tutorial sources from my Bitbucket repository, fork it there or clone it using Git:
git clone https://bitbucket.org/hascode/rsocket-tutorial.git
Resources
Tags: asynchronous, channel, client, event, fire-and-forget, reactive, request, response, rsocket, server, stream, subscribe, subscription, tcp, websocket
April 9th, 2019 at 10:24 am
Toll :) Erste Google-Ergebnisseite für “Java RSocket example” und da biste :) Geiler Beitrag, Merci!
April 11th, 2019 at 2:54 pm
You’re welcome! нравится случаться :)