About streams and tables in Kafka and Stream Processing, part 1

* Michael G. Noll is an active contributor to Open Source projects, including Apache Kafka and Apache Storm.
The article will be useful first of all to those who only get acquainted with Apache Kafka and /or stream processing[Stream Processing].

In this article, perhaps in the first of the mini-series, I want to explain the concepts of Streams [Streams] and Tables [Tables] in stream processing and, in particular, in Apache Kafka . I hope you will have a better theoretical idea and ideas that will help you solve your current and future tasks better and /or faster.
* Motivation
* Streams and Tables in plain language
* Illustrated examples
* Streams and Tables in Kafka in plain language
* A closer look at Kafka Streams, KSQL and analogs in Scala
* Tables stand on the shoulders of giants (on streams)
* Turning the Database Inside-Out
* Conclusion
Kafka Streams and KSQL (streaming SQL for Kafka). Some users already have experience in streaming or using Kafka, some have experience using RDBMS, such as Oracle or MySQL, some do not have either experience.
The frequently asked question: "What is the difference between Streams and Tables?" In this article, I want to give both answers: how short (TL; DR), and long so that you can get a deeper understanding. Some of the explanations given below will be slightly simplified, because this simplifies understanding and memorization (for example, as a simpler Newton's attraction model is quite sufficient for most everyday situations, which saves us from having to go straight to Einstein's relativistic model, fortunately, streaming is not so is complex).
Another common question: "Well, but why should I care? How will this help me in my daily work? "In short, for many reasons! Once you start using stream processing, you will soon realize that in practice, in most cases, is required. and stream, and tables. Tables, as I will explain later, represent the state. Whenever you perform any processing with the state [stateful processing] , as a combination of [joins] (for example, data enrichment
? w2w2w2?
? in real time by combining the flow of facts with the "dimensional" tables .[dimension tables] ) or aggregation [aggregations] (for example, real-time calculation of the average for key business indicators in 5 minutes), then the tables introduce a flow picture of [streaming picture] . Otherwise, this means that you will have to do it yourself [a lot of DIY pain] .
Even the notorious WordCount example, probably your first "Hello World" from this area, falls into the category "with state": it is an example of processing with a state where we aggregate the stream of rows into a continuously updated table /map for calculating words. So, regardless of whether you are implementing a simple streaming of WordCount or something more complicated, like fraud detection [fraud detection] , you want an easy-to-use solution for stream processing with basic data structures and everything is needed inside (hint: streams and tables). You, of course, do not want to build a complex and unnecessary architecture, where you need to connect the technology (only) to the stream processing with a remote repository, such as Cassandra or MySQL, and possibly with the addition of Hadoop /HDFS to ensure the fault tolerance of processing [fault-tolerance processing] (three things are too many).

Streams and Tables in plain language

Here is the best analogy I could come up with:
Stream in Kafka is a complete history of all the events that happened (or only business events) in the world from the beginning of time to the present day . He represents the past and the present. As we move from today to tomorrow, new developments are constantly added to world history.
Table in Kafka - this is the state of the world for today . It represents the present. This is a set of [aggregation] of all events in the world, which is constantly changing as we move from today to tomorrow.
And as an aperitif to the future post: if you have access to the entire history of events in the world (stream), then you can restore the state of the world at any time , that is, the table at an arbitrary time t in the flow, where t not limited to t = now . In other words, we can create "pictures" [snapshots] state of the world (table) at any time t , for example, 2560 BC, when the Great Pyramid at Giza was built, or in 199? when the European Union was founded.

Illustrated examples

The first example shows stream with geo-locations of users, which are aggregated in a table that fixes current (last) position of each user . As I will explain later, this also turns out to be the default semantics for tables when you read the topic [Topic] Kafka directly into the table.
About streams and tables in Kafka and Stream Processing, part 1
The second example of use demonstrates the same flow updates geolocation of users, but now the stream is aggregated in Table , which fixes number of places visited by each user . Because the aggregation function is different (here: counting the number), the contents of the table are also different. More precisely, other values ​​by the key.


Streams and Tables in Kafka in plain language

Before we go into details, let's start with a simple one.
The topic is in Kafka - an unlimited sequence of key-value pairs. Keys and values ​​are ordinary byte arrays, i.e. .
Stream - topic with the scheme [schema] . Keys and values ​​are no longer byte arrays, but have certain types.
Example: The topic is read as user geo-locations.

