The book "Apache Kafka. Stream processing and data analysis »
When you run any enterprise-application, data is generated: log files, metrics, information about user activity, outgoing messages, and so on. Correct manipulation of all these data is just as important as the data itself. If you are an architect, developer or engineer who wants to solve such problems, but are not familiar with Apache Kafka, it is from this wonderful book that you will learn how to work with this free streaming platform that allows you to process real-time data queues.
For whom this book is intended
"Apache Kafka. Stream processing and data analysis "is written for developers who use the Kafka API in their work, as well as process engineers (also called SRE, DevOps or system administrators) who are involved in the installation, configuration, tuning and monitoring of its operation during industrial operation. We also did not forget about data architects and analysts - those responsible for designing and creating the entire data infrastructure of the company. Some chapters, in particular ? 4 and 1? are targeted at Java developers. For their mastering, it is important that the reader is familiar with the basics of the Java programming language, including issues such as exception handling and competitiveness.
In the following example, install the Kafka platform in the /usr /local /kafka directory, configuring it to use the previously running ZooKeeper server and saving the message log segments in the /tmp /kafka-logs directory:
# tar -zxf kafka_???-???.1.tgz
# mv kafka_???-???.1 /usr /local /kafka
# mkdir /tmp /kafka-logs
# export JAVA_HOME = /usr /java /jdk???_51
# /usr/local/kafka/bin/kafka-server-start.sh -daemon
After the launch of the Kafka broker, you can test its functioning by performing some simple operations with the cluster, including creating a test topic, generating messages and consuming them.
Creating and testing the theme:
# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost: 2181
--replication-factor 1 --partitions 1 --topic test
Created topic "test".
# /usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost: 2181
--describe --topic test
Topic: test PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Generating messages for the test topic:
# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list
localhost: 9092 --topic test
Test Message 1
Test Message 2
Consuming messages from the topic test:
# /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper
localhost: 2181 --topic test --from-beginning
Test Message 1
Test Message 2
Consumed 2 messages
Configuration of the broker
The example of the broker configuration, supplied with the Kafka distribution, is quite suitable for a trial run of a standalone server, but for most installations it will not be enough. There are many configuration options for Kafka, which regulate all aspects of installation and configuration. For many of them, you can leave the default values, because they refer to the nuances of setting up the Kafka broker, which is not used until you work with the specific script that requires them.
Basic settings of the broker
There are several settings for the Kafka broker, which you should consider when deploying the platform in any environment, except for an offline broker on a separate server. These parameters are related to the basic settings of the broker, and most of them must necessarily be changed so that the broker can work in a cluster with other brokers.
Each Kafka broker must have an integer identifier specified by the parameter broker.id. By default, this value is ? but can be any number. The main thing is that it does not repeat within the same Kafka cluster. The choice of the number can be arbitrary, and if necessary, for the sake of convenience of accompaniment, it can be transferred from one broker to another. It is desirable, that this number was somehow connected with the host, then the correspondence of the broker IDs to hosts will be more transparent when accompanied. For example, if your host names contain unique numbers (for example, host1.example.com, host2.example.com, etc.), these numbers will be a good choice for broker.id values.
A typical configuration file launches Kafka with a listener on TCP port 9092. This port can be changed to any other available by changing the port configuration parameter. Keep in mind that if you select a port number less than 1024 Kafka should be run as root. And to run Kafka as root is not recommended.
The path that ZooKeeper uses to store broker metadata is specified using the zookeeper.connect configuration parameter. In the configuration sample, ZooKeeper runs on port 2181 on the local host machine, which is indicated as localhost: 2181. The format of this parameter is a comma-separated list of strings of the form hostname: port /path, which includes:
- hostname - the hostname or IP address of the ZooKeeper server;
- port - port number of the client for the server;
- /path is the optional ZooKeeper path used as the new root (chroot) path for the Kafka cluster. If it is not specified, the root path is used.
If the specified chroot path does not exist, it will be created when the broker starts.
Kafka stores all messages on the hard disk, and stores these log segments in the directories specified in the log.dirs configuration. It is a comma-separated list of paths in the local system. If several paths are specified, the broker will store the partitions in them according to the principle of the least used, with the preservation of log segments of the same section along one path. Note that the broker will put the new partition in the directory, in which at the moment the fewer partitions are stored, and not the least amount of space, so that even partitioning of the data is not guaranteed.
To process log segments, Kafka uses a configurable thread pool. At the moment it is used:
- during normal start-up - to open log segments of each partition;
- start after a failure - to check and truncate the log segments of each partition;
- stop - to gently close the segments of the logs.
By default, only one thread per log directory is used. Since this only happens at startup and shutdown, it makes sense to use more of them to parallelize operations. When recovering from an incorrect shutdown, the benefits of using this approach can reach several hours in the event of a restart of a broker with a large number of partitions! Remember that the value of this parameter is determined from the calculation of one log directory from the number set by log.dirs. That is, if the value of the parameter num.recovery.threads.per.data.dir is ? and in log.dirs three paths are specified, the total number of threads is 24.
In accordance with the configuration Kafka by default, the broker should automatically create a topic when:
- the producer begins to write in the subject of the message;
- the consumer begins to read from the subject of the message;
- any client requests metadata for the topic.
In many cases, this behavior may not be desirable, especially because there is no way to test the existence of a topic using the Kafka protocol without causing it to be created. If you control the creation of an object explicitly, manually or through an initialization system, you can set auto.create.topics.enable to false.
The default theme settings are
The configuration of the Kafka server sets a number of default settings for the themes being created. Some of these parameters, including the number of partitions and the options for saving messages, can be set for each topic separately using the administrator tools (discussed in Chapter 9). The default values in the server configuration should be set to the reference values appropriate for most cluster topics.
The num.partitions parameter specifies how many sections a new theme is created, mainly when automatic topic creation is enabled (which is the default behavior). The default value of this parameter is 1. Keep in mind that the number of sections for a topic can only be increased, but not reduced. This means that if it requires fewer partitions than specified in num.partitions, you will need to create it carefully manually (this is discussed in Chapter 9).
As discussed in Chapter ? partitions are a way to scale the topics in a Kafka cluster, so it's important that there are as many of them as necessary to balance the load across the entire cluster as the brokers are added. Many users prefer that the number of partitions is equal to the number of brokers in the cluster or multiples thereof. This makes it possible to distribute sections evenly across brokers, which will result in a uniform load distribution according to reports. However, this is not an obligatory requirement, because the presence of several topics allows you to equalize the load.
More often than not, the duration of Kafka's message storage is limited in time. The default value is specified in the configuration file with the parameter log.retention.hours and is equal to 168 hours, or 1 week. However, you can use the other two parameters - log.retention.minutes and log.retention.ms. All three parameters define the same - the time interval after which messages are deleted. But it is recommended to use the parameter log.retention.ms, because if you specify several parameters, the priority belongs to the smallest unit of measure, so that the value log.retention.ms will always be used.
Another way to limit the expiration of messages is based on the total size (in bytes) of messages that are stored. The value is set using the parameter log.retention.bytes and is applied separately. This means that in the case of a topic from eight partitions and equal to 1 GB of log.retention.bytes, the maximum amount of data stored for this topic will be 8 GB. Note that the amount of storage depends on the individual sections, not on the topic. This means that in case of increasing the number of sections for a topic, the maximum amount of data stored by using log.retention.bytes also increases.
The mentioned log storage settings relate to log segments, not single messages. As messages are generated by the Kafka broker, they are added to the end of the current log segment of the corresponding section. When the segment of the log reaches the size specified by the parameter log.segment.bytes and the default is 1 Gbyte, this segment is closed and a new one opens. After closing, the magazine segment can be removed from circulation. The smaller the size of the log segments, the more often it is necessary to close files and create new ones, which reduces the overall efficiency of write operations to disk.
The size of the log segments is important when the topics are characterized by a low frequency of message generation. For example, if a topic receives only 100 MB of messages per day, and the log.segment.bytes parameter is set to the default value, it takes 10 days to populate one segment. And since the messages can not be declared invalid until the log segment is closed, then the log.retention.ms parameter can reach 60?80?000 (1 week) by the time the closed segment of the journal is withdrawn from circulation, messages may accumulate for 17 days. This is because when you close a segment with 10-day messages that have accumulated, it must be stored for another 7 days before it can be withdrawn from circulation in accordance with the accepted temporary rules, since the segment can not be deleted before the last message expires in it .
Another way to control the closing of log segments is by using the parameter log.segment.ms, which specifies the length of time after which the segment of the log is closed. Like the parameters log.retention.bytes and log.retention.ms, the parameters log.segment.bytes and log.segment.ms are not mutually exclusive. Kafka closes the log segment when either the time lapse or a specified size limit is reached, whichever occurs first. By default, the value of the parameter log.segment.ms is not specified, as a result of which the closing of log segments is determined by their size.
The Kafka broker allows using the message.max.bytes parameter to limit the maximum size of generated messages. The default value for this parameter is ??? (1MB). A vendor who attempts to send a larger message will receive an error notification from the broker, and the message will not be accepted. As with all other sizes in bytes specified in broker settings, it is the size of the compressed message, so manufacturers can send messages that are much larger in uncompressed form if they can be compressed to the limits specified by the parameter message.max.bytes .
Increasing the size of the message can seriously affect performance. A larger message size means that broker flows that process network connections and requests will take longer to process each request. Also, larger messages increase the amount of data written to the disk, which affects the I /O bandwidth.
Choice of hardware
Choosing the right hardware for a Kafka broker is more an art than a science. The platform Kafka does not have any strict hardware requirements, it will work without problems on any system. But if we talk about performance, then it affects several factors: the capacity and bandwidth of disks, RAM, network and CPU.
First you need to decide what types of performance are most important for your system, after which you can choose the optimal hardware configuration that fits into the budget.
The throughput of disks
The throughput of broker discs, which are used to store log segments, most directly affects the performance of manufacturing customers. Kafka messages should be recorded in the local store, which would confirm their record. Only then can the sending operation be considered successful. This means that the faster write operations to the disk, the less the delay in generating messages.
The obvious action when there are problems with the bandwidth of disks is to use hard disks with spinning plates (HDD) or solid state drives (SSDs). SSD is orders of magnitude lower search /access time and higher performance. HDD is more economical and has a higher relative capacity. HDD performance can be improved by increasing the number in the broker, or by using multiple data directories, or by placing disks in an array of independent disks (redundant array of independent disks, RAID). The bandwidth is also influenced by other factors, for example, the technology of manufacturing a hard disk (for example, SAS or SATA), as well as the characteristics of a hard disk controller.
The disk capacity is
Capacity is another aspect of storage. The amount of disk space required is determined by how many messages should be stored at the same time. If it is expected that the broker will receive 1 TB of traffic per day, then with 7-day storage, he will need an available storage for the journal segments with a minimum of 7 terabytes. You should also take into account the overrun at least 10% for other files, not counting the buffer for possible traffic fluctuations or growth over time.
Storage capacity is one of the factors that must be considered when determining the optimal size of a Kafka cluster and deciding whether to expand it. The total traffic of the cluster can be balanced by several sections for each topic, which allows you to use additional brokers to increase available capacity in cases where the data density per broker is not enough. The decision about how much disk space is needed is also determined by the replication strategy chosen for the cluster (discussed in more detail in Chapter 6).
In the normal mode of operation, the consumer Kafka reads from the end of the section, with the consumer constantly catching up and only marginally lagging behind the producers, if not at all. At the same time, the messages read by the user are saved in the optimal way in the page cache of the system, due to which the read operations are performed faster than if the broker had to re-read them from the disk. Consequently, the larger the amount of RAM available for the page cache, the higher the responsiveness of customer-consumers.
For Kafka itself, you do not need to allocate a large amount of RAM for the JVM in the heap. Even a broker that processes X messages per second at a data transfer rate of X megabits per second can work with a heap of 5 GB. The rest of the system memory will be used for the page cache and will benefit Kafka due to the ability to cache the log segments used. That's why it is not recommended to have Kafka in a system where other important applications are already running, as they will have to share the page cache, which will reduce the productivity of Kafka's consumers.
Data transmission over the network
The maximum amount of traffic that Kafka can handle is determined by the available network bandwidth. Often this is a key (along with the amount of disk storage) factor in the choice of cluster size. This choice is complicated by the inherent Kafka (due to the support of several consumers) imbalance between incoming and outgoing network traffic. The manufacturer can generate 1 MB of messages per second for a given topic, but the number of consumers can be anything, bringing the appropriate multiplier for outgoing traffic. Increase network requirements and other operations, such as cluster replication (see Chapter 6) and mirroring (discussed in Chapter 8). With intensive use of the network interface, it is quite possible to lag the replication of the cluster, which will cause instability of its state.
Computing power is not as important as disk space and RAM, but they also to some extent affect the overall performance of the broker. Ideally, customers should compress messages to optimize network usage and disk space. The Kafka broker, however, must unzip all message packets to check the checksums of individual messages and the assignment of offsets. Then he needs to compress the message pack again to save it to disk. It is for this purpose that Kafka needs the most part of computing power. However, this should not be considered as the main factor when choosing hardware.
Kafka in the cloudy environment
Kafka is often installed in a cloud computing environment, napExample: Amazon Web Services (AWS). AWS provides many virtual computing nodes, all with different combinations of CPU, RAM and disk space. To select the appropriate virtual node configuration, you must first consider the performance factors of Kafka. You can start with the required amount of data storage, and then take into account the required performance of the generators. If you need a very low latency, you might need virtual nodes with I /O optimization with local SSD-based storage. Otherwise, there may be enough remote storage (for example, AWS Elastic Block Store). After making these decisions, you can choose among the available CPU and RAM options.
In practice, this means that if you use AWS, you can select virtual nodes of types m4 or r3. A virtual node of type m4 allows for longer storage, but with less write-to-disk capacity, because it is based on adaptive block storage. The throughput of a virtual node of type r3 is much higher due to the use of local SSD disks, but the latter limit the amount of data available for storage. The advantages of both these options combine significantly more expensive types of virtual nodes i2 and d2.
A separate Kafka server is well suited for local development or prototyping systems, but configuring multiple brokers to work together as a cluster is much more profitable (Figure 2.2). The main benefit from this is the ability to scale the load on multiple servers. The second most important is the possibility of using replication to protect against data loss due to failures of individual systems. Replication also provides the opportunity to perform maintenance of Kafka or the underlying system while retaining accessibility for customers. In this section, we will only review the configuration of the Kafka cluster. For more information on replicating data, see Chapter 6.
How many brokers should there be?
The size of the Kafka cluster is determined by several factors. The first of these is the amount of disk space required for storing messages and the amount of available space on a separate broker. If the cluster needs to store 10 TB of data, and a separate broker can store 2 TB, then the minimum cluster size is five brokers. In addition, the use of replication can increase storage requirements by at least 100% (depending on its ratio) (see Chapter 6). This means that when replication is used, the same cluster should contain at least ten brokers.
Another factor that needs to be considered is the cluster's ability to process queries. For example, what are the capabilities of network interfaces and are they able to cope with the traffic of clients with several data consumers or traffic fluctuations during data storage (i.e., in case of traffic spikes during the peak load period). If the network interface of an individual broker is used at 80% for a peak load, and the data consumer is two, then they will not be able to cope with peak traffic with fewer than two brokers. If the cluster uses replication, it plays the role of an additional data consumer, which must be taken into account. It may be useful to increase the number of brokers in a cluster to cope with performance problems caused by a decrease in disk capacity or the amount of available RAM.
Configuration of brokers
There are only two requirements for the configuration of brokers when they operate as part of a single Kafka cluster. First, in the configuration of all brokers, the same value of the zookeeper.connect parameter must be present. It specifies the ZooKeeper ensemble and the cluster storage path for the metadata. Secondly, each broker of the cluster should have a unique value of the parameter broker.id. If two brokers with the same broker.id value attempt to join the cluster, the second broker will log the error and will not start. There are other broker configuration parameters used in the cluster operation, namely the parameters for managing replication, described in the following chapters.
Fine-tune the operating system
Although most Linux distributions have preconfigured configurations of kernel configuration parameters that are quite suitable for most applications, you can make several changes to them to improve the performance of the Kafka broker. Mostly they relate to the virtual memory and network subsystems, as well as specific moments related to the mount point of the disk for saving log segments. These parameters are usually configured in the /etc/sysctl.conf file, but it's best to consult the documentation for a specific Linux distribution to find out all the nuances of adjusting the kernel settings.
Typically, the Linux virtual memory system itself adjusts to the system load. But you can make some adjustments to work with both the swap area and the "dirty" pages of memory in order to better adapt it to the specific load of Kafka.
As with most applications, especially those where bandwidth is important, it is best to avoid swapping (practically) at all costs. Costs caused by paging memory pages on the disk significantly affect all aspects of Kafka's performance. In addition, Kafka actively uses the system page cache, and if the virtual memory subsystem paging the disk, the page cache is allocated insufficient memory.
One way to avoid paging is not to allocate any settings for it in the settings. Paging is not an obligatory requirement, but rather insurance in the event of an accident in the system. It can save from unexpected interruption by the system of execution of the process due to lack of memory. Therefore, it is recommended to make the value of the vm.swappiness parameter very small, for example 1. This parameter represents the probability (in percent) of the fact that the virtual memory subsystem will use paging instead of removing pages from the page cache. It is better to reduce the size of the page cache than to use swap.
Correcting what the system's kernel does with the "dirty" pages that need to be flushed to disk also makes sense. The speed with which Kafka responds to manufacturers depends on the performance of disk I /O operations. That is why log segments are usually placed on fast disks: either individual disks with fast response time (for example, SSD), or disk subsystems with a large amount of NVRAM for caching (for example, RAID). As a result, it becomes possible to reduce the number of "dirty" pages, after which the background reset is started on the disk. To do this, you must set the value of the vm.dirty_background_ratio parameter lower than the default value (10). It means the percentage of total system memory (in percent), and in many cases it can be set to 5. However, do not make it equal to ? since in this case the kernel will continuously flush pages to disk and thus lose the ability to buffer disk write operations when temporary fluctuations in the performance of the underlying hardware components.
The total number of "dirty" pages, when exceeded, the kernel of the system forcibly initiates synchronous operations to reset them to disk, you can increase and increase the parameter vm.dirty_ratio to a value that exceeds the default value of 20 (also the percentage of the total amount of system memory ). There is a wide range of possible values for this parameter, but the most reasonable are between 60 and 80. Changing this parameter is somewhat risky in terms of both the amount of actions not discarded on the disk and the likelihood of long I /O pauses in the case of forced triggering of synchronous reset operations. If you select higher values for the vm.dirty_ratio parameter, we strongly recommend that you use replication in the Kafka cluster to protect against system failures.
When choosing the values of these parameters it makes sense to control the number of "dirty" pages during the operation of the Kafka cluster under load during industrial operation or simulation simulation. You can determine it by using the /proc /vmstat file:
# cat /proc /vmstat | egrep "dirty | writeback"
If you do not take into account the choice of hardware for the subsystem of hard disks, as well as the configuration of the RAID array in the case of its use, the file system used for these disks is the most affected. There are many different file systems, but EXT4 (fourth extended file system) or XFS (Extents File System) is most often used as the local file system. EXT4 works pretty well, but it requires potentially unsafe fine-tuning parameters. Among them, setting a longer fixing interval than the default value (5), in order to reduce the reset frequency to disk. In EXT? there is also a delayed allocation of blocks, which increases the likelihood of data loss and file system damage in the event of a system failure. The XFS file system also uses the deferred selection algorithm, but is more secure than in EXT4. The performance of XFS for a typical Kafka load is also higher, and there is no need to fine-tune it beyond the automatic one executed by the file system itself. It is also more efficient for batch write operations on a disk, combined to increase the I /O bandwidth.
Regardless of the file system selected as the mount point for log segments, it is recommended that you specify the noatime mount option. The file metadata contains three date /time labels: the time of creation (ctime), the time of the last modification (mtime) and the time of the last access to the file (atime). By default, the value of the atime attribute is updated each time the file is read. This significantly increases the number of write operations to disk. The atime attribute is usually not very useful, unless the application needs information about whether the file was accessed after its last modification (in this case, you can use the realtime option). Kafka does not use the atime attribute at all, so you can safely disable it. Setting the noatime parameter for the mount point prevents updates to the date /time labels, but does not affect the correct handling of the ctime and mtime attributes.
Data transmission over the network
Adjusting the default settings of the Linux network stack is common for any application that generates a lot of network traffic, since the default kernel is not suitable for high-speed transmission of large amounts of data. In fact, the recommended changes for Kafka do not differ from the changes recommended for most web servers and other network applications. First, you need to change the volumes (default and maximum) of the memory allocated for send and receive buffers for each socket. This will significantly increase the performance in the case of transferring large amounts of data. The corresponding parameters for the default values of send and receive buffers for each socket are called net.core.wmem_default and net.core.rmem_default respectively, and their reasonable value will be ??? (2 MB). Keep in mind that the maximum size does not mean allocating such a space for each buffer, but only allows you to do this if necessary.
In addition to configuring sockets, you need to separately set the send and receive buffers for TCP sockets using the parameters net.ipv4.tcp_wmem and net.ipv4.tcp_rmem. They specify three space-separated integers that define the minimum size, the default size, and the maximum size, respectively. An example of these parameters is ??? - means that the minimum buffer size is 4 KB, the default size is 64 KB and the maximum size is 2 MB. The maximum size can not exceed the values specified for all sockets by the parameters net.core.wmem_max and net.core.rmem_max. Depending on the actual downloads of your Kafka brokers, you may need to increase the maximum values to increase the degree of buffering of network connections.
There are several other useful network parameters. You can enable TCP window scaling by setting the value 1 of the parameter net.ipv4.tcp_window_scaling, which will allow clients to transfer data more efficiently and will provide the possibility of buffering this data on the side of the broker. The value of the parameter net.ipv4.tcp_max_syn_backlog is greater than the default value of 102? it allows to increase the number of simultaneous connections. The value of the net.core.netdev_max_backlog parameter, which is greater than the default value of 100? can help in case of network traffic surges, especially at gigabit speeds, due to the increase in the number of packets queued for later processing by the kernel.
When it comes time to transfer the Kafka environment from test mode to commercial operation, only a few more points will remain to take care of setting up a reliable messaging service.
Garbage collection parameters
Tuning the garbage collection Java for the application has always been a kind of art that requires detailed information about the application's memory usage and a lot of observations, trial and error. Fortunately, this changed after the release of Java 7 and the emergence of garbage collector Garbage First (G1). G1 is able to automatically adapt to different types of load and ensure the consistency of pauses for garbage collection throughout the entire life cycle of the application. It is also easily managed with a large heap, since it breaks it into small zones, rather than collecting garbage all over the heap at each pause.
In normal operation, all of this requires a minimum of settings. Two parameters are used to adjust its performance.
- MaxGCPauseMillis. Specifies the desired pause duration for each garbage collection cycle. This is not a fixed maximum - if necessary, G1 can exceed this duration. The default value is 200 ms. This means that G1 will try to plan the frequency of garbage collection cycles, as well as the number of zones processed in each cycle, so that each cycle takes about 200 ms.
- InitiatingHeapOccupancyPercent. Specifies the percentage of the total heap size that garbage collection does not begin to exceed. The default value is 45. This means that G1 will not run a garbage collection cycle before 45% of the heap is used, including the total use of zones for both new (Eden) and old objects.
The Kafka broker very efficiently uses memory from the heap and creates objects, so you can set lower values for these parameters. The values of the garbage collection parameters shown in this section are considered quite suitable for a server with 64 GB of RAM, where Kafka worked with a heap of 5 GB. This broker could work with a value of 20 MaxGCPauseMillis. And the value of the parameter InitiatingHeapOccupancyPercent is set to 3? so garbage collection runs a little earlier than with the default value.
The Kafka launch scenario does not use the G1 garbage collector by default, but a new parallel garbage collector and a competing garbage collector for marking and cleaning. This can easily be changed by means of environment variables. Let's change the previous launch command as follows:brazeme:
# export JAVA_HOME = /usr /java /jdk???_51
# export KAFKA_JVM_PERFORMANCE_OPTS = "- server -XX: + UseG1GC
-XX: MaxGCPauseMillis = 20 -XX: InitiatingHeapOccupancyPercent = 35
-XX: + DisableExplicitGC -Djava.awt.headless = true"
# /usr/local/kafka/bin/kafka-server-start.sh -daemon
Planning of the data center
With the use of system-oriented development systems, the physical location of Kafka brokers in the data center is of little importance, since partial or complete inaccessibility of the cluster for short periods of time does not affect the operation much. However, in industrial operation, the simple in the process of issuing data means loss of money due to inability or to serve users, or to obtain telemetry of their actions. This increases the importance of using replication in the Kafka cluster (see Chapter 6), as well as the physical location of brokers in the racks in the data center. If you do not take care of this before deploying Kafka, you may need expensive server move work.
Broker Kafka does not know anything about placing on racks during the assignment of new sections to brokers, which means that he is unable to take into account the possible location of two brokers in one physical rack or in the same access zone (when working in a cloud service, for example, AWS) , as a result of which it can accidentally put all replicas of the section in compliance with brokers using the same network and power connections in the same rack. If this rack fails, the sections will be inaccessible to customers. In addition, as a result of the "unclean" election of the lead node, this can lead to additional data loss for recovery (see details in Chapter 6).
Recommended Practice: installing each Kafka broker in a cluster in a separate rack or at least using various critical points of infrastructure services such as power and network. Usually this means at least use for servers of servers with backup power (connection to two different power circuits) and dual network switches with a combined interface to the servers themselves for switching to another interface without interruption in operation. From time to time, you may need to perform hardware maintenance of the rack or cabinet with their disconnection, for example, move the server or replace the wiring.
Placing applications on ZooKeeper
Kafka uses ZooKeeper to store metadata about brokers, topics and sections. The entry in ZooKeeper is performed only when the lists of participants in the consumer groups are modified or changes are made in the Kafka cluster itself. The amount of traffic is minimal, so the use of a dedicated ZooKeeper ensemble for a single Kafka cluster is not justified. In fact, one ZooKeeper ensemble is often used for multiple Kafka clusters (using the new ZooKeeper root path for each cluster, as described earlier in this chapter).
However, when users and ZooKeeper work with certain settings there is a nuance. To fix offsets, consumers can use either ZooKeeper or Kafka, and the interval between the commit can be adjusted. If consumers use ZooKeeper for offsets, then each consumer will perform a ZooKeeper write operation after a specified time for each section consumed by it. The usual time interval for fixing offsets is 1 minute, because it is exactly at this time that a group of consumers reads duplicate messages in the event of a consumer failure. These fixations can constitute a significant share of ZooKeeper traffic, especially in a cluster with many users, so they should be considered. If the ZooKeeper ensemble is unable to handle this amount of traffic, it may be necessary to increase the commit interval. However, it is recommended that customers working with the current Kafka libraries use Kafka to fix the offsets and not depend on ZooKeeper.
Apart from using one ensemble for several Kafka clusters, it is not recommended to divide the ensemble with other applications, if this can be avoided. Kafka is very sensitive to the length of the delay and waiting time of the ZooKeeper, and disruption of communication with the ensemble can cause unpredictable behavior of brokers. As a result, several brokers may well be disconnected at the same time in case of loss of connections to the ZooKeeper, which will lead to disconnection of the partitions. This will also create additional load on the cluster manager, which can cause non-obvious errors after a long time after a communication failure, for example, when attempting a controlled shutdown of the broker. You should take out other applications that create a load on the cluster manager as a result of active use or malfunctioning, into separate ensembles.
In this chapter, we talked about how to install and run Apache Kafka. Considered how to choose the right hardware for brokers, and figured out specific settings issues for industrial operation. Now that we have a Kafka cluster, we can go through the main issues of Kafka's client applications. The next two chapters will focus on creating clients for both the generation of messages for Kafka (Chapter 3) and for their further consumption (Chapter 4).
"For more details on the book, see site of the publishing house
" Table of contents
For Khabrozhiteley 20% discount on the coupon - Apache Kafka
It may be interesting