Java and Project Reactor

Java and Project Reactor
Hello! My name is Lyokha, and I work as a backend developer in FunCorp. Today we will talk about reactive programming, the Reactor library and a little about the web.
Reactive programming is often "exposed", but if you (like the author of the article) still do not know what it is - sit back, try to figure it out together.
these are your manifestos with details of , but worth it.
And here is the web?
There are rumors that if you build your system reactively, according to all the canons of Reactive Manifesto, starting with the HTTP server and ending with the database driver, you can call the second coming. Well, or at least build a really quality backend.
This, of course, is light slyness. But if your yuzkeys - processing of multiple and not always fast requests, and the servlet container ceases to cope - welcome to the wonderful world of reactive!
If you have 128 continuous parallel requests, a servlet container is probably the right tool for the job.
And on what to write reactively, if not on
? It's worth noting that writing a backend on Netty's bare is tedious, and it's nice to have abstractions for work.
There are not many annual server abstractions for Netty, so the guys from Pivotal added to
Spring Boot 2
his support. On March ? 201? all this is even it was found out . To make us very pleased, they created the module
, which is an alternative to
Spring MVC
and represents a reactive approach for writing web services.
WebFlux positions itself as a microframe (microframework and Spring, haha), promises to fit into your (our) fashionable micro services, presents the API in a functional style and already mentioned on Habré . More details (including about the differences from Spring MVC) can be read here . But today about the other. The basis of WebFlux is the Reactor library. About it and talk.
Is a reactive (suddenly!) Open-source platform developed by Pivotal. I decided to free paraphrase (with comments) introduction to this wonderful library .
Blocking code (for the youngest)
The Java code is usually blocking. For example, calls over HTTP or requests to the database hang our current thread until the third-party service answers us. It is normal practice if the service is responsible for an acceptable time. Otherwise, this business turns into bottleneck. We have to parallelize it, run more threads that will execute the same blocking code. In passing, it is necessary to solve the emerging problems with contention and competitiveness.
Frequent blocking, especially due to I /O (and if you have a lot of mobile clients,
? it's not at all fast I /O
), Causes our numerous threads to sit on pants waiting for data, wasting precious resources to switch context and all that.
Parallelization is not a magic wand that solves all problems. It is a complex instrument carrying its overhead.
Async && non-blocking
These terms are easy to find, difficult to understand and impossible to forget. But they often figure when it comes to reactivity, so let's try to understand them.
From the text above it can be concluded that the blocking code is at fault. Okay, let's start writing nonblocking. What does this mean? If we are not yet ready to give the result, then instead of waiting for it we give some error, for example, with a request to repeat the request later. Cool, of course, but what should we do with this mistake? So we have asynchronous processing to react later: everything is ready!
It turns out, it is necessary to write the code asynchronous and non-blocking, and all at us becomes well? No, it will not. But it can make life easier. To do this, kind and intelligent people came up with all sorts of specifications (including reactive ones) and piled libraries, which respect these specifications.
So, Reactor. If very shortly
In fact, Reactor (at least its core part) is the implementation of the specification. Reactive Streams and parts ReactiveX operators . But more on that later.
If you are familiar or heard about RxJava, then Reactor shares the approach and philosophy of RxJav, but has a number of semantic differences (which grow due to backward compatibility from RxJava and Android development features).
What is Reactive Streams in Java?
Roughly, there are 4 interfaces, which are represented in the library reactive-streams-jvm :
Their exact copies are present in class Flow from the ninth of .
If even more rude, then all of them are put forward with the following requirements:
- Asynchrony;
- "non-blocking" of input /output;
- the ability to handle situations where data appears faster than consumed (in a synchronous, imperative code, this situation does not occur, but this is often found in jet systems).
Let's take a look at the Flow class code from JDK 9 (Javadoc comments are removed for brevity):
public final class Flow {
public static interface Publisher {
public void subscribe (Subscriber <? super T> subscriber);
public static interface Subscriber {
public void onSubscribe (Subscription subscription);
public void onNext (T item);
public void onerror (Throwable throwable);
public void onComplete ();
public static interface Subscription {
public void request (long n);
public void cancel ();
public static interface Processor extends Subscriber , Publisher {


For now, this is all the support for reactivity at the JDK level. Somewhere in the incubator module, an HTTP /2-client is maturing, in which Flow is actively used. I did not find other uses inside JDK 9.


Integration of


Reactor is integrated into our favorite pribluda Java ? among which CompletableFuture, Stream, Duration. Supports IPC modules . He has adapters for Akka and RxJava , modules test (obviously, for writing tests) and extra (utility classes).


For fans of Redis, customers have lettuce /redisson There is a reactive API with Reactor support.
For fans of MongoDB There is an official reactive driver, which is implemented by Reactive Streams, in connection with which it is easily picked up by Reactor.


Great, but how to start it all?


All this can be run on JDK8 and higher. However, if you use Android and your (minSdk < 26), то лучше обратите свой взор на RxJava 2.

If you have Maven [/b]
so you need the plugin .

Reactor supports Kotlin .




So, we need to write asynchronous and nonblocking code. In other words, to allow the current execution thread not to block and wait, but to switch to something useful, returning to the current process when the asynchronous processing is completed.


On a sunny island called Java for it there are two main ways:


Callbacks. In the case of callbacks, the method does not have a return value (void), but it takes an additional parameter (lambda, anonymous class, etc.) that is called after a certain event. An example is the EventListener from the Swing library.

Futures. This is an object that promises to return something in the future. In Future, there is a reference to the object , the value of which will be calculated later (asynchronously). Future can block to wait for the result. Suppose the ExecutorService is submit () returns you Future from Callable .  


These are well-known tools, but at some point they are not enough.


Problems with callbacks


Kolbacks do not succumb to the composition and quickly turn into a hodge-podge called "callback hell".


Let's take a look at the example


You need to show the user 5 top memes, and if they are not, then go to the service suggestions and take 5 memes from there.


Total involved 3 services: the first gives the ID of the user's favorite memes, the second one fetches the memes, and the third gives suggestions, if there are no favorite memes.

