Java and Project Reactor

Java and Project Reactor
Hello! My name is Lyokha, and I work as a backend developer in FunCorp. Today we will talk about reactive programming, the Reactor library and a little about the web.
Reactive programming is often "exposed", but if you (like the author of the article) still do not know what it is - sit back, try to figure it out together.
these are your manifestos with details of , but worth it.
And here is the web?
There are rumors that if you build your system reactively, according to all the canons of Reactive Manifesto, starting with the HTTP server and ending with the database driver, you can call the second coming. Well, or at least build a really quality backend.
This, of course, is light slyness. But if your yuzkeys - processing of multiple and not always fast requests, and the servlet container ceases to cope - welcome to the wonderful world of reactive!
If you have 128 continuous parallel requests, a servlet container is probably the right tool for the job.
And on what to write reactively, if not on
? It's worth noting that writing a backend on Netty's bare is tedious, and it's nice to have abstractions for work.
There are not many annual server abstractions for Netty, so the guys from Pivotal added to
Spring Boot 2
his support. On March ? 201? all this is even it was found out . To make us very pleased, they created the module
, which is an alternative to
Spring MVC
and represents a reactive approach for writing web services.
WebFlux positions itself as a microframe (microframework and Spring, haha), promises to fit into your (our) fashionable micro services, presents the API in a functional style and already mentioned on Habré . More details (including about the differences from Spring MVC) can be read here . But today about the other. The basis of WebFlux is the Reactor library. About it and talk.
Is a reactive (suddenly!) Open-source platform developed by Pivotal. I decided to free paraphrase (with comments) introduction to this wonderful library .
Blocking code (for the youngest)
The Java code is usually blocking. For example, calls over HTTP or requests to the database hang our current thread until the third-party service answers us. It is normal practice if the service is responsible for an acceptable time. Otherwise, this business turns into bottleneck. We have to parallelize it, run more threads that will execute the same blocking code. In passing, it is necessary to solve the emerging problems with contention and competitiveness.
Frequent blocking, especially due to I /O (and if you have a lot of mobile clients,
? it's not at all fast I /O
), Causes our numerous threads to sit on pants waiting for data, wasting precious resources to switch context and all that.
Parallelization is not a magic wand that solves all problems. It is a complex instrument carrying its overhead.
Async && non-blocking
These terms are easy to find, difficult to understand and impossible to forget. But they often figure when it comes to reactivity, so let's try to understand them.
From the text above it can be concluded that the blocking code is at fault. Okay, let's start writing nonblocking. What does this mean? If we are not yet ready to give the result, then instead of waiting for it we give some error, for example, with a request to repeat the request later. Cool, of course, but what should we do with this mistake? So we have asynchronous processing to react later: everything is ready!
It turns out, it is necessary to write the code asynchronous and non-blocking, and all at us becomes well? No, it will not. But it can make life easier. To do this, kind and intelligent people came up with all sorts of specifications (including reactive ones) and piled libraries, which respect these specifications.
So, Reactor. If very shortly
In fact, Reactor (at least its core part) is the implementation of the specification. Reactive Streams and parts ReactiveX operators . But more on that later.
If you are familiar or heard about RxJava, then Reactor shares the approach and philosophy of RxJav, but has a number of semantic differences (which grow due to backward compatibility from RxJava and Android development features).
What is Reactive Streams in Java?
Roughly, there are 4 interfaces, which are represented in the library reactive-streams-jvm :
Their exact copies are present in class Flow from the ninth of .
If even more rude, then all of them are put forward with the following requirements:
- Asynchrony;
- "non-blocking" of input /output;
- the ability to handle situations where data appears faster than consumed (in a synchronous, imperative code, this situation does not occur, but this is often found in jet systems).
Let's take a look at the Flow class code from JDK 9 (Javadoc comments are removed for brevity):
public final class Flow {
public static interface Publisher {
public void subscribe (Subscriber <? super T> subscriber);
public static interface Subscriber {
public void onSubscribe (Subscription subscription);
public void onNext (T item);
public void onerror (Throwable throwable);
public void onComplete ();
public static interface Subscription {
public void request (long n);
public void cancel ();
public static interface Processor extends Subscriber , Publisher {


For now, this is all the support for reactivity at the JDK level. Somewhere in the incubator module, an HTTP /2-client is maturing, in which Flow is actively used. I did not find other uses inside JDK 9.


Integration of


Reactor is integrated into our favorite pribluda Java ? among which CompletableFuture, Stream, Duration. Supports IPC modules . He has adapters for Akka and RxJava , modules test (obviously, for writing tests) and extra (utility classes).


For fans of Redis, customers have lettuce /redisson There is a reactive API with Reactor support.
For fans of MongoDB There is an official reactive driver, which is implemented by Reactive Streams, in connection with which it is easily picked up by Reactor.


Great, but how to start it all?


All this can be run on JDK8 and higher. However, if you use Android and your (minSdk < 26), то лучше обратите свой взор на RxJava 2.

If you have Maven [/b]
so you need the plugin .

Reactor supports Kotlin .




So, we need to write asynchronous and nonblocking code. In other words, to allow the current execution thread not to block and wait, but to switch to something useful, returning to the current process when the asynchronous processing is completed.


On a sunny island called Java for it there are two main ways:


Callbacks. In the case of callbacks, the method does not have a return value (void), but it takes an additional parameter (lambda, anonymous class, etc.) that is called after a certain event. An example is the EventListener from the Swing library.

Futures. This is an object that promises to return something in the future. In Future, there is a reference to the object , the value of which will be calculated later (asynchronously). Future can block to wait for the result. Suppose the ExecutorService is submit () returns you Future from Callable .  


These are well-known tools, but at some point they are not enough.


Problems with callbacks


Kolbacks do not succumb to the composition and quickly turn into a hodge-podge called "callback hell".


Let's take a look at the example


You need to show the user 5 top memes, and if they are not, then go to the service suggestions and take 5 memes from there.


Total involved 3 services: the first gives the ID of the user's favorite memes, the second one fetches the memes, and the third gives suggestions, if there are no favorite memes.