Table - a table in the usual sense of the word (I feel the joy of those of you who are already familiar with RDBMS and only get to know Kafka). But looking through the prism of the stream processing, we see that the table is also aggregated stream (you really did not expect us to stop on the definition of "table is a table", is not it?).
Example: The stream with the updated geodata is aggregated in A table that tracks the last position of the user. At the aggregation stage, is updated.[UPSERT] values ​​in the table by the key from the input stream. We saw this in the first illustrated example above.
Example: The stream is aggregated in A table that tracks the number of visited locations for each user. At the aggregation stage, the values ​​for the keys in the table are continuously calculated (and updated). We saw this in the second illustrated example above.


The topics, streams and tables have the following properties in Kafka:
Type There are partitions Not limited to There is an order of Having changed The uniqueness of the key is Scheme
Topic Yes Yes Yes No No No
Stream Yes Yes Yes No No Yes
Table Yes Yes No Yes Yes Yes

Let's see how topics, streams and tables relate to Kafka Streams API and KSQL, and also draw analogies with programming languages ​​(in analogy ignored, for example, that topics /streams /tables can be partitioned):
Type Kafka Streams KSQL Java Scala Python
Topic - - List /Stream List /Stream[(Array[Byte], Array[Byte])] []
Stream KStream STREAM List /Stream List /Stream[(K, V)] []
Table KTable TABLE HashMap mutable.Map[K, V] {}

But this summary at this level can be of little use to you. So, let's look closer.

A close look at Kafka Streams, KSQL and analogs in Scala

I'll start each of the following sections with the analogy in Scala (imagine that the streaming is done on one machine) and Scala REPL so you can copy the code and play it yourself, then I'll explain how to do the same in Kafka Streams and KSQL (flexible, scalable and fault-tolerant stream processing on distributed machines). As I mentioned in the beginning, I'm simplifying the explanations below a bit. For example, I will not consider the influence of partisanship in Kafka.
If you do not know Scala: Do not be embarrassed! You do not need to understand Scala-analogues in every detail. It is enough to pay attention to what operations (for example, map () ) Are joined together, what they are (for example, reduceLeft () Represents aggregation), and how the "chain" of streams correlates with the "chain" "Tables.



The topic in Kafka consists of "key-value" messages. The topic does not depend on the format of serialization or "type" of messages: keys and values ​​in messages are treated as ordinary arrays of bytes byte[] . In other words, from this point of view, we have no idea what's inside the data.
In Kafka Streams and KSQL there is no concept of "topic". They only know about the streams and tables. Therefore, I will show here only an analogue of the topic in Scala.
    //Scala analogy
scala> val topic: Seq[(Array[Byte], Array[Byte])]= Seq (Array (9? 10? 10? 9? 101), Array (8? 9? 11? 10? 115)), Array (9? 11? 98), Array (8? 12? 100 , 11? 10? 121)), (Array (9? 10? 10? 9? 101), Array (8? 11? 10? 101)), Array (9? 11? 98), Array (7? 10? 10? 97)), Array (9? 10? 10? 9? 101) , Array (6? 10? 11? 10? 10? 110)))



Now we are reading the topic in the stream, adding information about the scheme (the scheme for reading .[schema-on-read]. ). In other words, we turn a raw, untyped topic into a "typed topic" or stream.
Scheme for reading vs Scheme for writing [schema-on-write] : Kafka and its topics do not depend on the serialization format of your data. Therefore, you must specify a schema when you want to read data in a stream or table. This is called scheme for reading . The scheme for reading has both pluses and minuses. Fortunately, you can choose the intermediate link between the read and write schema, defining a contract for your data-just like you probably define API contracts in your applications and services. This can be achieved by choosing a structured but expandable data format, such as Apache Avro with the deployment of the registry for your Avro-schemes, for example Confluent Schema Registry . And yes, both Kafka Streams and KSQL support Avro, if you are interested.

In Scala, this is achieved with the operation map () below. In this example, we get a stream of pairs . Notice how we can now look inside the data.
    //Scala analogy