    //list of your favorite user memories
userService.getFavoriteMemes (userId, new Callback <>() {
//got the data
public void onSuccess (List specificFavoriteMemes) {
.if (userFavoriteMemes.isEmpty ()) {
//if there are no favorite entries, offer him the content
.subjectMemes (new Callback <>() {
public void onSuccess (List suggestedMemes) {
uiUtils.submitOnUiThread (() -> { (). limit (5) .forEach (meme -> {
//rendering data in UI
public void onerror (Throwable error) {
uiUtils.errorPopup (error); //rendering errors in UI
} else {
//if they found records ()
.limit (5) //restrict the output of 5 elements to
.forEach (favId -> memeService.getMemes (favId,
new Callback () {
//fetchime memes
public void onSuccess (Meme loadedMeme) {
uiUtils.submitOnUiThread (() -> {
//renders data in UI
Public void onerror (Throwable error) {
.uiUtils.errorPopup (error);
public void onerror (Throwable error) {
uiUtils.errorPopup (error);


Looks somehow not cool.


Now let's see how we would do it with Reactor

    //get the sequence of identifiers
userService.getFavoriteMemes (userId)
.flatMap (memeService.getMemes) //upload memes to ID
//if empty, we take memes from the service offer
.switchIfEmpty (suggestionService.getSuggestedMemes ())
.take (5) //we need no more than 5 elements
.publishOn (UiUtils.uiThreadScheduler ())) //Send the data to the UI stream
.subscribe (favorites -> { (favorites); //it is called in the UI-stream
}, UiUtils :: errorPopup); //callback in case of error

Reaction.jpeg [/b]


And what if we suddenly wanted to fall off on a time-out of 800 ms and load the cached data?

    userService.getFavoriteMemes (userId)
.timeout (Duration.ofMillis (800)) //duration of the timeout
//alternative data source
.onerrorResume (cacheService.cachedFavoritesFor (userId))
.flatMap (memeService.getMemes) //upload memes to ID
.switchIfEmpty (suggestionService.getSuggestedMemes ())
.take (5) //take the first 5 elements of
.publishOn (UiUtils.uiThreadScheduler ())
.subscribe (favorites -> { (favorites);
}, UiUtils :: errorPopup);


In Reactor, we simply add a timeout statement to the call chain. Timeout throws an exception. The operator is onerrorResume we specify an alternative (fallback) source from which to retrieve the data in case of an error.


Callbacks in 20! ? we also have CompletableFuture


We have a list of IDs by which we want to query the name and statistics, and then combine it as key-value pairs, and all this is asynchronous.

    CompletableFuture    {
Stream {
//get the name (asynchronously)
CompletableFuture nameTask = ifhName (i);
//get the statistics (asynchronously)
CompletableFuture statTask = ifhStat (i);
//combine the results
return nameTask.thenCombineAsync (statTask,
(name, stat) -> "Name" + name + "has stats" + stat);
//We collect the array CompletableFuture
List []combinationArray = combinationList.toArray (
new CompletableFuture[combinationList.size()]);
//wait for all the features to run with allOf
CompletableFuture allDone = CompletableFuture.allOf (combinationArray);
//here is the hack associated with the fact that allOf returns Feauture
return allDone.thenApply (v -> ()
.map (CompletableFuture :: join)
.collect (Collectors.toList ()));
List results = result.join ();
assertThat (results) .contains (
"Name NameJoe has stats 103",
"Name NameBart has stats 104",
"Name NameHenry has stats 105",
"Name NameNicole has stats 106",
" Name NameABSLAJNFOAJNFOANFANSF has stats 121 "


How can we do this with Reactor?

    Flux    ids = ifhrIds ();
Flux combinations = ids.flatMap (id -> {
Mono nameTask = ifhrName (id);
Mono statTask = ifhrStat (id);
//zipWith operator for the combination
return nameTask.zipWith (
statTask, (name, stat) -> "Name" + name + "has stats" + stat
Mono results = result.block (); //since we are in the test, then we simply block
assertThat (results) .containsExactly (
"Name NameJoe has stats 103",
"Name NameBart has stats 104",
"Name NameHenry has stats 105",
"Name NameNicole has stats 106",
" Name NameABSLAJNFOAJNFOANFANSF has stats 121 "


As a result, we are provided with a high-level API, which is compiled and readable (

? in fact, we originally used Reactor for this, because we needed a way to write asynchronous code in the same style ), And other goodies: lazy execution, BackPressure management , various schedulers (Schedulers) and integration.

Okay, what else is Flux and Mono?


Flux and Mono are the two main data structures of Reactor.





Flux Is the implementation of the interface Publisher , is a sequence of 0N elements that can (but not necessarily) end (including and with an error).


The Flux sequence has 3 valid values: a sequence object, a completion signal, or an error signal (calls to the methods onNext , OnComplete , And onerror
? respectively).


Each of the 3 values ​​is optional. For example, Flux can be an infinite empty sequence (no method is called). Or the final empty sequence (only onComplete is called). Or an infinite sequence of values ​​(only onNext is called). Etc.


For example, Flux.interval () gives an infinite sequence of ticks of type Flux .


Construction type:

.interval (Duration.ofSeconds (1))
.doOnEach (signal -> ("{}", signal.get ()))
.blockLast ();


Will print the following text:

    12: 24: ???[parallel-1]INFO = 0
12: 24: ???[parallel-1]INFO - 1
12: 24: ???[parallel-1]INFO = 2
12: 24: ???[parallel-1]INFO - 3
12: 24: ???[parallel-1]INFO - 4
12: 24: ???[parallel-1]INFO - 5
12: 24: ???[parallel-1]INFO = 6
12: 24: ???[parallel-1]INFO 7 7r3r31095. 12: 24: ???[parallel-1]INFO = 8
12: 24: ???[parallel-1]INFO - 9
12: 24: ???[parallel-1]INFO = 10


Method doOnEach (Consumer ) applies a side effect to each element in the sequence, which is convenient for logging.


Pay attention to blockLast () : since the sequence is infinite, the flow in which the call occurs will endlessly wait for the end.


If you are familiar with RxJava, Flux is very similar to Observable




Mono - this is the implementation of the Publisher interface, is an asynchronous element or its absence Mono.empty () .



Unlike Flux, Mono can return no more than 1 element. Calls onComplete () and onerror () , as in the case of Flux, are optional.


Mono can also be used as an asynchronous task in the style of "executed and forgotten", without the return result (similar to Runnable). To do this, you can declare it as Mono and use the empty statement.

    Mono    asyncCall = Mono.fromRunnable (() -> {
//here is some kind of logic
//return Mono.empty () after execution of
asyncCall.subscribe ();


If you are familiar with RxJava, take Mono as a cocktail from Single + Maybe


Why this division?


Separation into Flux and Mono helps improve the semantics of the reactive API, making it expressive enough, but not redundant.


Ideally, just looking at the return value, we can understand what the method does: some call (Mono ), A request-response (Mono ) Or returns a data stream (Flux ).


Flux and Mono use their semantics and flow into each other. Flux has the method single () , which returns Mono , while Mono has the method concatWith (Mono ) , which returns already Flux .


They also have unique operators. Some are meaningful only for N elements in the sequence (Flux) or, conversely, relevant only to one value. For example, Mono has or (Mono ) , and Flux has the operators limit /take .


More examples


The easiest way to create Flux /Mono is to useOne of the masses of factory methods that are represented in these classes.

Initialize Flux with the ready values ​​ [/b]
    Flux    sequence = Flux.just ("foo", "bar", "foobar");    

You can initialize from Iterable [/b]
    List    iterable = Arrays.asList ("foo", "bar", "foobar");
Flux sequence = Flux.fromIterable (iterable);

It is possible from third-party Publisher [/b]
    Publisher    publisher = redisson.getKeys (). getKeys ();
Flux from = Flux.from (publisher);

Well, you can also [/b]
    Mono    noData = Mono.empty (); //empty Mono
Mono data = Mono.just ("foo"); //the string "foo"
//the sequence of //numbers is ? b, 7
Flux numbersFromFiveToSeven = Flux.range (? 3);


Flux and Mono are lazy. In order to run some processing and use the data lying in our Mono and Flux, you need to subscribe to them using .subscribe () .


Subscription is a way to provide lazy behavior and at the same time indicate what you need to do with our data. The subscribe methods use "lambda expressions" from Java 8 as parameters.

Ways to subscribe [/b]
    subscribe (); //start execution
//and do something with each resulting value of
subscribe (Consumer <? super T> consumer);
//and do something in case of exception
subscribe (Consumer <? super T> consumer, Consumer <? super Throwable> errorConsumer);
//and do something at the end of
subscribe (
? Consumer <? super T> consumer,
? Consumer
? errorConsumer,
Runnable completeConsumer

Output ? ? 3 [/b]
    Flux    ints = Flux.range (? 3);
ints.subscribe (i -> System.out.println (i));


will output the following:


Output ? ? 3 and the error [/b]
    Flux    ints = Flux.range (? 4)
.map (i -> {
.if (i ? return i;
throw new RuntimeException ("Got to 4");
.ints.subscribe (
i -> System .out.println (i), error -> System.err.println ("Error:" + error)

will output the following:

Error: java.lang.RuntimeException: Got to 4

Output ? ? ? 4 and Done [/b]
    Flux    ints = Flux.range (? 4);
ints.subscribe (i -> System.out.println (i),
error -> System.err.println ("Error" + error),
() -> {System.out.println ("Done" );


will output the following:



By default, all this will work in the current thread. The execution stream can be changed, for example, using the operator .publishOn () , passing there the Scheduler that interests us (Scheduler is such a spin on ExecutorService).

Change the execution thread [/b]
    Flux    sequence = Flux.range (? 100) .publishOn (Schedulers.single ());
//calls onNext, onComplete, and onerror will occur in single.
sequence.subscribe (n -> {
System.out.println ("n =" + n);
System.out.println ("Thread.currentThread () =" + Thread.currentThread ());
sequence.blockLast ();


will output the following (100 times):

    n = 0-
Thread.currentThread () = Thread[single-1,5,main]

What conclusions can be drawn?
With all due respect to CompletableFuture, their API, composability and readability leaves much to be desired.
Using Reactor, we get a way to manipulate asynchronous data flows without blocking and suffering.
On the side of the backend, unfortunately, there are a number of reasons that still impede the construction of fully reactive systems (for example, blocking drivers).
Reactivity does not make your code more productive, but improves scalability.
You can use Reactor right now to manage the data inside the application.
Here such an entertaining review turned out (no). If you were interested, write, and we will go deeper into what is happening. And do not hesitate to comment!
Thank you for attention!
Based on Reactor documentation
Copies of this document may be made for your own use and for distribution to others, provided that you do not charge any fee for such copies.
I'm not here, but there are more worthy men, incl. and contributors /maintainers.
+ 0 -

Add comment