    //list of your favorite user memories
userService.getFavoriteMemes (userId, new Callback <>() {
//got the data
public void onSuccess (List specificFavoriteMemes) {
.if (userFavoriteMemes.isEmpty ()) {
//if there are no favorite entries, offer him the content
.subjectMemes (new Callback <>() {
public void onSuccess (List suggestedMemes) {
uiUtils.submitOnUiThread (() -> { (). limit (5) .forEach (meme -> {
//rendering data in UI
public void onerror (Throwable error) {
uiUtils.errorPopup (error); //rendering errors in UI
} else {
//if they found records ()
.limit (5) //restrict the output of 5 elements to
.forEach (favId -> memeService.getMemes (favId,
new Callback () {
//fetchime memes
public void onSuccess (Meme loadedMeme) {
uiUtils.submitOnUiThread (() -> {
//renders data in UI
Public void onerror (Throwable error) {
.uiUtils.errorPopup (error);
public void onerror (Throwable error) {
uiUtils.errorPopup (error);


Looks somehow not cool.


Now let's see how we would do it with Reactor

    //get the sequence of identifiers
userService.getFavoriteMemes (userId)
.flatMap (memeService.getMemes) //upload memes to ID
//if empty, we take memes from the service offer
.switchIfEmpty (suggestionService.getSuggestedMemes ())
.take (5) //we need no more than 5 elements
.publishOn (UiUtils.uiThreadScheduler ())) //Send the data to the UI stream
.subscribe (favorites -> { (favorites); //it is called in the UI-stream
}, UiUtils :: errorPopup); //callback in case of error

Reaction.jpeg [/b]


And what if we suddenly wanted to fall off on a time-out of 800 ms and load the cached data?

    userService.getFavoriteMemes (userId)
.timeout (Duration.ofMillis (800)) //duration of the timeout
//alternative data source
.onerrorResume (cacheService.cachedFavoritesFor (userId))
.flatMap (memeService.getMemes) //upload memes to ID
.switchIfEmpty (suggestionService.getSuggestedMemes ())
.take (5) //take the first 5 elements of
.publishOn (UiUtils.uiThreadScheduler ())
.subscribe (favorites -> { (favorites);
}, UiUtils :: errorPopup);


In Reactor, we simply add a timeout statement to the call chain. Timeout throws an exception. The operator is onerrorResume we specify an alternative (fallback) source from which to retrieve the data in case of an error.


Callbacks in 20! ? we also have CompletableFuture


We have a list of IDs by which we want to query the name and statistics, and then combine it as key-value pairs, and all this is asynchronous.

    CompletableFuture    {
Stream {
//get the name (asynchronously)
CompletableFuture nameTask = ifhName (i);
//get the statistics (asynchronously)
CompletableFuture statTask = ifhStat (i);
//combine the results
return nameTask.thenCombineAsync (statTask,
(name, stat) -> "Name" + name + "has stats" + stat);
//We collect the array CompletableFuture
List []combinationArray = combinationList.toArray (
new CompletableFuture[combinationList.size()]);
//wait for all the features to run with allOf
CompletableFuture allDone = CompletableFuture.allOf (combinationArray);
//here is the hack associated with the fact that allOf returns Feauture
return allDone.thenApply (v -> ()
.map (CompletableFuture :: join)
.collect (Collectors.toList ()));
List results = result.join ();
assertThat (results) .contains (
"Name NameJoe has stats 103",
"Name NameBart has stats 104",
"Name NameHenry has stats 105",
"Name NameNicole has stats 106",
" Name NameABSLAJNFOAJNFOANFANSF has stats 121 "


How can we do this with Reactor?

    Flux    ids = ifhrIds ();
Flux combinations = ids.flatMap (id -> {
Mono nameTask = ifhrName (id);
Mono statTask = ifhrStat (id);
//zipWith operator for the combination
return nameTask.zipWith (
statTask, (name, stat) -> "Name" + name + "has stats" + stat
Mono results = result.block (); //since we are in the test, then we simply block
assertThat (results) .containsExactly (
"Name NameJoe has stats 103",
"Name NameBart has stats 104",
"Name NameHenry has stats 105",
"Name NameNicole has stats 106",
" Name NameABSLAJNFOAJNFOANFANSF has stats 121 "


As a result, we are provided with a high-level API, which is compiled and readable (

? in fact, we originally used Reactor for this, because we needed a way to write asynchronous code in the same style ), And other goodies: lazy execution, BackPressure management , various schedulers (Schedulers) and integration.

Okay, what else is Flux and Mono?


Flux and Mono are the two main data structures of Reactor.





Flux Is the implementation of the interface Publisher , is a sequence of 0N elements that can (but not necessarily) end (including and with an error).


The Flux sequence has 3 valid values: a sequence object, a completion signal, or an error signal (calls to the methods onNext , OnComplete , And onerror
? respectively).


Each of the 3 values ​​is optional. For example, Flux can be an infinite empty sequence (no method is called). Or the final empty sequence (only onComplete is called). Or an infinite sequence of values ​​(only onNext is called). Etc.


For example, Flux.interval () gives an infinite sequence of ticks of type Flux .


Construction type:

.interval (Duration.ofSeconds (1))
.doOnEach (signal -> ("{}", signal.get ()))
.blockLast ();


Will print the following text:

    12: 24: ???[parallel-1]INFO = 0
12: 24: ???[parallel-1]INFO - 1
12: 24: ???[parallel-1]INFO = 2
12: 24: ???[parallel-1]INFO - 3
12: 24: ???[parallel-1]INFO - 4
12: 24: ???[parallel-1]INFO - 5
12: 24: ???[parallel-1]INFO = 6
12: 24: ???[parallel-1]INFO 7 7r3r31095. 12: 24: ???[parallel-1]INFO = 8
12: 24: ???[parallel-1]INFO - 9
12: 24: ???[parallel-1]INFO = 10


Method doOnEach (Consumer ) applies a side effect to each element in the sequence, which is convenient for logging.


Pay attention to blockLast () : since the sequence is infinite, the flow in which the call occurs will endlessly wait for the end.


If you are familiar with RxJava, Flux is very similar to Observable




Mono - this is the implementation of the Publisher interface, is an asynchronous element or its absence Mono.empty () .



Unlike Flux, Mono can return no more than 1 element. Calls onComplete () and onerror () , as in the case of Flux, are optional.


Mono can also be used as an asynchronous task in the style of "executed and forgotten", without the return result (similar to Runnable). To do this, you can declare it as Mono and use the empty statement.

    Mono    asyncCall = Mono.fromRunnable (() -> {
//here is some kind of logic
//return Mono.empty () after execution of
asyncCall.subscribe ();


If you are familiar with RxJava, take Mono as a cocktail from Single + Maybe


Why this division?


Separation into Flux and Mono helps improve the semantics of the reactive API, making it expressive enough, but not redundant.


Ideally, just looking at the return value, we can understand what the method does: some call (Mono ), A request-response (Mono ) Or returns a data stream (Flux ).


Flux and Mono use their semantics and flow into each other. Flux has the method single () , which returns Mono , while Mono has the method concatWith (Mono ) , which returns already Flux .


They also have unique operators. Some are meaningful only for N elements in the sequence (Flux) or, conversely, relevant only to one value. For example, Mono has or (Mono ) , and Flux has the operators limit /take .


More examples


The easiest way to create Flux /Mono is to useOne of the masses of factory methods that are represented in these classes.

Initialize Flux with the ready values ​​ [/b]
    Flux    sequence = Flux.just ("foo", "bar", "foobar");    

You can initialize from Iterable [/b]
    List    iterable = Arrays.asList ("foo", "bar", "foobar");
Flux sequence = Flux.fromIterable (iterable);

It is possible from third-party Publisher [/b]
    Publisher    publisher = redisson.getKeys (). getKeys ();
Flux from = Flux.from (publisher);

Well, you can also [/b]
    Mono    noData = Mono.empty (); //empty Mono
Mono data = Mono.just ("foo"); //the string "foo"
//the sequence of //numbers is ? b, 7
Flux numbersFromFiveToSeven = Flux.range (? 3);


Flux and Mono are lazy. In order to run some processing and use the data lying in our Mono and Flux, you need to subscribe to them using .subscribe () .


Subscription is a way to provide lazy behavior and at the same time indicate what you need to do with our data. The subscribe methods use "lambda expressions" from Java 8 as parameters.

Ways to subscribe [/b]
    subscribe (); //start execution
//and do something with each resulting value of
subscribe (Consumer <? super T> consumer);
//and do something in case of exception
subscribe (Consumer <? super T> consumer, Consumer <? super Throwable> errorConsumer);
//and do something at the end of
subscribe (
? Consumer <? super T> consumer,
? Consumer
? errorConsumer,
Runnable completeConsumer

Output ? ? 3 [/b]
    Flux    ints = Flux.range (? 3);
ints.subscribe (i -> System.out.println (i));


will output the following:


Output ? ? 3 and the error [/b]
    Flux    ints = Flux.range (? 4)
.map (i -> {
.if (i ? return i;
throw new RuntimeException ("Got to 4");
.ints.subscribe (
i -> System .out.println (i), error -> System.err.println ("Error:" + error)

will output the following:

Error: java.lang.RuntimeException: Got to 4

Output ? ? ? 4 and Done [/b]
    Flux    ints = Flux.range (? 4);
ints.subscribe (i -> System.out.println (i),
error -> System.err.println ("Error" + error),
() -> {System.out.println ("Done" );


will output the following:



By default, all this will work in the current thread. The execution stream can be changed, for example, using the operator .publishOn () , passing there the Scheduler that interests us (Scheduler is such a spin on ExecutorService).

Change the execution thread [/b]
    Flux    sequence = Flux.range (? 100) .publishOn (Schedulers.single ());
//calls onNext, onComplete, and onerror will occur in single.
sequence.subscribe (n -> {
System.out.println ("n =" + n);
System.out.println ("Thread.currentThread () =" + Thread.currentThread ());
sequence.blockLast ();


will output the following (100 times):

    n = 0-
Thread.currentThread () = Thread[single-1,5,main]

What conclusions can be drawn?
With all due respect to CompletableFuture, their API, composability and readability leaves much to be desired.
Using Reactor, we get a way to manipulate asynchronous data flows without blocking and suffering.
On the side of the backend, unfortunately, there are a number of reasons that still impede the construction of fully reactive systems (for example, blocking drivers).
Reactivity does not make your code more productive, but improves scalability.
You can use Reactor right now to manage the data inside the application.
Here such an entertaining review turned out (no). If you were interested, write, and we will go deeper into what is happening. And do not hesitate to comment!
Thank you for attention!
Based on Reactor documentation
Copies of this document may be made for your own use and for distribution to others, provided that you do not charge any fee for such copies.
I'm not here, but there are more worthy men, incl. and contributors /maintainers.
+ 0 -

Comments 44

Megan Garcia
Megan Garcia 10 September 2018 09:17
No doubt this is an  excellent post I got a lot of knowledge after reading good luck. Theme of  blog is excellent there is almost everything to read, Brilliant post.AWS Advanced Networking
Jordan Clark
Jordan Clark 14 September 2018 11:09
I am very enjoyed for  this blog. Its an informative topic. It help me very much to solve some  problems. Its opportunity are so fantastic and working style so speedy. victron solar regulator mppt
Elizabeth Morgan
Elizabeth Morgan 17 September 2018 15:21
Positive site, where  did u come up with the information on this posting?I have read a few of the  articles on your website now, and I really like your style. Thanks a million  and please keep up the effective work. right here
Anna Sally
Anna Sally 22 September 2018 09:35
Nice to read your article! I am looking forward to sharing your adventures and guest posting service

David Torres
David Torres 23 September 2018 09:48
I’ve been searching  for some decent stuff on the subject and haven't had any luck up until this  point, You just got a new biggest fan!.. plus d'info
Howard Austin
Howard Austin 24 September 2018 11:12
Thanks so much for sharing this awesome info! I am looking forward to see more postsby you! cheap essays
Beverly Lucas
Beverly Lucas 27 September 2018 13:30
First You got a great blog .I will be interested in more similar topics. i see you got really very useful topics. commercial cleaning Calgary
Mary Porter
Mary Porter 1 October 2018 19:22
I really thank you  for the valuable info on this great subject and look forward to more great  posts. Thanks a lot for enjoying this beauty article with me. I am  appreciating it very much! Looking forward to another great article. Good  luck to the author! All the best!  short term insurance
Rachel Hernandez
Rachel Hernandez 3 October 2018 01:17
I think this is one of the most significant information for me. And i’m glad reading your article. But should remark on some general things, The web site style is perfect, the articles is really great : D. Good job, cheers  Best Nationwide moving companies
Denise Gardner
Denise Gardner 5 October 2018 13:57
Superior post, keep up with this exceptional work. It's nice to know that this topic is being also covered on this web site so cheers for taking the time to discuss this! Thanks again and again!  aws certification

Judy Reynolds
Judy Reynolds 6 October 2018 16:38
Pretty good post. I just stumbled upon your blog and wanted to say that I have really enjoyed reading your blog posts. Any way I'll be subscribing to your feed and I hope you post again soon. Big thanks for the useful info.  Railway Ticket Reservation

Jesse Gilbert
Jesse Gilbert 11 October 2018 17:30
สำหรับการแทงบอลออนไลน์ แบบ สูง-ต่ำ นี้ก็ถือได้ว่าเป็นอีกหนึ่งช่องทางที่จะให้คุณได้เข้ามาทำการศึกษา วิธีการแทงบอล แบบสูง หรือ ต่ำ ก่อนอื่นเลยก็คือให้เรา ทำการเลือกคู่บอล ที่มีราคาต่อที่ 2/2.5 ประมาณนี้ มา 1 คู่ก่อนครับ แล้วเมื่อท่านเลือกได้แล้วก็อย่าเพิ่งใจร้อนครับให้ท่านรอราคาลดลงมาอีกสักนิด สักประมาณ 1.5/2 ก่อนครับ ซึ่งช่วงเวลาที่ราคาลดลงมานั้น จะอยู่ในช่วงเวลาที่ ผ่านไปแล้วสักครึ่งชั่วโมงก่อน เมื่อท่านได้ราคาตามที่ท่านต้องการแล้วนั้น ท่านก็ให้แทงบอลไปที่ สูง ทันทีครับ หลังจากนั้นก็ท่านรอสักแปป ให้มีการทำประตูเกิดขึ้น แต่เมื่อหมดเวลาครึ่งแรกแล้วยังไม่มีการทำประตูเกิดขึ้น ให้แทงต่ำ สวน ขึ้นไปทันทีครับ รับรองได้ผล แต่ถ้ามีการทำประตูเกิดขึ้นไปก่อนที่จะหมดเวลาครึ่งแรก แน่นอนว่า ราคาจะพุ่งขึ้นไปที่ 2.5 ทันทีครับ ก็จะเข้าทางของเรา เมื่อมีการทำประตูกันเกิดขึ้นแล้วที่ ครึ่งแรก พอราคาบอลขึ้นถึง 2.5 ให้แทงต่ำ ไปเท่าๆกับจำนวนที่ แทงสูงสวนไปเลยครับ  FIFA55

Alice Kim
Alice Kim 17 October 2018 16:11
It is my first visit to your blog, and I am very impressed with the articles that you serve. Give adequate knowledge for me. Thank you for sharing useful material. I will be back for the more great post.  website

Jessica Greene
Jessica Greene 20 October 2018 11:48
Hey, this day is too much good for me, since this time I am reading this enormous informative article here at my home. Thanks a lot for massive hard work.  Refrigerator repair in Mission Viejo

Gloria Nelson
Gloria Nelson 22 October 2018 14:00
Wow, What a Excellent post. I really found this to much informatics. It is what i was searching for.I would like to suggest you that please keep sharing such type of info.Thanks  Best Movers
Craig Oliver
Craig Oliver 22 October 2018 14:59
I recently came across your blog and have been reading along. I thought I would leave my first comment. I don’t know what to say except that I have enjoyed reading.   웹툰 미리보기 다시보기

George Powell
George Powell 23 October 2018 15:57
When i ordered on your web page on the other hand setting concentration simply somewhat minor submits. Rewarding technique for near future, We are book-marking whenever get men and women down comes up available  dịch vụ seo từ khóa

Alexander Kelly
Alexander Kelly 26 October 2018 20:08
Excellent blog! I found it while surfing around on Google. Content of this page is unique as well as well researched. Appreciate it.

Farhan Malik
Farhan Malik 27 October 2018 19:56
Immediately this specific website may possibly irrefutably turn out celebrated involving every single producing a lot of people, as a consequence of careful content along with testimonials along with evaluations. Web Design Company Houston

Timothy Wheeler
Timothy Wheeler 30 October 2018 12:24
This is highly informatics, crisp and clear. I think that everything has been described in systematic manner so that reader could get maximum information and learn many things.  simple girls in Argentina

Anna Sally
Anna Sally 3 November 2018 11:51
เพื่อให้ท่านได้รับความสะดวกสบายสูงสุดทุกการเข้าใช้บริการ สมัคร  FIFA55
Farhan Malik
Farhan Malik 5 November 2018 15:22
I am interested in types write-up. It really is good for uncover individuals explain in words about the heart along with knowing in this substantial style is generally merely found. putlocker

Sunny Deol
Sunny Deol 6 November 2018 15:08
To look at gotten on your blog site nonetheless setting treatment plainly only a little little bit of submits. Agreeable technique for potential future, We are book-marking at a stretch secure varieties stop rises together. Camera for Home Assistant

Sunny Deol
Sunny Deol 7 November 2018 14:09
Quickly this particular site may undoubtedly gain popularity in between just about all running a blog as well as site-building people, because of its painstaking articles or maybe evaluations.
Anna Sally
Anna Sally 8 November 2018 11:33
This was really an  interesting topic and I kinda agree with what you have mentioned here!  internet  radio online
sad 8 November 2018 15:19
Most of the increased on your web page even though residing status simply just a lot of tid amount submits. Lightning charging cable

Douglas Coleman
Douglas Coleman 12 November 2018 16:39
Thanks for posting this info. I just want to let you know that I just check out your site and I find it very interesting and informative. I can't wait to read lots of your posts.   cheap makeup vanity

charlos john
charlos john 13 November 2018 19:14
Thank you for some  other informative blog. Where else could I get that type of information  written in such an ideal means? I have a mission that I’m just now working  on, and I have been at the look out for such information.  PatentReal  Corporation
Sunny Deol
Sunny Deol 14 November 2018 15:13
I enjoy each of the subject matter, I've got to suggest as i enjoyed, As i would love additional information involving it, for the reason that extremely very good., Thanks for your time in regard to enlightening. bitcoin wallet
Mildred Gonzales
Mildred Gonzales 14 November 2018 18:21
This is actually the kind of information I have been trying to find. Thank you for writing this information.  Party rental

Add comment