scala> val stream = topic
| | .map {case (k: Array[Byte], v: Array[Byte]) => new String (k) -> new String (v)}
//=> stream: Seq[(String, String)]=
//List ((alice, Paris), (bob, Sydney), (alice, Rome), (bob, Lima), (alice, Berlin))

In Kafka Streams you read the topic in KStream through StreamsBuilder # stream () . Here you must define the desired circuit using the parameter Consumed.with () when reading data from the topic:
    StreamsBuilder builder = new StreamsBuilder ();
KStream stream =
builder.stream ("input-topic", Consumed.with (Serdes.String (), Serdes.String ()));

In KSQL, you need to do something like the following to read the topic as STREAM . Here you define the desired scheme, specifying the names and types of columns when reading data from the topic:
    CREATE STREAM myStream (username VARCHAR, location VARCHAR)
WITH (KAFKA_TOPIC = 'input-topic', VALUE_FORMAT = '')



Now we are reading the same topic in the table. First, we need to add information about the scheme (the scheme for reading). Secondly, you must convert the stream into a table. The semantics of the table in Kafka says that the resulting table should display each message key from the topic in the last value for this key.
Let's first use the first example, where the summary table tracks the last location of each user:

In Scala:
    //Scala analogy
scala> val table = topic
| | .map {case (k: Array[Byte], v: Array[Byte]) => new String (k) -> new String (v)}
| | .groupBy (_._ 1)
| | .map {case (k, v) => (k, v.reduceLeft ((aggV, newV) => newV) ._ 2)}
//=> table: scala.collection.immutable.Map[String,String]=
//Map (alice -> Berlin, bob -> Lima)

Adding information about the scheme is achieved using the first operation map () - just like in the example above. Stream conversion to table [stream-to-table] is carried out with the help of aggregation phase (more on this later), which in this case is an operation (without the state) UPSERT on the table: this is the step groupBy (). map () , which contains the operation reduceLeft () for each key. Aggregation means that for each key, we compress a set of values ​​into one. Note that this particular aggregation is reduceLeft () without state - the previous value of aggV is not used when calculating a new value for a given key.
What is interesting about the relationship between streams and tables is that the command above creates a table equivalent to the short version below (remember about reference transparency [referential transparency] ) Where we build the table directly from the stream, which allows we skip the task of the scheme /type, because the stream is already typed. We can see that the table is the output of [derivation] , Stream aggregation :
    //Scala analogy, simplified
scala> val table = stream
| | .groupBy (_._ 1)
| | .map {case (k, v) => (k, v.reduceLeft ((aggV, newV) => newV) ._ 2)}
//=> table: scala.collection.immutable.Map[String,String]=
//Map (alice -> Berlin, bob -> Lima)

In Kafka Streams, you usually use StreamsBuilder # table () To read the topic of Kafka in KTable simple single-row:
    KTable    table = builder.table ("input-topic", Consumed.with (Serdes.String (), Serdes.String ()));    

But for clarity, you can also read the topic first in KStream , and then perform the same aggregation step, as shown above, to convert KStream in KTable .
    KStream    stream = ;
KTable table = stream
.groupByKey ()
.reduce ((aggV, newV) -> newV);

In KSQL, you need to do something like the following to read the topic as TABLE . Here you must define the desired scheme, specifying the names and types for the columns when reading from the topic:
    CREATE TABLE myTable (username VARCHAR, location VARCHAR)
WITH (KAFKA_TOPIC = 'input-topic', KEY = 'username', VALUE_FORMAT = '')

What does this mean? This means that the table is actually aggregated stream [aggregated stream] , as we said at the very beginning. We saw this directly in the special case above, when the table was created directly from the topic. However, in fact, this is a general case.

The tables stand on the shoulders of giants (on streams)

Conceptually, only the stream is a first-order data structure in Kafka. On the other hand, the table either (1) is derived from the existing stream via a keyed [per-key] aggregation, or (2) is derived from the existing table, which always turns to aggregated stream (we could call the last tables "proto-streams" .[«ur-stream»]. ).
Tables are also often described as a materialized view of [materialized view] Stream. Stream representation is nothing more than aggregation in this context.

Of the two cases, the more interesting for discussion is (1), so let's focus on this. And this probably means that I need to first figure out how aggregations work in Kafka.

Aggregations in Kafka

