Acquaintance with jet streams - for Java-developers

Hello, Habr!
Today we will return to one of the topics covered in our remarkable book " .Reactive design patterns ". It's about Akka Streams and streaming data in general - in the book by Roland Kuhn, chapters 10 and 15-17 deal with these issues.
Reactive flows Is a standard way of asynchronous processing of data in a stream style. They were are included in Java 9 as interfaces java.util.concurrent.Flow , and now they are becoming a real wand-wand to create streaming components in various applications - and this arrangement will continue for the next few years. It should be noted that reactive flows are "just" standard, and by themselves they are not good for anything. In practice, this or that specific implementation of this standard is used, and today we will talk about Akka Streams - one of the leading realizations of jet streams since their inception.
A typical stream processing pipeline consists of several steps, on each of which information is transferred to the next step (that is, downstream). So, if we take two adjacent steps and consider the higher one as the supplier, and the next following it as the consumer of the data, it turns out that the supplier can work either slower than the consumer, or faster than it. When the supplier is slower, everything is fine, but the situation becomes more complicated if the consumer does not keep up with the supplier. In this case, the consumer can overflow with the data, which he has (to the best of his ability) to carefully process.
The simplest way to deal with the excess of data is to take and drop everything that can not be processed. This is exactly what happens, for example, when working with network equipment. But what if we do not want to drop anything at all? Then we will need the backpressure
The idea of ​​back pressure is very important in the context of Reactive Streams and it boils down to the fact that we limit the amount of data transferred between neighboring links of the pipeline, so no link is overflowing. Since the most important aspect of the reactive approach is to prevent blockages, except when absolutely necessary, the implementation of back pressure in the reactive flow should also be non-blocking.
How it is done
The Reactive Streams standard defines a number of interfaces, but not an implementation as such. This means that simply adding addiction to org.reactivestreams: reactive-streams, we are just trampling on the spot - we still need a specific implementation. There are many implementations of Reactive Streams, and in this article we will use Akka Streams and the corresponding DSL based on Java . Other implementations include RxJava 2.x or Reactor
Example of using
Suppose we have a directory where we want to track new CSV files, then process each file on a stream basis, perform some aggregation on the fly, and send the results so collected to the web socket (in real time). In addition, we want to set a certain threshold for the accumulation of aggregated data, upon which the notification will be initiated by e-mail.
In our example, the CSV lines will be pairs ( ? id , Value ), And, id will change every two lines, for example:

