- 4. DESIGN 设计
- 4.1 Motivation 目的
- 4.2 Persistence 存储
- 4.3 Efficiency 效率
- 4.4 The Producer 发布者
- 4.5 The Consumer 订阅者
- 4.6 Message Delivery Semantics 消息分发语义
- 4.7 Replication 复制
- 4.8 Log Compaction 日志压缩
- 4.9 Quotas 配额
4. DESIGN 设计
We designed Kafka to be able to act as a unified platform for handling all the real-time data feeds a large company might have. To do this we had to think through a fairly broad set of use cases.
kafka被设计为大公司的实时在线数据处理提供一个统一的平台, 为达到这样的目标, 我们对相当广泛的用例进行考虑和衡量.
It would have to have high-throughput to support high volume event streams such as real-time log aggregation.
It would need to deal gracefully with large data backlogs to be able to support periodic data loads from offline systems.
It also meant the system would have to handle low-latency delivery to handle more traditional messaging use-cases.
We wanted to support partitioned, distributed, real-time processing of these feeds to create new, derived feeds. This motivated our partitioning and consumer model.
希望能够支持可分区的, 分布式的, 实时的数据反馈处理, 并创建和分发新的反馈.
Finally in cases where the stream is fed into other data systems for serving, we knew the system would have to be able to guarantee fault-tolerance in the presence of machine failures.
最后, 如果流是反馈给其他系统的, 系统需要能在机器宕机的时候提供容错保障.
Supporting these uses led us to a design with a number of unique elements, more akin to a database log than a traditional messaging system. We will outline some elements of the design in the following sections.
为了支持这些使用情景, 我们需要设计一个更类似于数据库日志系统, 而不是传统的消息系统那样, 具有更多独特特性的系统
Don’t fear the filesystem! 不要对文件系统感到恐惧
Kafka relies heavily on the filesystem for storing and caching messages. There is a general perception that “disks are slow” which makes people skeptical that a persistent structure can offer competitive performance. In fact disks are both much slower and much faster than people expect depending on how they are used; and a properly designed disk structure can often be as fast as the network.
kafka很依赖于底层的文件系统用于保存和缓存消息记录, 一种普遍的观念是磁盘很慢, 大家都会怀疑kafka的存储结构是否能提供有竞争力的存储性能呢. 但是, 实际上磁盘比人们现象中的还快, 这就看你怎么用了. 一个合理设计的磁盘存储结构, 往往可以和网络一样快
The key fact about disk performance is that the throughput of hard drives has been diverging from the latency of a disk seek for the last decade. As a result the performance of linear writes on a JBOD configuration with six 7200rpm SATA RAID-5 array is about 600MB/sec but the performance of random writes is only about 100k/sec—a difference of over 6000X. These linear reads and writes are the most predictable of all usage patterns, and are heavily optimized by the operating system. A modern operating system provides read-ahead and write-behind techniques that prefetch data in large block multiples and group smaller logical writes into large physical writes. A further discussion of this issue can be found in this ACM Queue article; they actually find that sequential disk access can in some cases be faster than random memory access!
关于磁盘性能的关键事实是，硬盘驱动器的吞吐量在过去十年时, 磁道的寻址延迟就已经达到了极限了。使用jaod方式配置6个7200rpm SATA RAID-5 组的磁盘阵列大概是 600MB/sec, 但是随即写性能只有100k/sec, 差距是6000X万倍, 线性读写在使用上是最容易预测的方式, 所以大部分操作系统都对这方面做了很多优化措施. 现在的操作系统, 都有提前读和缓存写的技术, 从大的数据块中批量读取数据, 并汇总小的逻辑写请求后, 使用一次大的物理写请求代替. 跟多关于这方面的套路可以查看 ACM Queue article 这里, 它指出这一的一个事实, 顺序写在某些情况下比随机的内存读取还要快
To compensate for this performance divergence, modern operating systems have become increasingly aggressive in their use of main memory for disk caching. A modern OS will happily divert all free memory to disk caching with little performance penalty when the memory is reclaimed. All disk reads and writes will go through this unified cache. This feature cannot easily be turned off without using direct I/O, so even if a process maintains an in-process cache of the data, this data will likely be duplicated in OS pagecache, effectively storing everything twice.
为了弥补这种性能上的差距, 现代操作系统, 更多使用内存来为磁盘做缓存. 现代的操作系统更乐意使用所有的空闲内存为磁盘做缓存, 在内存的回收上只需要花费极小的代价. 所有的磁盘读写都通过统一的缓存. 如果没有使用direct I/O这个开关, 这种特性不会很容易被屏蔽掉. 因此,即使一个进程内部独立维持一个数据缓存, 那么数据也有可能在系统页中再被缓存一次, 所有的数据都会被存储两次
Furthermore, we are building on top of the JVM, and anyone who has spent any time with Java memory usage knows two things:
此外, 我们基于jvm上面构建应用, 有花费时间在java内存上的人都知道两件事
- The memory overhead of objects is very high, often doubling the size of the data stored (or worse). 内存中存有大量的对象需要消耗很高, 经常是双倍于存储到磁盘时大小(可能更多)
- Java garbage collection becomes increasingly fiddly and slow as the in-heap data increases. java的垃圾收集器在内存数据增加是变得很烦琐的, 很慢
As a result of these factors using the filesystem and relying on pagecache is superior to maintaining an in-memory cache or other structure—we at least double the available cache by having automatic access to all free memory, and likely double again by storing a compact byte structure rather than individual objects. Doing so will result in a cache of up to 28-30GB on a 32GB machine without GC penalties. Furthermore, this cache will stay warm even if the service is restarted, whereas the in-process cache will need to be rebuilt in memory (which for a 10GB cache may take 10 minutes) or else it will need to start with a completely cold cache (which likely means terrible initial performance). This also greatly simplifies the code as all logic for maintaining coherency between the cache and filesystem is now in the OS, which tends to do so more efficiently and more correctly than one-off in-process attempts. If your disk usage favors linear reads then read-ahead is effectively pre-populating this cache with useful data on each disk read.
考虑到这些因素, 使用文件系统并使用页缓存机制比自己去进行内存缓存或使用其他存储结构更为有效–我们访问内存的时候已经起码至少访问了两次缓存, 很有可能在写字节的时候也是两次存储而非单次. 这样做的话, 在一个缓存达到32GB的机器上, 可以减少GC的代价, 这样也可以减少代码在维护缓存和系统文件间的一致性, 比再尝试新的方法有更高的正确行. 如果你对磁盘的使用充分利用到线性读, 那么预取机制将会很有效的在每次磁盘读取时实现填充好缓存空间.
This suggests a design which is very simple: rather than maintain as much as possible in-memory and flush it all out to the filesystem in a panic when we run out of space, we invert that. All data is immediately written to a persistent log on the filesystem without necessarily flushing to disk. In effect this just means that it is transferred into the kernel’s pagecache.
这意味设计非常简单, 系统不是更多把数据保存到内存空间, 在内存空间耗尽时才赶紧写入到文件系统中, 相反的, 所有的数据都被马上写入到文件系统的日志文件中, 但没有必要马上进行flush磁盘操作. 只是把数据传输到系统内核的页面空间中去了.
This style of pagecache-centric design is described in an article on the design of Varnish here (along with a healthy dose of arrogance).
这种页面缓存风格设计可以参考这里 : article
Constant Time Suffices 常量耗时需求
The persistent data structure used in messaging systems are often a per-consumer queue with an associated BTree or other general-purpose random access data structures to maintain metadata about messages. BTrees are the most versatile data structure available, and make it possible to support a wide variety of transactional and non-transactional semantics in the messaging system. They do come with a fairly high cost, though: Btree operations are O(log N). Normally O(log N) is considered essentially equivalent to constant time, but this is not true for disk operations. Disk seeks come at 10 ms a pop, and each disk can do only one seek at a time so parallelism is limited. Hence even a handful of disk seeks leads to very high overhead. Since storage systems mix very fast cached operations with very slow physical disk operations, the observed performance of tree structures is often superlinear as data increases with fixed cache–i.e. doubling your data makes things much worse than twice as slow.
在消息系统中, 大部分持久化的数据结构通常使用一个消费者队列一个btree结构, 或其他随机读取的数据结构用于维持消息的元数据信息. btree结构是最通用的数据结构类型, 它在消息系统中, 能够支广泛的事物或非事物的语义. 虽然btree操作的代价是 O(log N), 但是实际使用时消耗的代价却很高. 通常O(log N) 被认为是消耗常量时间, 但是这个对硬盘操作却不是这样, 硬盘寻址需要使用10ms的耗时, 每次请求只能做一次硬盘寻址, 不能并发执行. 所以即使少数的几次硬盘寻址也会有很高的负载, 因为存储系统混合和快速缓存操作和慢速的物理磁盘操作, btree树的性能一般逼近与缓存到硬盘里面的数据大小, 当数据量加倍时, 效率可能下降一半, 或更慢.
Intuitively a persistent queue could be built on simple reads and appends to files as is commonly the case with logging solutions. This structure has the advantage that all operations are O(1) and reads do not block writes or each other. This has obvious performance advantages since the performance is completely decoupled from the data size—one server can now take full advantage of a number of cheap, low-rotational speed 1+TB SATA drives. Though they have poor seek performance, these drives have acceptable performance for large reads and writes and come at 1/3 the price and 3x the capacity.
直觉来看, 一个持久化队列可以使用简单的读和追加数据到文件的日志方式进行实现, 这种结构有一个好处是, 所有操作都是O(1)性能的, 而且读和写入数据不会相互阻塞, 这样性能和数据的大小完全无关, 一台服务器可以完全充分利用了廉价, 低速的1+TB SATA 硬盘, 虽然它们的寻道性能不高, 但是他们以3分之一的价格和3倍的容量接受大量的读写请求
Having access to virtually unlimited disk space without any performance penalty means that we can provide some features not usually found in a messaging system. For example, in Kafka, instead of attempting to delete messages as soon as they are consumed, we can retain messages for a relatively long period (say a week). This leads to a great deal of flexibility for consumers, as we will describe.
能够以微小地 性能代价存取数据到无限的硬盘中, 这意味着我们可以提供一些其他消息系统没有的特性. 例如, 在kafka中, 不需要在消费者消费了数据后马上把消息从队列中删除掉, 相反的我们可以保留一段很长的时间, 例如一个礼拜. 这对消费者来说提供了很大的灵活性, 下面我们就会讲到
We have put significant effort into efficiency. One of our primary use cases is handling web activity data, which is very high volume: each page view may generate dozens of writes. Furthermore, we assume each message published is read by at least one consumer (often many), hence we strive to make consumption as cheap as possible.
我们在效率上投入了众多的努力, 一个我们的主要用例是具有大吞吐量的web活动日志, 每页面的每次访问都会产生好几十次的写, 进一步, 我们假定每次消息发布, 至少会被一个消费者读取(经常情况下是多个消费者), 因此, 我们努力使消费消息的代价尽可能小.
We have also found, from experience building and running a number of similar systems, that efficiency is a key to effective multi-tenant operations. If the downstream infrastructure service can easily become a bottleneck due to a small bump in usage by the application, such small changes will often create problems. By being very fast we help ensure that the application will tip-over under load before the infrastructure. This is particularly important when trying to run a centralized service that supports dozens or hundreds of applications on a centralized cluster as changes in usage patterns are a near-daily occurrence.
从构建一些相识的系统的经验中, 我们也发现, 有效的多租户操作是提升性能的关键. 下游的基础服务很容易由于程序的很小的使用错误成为瓶颈, 例如, 一些小的变化很常导致一些新的问题, 我们可以非常快速在程序发布到基础平台前, 进行迭代测试, 这对需要在集中式的集群里跑几十个, 几千个应用时, 程序每天都在变动时非常有用.
We discussed disk efficiency in the previous section. Once poor disk access patterns have been eliminated, there are two common causes of inefficiency in this type of system: too many small I/O operations, and excessive byte copying.
前面一个章节我们讨论了磁盘的性能, 没有效率的磁盘访问模式就忽略不说了, 这里在系统上还有两个可能会导致效率低下的地方: 很多小的I/O操作和过多的字节拷贝
The small I/O problem happens both between the client and the server and in the server’s own persistent operations.
To avoid this, our protocol is built around a “message set” abstraction that naturally groups messages together. This allows network requests to group messages together and amortize the overhead of the network roundtrip rather than sending a single message at a time. The server in turn appends chunks of messages to its log in one go, and the consumer fetches large linear chunks at a time.
为了避免这个问题, 我们的通讯协议这是基于消息集合这个概念构建的, 很容易把多个消息组合起来. 这样允许网络组合消息后进行发送, 而不是每次发送一条信息, 减少网络的来回开销. 服务器也是每次写入一堆数据到日志中, 消费者也是每次线性读取一堆数据
This simple optimization produces orders of magnitude speed up. Batching leads to larger network packets, larger sequential disk operations, contiguous memory blocks, and so on, all of which allows Kafka to turn a bursty stream of random message writes into linear writes that flow to the consumers.
这种简单的优化可以提升大量的性能, 批量处理导致大的网络数据包, 大的磁盘顺序读写, 连续的内存块等等, 所有的这些能把kafka的间接性的随机消息写改成线性写入后, 发送给消费者
The other inefficiency is in byte copying. At low message rates this is not an issue, but under load the impact is significant. To avoid this we employ a standardized binary message format that is shared by the producer, the broker, and the consumer (so data chunks can be transferred without modification between them).
The message log maintained by the broker is itself just a directory of files, each populated by a sequence of message sets that have been written to disk in the same format used by the producer and consumer. Maintaining this common format allows optimization of the most important operation: network transfer of persistent log chunks. Modern unix operating systems offer a highly optimized code path for transferring data out of pagecache to a socket; in Linux this is done with the sendfile system call.
服务器端使用文件的形式维护消息日志, 所有的消息都按提供者和消费者使用的格式顺序写入到磁盘中, 维护这样的格式需要优化最常用的一些操作: 对持久日志块的网络传输. 现在的unix操作系列通常都有提供高效的优化代码直接把数据从缓存页发送到socket, 在linux下使用sendfile的系统调用
To understand the impact of sendfile, it is important to understand the common data path for transfer of data from file to socket: 如果要理解一下sendfile调用的功效, 需要了解下正常情况下数据从文件发送到socket的过程
- The operating system reads data from the disk into pagecache in kernel space 操作系统从磁盘读取数据到系统内核空间的缓存页中
- The application reads the data from kernel space into a user-space buffer 应用从内核空间读取数据到用户空间缓冲区中
- The application writes the data back into kernel space into a socket buffer 应用从把数据写回到内核空间的socket缓冲区中
- The operating system copies the data from the socket buffer to the NIC buffer where it is sent over the network 系统拷贝socket缓冲区的数据到网卡缓冲区, 然后由网卡发送数据到网络中
This is clearly inefficient, there are four copies and two system calls. Using sendfile, this re-copying is avoided by allowing the OS to send the data from pagecache to the network directly. So in this optimized path, only the final copy to the NIC buffer is needed.
这很明显很没效率, 有4次拷贝还有2次系统调用, 如果使用sendfile命令, 重新拷贝运行系统直接把数据从缓存页拷贝到网络, 优化后, 最终只需要一次从缓存页到网卡缓冲区拷贝
We expect a common use case to be multiple consumers on a topic. Using the zero-copy optimization above, data is copied into pagecache exactly once and reused on each consumption instead of being stored in memory and copied out to kernel space every time it is read. This allows messages to be consumed at a rate that approaches the limit of the network connection.
我们预期一个常见消费方式是使用多个消费者同时消费一个主题, 使用上面提到的zero-copy的优化方式, 数据只被拷贝到页缓存一次, 并被多次消费, 而不是缓存到(用户空间的)内存中, 然后在每次消费时拷贝到系统内核空间中. 这可以使消费者消费消息的速度达到网络连接的速度.
This combination of pagecache and sendfile means that on a Kafka cluster where the consumers are mostly caught up you will see no read activity on the disks whatsoever as they will be serving data entirely from cache.
组合页缓存和sendfile机制后, kafka集群在跟上消费者消费的同时, 让你觉得好像没有多少的磁盘读活动, 因为大部分的数据响应需求都是从缓存获取的.
For more background on the sendfile and zero-copy support in Java, see this article.
如果想要知道更多关于java对sendfile和zero-copy的支持, 可以阅读这篇文章 article.
End-to-end Batch Compression 端到端的数据压缩
In some cases the bottleneck is actually not CPU or disk but network bandwidth. This is particularly true for a data pipeline that needs to send messages between data centers over a wide-area network. Of course, the user can always compress its messages one at a time without any support needed from Kafka, but this can lead to very poor compression ratios as much of the redundancy is due to repetition between messages of the same type (e.g. field names in JSON or user agents in web logs or common string values). Efficient compression requires compressing multiple messages together rather than compressing each message individually.
在大部分情况下, 瓶颈不会是cpu或磁盘, 而是网络带宽. 这个在数据中心之间建立需要跨越广域网发送消息的数据管道时更为明显, 当然用户可以独立于kafka自己做消息压缩, 但是这有可能由于消息类型冗余, 导致压缩比例很低(例如, json的字段名, 或web中的用户代理日志, 或常用的字符串值), 有效的压缩方式应该是允许压缩重复的消息, 而不是分别压缩单个消息
Kafka supports this by allowing recursive message sets. A batch of messages can be clumped together compressed and sent to the server in this form. This batch of messages will be written in compressed form and will remain compressed in the log and will only be decompressed by the consumer.
kafka通过递归的消息集合支持这样的操作. 一批的消息可以被收集在一起后压缩, 并发送到服务器端. 这样被压缩的一批数据, 在日志也是使用压缩的格式, 只有在消费者消费的时候才会被解压
Kafka supports GZIP, Snappy and LZ4 compression protocols. More details on compression can be found here.
kafka支持 GZIP, Snappy and LZ4 压缩协议, 更多关于压缩的细节可以查看这里 here.
4.4 The Producer 发布者
Load balancing 负载均衡
The producer sends data directly to the broker that is the leader for the partition without any intervening routing tier. To help the producer do this all Kafka nodes can answer a request for metadata about which servers are alive and where the leaders for the partitions of a topic are at any given time to allow the producer to appropriately direct its requests. 为了让生产者实现这个功能, 所有的kafka服务器节点都能响应这样的元数据请求: 哪些服务器是活着的, 主题的哪些分区是主分区, 分配在哪个服务器上, 这样提供者就能适当地直接发送它的请求到服务器上.
The client controls which partition it publishes messages to. This can be done at random, implementing a kind of random load balancing, or it can be done by some semantic partitioning function. We expose the interface for semantic partitioning by allowing the user to specify a key to partition by and using this to hash to a partition (there is also an option to override the partition function if need be). For example if the key chosen was a user id then all data for a given user would be sent to the same partition. This in turn will allow consumers to make locality assumptions about their consumption. This style of partitioning is explicitly designed to allow locality-sensitive processing in consumers.
客户端控制消息发送数据到哪个分区, 这个可以实现随机的负载均衡方式. 或者使用一些特定语义的分区函数, 我们有提供特定分区的接口让用于根据指定的键值进行hash分区(当然也有选项可以重写分区函数), 例如, 如果键值使用用户ID, 则用户相关的所有数据都会被分发到同一个分区上. 这允许消费者, 在消费数据时做一些特定的本地化处理. 这样的分区风格经常被设计用于一些本地处理比较敏感的消费者
Asynchronous send 异步发送
Batching is one of the big drivers of efficiency, and to enable batching the Kafka producer will attempt to accumulate data in memory and to send out larger batches in a single request. The batching can be configured to accumulate no more than a fixed number of messages and to wait no longer than some fixed latency bound (say 64k or 10 ms). This allows the accumulation of more bytes to send, and few larger I/O operations on the servers. This buffering is configurable and gives a mechanism to trade off a small amount of additional latency for better throughput.
批处理是提升性能的一个主要驱动, 为了允许批量处理, kafka提供者会尝试在内存中汇总数据, 并用一次请求批次提交信息. 批处理, 不仅仅可以配置指定的消息数量, 也可以指定等待特定的延迟时间(如64k 或10ms), 这允许汇总更多的数据后再发送, 在服务器端也会减少更多的IO操作. 该缓冲是可配置的，并给出了一个机制，通过权衡少量额外的延迟时间获取更好的吞吐量.
4.5 The Consumer 订阅者
Push vs. pull 推送vs拉取
An initial question we considered is whether consumers should pull data from brokers or brokers should push data to the consumer. In this respect Kafka follows a more traditional design, shared by most messaging systems, where data is pushed to the broker from the producer and pulled from the broker by the consumer. Some logging-centric systems, such as Scribe and Apache Flume, follow a very different push-based path where data is pushed downstream. There are pros and cons to both approaches. However, a push-based system has difficulty dealing with diverse consumers as the broker controls the rate at which data is transferred. The goal is generally for the consumer to be able to consume at the maximum possible rate; unfortunately, in a push system this means the consumer tends to be overwhelmed when its rate of consumption falls below the rate of production (a denial of service attack, in essence). A pull-based system has the nicer property that the consumer simply falls behind and catches up when it can. This can be mitigated with some kind of backoff protocol by which the consumer can indicate it is overwhelmed, but getting the rate of transfer to fully utilize (but never over-utilize) the consumer is trickier than it seems. Previous attempts at building systems in this fashion led us to go with a more traditional pull model.
一个基本的问题是, 我们在考虑, 消费者是否主动从服务器那里拉去数据, 还是服务器应该主动推送数据到消费者端. 在这方面, kafka和传统的消息吸引设计一样, 生产者推送消息到服务器, 消费者从服务器拉去消息. 在一些日志中心系统, 像 Scribe and Apache Flume, 使用一种特殊的推送流数据推送机制, 这些方式都有利有弊, 但是, 在一个基于推送方式消息系统, 很难处理大量的消费者, 因为服务器需要控制数据的传输速率. 目标是为了让消费者尽可能多消费数据;不幸的是，在一个推送系统，这意味着消费者往往被消息淹没，如果消费率低于生产速度(例如密集的服务攻击). 基于拉去的系统往往比较优雅些, 消息处理只是落后, 消费者在后面尽可能赶上.
Another advantage of a pull-based system is that it lends itself to aggressive batching of data sent to the consumer. A push-based system must choose to either send a request immediately or accumulate more data and then send it later without knowledge of whether the downstream consumer will be able to immediately process it. If tuned for low latency, this will result in sending a single message at a time only for the transfer to end up being buffered anyway, which is wasteful. A pull-based design fixes this as the consumer always pulls all available messages after its current position in the log (or up to some configurable max size). So one gets optimal batching without introducing unnecessary latency.
使用基于拉取方式的系统还有一个好处就是容易汇集批量数据后发给消费者. 基于推送的系统, 要么马上发送请求, 要么汇总数据后再发送, 而不光下游的消费者是否能够处理得上. 如果为了进一步降低延迟, 这会导致缓存还没有结束时就传输单条数据过去, 这样很浪费. 基于拉的方式可以从当前日志位置拉去可用的消息(或者根据配置的大小). 这样能在没有引入不必要的延迟的情况下, 获取到比较好的批处理性能.
The deficiency of a naive pull-based system is that if the broker has no data the consumer may end up polling in a tight loop, effectively busy-waiting for data to arrive. To avoid this we have parameters in our pull request that allow the consumer request to block in a “long poll” waiting until data arrives (and optionally waiting until a given number of bytes is available to ensure large transfer sizes).
基于拉取方式的系统不足的地方是如果没有任何数据, 消费者就要循环检测, 使用空轮询的繁忙检测方式等候数据到来.为了避免这一点，我们可以设置拉请求的参数，允许消费者请求在“长轮询”时阻塞，直到数据到达.
You could imagine other possible designs which would be only pull, end-to-end. The producer would locally write to a local log, and brokers would pull from that with consumers pulling from them. A similar type of “store-and-forward” producer is often proposed. This is intriguing but we felt not very suitable for our target use cases which have thousands of producers. Our experience running persistent data systems at scale led us to feel that involving thousands of disks in the system across many applications would not actually make things more reliable and would be a nightmare to operate. And in practice we have found that we can run a pipeline with strong SLAs at large scale without a need for producer persistence.
你可以想象一些其他从端到端的一些可能性设计. 生产者把记录写入到本地日志中, 服务器将从消费者拉取的数据中拉取. 一种类似的储存和转发的生产者模型经常被提议. 这虽然挺有趣的, 但不适合有成千上万生产者的情况. 在我们大规模运行数据储存系统的经验来看, 成千上万的磁盘跨越多个应用并不让系统更为可靠, 操作起来将会是一个噩梦. 在实践中, 我们发现可以创建具有很强壮的SLAs保障的, 大规模的管道, 并且不需要提供者有持久化能力.
Most messaging systems keep metadata about what messages have been consumed on the broker. That is, as a message is handed out to a consumer, the broker either records that fact locally immediately or it may wait for acknowledgement from the consumer. This is a fairly intuitive choice, and indeed for a single machine server it is not clear where else this state could go. Since the data structures used for storage in many messaging systems scale poorly, this is also a pragmatic choice–since the broker knows what is consumed it can immediately delete it, keeping the data size small.
大部分的消息系统在服务器端记录哪些消息被消费的元数据信息. 那就是, 消息被发送给消费者时, 服务器要么在本地马上记录日志, 要么等待消费者反馈后记录. 这样的话相当不直观, 事实上，对于一台服务器, 很难理清楚这个状态到底去哪里了. 因为在大部分的消息储存系统中, 数据结构很难被扩展, 这也依赖于编程的语义, 如果服务器知道消息被消费后可以马上删除, 那么就可以维持比较小的数据集.
What is perhaps not obvious is that getting the broker and consumer to come into agreement about what has been consumed is not a trivial problem. If the broker records a message as consumed immediately every time it is handed out over the network, then if the consumer fails to process the message (say because it crashes or the request times out or whatever) that message will be lost. To solve this problem, many messaging systems add an acknowledgement feature which means that messages are only marked as sent not consumed when they are sent; the broker waits for a specific acknowledgement from the consumer to record the message as consumed. This strategy fixes the problem of losing messages, but creates new problems. First of all, if the consumer processes the message but fails before it can send an acknowledgement then the message will be consumed twice. The second problem is around performance, now the broker must keep multiple states about every single message (first to lock it so it is not given out a second time, and then to mark it as permanently consumed so that it can be removed). Tricky problems must be dealt with, like what to do with messages that are sent but never acknowledged.
碰巧不太明显的是, 让服务器和消费者对已经消费的数据达成一致并不是一件简单的事情. 如果服务器在每次数据分发出去后, 马上标记消息已经被消费了, 如果消费者处理消息失败了(例如宕机了), 那么消息可能会丢失. 为了解决这个问题, 很多消息系统添加了反馈机制, 用于标记消息已经被发送, 而不是被消费, 服务器等待消费者发送一个反馈来确认消息已经 被消费. 这个策略解决消息丢失的问题, 但是同时也引发新的问题. 首先, 如果消费者已经消费了记录, 但是在反馈时失败, 则有可能重复消费两次. 其次, 是多一个来回的性能损耗, 现在服务器就要为每个消息保存不同的状态(先锁定, 这样不会发送第二次, 然后标记为永久消费后, 才能把它删除). 还有些麻烦的问题需要处理, 比如消息被发送 了, 但是从来没有接受到反馈.
Kafka handles this differently. Our topic is divided into a set of totally ordered partitions, each of which is consumed by exactly one consumer within each subscribing consumer group at any given time. This means that the position of a consumer in each partition is just a single integer, the offset of the next message to consume. This makes the state about what has been consumed very small, just one number for each partition. This state can be periodically checkpointed. This makes the equivalent of message acknowledgements very cheap.
kafka使用不一样的处理方式, 主题被划分成一系列有序的分区集合, 每个分区在一个时刻仅被订阅分组中的一个消费者消费. 这意味这每个消费者在一个分区位置就只是一个数值, 用于记录下一次消息要被消费的位置. 这意味着记录消费者状态的代价非常小, 只是每个分区一个数值. 这个状态可以定期做检查点, 这使等价的消息反馈代价非常小.
There is a side benefit of this decision. A consumer can deliberately rewind back to an old offset and re-consume data. This violates the common contract of a queue, but turns out to be an essential feature for many consumers. For example, if the consumer code has a bug and is discovered after some messages are consumed, the consumer can re-consume those messages once the bug is fixed.
这个方案还有另外的好处, 消费者可以优雅地重新指定一个旧的位移位置, 并重新消费数据. 这个和通常的队列观念有点相悖, 但是对很多消费者来说是一个很重要的特性. 例如，如果消费代码有bug，并且在一些消息被消费后发现，一旦bug被修复，消费者可以重新使用这些消息.。
Offline Data Load 离线数据加载
In the case of Hadoop we parallelize the data load by splitting the load over individual map tasks, one for each node/topic/partition combination, allowing full parallelism in the loading. Hadoop provides the task management, and tasks which fail can restart without danger of duplicate data—they simply restart from their original position.
在hadoop的例子中, 我们通过把数据分发到独立的任务集中进行并行处理, 每个的单位是按服务器/主题/分区, 这样可以允许很好的并发数据加载处理. Hadoop 提供任务管理, 任务可以在失败是重新启动, 而不用担心会重复处理数据–只需要简单从他们原来处理的位置重新开始.
Now that we understand a little about how producers and consumers work, let’s discuss the semantic guarantees Kafka provides between producer and consumer. Clearly there are multiple possible message delivery guarantees that could be provided:
现在我们大致理解生产者和消息者是怎么工作的, 现在我们讨论下kafka提供的基于生产者和消费者之间提供的保障语言. 很清楚地, 这里有三种消息的发送保障机制
- At most once—Messages may be lost but are never redelivered. 最多一次, 消息可能会丢失, 但是不会被重复分发
- At least once—Messages are never lost but may be redelivered. 至少一次, 消息不会丢失, 但有可能会重复分发
- Exactly once—this is what people actually want, each message is delivered once and only once. 有且仅有一次, 这是人们最终想要的, 消息仅且只会被分发一次
Many systems claim to provide “exactly once” delivery semantics, but it is important to read the fine print, most of these claims are misleading (i.e. they don’t translate to the case where consumers or producers can fail, cases where there are multiple consumer processes, or cases where data written to disk can be lost).
很多系统声称他们能够提供仅此一次的分发语义, 但是仔细读的话, 所有的这些声明都是误导(他们没有考虑到消费者或提供者可能会失败的情况, 或是多个消费者的情况, 或是数据写入磁盘时丢失的情况)
Kafka’s semantics are straight-forward. When publishing a message we have a notion of the message being “committed” to the log. Once a published message is committed it will not be lost as long as one broker that replicates the partition to which this message was written remains “alive”. The definition of alive as well as a description of which types of failures we attempt to handle will be described in more detail in the next section. For now let’s assume a perfect, lossless broker and try to understand the guarantees to the producer and consumer. If a producer attempts to publish a message and experiences a network error it cannot be sure if this error happened before or after the message was committed. This is similar to the semantics of inserting into a database table with an autogenerated key
.kafka提供的语义十分直接. 在发布消息的时, 我们有一个消息正在被提及到日志的概念, 一旦消费被提交上, 就不会丢失, kafka服务器把对这个分区上的消息复制到其他活着的服务器上.关于存活的定义和失败的定义我们将会在下面讲到, 现在, 我们假设有一个很好的, 不会丢失的服务器, 尝试对消费者和生产者提供保障. 如果生产者尝试发布消息碰到网络异常, 它无法确认这个错误是在消息提及之前还是在消息提及之后. 这个类似于使用插入数据到数据库时使用自动增长的主键的情形.
These are not the strongest possible semantics for publishers. Although we cannot be sure of what happened in the case of a network error, it is possible to allow the producer to generate a sort of “primary key” that makes retrying the produce request idempotent. This feature is not trivial for a replicated system because of course it must work even (or especially) in the case of a server failure. With this feature it would suffice for the producer to retry until it receives acknowledgement of a successfully committed message at which point we would guarantee the message had been published exactly once. We hope to add this in a future Kafka version.
这里没有对生产者使用强制可能的语境. 因为, 我们无法确定网络是否会发生异常, 有可能让生产者创建有序的主键, 使得提供者在重试请求是对等的. 这个特性对一个复制系统非常重要, 它甚至要保证服务器宕机时也能工作, 使用这个特性允许生产者重试, 直到接收到消息已经成功提交的反馈信息, 在这个点上可以保证消息指北发布一次. 我们打算把这个特性发布到未来的kafka版本中
Not all use cases require such strong guarantees. For uses which are latency sensitive we allow the producer to specify the durability level it desires. If the producer specifies that it wants to wait on the message being committed this can take on the order of 10 ms. However the producer can also specify that it wants to perform the send completely asynchronously or that it wants to wait only until the leader (but not necessarily the followers) have the message.
并不是所有的情况都需要这样强的保障的. 对于那些对延迟比较敏感的生产者, 我们允许生产者自定义可用性级别. 比如生产者愿意等候消息10ms后再被提交. 然而, 生产者也可以配置完全使用异步发送, 或者等候到主服务器(而不是所有的副本)已经拥有这份消息.
Now let’s describe the semantics from the point-of-view of the consumer. All replicas have the exact same log with the same offsets. The consumer controls its position in this log. If the consumer never crashed it could just store this position in memory, but if the consumer fails and we want this topic partition to be taken over by another process the new process will need to choose an appropriate position from which to start processing. Let’s say the consumer reads some messages — it has several options for processing the messages and updating its position.
现在, 让我来查看下消费者在这方面对应的语义. 所有的副本都有同样的日志和位移, 消费者控制它在日志中的唯一位置, 如果消费者没有奔溃, 它只需要简单地把位置储存到内存中, 但是如果消费者失败了, 我们希望这个主题的分区能够被其它新的消费者进程使用, 并能从原先的合适的位置开始读取. 我们说消费者需要读取一些信息–它有一些选项用于处理消息和更新它的位置.
- It can read the messages, then save its position in the log, and finally process the messages. In this case there is a possibility that the consumer process crashes after saving its position but before saving the output of its message processing. In this case the process that took over processing would start at the saved position even though a few messages prior to that position had not been processed. This corresponds to “at-most-once” semantics as in the case of a consumer failure messages may not be processed.
- 它可以先读取消息, 然后把位置保存到日志中, 然后处理消息, 在这种情况下, 消费者有可能在保存了日志点后, 在处理消息输出数据是奔溃, 这时进程可能从日志点读取位移位置接下去处理数据, 尽管之前有一些数据处理失败了. 这种对于在消费者失败时最多被处理一次的语义
- It can read the messages, process the messages, and finally save its position. In this case there is a possibility that the consumer process crashes after processing messages but before saving its position. In this case when the new process takes over the first few messages it receives will already have been processed. This corresponds to the “at-least-once” semantics in the case of consumer failure. In many cases messages have a primary key and so the updates are idempotent (receiving the same message twice just overwrites a record with another copy of itself).
- 它可以先读取, 然后处理消息, 最后保存位移. 在这种情况下, 消费者有可能在保存位移时奔溃, 新进程重启是可能就会接收到一些之前处理过的数据, 这对应于至少被消费一次的语义, 很多情况下, 消息可能有自己的主键, 所以在更新上是等效的(收到一份相同的消息两次, 只不过对同一份记录覆盖两次)
- So what about exactly once semantics (i.e. the thing you actually want)? The limitation here is not actually a feature of the messaging system but rather the need to co-ordinate the consumer’s position with what is actually stored as output. The classic way of achieving this would be to introduce a two-phase commit between the storage for the consumer position and the storage of the consumers output. But this can be handled more simply and generally by simply letting the consumer store its offset in the same place as its output. This is better because many of the output systems a consumer might want to write to will not support a two-phase commit. As an example of this, our Hadoop ETL that populates data in HDFS stores its offsets in HDFS with the data it reads so that it is guaranteed that either data and offsets are both updated or neither is. We follow similar patterns for many other data systems which require these stronger semantics and for which the messages do not have a primary key to allow for deduplication.
- 那什么是仅此一次的语义(这个是不是你真的想要的), 这个限制其实并不是消息系统的特性, 而是要协调消费者的位置和它实际输出的储存方式. 经典的解决方式是在数据储存和消费者位移储存间引入来两次提交的方式, 但是可以使用更简单的方式把消费者的位移位置和数据输出保存到同一个位置上, 因为有很多储存系统并不支持两相提交. 例如, 我们的hadoop ETL工具从保存数据到dhfs上的同时也把位移位置也保存到hdfs中了, 这样可以保证数据和位移位置同时被更新或者都没更新.我们在很多系统上使用类似的模式, 用于解决那些需要这种强语义但是却没有主键用于区分重复的储存系统中.
- So effectively Kafka guarantees at-least-once delivery by default and allows the user to implement at most once delivery by disabling retries on the producer and committing its offset prior to processing a batch of messages. Exactly-once delivery requires co-operation with the destination storage system but Kafka provides the offset which makes implementing this straight-forward.
- 因此kafka默认保障最少一次的分发语义, 并允许用户禁止重试和在处理数据之前提及它的位移位置来实现最多一次的语义, 有且仅有一次, 这种语义需要和输出的目的储存系统相结合, 但是kafka提供的位移语义使得实现这些功能非常简单.
Kafka replicates the log for each topic’s partitions across a configurable number of servers (you can set this replication factor on a topic-by-topic basis). This allows automatic failover to these replicas when a server in the cluster fails so messages remain available in the presence of failures.
kafka为每个主题的分区日志复制到一个可配置的数据的服务器集群上(你可以对每个主题设置副本数). 这保证了如果集群中有服务器宕机时能够自动恢复, 消息可以从剩余的服务器中读取.
Other messaging systems provide some replication-related features, but, in our (totally biased) opinion, this appears to be a tacked-on thing, not heavily used, and with large downsides: slaves are inactive, throughput is heavily impacted, it requires fiddly manual configuration, etc. Kafka is meant to be used with replication by default—in fact we implement un-replicated topics as replicated topics where the replication factor is one.
其他消息系统也提供复制特性, 但是, 在我们(有点偏见地)看来, 这视乎是一个附加的特性, 不能大量使用, 并且伴随大量的缺点, 备机不是活跃的, 吞吐量严重受到影响. 还需要繁琐的人工配置等等. kafka默认开启复制功能, 实际上我们把没有实现复制的主题当作副本只有一个的复制主题来看待.
The unit of replication is the topic partition. Under non-failure conditions, each partition in Kafka has a single leader and zero or more followers. The total number of replicas including the leader constitute the replication factor. All reads and writes go to the leader of the partition. Typically, there are many more partitions than brokers and the leaders are evenly distributed among brokers. The logs on the followers are identical to the leader’s log—all have the same offsets and messages in the same order (though, of course, at any given time the leader may have a few as-yet unreplicated messages at the end of its log).
复制是基于主题分区. 在没有失败的情况下, 每个分区在kafka中有一个主分区和零个或多个备份分区, 所有的这些副本包括主分区构成了复制因子.所有读写都使用主分区. 正常情况下, 分区数量一般比服务器多的多, 所有的主分区最终分布到所有的服务器上. 在备份服务器上的日志一般和主服务器的日志一致, 拥有相同的偏移量和消息顺序(当然, 在特定的时间内, 主分区日志的尾部可能有一些消息没有复制到主服务器上)
Followers consume messages from the leader just as a normal Kafka consumer would and apply them to their own log. Having the followers pull from the leader has the nice property of allowing the follower to naturally batch together log entries they are applying to their log.
备份服务器获从主服务器获取消息就像kafka的消费着读取并记录到自己的日志中. 这些从服务器有个很好的特性, 就是能自然地获取批量数据并应用到他们自己的日志中.
As with most distributed systems automatically handling failures requires having a precise definition of what it means for a node to be “alive”. For Kafka node liveness has two conditions
和大部分分布式系统一样, 自动处理容错需要对节点”存活”有一个准确的定义, 比如kafka节点存活有两个条件
- A node must be able to maintain its session with ZooKeeper (via ZooKeeper’s heartbeat mechanism) 节点必须能够和zookeeper机器建立心跳信号
- If it is a slave it must replicate the writes happening on the leader and not fall “too far” behind 如果是个备份节点, 必须在主节点写的时候进行复制, 不能落下太远.
In distributed systems terminology we only attempt to handle a “fail/recover” model of failures where nodes suddenly cease working and then later recover (perhaps without knowing that they have died). Kafka does not handle so-called “Byzantine” failures in which nodes produce arbitrary or malicious responses (perhaps due to bugs or foul play).
在一个分布式的术语里, 我们尝试处理”失败/恢复”模型, 像节点突然停止工作, 然后又恢复的(可能不知道他们是否宕机了). kafka不处理所谓的“拜占庭”故障，比如节点产生任意或恶意的反馈(比如bug或不规范行为)
A message is considered “committed” when all in sync replicas for that partition have applied it to their log. Only committed messages are ever given out to the consumer. This means that the consumer need not worry about potentially seeing a message that could be lost if the leader fails. Producers, on the other hand, have the option of either waiting for the message to be committed or not, depending on their preference for tradeoff between latency and durability. This preference is controlled by the acks setting that the producer uses.
一条消息只有在它在所有的同步副本集的日志分区都已经提交了, 才被当作是”已提交”. 只有已经提交的消息才会分发给消费者. 这说明消费者不需要担心会看到主节点宕机时消息会丢失. 生产者可以在延迟和持久性中, 决定是否等待消息提交. 这个在生产者中的反馈配置项中可以设置.
The guarantee that Kafka offers is that a committed message will not be lost, as long as there is at least one in sync replica alive, at all times.
Kafka will remain available in the presence of node failures after a short fail-over period, but may not remain available in the presence of network partitions.
kafka可以保证节点在一个短暂的宕机时, 仍然可用. 但是无法保证网络出现脑裂时仍然可用.
A replicated log models the process of coming into consensus on the order of a series of values (generally numbering the log entries 0, 1, 2, …). There are many ways to implement this, but the simplest and fastest is with a leader who chooses the ordering of values provided to it. As long as the leader remains alive, all followers need to only copy the values and ordering the leader chooses.
复制日志模型用于处理连续输入的, 有序的记录值(像有编号的日志1, 2, 3). 这里有很多实现的方式, 但是最简单和最有效的方式是, 主节点选择和提供这个顺序值. 只要主节点存活, 备份节点只要按主几点选择的顺序拷贝这些值就可以了.
Of course if leaders didn’t fail we wouldn’t need followers! When the leader does die we need to choose a new leader from among the followers. But followers themselves may fall behind or crash so we must ensure we choose an up-to-date follower. The fundamental guarantee a log replication algorithm must provide is that if we tell the client a message is committed, and the leader fails, the new leader we elect must also have that message. This yields a tradeoff: if the leader waits for more followers to acknowledge a message before declaring it committed then there will be more potentially electable leaders.
当然如果主节点不宕机, 我们也不需要备份节点! 如果主节点宕机了, 我们需要从备份节点中选择一个新的主节点. 但是备份节点本身也有可能宕机或者延迟, 所以我们必须选择最新的备份节点. 最基本的保证是, 一个复制算法必须提供,”如果我们告诉客户端消息已经提交了, 这个时候主节点宕机, 新的主节点被选举出来时必须保证也有拥有这条消息”, 这里有一个权衡, 主节点必须等待多个从节点反馈消息已经提交, 这样才能有更多备节点能用来做为主节点的候选节点.
If you choose the number of acknowledgements required and the number of logs that must be compared to elect a leader such that there is guaranteed to be an overlap, then this is called a Quorum.
如果你选择那些需要反馈的数量和可以用于选举为主节点的日志数可以保证重叠, 这个叫做 Quorum
A common approach to this tradeoff is to use a majority vote for both the commit decision and the leader election. This is not what Kafka does, but let’s explore it anyway to understand the tradeoffs. Let’s say we have 2f+1 replicas. If f+1 replicas must receive a message prior to a commit being declared by the leader, and if we elect a new leader by electing the follower with the most complete log from at least f+1 replicas, then, with no more than f failures, the leader is guaranteed to have all committed messages. This is because among any f+1 replicas, there must be at least one replica that contains all committed messages. That replica’s log will be the most complete and therefore will be selected as the new leader. There are many remaining details that each algorithm must handle (such as precisely defined what makes a log more complete, ensuring log consistency during leader failure or changing the set of servers in the replica set) but we will ignore these for now.
一种达到这种目标的最常用的方法是, 在提交决策和选举决策上都使用最多投票的方式, 这不是kafka的实现, 但是我们为了明白这个原理还是解释下.如果说我们有2f+1个副本, 那么f+1的副本必须在主节点提交日志前接受到消息, 这样我们就可以从拥有最完全的日志的f+1个副本集中选择出主服务器. 因为在任何f+1个副本中, 肯定有一个副本是包含全部的日志的, 这个副本的日志是最新的, 因此会被选择为主节点. 这里有很多关于这个算法的细节需要处理(像如果定义使日志更全些, 再主节点宕机时保证日志的一致性, 修改复制集中日志的副本数 ), 但是我们现在先忽略
This majority vote approach has a very nice property: the latency is dependent on only the fastest servers. That is, if the replication factor is three, the latency is determined by the faster slave not the slower one.
There are a rich variety of algorithms in this family including ZooKeeper’s Zab, Raft, and Viewstamped Replication. The most similar academic publication we are aware of to Kafka’s actual implementation is PacificA from Microsoft.
主选举机制有一个相当好的属性, 延迟只依赖于最快的服务器. 因此, 如果复制集是3, 延迟取决于最快的备份节点而不是最慢的那个.
The downside of majority vote is that it doesn’t take many failures to leave you with no electable leaders. To tolerate one failure requires three copies of the data, and to tolerate two failures requires five copies of the data. In our experience having only enough redundancy to tolerate a single failure is not enough for a practical system, but doing every write five times, with 5x the disk space requirements and 1/5th the throughput, is not very practical for large volume data problems. This is likely why quorum algorithms more commonly appear for shared cluster configuration such as ZooKeeper but are less common for primary data storage. For example in HDFS the namenode’s high-availability feature is built on a majority-vote-based journal, but this more expensive approach is not used for the data itself.
多数投票的缺点是，它并不容忍很多的失败, 导致你没有可被选择为主的备节点.为了容忍1个错误需要3份的副本数据, 要容忍3个失败需要5份副本数据. 在我们的经验中, 用足够多的冗余来来容忍单一的错误在现实中的系统是不够的, 这样每次写5次, 使用5被的硬盘空间和5份之一的带宽, 在大体量的数据储存上不是特别实践, 所以quorum的算法机制在共享的集群配置中好像更为常见写, 但是在主储存结构上比较少用, 例如, hdfs的namenode节点使用基于主副本的选举机制建立高可用性能, 但是由于代价太高没有用在数据方面
Kafka takes a slightly different approach to choosing its quorum set. Instead of majority vote, Kafka dynamically maintains a set of in-sync replicas (ISR) that are caught-up to the leader. Only members of this set are eligible for election as leader. A write to a Kafka partition is not considered committed until all in-sync replicas have received the write. This ISR set is persisted to ZooKeeper whenever it changes. Because of this, any replica in the ISR is eligible to be elected leader. This is an important factor for Kafka’s usage model where there are many partitions and ensuring leadership balance is important. With this ISR model and f+1 replicas, a Kafka topic can tolerate f failures without losing committed messages.
kafka使用有点儿不太一样的策略来选择他的quorum集合. 不像多数投票一样, kafka动态维护能跟得上主节点的复制集合(ISR), 只有在这个集合里面的成员才有资格被选举为主节点, 这个对于kafka这种拥有很多分区并且需要保证主节点的负载均衡的模型来说非常重要. 使用ISR这样的模型和f+1个副本, kafka的主题可以容忍f个节点宕机后已经提交的消息不会丢失.
For most use cases we hope to handle, we think this tradeoff is a reasonable one. In practice, to tolerate f failures, both the majority vote and the ISR approach will wait for the same number of replicas to acknowledge before committing a message (e.g. to survive one failure a majority quorum needs three replicas and one acknowledgement and the ISR approach requires two replicas and one acknowledgement). The ability to commit without the slowest servers is an advantage of the majority vote approach. However, we think it is ameliorated by allowing the client to choose whether they block on the message commit or not, and the additional throughput and disk space due to the lower required replication factor is worth it.
在大部分情况下我们希望能处理的, 认为这样的权衡是合理的. 在实践中, 为了对f几个节点宕机进行容错, 无论是多数投票还是ISR策略都需要等待一样数量的副本都得到通知后才能提交消息(为了避免一个节点宕机, 多数投票策略需要3个副本和一次反馈, isr策略需要2个副本和一次反馈). 不需要等待最慢的服务器就能提交消息是多数投票策略最大的优点.尽管这样, 我们进行改善,让客户端选择是否等待消息提交, 使用较小的副本因子会带来额外的吞吐量带来的价值可能不菲.
Another important design distinction is that Kafka does not require that crashed nodes recover with all their data intact. It is not uncommon for replication algorithms in this space to depend on the existence of “stable storage” that cannot be lost in any failure-recovery scenario without potential consistency violations. There are two primary problems with this assumption. First, disk errors are the most common problem we observe in real operation of persistent data systems and they often do not leave data intact. Secondly, even if this were not a problem, we do not want to require the use of fsync on every write for our consistency guarantees as this can reduce performance by two to three orders of magnitude. Our protocol for allowing a replica to rejoin the ISR ensures that before rejoining, it must fully re-sync again even if it lost unflushed data in its crash.
一个重要的设计是kafka并没有要求宕机的节点需要完整无缺恢复数据. 要求复制算法在故障恢复是没有任何一致性冲突依赖于底层的储存介质. 这个假设有两个主要的问题, 第一磁盘错误是最通常的问题, 我们从一个现实的数据储存系统的操作可以观察到, 这个可能导致数据不完整. 第二, 即使这些没有问题, 我们也不想每次写数据是调用fsync方法来保证数据的一致性, 因为这会降低2到3倍的性能. 我们允许一个复制节点从新加入到ISR集合前, 必须完全同步上, 即使它宕机是把未flush的数据丢失了
Unclean leader election: What if they all die? 不清楚主选举:如果全部宕机了?
However a practical system needs to do something reasonable when all the replicas die. If you are unlucky enough to have this occur, it is important to consider what will happen. There are two behaviors that could be implemented:
尽管这样, 在现实中的系统, 需要做一些事情在所有副本都宕机的情况下. 如果你不幸遇到了, 你需要仔细考虑下将碰到的问题. 有两种行为需要去做:
- Wait for a replica in the ISR to come back to life and choose this replica as the leader (hopefully it still has all its data). 等待ISR中一个副本起来, 然后选择这个副本作为主节点(期望数据不会丢失)
- Choose the first replica (not necessarily in the ISR) that comes back to life as the leader.选择第一个存活的副本(不一定在ISR副本 集中)直接作为主节点
This is a simple tradeoff between availability and consistency. If we wait for replicas in the ISR, then we will remain unavailable as long as those replicas are down. If such replicas were destroyed or their data was lost, then we are permanently down. If, on the other hand, a non-in-sync replica comes back to life and we allow it to become leader, then its log becomes the source of truth even though it is not guaranteed to have every committed message. By default, Kafka chooses the second strategy and favor choosing a potentially inconsistent replica when all replicas in the ISR are dead. This behavior can be disabled using configuration property unclean.leader.election.enable, to support use cases where downtime is preferable to inconsistency.
这个必须在可用性和一致性之间作权衡. 如果我们等待在ISR集合中的副本再次启动起来, 那么在所有副本及都宕机这段时间, 我们会维持不可用的状态.如果这些副本已经坏了, 或对应的数据已经丢失了, 则我们永久宕机了.如果, 换种方式, 从没有同步的副本中选择一个存活的变成主的, 那么这个副本的日志就变成当前主要的数据源, 但是保证当前所有已经提交的消息还存在. 默认情况下, kafka使用第二种策略, 在ISR中的所有副本集都宕机时, 使用一个潜在的非一致性的副本,如果我们更期望是不可用状态而不是不一致状态时, 这个特性可以通过配置unclean.leader.election.enable来禁用,
This dilemma is not specific to Kafka. It exists in any quorum-based scheme. For example in a majority voting scheme, if a majority of servers suffer a permanent failure, then you must either choose to lose 100% of your data or violate consistency by taking what remains on an existing server as your new source of truth.
这种困境不是kafka特有的, 这存在于任何基于quorum方式的结构中. 例如, 多数投票算法, 如果大多数的服务器都永久性失效了, 你必须选择丢失全部的数据或者接受某一台可能数据不一致的服务器上的数据.
Availability and Durability Guarantees 可用性和可靠性保证
- Disable unclean leader election – if all replicas become unavailable, then the partition will remain unavailable until the most recent leader becomes available again. This effectively prefers unavailability over the risk of message loss. See the previous section on Unclean Leader Election for clarification.
- 禁止无主选举, 如果所有的副本都不可用, 这个分区就要等到最近一个主分区可以用时才可用, 这比较可能导致不可用, 而不是数据丢失.
- Specify a minimum ISR size – the partition will only accept writes if the size of the ISR is above a certain minimum, in order to prevent the loss of messages that were written to just a single replica, which subsequently becomes unavailable. This setting only takes effect if the producer uses acks=all and guarantees that the message will be acknowledged by at least this many in-sync replicas. This setting offers a trade-off between consistency and availability. A higher setting for minimum ISR size guarantees better consistency since the message is guaranteed to be written to more replicas which reduces the probability that it will be lost. However, it reduces availability since the partition will be unavailable for writes if the number of in-sync replicas drops below the minimum threshold.
- 指定一个最小的ISR集合, 分区只有在ISR集合的个数大于指定值时, 才能进行读写, 这样可以阻止消息只写入到一个副本的, 随后这个副本宕机导致数据丢失.这个只有参数生产者开启了全反馈的时才能保证消息会在所有同步的副本集中至少有这么多个反馈. 这个参数提供了一致性和可用性之前的权衡, 较大最小ISR可以保证比较好的一致性, 因为消息被写入更多的副本, 减少丢失的可能性, 但是同时也减低了可用性, 因为分区的副本数如果达不到最小ISR集合时将不可用.
Replica Management 复制管理
It is also important to optimize the leadership election process as that is the critical window of unavailability. A naive implementation of leader election would end up running an election per partition for all partitions a node hosted when that node failed. Instead, we elect one of the brokers as the “controller”. This controller detects failures at the broker level and is responsible for changing the leader of all affected partitions in a failed broker. The result is that we are able to batch together many of the required leadership change notifications which makes the election process far cheaper and faster for a large number of partitions. If the controller fails, one of the surviving brokers will become the new controller.
对不可用的时间端, 优化主节点的选举也很重要. 一个直观的选举实现是如果一个节点宕机了, 那么这个节点上的每个分区都独立选举. 但是, 我们选举一个节点作为控制器. 这个控制或在节点级别上进行容错管理, 和负责修改所有的被影响的分区的选举. 这样好处是,我们可以批量处理选举, 减少很多独立选举时大量通知, 这使得在大量分区时选举代价更小, 更快. 如果这个控制器失败了, 其中一个还存活的节点会变成主控制器.
4.8 Log Compaction 日志压缩
So far we have described only the simpler approach to data retention where old log data is discarded after a fixed period of time or when the log reaches some predetermined size. This works well for temporal event data such as logging where each record stands alone. However an important class of data streams are the log of changes to keyed, mutable data (for example, the changes to a database table).
到目前为止, 我们只讨论了简单的日志维护方式, 如定期清理过期的数据, 或者清除超过指定大小的数据. 这种方式对于每条数据记录临时日志信息非常有效. 但是有一种重要的数据流类型, 是根据主键, 变化的数据(例如: 数据库表的变化)
Let’s discuss a concrete example of such a stream. Say we have a topic containing user email addresses; every time a user updates their email address we send a message to this topic using their user id as the primary key. Now say we send the following messages over some time period for a user with id 123, each message corresponding to a change in email address (messages for other ids are omitted):
让我们使用具体实例来讨论这个问题. 比如说我们有个关于用户邮件的主题, 每次有用户的邮件变化时, 我们发送一条消息到这个主题, 这个消息使用用户的ID作为主键. 现在我们过段时间发送一些消息给用户(id:123), 每条消息包含着邮件变更信息(其他人的消息暂时忽略)
123 => firstname.lastname@example.org . . . 123 => email@example.com . . . 123 => firstname.lastname@example.org
email@example.com). By doing this we guarantee that the log contains a full snapshot of the final value for every key not just keys that changed recently. This means downstream consumers can restore their own state off this topic without us having to retain a complete log of all changes.
Let’s start by looking at a few use cases where this is useful, then we’ll see how it can be used.
- Database change subscription. It is often necessary to have a data set in multiple data systems, and often one of these systems is a database of some kind (either a RDBMS or perhaps a new-fangled key-value store). For example you might have a database, a cache, a search cluster, and a Hadoop cluster. Each change to the database will need to be reflected in the cache, the search cluster, and eventually in Hadoop. In the case that one is only handling the real-time updates you only need recent log. But if you want to be able to reload the cache or restore a failed search node you may need a complete data set.
- 数据库变更订阅. 很常有这样的情况, 一份数据会出现在多个系统里面, 而且很常其中的一个系统是数据库类型的(如RDBMS或者新的键值系统), 比如, 你有一个数据库, 一个缓存系统, 一个检索集群, 一个hadoop集群. 每次对数据库的变更需要同步到在缓存, 检索集群, 并最终保存到hadoop中. 在这种情况下, 你只需要当前实时系统最新的更新日志. 但是, 如果你要重新加载缓存, 或恢复宕机的检索节点, 你需要完整的数据
- Event sourcing. This is a style of application design which co-locates query processing with application design and uses a log of changes as the primary store for the application.
- 事件源. 这是一种应用程序设计风格，它将查询处理与应用程序设计结合在一起，并使用日志的更改作为应用程序的主存储.
- Journaling for high-availability. A process that does local computation can be made fault-tolerant by logging out changes that it makes to its local state so another process can reload these changes and carry on if it should fail. A concrete example of this is handling counts, aggregations, and other “group by”-like processing in a stream query system. Samza, a real-time stream-processing framework, uses this feature for exactly this purpose.
- 高可用日志. 一个本地计算进程, 可以把变更日志输进行, 达到容错的目的, 这样另外一个进程就能够在当前那个进程宕机的时接受继续处理. 例如, 像流数据查询例子, 如计数, 汇总或其他的分组操作. 实时系统框架如Samza, 就是为了达到这个目的使用这个特性的
The general idea is quite simple. If we had infinite log retention, and we logged each change in the above cases, then we would have captured the state of the system at each time from when it first began. Using this complete log, we could restore to any point in time by replaying the first N records in the log. This hypothetical complete log is not very practical for systems that update a single record many times as the log will grow without bound even for a stable dataset. The simple log retention mechanism which throws away old updates will bound space but the log is no longer a way to restore the current state—now restoring from the beginning of the log no longer recreates the current state as old updates may not be captured at all.
想法很简单, 我们有无限的日志, 在每种情况下记录变更日志, 我们从一开始就开始记录系统的每一次变更情况. 着用这种日志, 我们可以通过回放N个记录, 把状态恢复到任何一个时间点. 在这种假设前提下, 完全日志对于那些对同一条记录会更新很多次, 即使数据集是规定大小的, 日志也会迅速增长. 这种简单的日志维护方式除了浪费空间外, 但是这些日志也不能恢复当前的状态, 从当前的日志不能恢复当前状态, 是因为旧的日志可能没有全部的更新记录.
Log compaction is a mechanism to give finer-grained per-record retention, rather than the coarser-grained time-based retention. The idea is to selectively remove records where we have a more recent update with the same primary key. This way the log is guaranteed to have at least the last state for each key.
日志压缩提供对每条记录的保存方式提供细粒度的机制, 而不是基于时间范围的粗款的方式. 我们可以在对具有相同主键的记录更新时, 选择性删除记录.这样日志可以保证拥有每个键值的最后状态
This retention policy can be set per-topic, so a single cluster can have some topics where retention is enforced by size or time and other topics where retention is enforced by compaction.
这种保存策略可以针对主题基本设置, 这样一个集群的一些主题可以按大小和时间进行保存, 一些可以按压缩策略进行保存
This functionality is inspired by one of LinkedIn’s oldest and most successful pieces of infrastructure—a database changelog caching service called Databus. Unlike most log-structured storage systems Kafka is built for subscription and organizes data for fast linear reads and writes. Unlike Databus, Kafka acts as a source-of-truth store so it is useful even in situations where the upstream data source would not otherwise be replayable.
这项功能被linkedin的一个最为古老和成功的基础设施所使用, 数据库日志缓存服务叫做 Databus.
Log Compaction Basics 日志压缩基础
The head of the log is identical to a traditional Kafka log. It has dense, sequential offsets and retains all messages. Log compaction adds an option for handling the tail of the log. The picture above shows a log with a compacted tail. Note that the messages in the tail of the log retain the original offset assigned when they were first written—that never changes. Note also that all offsets remain valid positions in the log, even if the message with that offset has been compacted away; in this case this position is indistinguishable from the next highest offset that does appear in the log. For example, in the picture above the offsets 36, 37, and 38 are all equivalent positions and a read beginning at any of these offsets would return a message set beginning with 38.
日志的同步和传统的kafka日志一样, 拥有密集的, 顺序的位移, 并保存所有的消息. 日志压缩对尾部添加而外的压缩选项.这张图展示了已经压缩的尾部. 注意, 在尾部的消息保存它们第一次写入时的原始位移位置, 也要注意, 这些消息的位移位置即使在压缩过后也是合法的, 在这种情况下, 这个位置和下次出现在日志中的最高位移位置是很难区分的. 比如这图上, 36, 37和38位移位置都是同等的, 在开始读取这些位移位置时, 将会从38开始读取.
Compaction also allows for deletes. A message with a key and a null payload will be treated as a delete from the log. This delete marker will cause any prior message with that key to be removed (as would any new message with that key), but delete markers are special in that they will themselves be cleaned out of the log after a period of time to free up space. The point in time at which deletes are no longer retained is marked as the “delete retention point” in the above diagram.
压缩同时也允许删除, 如果一个带键值的消息没有任何负载数据会被认为是要从日志中删除记录, 这个删除标志会导致先前带有这个键值的消息都被删除. 但是删除标志比较特殊, 在过一段时期后会被清除后释放空间.这个执行删除的时间点, 标记为”删除保留点”
The compaction is done in the background by periodically recopying log segments. Cleaning does not block reads and can be throttled to use no more than a configurable amount of I/O throughput to avoid impacting producers and consumers. The actual process of compacting a log segment looks something like this:
日志压缩在后台定时拷贝日志段的方式进行. 清除操作可以通过配置读写I/O的限额避免对消费额和生产者产生影响. 实际的日志段压缩过程有点像如下:
What guarantees does log compaction provide? 日志压缩提供了什么保障?
Log compaction guarantees the following: 日志压缩提供了如下保障:
- Any consumer that stays caught-up to within the head of the log will see every message that is written; these messages will have sequential offsets. The topic’s
min.compaction.lag.mscan be used to guarantee the minimum length of time must pass after a message is written before it could be compacted. I.e. it provides a lower bound on how long each message will remain in the (uncompacted) head.
- 任何追上头部的消费者, 都会接受到任何写入的消息, 这些消息都有顺序的位移值. 参数
- Ordering of messages is always maintained. Compaction will never re-order messages, just remove some.
- 消息顺序永远被保证, 压缩不会重新排序, 只会删除一些
- The offset for a message never changes. It is the permanent identifier for a position in the log.
- 消息的位移永远不会变, 这是消息在日志中的永久性标志
- Any consumer progressing from the start of the log will see at least the final state of all records in the order they were written. All delete markers for deleted records will be seen provided the consumer reaches the head of the log in a time period less than the topic’s
delete.retention.mssetting (the default is 24 hours). This is important as delete marker removal happens concurrently with read, and thus it is important that we do not remove any delete marker prior to the consumer seeing it.
- 任何从头开始消费记录的消费者, 都会按顺序得到最终的状态. 所有的删除标志的记录会在消费者到达头部之前, 小于主题设置的
Log Compaction Details 日志压缩细节
- It chooses the log that has the highest ratio of log head to log tail
- It creates a succinct summary of the last offset for each key in the head of the log
- It recopies the log from beginning to end removing keys which have a later occurrence in the log. New, clean segments are swapped into the log immediately so the additional disk space required is just one additional log segment (not a fully copy of the log).
- 它从头开始拷贝日志, 删除在日志最后出现的键值.新的干净日志段会被立刻交换到日志里面, 所以只需要额外的一个日志分段.
- The summary of the log head is essentially just a space-compact hash table. It uses exactly 24 bytes per entry. As a result with 8GB of cleaner buffer one cleaner iteration can clean around 366GB of log head (assuming 1k messages).
- 日志头部汇总实际上是一个空间紧凑的hash表, 使用24个字节一个条目的形式, 所以如果有8G的整理缓冲区, 则能迭代处理大约366G的日志头部(假设消息大小为1k)
Configuring The Log Cleaner 配置日志整理器
The log cleaner can be configured to retain a minimum amount of the uncompacted “head” of the log. This is enabled by setting the compaction time lag.
This can be used to prevent messages newer than a minimum message age from being subject to compaction. If not set, all log segments are eligible for compaction except for the last segment, i.e. the one currently being written to. The active segment will not be compacted even if all of its messages are older than the minimum compaction time lag.这个可以用于防止消息比当前正在压缩的最小消息时间更新, 如果没有设置, 所有的日志都会压缩, 除了最后一个正在被读写的段. 当前段甚至在消息大于最小压缩延迟时间也不会被压缩.
Further cleaner configurations are described here.
4.9 Quotas 配额
Starting in 0.9, the Kafka cluster has the ability to enforce quotas on produce and fetch requests. Quotas are basically byte-rate thresholds defined per group of clients sharing a quota.
从0.9版本开始, kafka集群对生产和消费请求进行限额配置, 配置主要是根据客户端分组按字节速率进行限定的.
Why are quotas necessary? 配额有必要么?
It is possible for producers and consumers to produce/consume very high volumes of data and thus monopolize broker resources, cause network saturation and generally DOS other clients and the brokers themselves. Having quotas protects against these issues and is all the more important in large multi-tenant clusters where a small set of badly behaved clients can degrade user experience for the well behaved ones. In fact, when running Kafka as a service this even makes it possible to enforce API limits according to an agreed upon contract.
生产者/消费者可能在生产/消费大量的数据, 因此会对服务器资源的大量独占, 导致网络达到饱和, 对其它客户端造成影响. 如果项目配置了限额就可以降低这些问题, 特别是在多租户的集群中, 一小部分低质量的客户端用户会降低这个用户集群的体验, 当使用kafka作为一项服务时, 甚至可以通过上层的协议来使用api进行强制限制
Client groups 客户端分组
PrincipalBuilder. Client-id is a logical grouping of clients with a meaningful name chosen by the client application. The tuple (user, client-id) defines a secure logical group of clients that share both user principal and client-id.
Quotas can be applied to (user, client-id), user or client-id groups. For a given connection, the most specific quota matching the connection is applied. All connections of a quota group share the quota configured for the group. For example, if (user=”test-user”, client-id=”test-client”) has a produce quota of 10MB/sec, this is shared across all producer instances of user “test-user” with the client-id “test-client”.
配额可以用于(user, client-id)组合, 或user, client-id分组. 对一个给定的连接, 最符合这个连接的配额被使用到, 一个限额组的所有连接共享这个限额配置, 例如: 如果(user=”test-user”, client-id=”test-client”) 10MB/s的配额, 这个配置会被所有的具有”test-user”用户 和客户端ID是 “test-client”的所有生产者所共享.
Quota Configuration 限额配置
Quota configuration may be defined for (user, client-id), user and client-id groups. It is possible to override the default quota at any of the quota levels that needs a higher (or even lower) quota. The mechanism is similar to the per-topic log config overrides. User and (user, client-id) quota overrides are written to ZooKeeper under /config/users and client-id quota overrides are written under /config/clients. These overrides are read by all brokers and are effective immediately. This lets us change quotas without having to do a rolling restart of the entire cluster. See here for details. Default quotas for each group may also be updated dynamically using the same mechanism.
配额可以按照(user, client-id)或者, user或client-id进行分组, 如果需要更高或更低的配额, 可以覆盖默配额, 这个机制类似于对日志主题配置的覆盖, user 或者 (user, client-id)配额可以覆盖写入到zookeeper下的 /config/users ,client-id配置, 可以写入到 /config/clients. 这些覆盖写入会被服务器很快的读取到, 这让我们修改配置不需要重新启动服务器. 每个分组的默认配置也可以同样的方式动态修改.
The order of precedence for quota configuration is:
Broker properties (quota.producer.default, quota.consumer.default) can also be used to set defaults for client-id groups. These properties are being deprecated and will be removed in a later release. Default quotas for client-id can be set in Zookeeper similar to the other quota overrides and defaults.服务器属性(quota.producer.default, quota.consumer.default)也可以用来配置默认client-id分组的默认值, 这些属性配置已经不鼓励使用, 会在后期删除掉. 默认client-id限额配置可以和其它默认配置一样, 在Zookeeper直接设置.
By default, each unique client group receives a fixed quota in bytes/sec as configured by the cluster. This quota is defined on a per-broker basis. Each client can publish/fetch a maximum of X bytes/sec per broker before it gets throttled. We decided that defining these quotas per broker is much better than having a fixed cluster wide bandwidth per client because that would require a mechanism to share client quota usage among all the brokers. This can be harder to get right than the quota implementation itself!
默认情况下, 每个唯一的客户端分组在集群上配置一个固定的限额, 这个限额是基于每台服务器的, 每个客户端能发布或获取在每台服务器都的最大速率, 我们按服务器定义配置, 而不是按整个集群定义, 是因为如果是集群范围的需要额外的机制来共享配额的使用情况, 这会导致配额机制的实现比较难.
How does a broker react when it detects a quota violation? In our solution, the broker does not return an error rather it attempts to slow down a client exceeding its quota. It computes the amount of delay needed to bring a guilty client under its quota and delays the response for that time. This approach keeps the quota violation transparent to clients (outside of client-side metrics). This also keeps them from having to implement any special backoff and retry behavior which can get tricky. In fact, bad client behavior (retry without backoff) can exacerbate the very problem quotas are trying to solve.
那么如果服务器检测到超出配额要怎么办? 在我们的解决方案中, 服务器不是返回错误, 而是尝试使客户端减低它的速率. 需要先计算有问题的客户端在它配额下需要的延迟时间, 然后延迟这段时间后响应.这使配置限制对客户端透明, 也可以防止它们做一些补偿策略或尝试行为, 会导致行为更为奇怪. 非法的客户端信息(没有补偿的重试行为)会导致比使用配置要解决的问题更恶劣.
客户端的字节限速使用多个小时间窗口(每秒30个窗口), 来快速检测和更正配额越界. 如果使用太大的配额窗口(例如30秒10个窗口), 容易导致在较长时间内有巨大的流量突增, 这个在实际中用户体验上不好.