Aggregation is one of the types of stream processing. Other types, for example, include filtering [filters] and the association [joins] .
As we found out earlier, the data in Kafka are represented as key-value pairs. Further, the first property of aggregations in Kafka is that they are all computed by by the key . That's why we need to group KStream before the aggregation stage in Kafka Streams through groupBy () or groupByKey () . For the same reason, we had to use groupBy () in the examples on Scala above.
Partitioning [partition] and message keys: No less important aspect of Kafka, which I ignore in this article, is that the topics, streams and tables are are partisanized . In fact, the data is processed and aggregated by key by partitions. By default, messages /records are allocated to partitions based on their keys, so in practice the simplification of "aggregation by key" instead of the more technically more complex and more correct "aggregation by key by partition" is entirely acceptable. But if you use the custom partitioning algorithm.[custom partitioning assigners] , then you must take this into account in your processing logic.

The second property of aggregations in Kafka is that aggregations are continuously updated as soon as new data enters the incoming streams. Together with the key calculation property, this requires a table or, more precisely, it requires changeable table [mutable table] as a result and, consequently, the type of aggregations returned. The previous values ​​(aggregation results) for the key are permanently overwritten by the new values. Both Kafka Streams and KSQL aggregations always return a table.
Let's return to our second example, in which we want to calculate the number of places visited by each user by our stream:

Counting [counting] Is the type of aggregation. To calculate the values, we only need to replace the aggregation step from the previous section .reduce ((aggV, newV) -> newV) with .map {case (k, v) => (k, v.length)} . Note that the return type is a table /map (and please ignore what is in the code on Scala, map is unchanged [immutabler map] , Because in Scala the unchangeable is used by default. Map ).
    //Scala analogy
scala> val visitedLocationsPerUser = stream
| | .groupBy (_._ 1)
| | .map {case (k, v) => (k, v.length)}
//=> visitedLocationsPerUser: scala.collection.immutable.Map[String,Int]=
//Map (alice -> ? bob -> 2)

The code for Kafka Streams, equivalent to the example on Scala above:
    KTable    visitedLocationsPerUser = stream
.groupByKey ()
.count ();

    CREATE TABLE visitedLocationsPerUser AS
SELECT username, COUNT (*)
FROM myStream
GROUP BY username;


The tables are aggregated streams (input stream → table)

As we saw above, tables are aggregations of their input streams or, in short, tables are aggregated streams. Whenever you perform aggregation in Kafka Streams or KSQL, the result is always a table.
The peculiarity of the stage of aggregation determines whether the table is directly received from the stream via the UPSERT semantics without state (the table displays the keys in their last value in the stream, which is aggregation when reading the Kafka topic directly into the table), by counting the number of values ​​seen for each key with preservation state [stateful counting] (see our last example), or more complex aggregations, such as summation, averaging, and so on. When using Kafka Streams and KSQL, you have many options for agre.g., including window aggregations [windowed aggregations] with "flipping" windows[tumbling windows], "Jumping" windows[hopping windows]and "session" windows[session windows].

In the tables there are streams of changes (table → output stream)

Although the tables is an aggregation of the input stream, it also has its own output stream! Like recording data on the change (CDC) in databases, each change in the table in Kafka is recorded in the internal stream of changes called changelog stream tables. Many calculations in Kafka Streams and KSQL are actually performed on changelog stream . This allows Kafka Streams and KSQL, for example, to correctly process historical data in accordance with the processing time semantics of the event [event-time processing semantics] - remember that the stream represents both the present and the past, whereas the table can represent only the present (or, more precisely, the fixed moment of time .[snapshot in time] ).
Note: In Kafka Streams, you can explicitly convert the table into a stream of changes [changelog stream] through KTable # toStream () .
Here is the first example, but already with changelog stream :

Please note that changelog stream The table is a copy of the input stream of this table. This is due to the nature of the corresponding aggregation function (UPSERT). And if you're wondering: "Wait, is not it a 1 to 1 copy that spends space on the disk?" - Under the bonnet Kafka Streams and KSQL, optimization is performed to minimize unnecessary data copying and local /network IO. I ignore these optimizations in the diagram above for a better illustration of what is basically happening.
And, finally, the second example of use, including changelog stream . Here, the table changes the table differently, because here is another aggregation function that performs the keyless [per-key] count.

