资源描述
Kafka深度分析
架构
kafka是显式分布式架构,producer、broker(Kafka)和consumer都可以有多个。Kafka的运行依赖于ZooKeeper,Producer推送消息给kafka,Consumer从kafka拉消息。
kafka关键技术点
(1) zero-copy
在Kafka上,有两个原因可能导致低效:1)太多的网络请求 2)过多的字节拷贝。为了提高效率,Kafka把message分成一组一组的,每次请求会把一组message发给相应的consumer。 此外, 为了减少字节拷贝,采用了sendfile系统调用。为了理解sendfile原理,先说一下传统的利用socket发送文件要进行拷贝:
Sendfile系统调用:
(2) Exactly once message transfer
怎样记录每个consumer处理的信息的状态?在Kafka中仅保存了每个consumer已经处理数据的offset。这样有两个好处:1)保存的数据量少 2)当consumer出错时,重新启动consumer处理数据时,只需从最近的offset开始处理数据即可。
(3)Push/pull
Producer 向Kafka(push)推数据,consumer 从kafka 拉(pull)数据。
(4)负载均衡和容错
Producer和broker之间没有负载均衡机制。
broker和consumer之间利用zookeeper进行负载均衡。所有broker和consumer都会在zookeeper中进行注册,且zookeeper会保存他们的一些元数据信息。如果某个broker和consumer发生了变化,所有其他的broker和consumer都会得到通知。
kafka术语
Topic
Topic,是KAFKA对消息分类的依据;一条消息,必须有一个与之对应的Topic;
比如现在又两个Topic,分别是TopicA和TopicB,Producer向TopicA发送一个消息messageA,然后向TopicB发送一个消息messaeB;那么,订阅TopicA的Consumer就会收到消息messageA,订阅TopicB的Consumer就会收到消息messaeB;(每个Consumer可以同时订阅多个Topic,也即是说,同时订阅TopicA和TopicB的Consumer可以收到messageA和messaeB)。
同一个Group id的consumers在同一个Topic的同一条消息只能被一个consumer消费,实现了点对点模式,不同Group id的Consumers在同一个Topic上的同一条消息可以同时消费到,则实现了发布订阅模式。通过Consumer的Group id实现了JMS的消息模式
Message
Message就是消息,是KAfKA操作的对象,消息是按照Topic存储的;
KAFKA中按照一定的期限保存着所有发布过的Message,不管这些Message是否被消费过;例如这些Message的保存期限被这只为两天,那么一条Message从发布开始的两天时间内是可用的,超过保存期限的消息会被清空以释放存储空间。
消息都是以字节数组进行网络传递。
Partition
每一个Topic可以有多个Partition,这样做是为了提高KAFKA系统的并发能力,每个Partition中按照消息发送的顺序保存着Producer发来的消息,每个消息用ID标识,代表这个消息在改Partition中的偏移量,这样,知道了ID,就可以方便的定位一个消息了;每个新提交过来的消息,被追加到Partition的尾部;如果一个Partition被写满了,就不再追加;(注意,KAFKA不保证不同Partition之间的消息有序保存)
Leader
Partition中负责消息读写的节点;Leader是从Partition的节点中随机选取的。每个Partition都会在集中的其中一台服务器存在Leader。一个Topic如果有多个Partition,则会有多个Leader。
ReplicationFactor
一个Partition中复制数据的所有节点,包括已经挂了的;数量不会超过集群中broker的数量
isr
ReplicationFactor的子集,存活的且和Leader保持同步的节点;
Consumer Group
传统的消息系统提供两种使用方式:队列和发布-订阅;
队列:是一个池中有若干个Consumer,一条消息发出来以后,被其中的一个Consumer消费;
发布-订阅:是一个消息被广播出去,之后被所有订阅该主题的Consumer消费;
KAFKA提供的使用方式可以达到以上两种方式的效果:Consumer Group;
每一个Consumer用Consumer Group Name标识自己,当一条消息产生后,改消息被订阅了其Topic的Consumer Group收到,之后被这个Consumer Group中的一个Consumer消费;
如果所有的Consumer都在同一个Consumer Group中,那么这就和传统的队列形式的消息系统一样了;
如果每一个Consumer都在一个不同的Consumer Group中,那么就和传统的发布-订阅的形式一样了;
Offset
消费者自己维护当前读取数据的offser,或者同步到zookeeper。mit.interval.ms 是consumer同步offset到zookeeper的时间间隔。这个值设置问题会影响到多线程consumer,重复读取的问题。
安装启动配置环境
安装
下载kafka_2.11-0.8.2.1,并在linux上解压
> tar -xzf kafka_2.11-0.8.2.1.tgz
> cd kafka_2.11-0.8.2.1/bin
可用的命令如下:
启动命令
Kafka需要用到zookeeper,所有首先需要启动zookeeper。
> ./zookeeper-server-start.sh ../config/zookeeper.properties &
然后启动kafka服务
> ./kafka-server-start.sh ../config/server.properties &
创建Topic
创建一个名字是”p2p”的topic,使用一个单独的partition和和一个replica
> ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic p2p
使用命令查看topic
> ./kafka-topics.sh --list --zookeeper localhost:2181
p2p
除了使用命令创建Topic外,可以让kafka自动创建,在客户端使用的时候,指定一个不存在的topic,kafka会自动给创建topic,自动创建将不能自定义partition和relica。
集群多broker
将上述的单节点kafka扩展为3个节点的集群。
从原始配置文件拷贝配置文件。
> cp ../config/server.properties ../config/server-1.properties
> cp ../config/server.properties ../config/server-2.properties
修改配置文件。
config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2
注意在集群中broker.id是唯一的。
现在在前面单一节点和zookeeper的基础上,再启动两个kafka节点。
> ./kafka-server-start.sh ../config/server-1.properties &
> ./kafka-server-start.sh ../config/server-2.properties &
创建一个新的topic,带三个ReplicationFactor
> ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic p2p-replicated-topic
查看刚刚创建的topic。
> ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic p2p-replicated-topic
partiton: partion id,由于此处只有一个partition,因此partition id 为0
leader:当前负责读写的lead broker id
relicas:当前partition的所有replication broker list
isr:relicas的子集,只包含出于活动状态的broker
Topic-Partition-Leader-ReplicationFactor 之间的关系样图
以上创建了三个节点的kafka集群,在集群上又用命令创建三个topic,分别是:
l replicated3-partitions3-topic:三份复制三个partition的topic
l replicated2-partitions3-topic:二份复制三个partition的topic
l test:1份复制,一个partition的topic
以我做测试创建的三个topic说明他们之间的关系。
>./kafka-topics.sh --describe --zookeeper localhost:2181 --topic replicated3-partitions3-topic
>./kafka-topics.sh --describe --zookeeper localhost:2181 --topic replicated2-partitions3-topic
>./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
以kafka当前的描述画出以下关系图:
从图上可以看到test没有备份,当broke Id 0 宕机后,虽然集群还有两个节点可以使用,但test这个topic却不能正常转发消息了。所以为了系统的可靠性,创建的replicas尽量的多,但却不能超过broker的数量。
客户端使用API
Producer API
从0.8.2版本开始,apache提供了新的java版本的Producer的API。这个java版本在测试中表现比之前的scala客户端性能要好。Pom获取java客户端:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.1</version>
</dependency>
Example
Consumer API
Kafka 0.8.2.1版本已经放出了java版的consumer,看javadoc文档和代码不太匹配,也没有样例来说明java版的consumer的使用样例,这里还是用scala版的consumer API来使用。
Kafka提供了两套API给Consumer:
The high-level Consumer API:高度抽象的Consumer API,封装了很多consumer需要的高级功能,使用起来简单、方便
The SimpleConsumer API:只有最基本的链接、读取功能,可以自己去读offset,并指定offset的读取方式。适合于各种自定义
High Level
class Consumer{
/**
* Create a ConsumerConnector:创建consumer connector
*
* @param config at the minimum, need to specify the groupid of the consumer and the zookeeper connection string zookeeper.connect.config参数作用:需要置顶consumer的groupid以及zookeeper连接字符串zookeeper.connect
*/
public static kafka.javaapi.consumer.ConsumerConnector createJavaConsumerConnector(ConsumerConfig config);
}
/**
* V: type of the message: 消息类型
* K: type of the optional key assciated with the message: 消息携带的可选关键字类型
*/
public interface kafka.javaapi.consumer.ConsumerConnector {
/**
* Create a list of message streams of type T for each topic.:为每个topic创建T类型的消息流的列表
*
* @param topicCountMap a map of (topic, #streams) pair : topic与streams的键值对
* @param decoder a decoder that converts from Message to T : 转换Message到T的解码器
* @return a map of (topic, list of KafakStream) pairs. : topic与KafkaStream列表的键值对
* The number of items in the list is #streams . Each stream supports
* an iterator over message/metadata pairs .:列表中项目的数量是#streams。每个stream都支持基于message/metadata 对的迭代器
*/
public <K,V> Map<String, List<KafkaStream<K,V> > >
createMessageStreams( Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
/**
* Create a list of message streams of type T for each topic, using the default decoder.为每个topic创建T类型的消息列表。使用默认解码器
*/
public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap);
/**
* Create a list of message streams for topics matching a wildcard.为匹配wildcard的topics创建消息流的列表
*
* @param topicFilter a TopicFilter that specifies which topics to
* subscribe to (encapsulates a whitelist or a blacklist).指定将要订阅的topics的TopicFilter(封装了whitelist或者黑名单)
* @param numStreams the number of message streams to return.将要返回的流的数量
* @param keyDecoder a decoder that decodes the message key 可以解码关键字key的解码器
* @param valueDecoder a decoder that decodes the message itself 可以解码消息本身的解码器
* @return a list of KafkaStream. Each stream supports an
* iterator over its MessageAndMetadata elements. 返回KafkaStream的列表。每个流都支持基于MessagesAndMetadata 元素的迭代器。
*/
public <K,V> List<KafkaStream<K,V>>
createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
/**
* Create a list of message streams for topics matching a wildcard, using the default decoder.使用默认解码器,为匹配wildcard的topics创建消息流列表
*/
public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);
/**
* Create a list of message streams for topics matching a wildcard, using the default decoder, with one stream.使用默认解码器,为匹配wildcard的topics创建消息流列表
*/
public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter);
/**
* Commit the offsets of all topic/partitions connected by this connector.通过connector提交所有topic/partitions的offsets
*/
public void commitOffsets();
/**
* Shut down the connector: 关闭connector
*/
public void shutdown();
}
对大多数应用来说, high level已经足够了,一些应用要求的一些特征还没有出现high level consumer接口(例如,当重启consumer时,设置初始offset)。他们可以使用Simple Api。逻辑可能会有些复杂。
Simple
使用Simple有以下缺点:
· 必须在程序中跟踪offset值
· 必须找出指定Topic Partition中的lead broker
· 必须处理broker的变动
class kafka.javaapi.consumer.SimpleConsumer {
/**
* Fetch a set of messages from a topic.从topis抓取消息序列
*
* @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.指定topic 名字,topic partition,开始的字节offset,抓取的最大字节数
* @return a set of fetched messages
*/
public FetchResponse fetch(kafka.javaapi.FetchRequest request);
/**
* Fetch metadata for a sequence of topics.抓取一系列topics的metadata
*
* @param request specifies the versionId, clientId, sequence of topics.指定versionId,clientId,topics
* @return metadata for each topic in the request.返回此要求中每个topic的元素据
*/
public kafka.javaapi.TopicMetadataResponse send(kafka.javaapi.TopicMetadataRequest request);
/**
* Get a list of valid offsets (up to maxSize) before the given time.在给定的时间内返回正确偏移的列表
*
* @param request a [[kafka.javaapi.OffsetRequest]] object.
* @return a [[kafka.javaapi.OffsetResponse]] object.
*/
public kafak.javaapi.OffsetResponse getOffsetsBefore(OffsetRequest request);
/**
* Close the SimpleConsumer.关闭
*/
public void close();
}
配置
Broker Config
核心关键配置:broker.id、log.dirs、zookeeper.connect
参数
默认值
说明(解释)
broker.id
每一个broker在集群中的唯一表示,非负正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况
log.dirs
/tmp/kafka-logs
kafka数据的存放地址,多个地址的话用逗号分割/data/kafka-logs-1,/data/kafka-logs-2
port
9092
broker server服务端口
message.max.bytes
1000000
表示消息体的最大大小,单位是字节
work.threads
3
broker处理消息的最大线程数,一般情况下不需要去修改
num.io.threads
8
broker处理磁盘IO的线程数,数值应该大于你的硬盘数
background.threads
10
一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改
queued.max.requests
500
等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,应该是一种自我保护机制。
host.name
null
broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置
advertised.host.name
null
If this is set this is the hostname that will be given out to producers, consumers, and other brokers to connect to.
advertised.port
null
The port to give out to producers, consumers, and other brokers to use in establishing connections. This only needs to be set if this port is different from the port the server should bind to.
socket.send.buffer.bytes
100 * 1024
socket的发送缓冲区,socket的调优参数SO_SNDBUFF
socket.receive.buffer.bytes
100 * 1024
socket的接受缓冲区,socket的调优参数SO_RCVBUFF
socket.request.max.bytes
100 * 1024 * 1024
socket请求的最大数值,防止内存溢出,必须小于Java heap size.
log.segment.bytes
1024 * 1024 * 1024
topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
log.roll.hours
24 * 7 hours
这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment会被 topic创建时的指定参数覆盖
log.cleanup.policy
delete
日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
log.retention.minutes
7 days
数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据
log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
log.retention.bytes=-1
-1
topic每个分区的最大文件大小,一个topic的大小限制 =分区数*log.retention.bytes。-1没有大小限log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
log.retention.check.interval.ms
5 minutes
文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略
log.cleaner.enable
false
是否开启日志压缩
log.cleaner.threads
1
日志压缩运行的线程数
log.cleaner.io.max.bytes.per.second
Double.MaxValue
日志压缩时候处理的最大大小
log.cleaner.dedupe.buffer.size
500*1024*1024
日志压缩去重时候的缓存空间,在空间允许的情况下,越大越好
log.cleaner.io.buffer.size
512*1024
日志清理时候用到的IO块大小一般不需要修改
log.cleaner.io.buffer.load.factor
0.9
日志清理中hash表的扩大因子一般不需要修改
log.cleaner.backoff.ms
15000
检查是否清理日志清理的间隔
log.cleaner.min.cleanable.ratio
0.5
日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被topic创建时的指定参数覆盖
log.cleaner.delete.retention.ms
1 day
对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖
log.index.size.max.bytes
10 * 1024 * 1024
对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖
log.index.interval.bytes
4096
当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数
log.flush.interval.messages
Long.MaxValue
log文件”sync”到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个”数据可靠性"的必要手段,所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞),如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.物理server故障,将会导致没有fsync的消息丢失.
log.flush.scheduler.interval.ms
Long.MaxValue
检查是否需要固化到硬盘的时间间隔
log.flush.interval.ms = None
Long.MaxValue
仅仅通过interval来控制消息的磁盘写入时机,是不足的.此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔达到阀值,也将触发.
log.delete.delay.ms
60000
文件在索引中清除后保留的时间一般不需要去修改
log.flush.offset.checkpoint.interval.ms
60000
控制上次固化硬盘的时间点,以便于数据恢复一般不需要去修改
log.segment.delete.delay.ms
60000
the amount of time to wait before deleting a file from the filesystem.
auto.create.topics.enable
true
是否允许自动创建topic,若是false,就需要通过命令创建topic
default.replication.factor
1
自动创建的topic默认 replication factor
num.partitions
1
每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖
以下是kafka中Leader,replicas配置参数
controller.socket.timeout.ms
30000
partition leader与replicas之间通讯时,socket的超时时间
controller.message.queue.size
Int.MaxValue
partition leader与replicas数据同步时,消息的队列尺寸
replica.lag.time.max.ms
10000
replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),并认为它是死的,不会再加入管理中
replica.lag.max.messages
4000
如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效
##通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后
##如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移
##到其他follower中.
##在broker数量较少,或者网络不足的环境中,建议提高此值.
replica.socket.timeout.ms
30 * 1000
follower与leader之间的socket超时时间
replica.socket.receive.buffer.bytes
64 * 1024
leader复制时候的socket缓存大小
replica.fetch.max.bytes
1024*1024
replicas每次获取数据的最大大小
replica.fetch.wait.max.ms
500
replicas同leader之间通信的最大等待时间,失败了会重试
replica.fetch.min.bytes
1
fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件
num.replica.fetchers
1
leader进行复制的线程数,增大这个数值会增加follower的IO
replica.high.watermark.checkpoint.interval.ms
5000
每个replica检查是否将最高水位进行固化的频率
fetch.purgatory.purge.interval.requests
1000
The purge interval (in number of requests) of the fetch request purgatory.
producer.purgatory.purge.interval.requests
6000
The purge int
展开阅读全文