We want to calculate the average value for two lines with a common id and send it to the web socket only if it exceeds 0.9. Moreover, we want to send an e-mail notification after every fifth value arriving at the web socket. Finally, we want to read and display the data received from the web socket, and this will be done through a trivial frontend written in jаvascript.
We are going to use a number of tools from the Akka ecosystem (see Figure 1). Naturally, in the center of the entire system will be Akka Streams, which allows you to process data in real time on a streaming basis. We will use to read CSV files. Alpakka , this is a set of connectors for the integration of Akka Streams with various technologies, protocols or libraries. Interestingly, since Akka Streams are jet streams, the entire Alpakka ecosystem is also available to any other RS ​​implementation - this is the benefit of interoperability called to achieve is RS-interfaces. Finally, we will use Akka HTTP, which will provide the end point of the web socket. The most pleasant thing in this case - Akka HTTP seamlessly integrates with Akka Streams (which, in fact, it uses "under the hood"), so providing a stream as a web socket is not difficult.
Acquaintance with jet streams - for Java-developers  
Fig. 1. Architecture overview
If we compare this scheme with the classical architecture of Java EE, then, probably, it is noticeable that everything here is much simpler. No containers and bin, but just a simple stand-alone application. Moreover, the Java EE stack does not support the streaming approach at all.
Basics of Akka Streams
In Akka Streams, the processing pipeline (graph) consists of three types of elements Source (source), Sink (trap) and Flow (processing steps).
On the basis of these components, we define our graph, which, in effect, is simply a recipe for data processing. There is no computation there. In order for the pipeline to work, we need to materialize the graph, that is, bring it into the startup form. To do this, you need a so-called materializer, which optimizes the definition of the graph and, eventually, triggers it. However, the built-in ActorMaterializer is virtually non-alternative, so you are unlikely to use any other implementation.
If you look closely at the parameters of the component types, it is noticeable that each component (except for the corresponding input /output types) has a mysterious type of Mat. It refers to the so-called "materialized value" -a value that is accessible from outside the graph (as opposed to the types of input /output available only for internal communication between the graph's steps - see Figure 2). If you prefer to ignore a materialized value (and this often happens if we are only interested in transferring data between graph steps), then there is a special parameter of this type: NotUsed . It can be compared with Void from Java, however, it is semantically slightly more loaded: in the semantic sense "we do not use this value" is more informative than Void . Also note that some APIs use a similar type of Done, signaling that a particular task is completed. Perhaps other Java libraries in both cases would use Void , but in Akka Streams all types try to fill up with maximum useful semantics.
Fig. 2. Description of parameters of type Flow
Now let's move on to the concrete implementation of the CSV handler. First, we define the Akka Streams graph, and then, using the Akka HTTP protocol, connect the stream to the web socket.
Components of the flow conveyor
At the input point of our streaming pipeline, we want to monitor whether new CSV files have appeared in the catalog of interest to us. I would like to use for this. java.nio.file.WatchService , but since we have a streaming application, we need to get the source of events ( Source ) and work with it, and not organize everything through callbacks. Fortunately, such Source is already available in Alpakka in the form of one of the connectors DirectoryChangesSource , is a part of alpakka-file , where the "under the hood" is used WatchService :
    private final Source    , NotUsed> newFiles =
DirectoryChangesSource.create (DATA_DIR, DATA_DIR_POLL_INTERVAL, 128);

So we get a source that gives us elements of type Pair . We are going to filter them out so that only new CSV files are selected, and then transfer them "down". For this data conversion, as well as for all subsequent data, we will use small elements called Flow, from which a complete processing pipeline will form:
    private final Flow    , Path, NotUsed> csvPaths =
Flow. .p) {
return p.first (). toString (). endsWith (". csv") && p.second (). equals (DirectoryChange.Creation);

You can create Flow , for example, using the generalized method create () - it is useful when the input type itself is generalized. Here the resulting stream will generate (in the form of ? Path ) Every new CSV file that appears in DATA_DIR .
Now we are going to convert the Paths to strings that are selected from each file. To turn a source into another source, you can use one of the methods. flatMap * . In both cases, we create Source from each incoming element and somehow combine several resulting sources into a new, one-piece, linking or merging the source sources. In this case, we will stop at flatMapConcat because we want to keep the order of the rows, so that the rows with the same id remained next to each other. To convert Path into the byte stream, use the built-in utility FileIO :
    private final Flow    fileBytes =
Flow.of (Path.class) .flatMapConcat (FileIO :: fromPath);

This time we will use the method of () to create a new thread - it is convenient when the input type is not generalized.
Shown above is ByteString Is a representation of the byte sequence adopted in Akka Streams. In this case, we want to parse the byte stream as a CSV file - and for this we will use one of the Alpakka modules again, this time alpakka-csv :
    private final Flow    , NotUsed> csvFields =
Flow.of (ByteString.class) .via (CsvParsing.lineScanner ());

Note the combinator used here. via , which allows you to attach an arbitrary Flow to the conclusion obtained at the other step of the graph ( Source , or another , Flow ). The result is a stream of elements, each corresponding to a field in a single row of the CSV file. Then they can be transformed into the model of our subject area:
    class Reading {
private final int id;
private final double value;
private Reading (int id, double value) { = id;
this.value = value;
double getValue () {
return value;
public String toString () {
return String.format ("Reading (% d,% f)", id, value);
static Reading create (Collection fields) {
List fieldList = (). map (ByteString :: utf8String) .collect (toList ());
int id = Integer.parseInt (fieldList.get (0));
double value = Double.parseDouble (fieldList.get (1));
return new Reading (id, value);

For the transformation as such, we use the method map and pass the reference to the method Reading.create :
    private final Flow    , Reading, NotUsed> readings =

The next step is to add the readings to pairs, calculate the average value of for each group. value and transmit information only when a certain threshold is reached. Since we require that the average be calculated asynchronously, we use the method. mapAsyncUnordered , performing an asynchronous operation with a specified level of parallelism:
    private final Flow    averageReadings =
Flow.of (Reading.class)
.grouped (2)
.mapAsyncUnordered (1? readings ->
) CompletableFuture.supplyAsync (() -> ()
.map (Reading :: getValue)
.collect (averagingDouble (v -> v)))
.filter (v -> v> AVERAGE_THRESHOLD);

Having determined the above-mentioned components, we are ready to lay out of them a one-piece conveyor (using the combiner already known to you. Via ). This is not at allcomplex:
    private final Source    liveReadings =
.via (csvPaths)
.via (fileBytes)
.via (csvFields)
.via (readings)
.via (averageReadings);

When combining components as shown above, the compiler protects us, not allowing to randomly connect two blocks containing incompatible data types.
Stream as a web socket
Now we will use Akka HTTP to create a simple web server that will play such roles:
  • Provide a source of readings as a web socket,  
  • Issue a trivial web page that connects to the web socket and displays the received data.  

It is not worth it to create a web server with Akka HTTP: you just need to inherit HttpApp and provide the required maps on the DSL route:
    class Server extends HttpApp {
private final Source readings;
Server (Source Readings) {
this.readings = readings;
protected Route routes () {
return route (
path ("data", () -> {
.Stream messages = (String :: valueOf) .map (TextMessage :: create);
return handleWebSocketMessages (Flow.fromSinkAndSourceCoupled ( Sink.ignore (), messages));
.get (() ->
.pathSingleSlash (() ->
.getFromResource ("index.html")

Two routes are defined here: /data , that is, the end point of the web socket, and / on which a trivial frontend is issued. It is already clear how easy it is to provide Source from Akka Streams as the end point of the web socket: take handleWebSocketMessages , whose task is to improve the HTTP connection before connecting to the web socket and to organize there a stream in which incoming and outgoing data will be processed.
WebSocket It is modeled as a stream, that is, outgoing and incoming messages are sent to the client. In this case, we want to ignore incoming data and create such a stream, the "incoming" side of which is stored in Sink.ignore () . The "outgoing" side of the thread of the web socket handler is simply connected to our source, from which the mean values ​​come. All that has to be done with the numbers double , in the form of which the averages are represented - to transform each of them into TextMessage , this is the wrapper used in Akka HTTP for the web socket data. Everything is done elementary with the help of the already known method map .
To start the server, you just need to start the method startServer , specifying the host name and port:
    Server server = new Server (csvProcessor.liveReadings);
server.startServer (config.getString (""), config.getInt ("server.port"));

To receive data from a web socket and display them, we use a completely simple jаvascript code that simply attaches the received values ​​to the textarea. This code uses the syntax ES? which should be executed normally in any modern browser.
    let ws = new WebSocket ("ws: //localhost: 8080 /data");
ws.onopen = () => log ("WS connection opened");
ws.onclose = event => log ("WS connection closed with code:" + event.code);
ws.onmessage = event => log ("WS received:" +;

Method log Attaches a message to the textarea, and also puts a timestamp.
To run and test the application, you need:
  • Start the server ( .sbt run ),  
  • go to the browser on localhost : 8080 (or to the host /port you selected, if you changed the default),  
  • copy one or more files from src /main /resources /sample-data in the directory data in the root of the project (if you did not change the 3s3r3705 in the configuration),  
  • See how the data is displayed in the server logs and in the browser.  

We add the mail trigger
The final touch in our application is a side channel, in which we will simulate email notifications sent after every fifth element is received on the web socket. It should work "from the side", so as not to violate the transfer of the main elements.
To implement this behavior, we will use the more advanced feature of Akka Streams - the Graph DSL language - on which we write our own graph step, in which the thread branches into two parts. The first simply submits the values ​​to the web socket, and the second monitors when the next 5 seconds have elapsed, and sends an e-mail notification - see Fig. 3.
Fig. 3. Our own graph step for sending messages by e-mail
We will use the built-in step Broadcast , where our input is sent to a set of declared conclusions. Also write our own trap - Mailer :
    private final Graph    , NotUsed> notifier = GraphDSL.create (builder -> {
.Sink MailerSink = Flow.of (Double.class)
.to (Sink.foreach (ds ->
.logger .info ("Sending e-mail")
NoiformFanOutShape broadcast = builder.add (Broadcast.create (2));
.SinkShape mailer = builder.add (mailerSink);
builder.from (broadcast.out (1)) toInlet ( ());
return FlowShape.of ( (), broadcast.out (0));

We begin to create our own graph step from the method GraphDSL.create () , where a copy of the graph builder is provided, Builder - it is used to manipulate the graph structure.
Next, we define our own trap, where is used. grouped to merge incoming elements into groups of arbitrary size (default is 5), after which these groups are sent down. For each such group, we simulate the side effect: notification via e-mail.
Having determined our own trap, we can use instance builder to add it to the graph. Also add step Broadcast with two outputs.
Next, you need to specify the connection between the elements of the graph - one of the outputs of step Broadcast we want to connect with the e-mail trap, and the other - to make an exit for the step of the graph written by us. The input of the step we have written will be directly connected to the output of step Broadcast .
Note 1
The compiler can not determine whether all parts of the graph are connected correctly. However, this moment is checked by the materializer at run time, therefore there will be no hanging elements at the input or output.
Note 2
In this case, you can notice that all the steps we have written are of the form Graph , where S is the form defining the number and types of inputs and outputs, and M is the materialized value (if any). Here we are dealing with the form of Flow, that is, we have one input and one output.
At the last stage we connect the notifier as an additional step of the pipeline liveReadings , which will now take the following form:
    private final Source    liveReadings =
.via (csvPaths)
.via (fileBytes)
.via (csvFields)
.via (readings)
.via (averageReadings)
.via (notifier);

Running the updated code, you will see how the records for mail notifications appear in the log. The notification is sent whenever five more values ​​pass through the web socket.
The result is
In this article, we studied the general concepts of streaming data processing, learned how to build a lightweight data pipeline using Akka Streams. This is an alternative to the traditional approach used in Java EE.
We examined how to use some processing steps built into Akka Streams, how to write your own step in the Graph DSL language. It also showed how to use Alpakka to stream data from the file system and the Akka HTTP protocol, which allows you to create a simple web server with a web socket at the endpoint that seamlessly integrates with Akka Streams.
A full working example with the code from this article is on GitHub . It has several additional log -steps placed at different points. They help to more accurately imagine what is happening inside the conveyor. In the article, I specifically omitted them so that it would be shorter.
+ 0 -

Add comment