But these are internal changelog stream They also have architectural and operational impact. Streams of changes are continuously backed up and saved as topics in Kafka, and the theme itself is part of magic that provides the elasticity of [elasticity] and fault tolerance in Kafka Streams and KSQL. This is due to the fact that they allow you to move processing tasks between machines /virtuals /containers without losing data and during all operations, irrespective of whether processing with state [stateful] or without [stateless] . The table is part of the state [state] your application (Kafka Streams) or query (KSQL), so for Kafka it is mandatory to transfer not only the processing code (which is easy), but also the processing states, including tables, between machines in a fast and reliable way (which is much more difficult). Whenever a table needs to be moved from client machine A to machine B, on a new assignment B the table is reconstructed from its changelog stream in Kafka (on the server side) is exactly the same state as it was on the machine A. We can see this in the last diagram above, where the "counting table" is [«counting table»] can be easily recovered from its changelog stream without the need to process input stream.

Duality Stream-Table

The term stream-table duality refers to the above relationship between streams and tables. This means, for example, that you can turn the stream into a table, this table into another stream, got stream into another table, and so on. For more information, see the post in the Confluent blog: Introduction to Kafka Streams: Stream Processing Made Simple .

Turning the Database Inside-Out

In addition to what we saw in the previous sections, you may have come across article Turning the Database Inside-Out , and now you might be interested to look at it whole? Since I do not want to go into details now, let me briefly compare the world of Kafka and stream processing with the world of databases. Be vigilant: further serious simplifications are [black-and-white simplifications] .
In the databases, table First order construction. This is what you will work out with. Streams also exist in databases, for example, as binlog in MySQL or GoldenGate in Oracle , but they are usually hidden from you in the sense that you can not interact with them directly. The database knows about the present, but it does not know about the past (if you need the past, restore the data from your tape backups .[backup tapes] , Which, haha, just hardware streams).
In Kafka and stream processing, stream First order construction. Tables are derived from streams [derivations of streams] , as we saw before. Stream knows about the present, and about the past. As an example, The New York Times stores all published articles - 160 years of journalism since the 1850s - in Kafka, a source of reliable data [source of truth] .
Briefly: the database thinks first of the table, and then - the stream. Kafka thinks first stream, and then - a table. Nevertheless, the Kafka community realized that in most cases of practical use of streaming, both streams and tables are required - even in the notorious but simple WordCount, in which a stream of text strings is aggregated into a table with word counters, as in our second usage example higher. Therefore, Kafka helps us connect the worlds of stream processing and databases, providing native support for streams and tables through Kafka Streams and KSQL, to save us from a lot of problems (and pager warnings). We could call Kafka and the type of streaming platform it is, thread-relational [stream-relational] , and not just streaming[stream-only].
The database thinks first of the table, and then - the stream. Kafka thinks first stream, and then - a table.


I hope you find these explanations useful in order to better understand the streams and tables in Kafka and the streaming processing in general. Now that we have finished with the details, you can go back to the beginning of the article and re-read the sections "Streams and Tables in Simple Language" and "Streams and Tables in Kafka in Simple Language".
If in this article you were interested to try the stream-relational processing with Kafka, Kafka Streams and KSQL, you can continue the study:
Learning how to use KSQL , a SQL stripping engine for Kafka, for processing data in Kafka without writing any code. This is something I would recommend as a starting point, especially if you are new to Kafka or streaming processing, since you have to get to work in a matter of minutes. There is also a great demo with KSQL clickstream (including the Docker variant), where you can play with Kafka, KSQL, Elasticsearch and Grafana to create and customize real-time dashboard .
Learning how to create Java or Scala applications for streaming using Kafka Streams API .
And yes, you can, of course, combine them, for example you can start processing your data using KSQL, then continue working with Kafka Streams, and then return to KSQL again.
Regardless of whether you use Kafka Streams or KSQL, thanks to Kafka you will get flexible, scalable and fault-tolerant distributed streaming that works everywhere (containers, virtual machines, machines, locally, on the customer, in the clouds, your option). I'll just say if it's not obvious. :-)
Finally, I called this article "Streams and Tables, Part 1". And although I already have ideas for the second part, I will be grateful for the questions and suggestions on what I could be considering next time. What do you want to know more about? Let me know in the comments or email me!
If you notice an inaccuracy in the translation - please write in a personal message or leave a comment.
+ 0 -

Add comment