1、大数据组件-KAFKA 技术分享 1. KAFKA 介绍 1 1.1 背景 1 1.2 组件 2 1.3 特性 3 2. 设计思想理念 3 3. 配置集群 5 4. 开发应用 7 5. 性能优化 10 6. 监控 11 1.下载Kafka Web Console 11 2.安装sbt 11 3.配置Kafka Web Console 11 4.配置mysql的jdbc驱动 11 5.执行sql语句(如下绿色选框所示) 12 6.编译 12 7.运行 12 8.浏览访问 13 7. 常见问题摘要 13 8. 参数设置表 14 9. 待续 23
2、 1. KAFKA 介绍 Kafka是由LinkedIn开发的一个分布式的消息系统,使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如Cloudera、Apache Storm、Spark都支持与Kafka集成。 1.1 背景 当今社会各种应用系统诸如商业、社交、搜索、浏览等像信息工厂一样不断的生产出各种信息,在大数据时代,我们面临如下几个挑战: 如何收集这些巨大的信息 如何分析它 如何及时做到如上两点 以上几个挑战形成了一个业务需求模型,即生产者生产(produce)各种信息,消费者消费(con
3、sume)(处理分析)这些信息,而在生产者与消费者之间,需要一个沟通两者的桥梁-消息系统。 从一个微观层面来说,这种需求也可理解为不同的系统之间如何传递消息。 1.2 组件 Topic:12 消息存放的目录即主题。消息发送时都被发送到一个topic,其本质就是一个目录,而topic是由一些Partition Logs(分区日志)组成 Producer: 消息生产者,就是向kafka broker发消息的客户端。Producer采用异步push方式,极大提高Kafka系统的吞吐率(可以通过参数控制是采用同步还是异步方式)。producer端,
4、可以将消息buffer起来,当消息的条数达到一定阀值时,批量发送给broker。 小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率。不过这也有一定的隐患,比如说当producer失效时,那些尚未发送的消息将会丢失。 producer将会和Topic下所有partition leader保持socket连接;消息由producer直接通过socket发送到broker,中间不会经过任何"路由层".事实上,消息被路由到哪个partition上,由producer客户端决定。partition leader的位置(host:port)注册在zookeeper中,produ
5、cer作为zookeeper client,已经注册了watch用来监听partition leader的变更事件。 Consumer:2 1 消息消费者,向kafka broker取消息的客户端 . consumer端向broker发送"fetch"请求,并告知其获取消息的offset;此后consumer将会获得一定条数的消息;consumer端也可以重置offset来重新消费消息。 每个consumer属于一个consumer group;反过来说,每个group中可以有多个consumer.发送到Topic的消息,只会被订阅此Topic的每个group中的一个consumer
6、消费.如果所有的consumer都具有相同的group,这种情况和queue模式很像;消息将会在consumers之间负载均衡.如果所有的consumer都具有不同的group,那这就是"发布-订阅";消息将会广播给所有的消费者. kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息.同样,Consumer可以批量fetch多条消息。消息量的大小可以通过配置文件来指定. 在kafka中,一个partition中的消息只会被group中的一个consumer消费;每个grou
7、p中consumer消息消费互相独立;我们可以认为一个group是一个"订阅"者,一个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer可以消费多个partitions中的消息.kafka只能保证一个partition中的消息被某个consumer消费时,消息是顺序的.事实上,从Topic角度来说,消息仍不是有序的. Message: 一个消息单位 Broker: 一台kafka服务器就是一个broker。一个集群由多个broker组成。 Consumer Group (CG):q 这是kafka用来实现一个t
8、opic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。 Partition: 为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序 分区机制partition:Kafka的broker端支持消息分区,Producer可以决定把消息发
9、到哪个分区,在一个分区中消息的顺序就是Producer发送消息的顺序,一个主题中可以有多个分区,具体分区的数量是可配置的。 一个Topic的多个partitions,被分布在kafka集群中的多个server上;每个server(kafka实例)负责partitions中消息的读写操作;此外kafka还可以配置partitions需要备份的个数(replicas),每个partition将会被备份到多台机器上,以提高可用性. replicated 基于replicated方案,那么就意味着需要对多个备份进行调度;每个partition都有一个server为"leader";leader负
10、责所有的读写操作,如果leader失效,那么将会有其他follower来接管(成为新的leader);follower只是单调的和leader跟进,同步消息即可..由此可见作为leader的server承载了全部的请求压力,因此从集群的整体考虑,有多少个partitions就意味着有多少个"leader",kafka会将"leader"均衡的分散在每个实例上,来确保整体的性能稳定. Offset: kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first
11、offset就是00000000000.kafka 具体流程: 1. Producer根据指定的partition方法(round-robin、hash等),将消息发布到指定topic的partition里面 2. kafka集群接收到Producer发过来的消息后,将其持久化到硬盘,并保留消息指定时长(可配置),而不关注消息是否被消费。 3. Consumer从kafka集群pull数据,并控制获取消息的offset 1.3 特性 A. 高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。Kafka的设计初衷便是要能处理TB级的数据,其更强调的是吞
12、吐率。 B. 持久化:通过将数据持久化到硬盘以及replication防止数据丢失。 C. 系统扩展性: kafka使用zookeeper来实现动态的集群扩展,broker会在zookeeper注册并保持相关的元数据(topic,partition信息等)更新。 D. 客户端负载均衡: 所有 broker 节点和生产者、消费者注册到 Zookeeper 。节点变化触发相关客户端的平衡操作。节点都尝试自平衡直到达成一致。 E. 服务器负载均衡:每个服务器充当它自身分区的leader并且充当其他服务器的分区的follower 2. 设计思想理念 消息状态:在Kafka
13、中,消息的状态被保存在consumer中,broker不会关心哪个消息被消费了被谁消费了,只记录一个offset值(指向partition中下一个要被消费的消息位置),这就意味着如果consumer处理不好的话,broker上的一个消息可能会被消费多次。 消息持久化:Kafka中会把消息持久化到本地文件系统中,并且保持极高的效率。 消息有效期:Kafka会长久保留其中的消息,以便consumer可以多次消费,当然其中很多细节是可配置的。 批量发送:Kafka支持以消息集合为单位进行批量发送,以提高push效率。 push-and-pull:Kafka中的Producer和consum
14、er采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管从broker pull消息,两者对消息的生产和消费是异步的。 Kafka集群中broker之间的关系:不是主从关系,各个broker在集群中地位一样,我们可以随意的增加或删除任何一个broker节点。 负载均衡方面:Kafka提供了一个 metadata API来管理broker之间的负载(对Kafka0.8.x而言,对于0.7.x主要靠zookeeper来实现负载均衡)。 复制备份:kafka将每个partition数据复制到多个server上,任何一个partitio
15、n有一个leader和多个follower(可以没有);备份的个数可以通过broker配置文件来设定.leader处理所有的read-write请求,follower需要和leader保持同步.Follower和consumer一样,消费消息并保存在本地日志中;leader负责跟踪所有的follower状态,如果follower"落后"太多或者失效,leader将会把它从replicas同步列表中删除.当所有的follower都将一条消息保存成功,此消息才被认为是"committed",那么此时consumer才能消费它.即使只有一个replicas实例存活,仍然可以保证消息的正常发送和接收,
16、只要zookeeper集群存活即可.(不同于其他分布式存储,比如hbase需要"多数派"存活才行) 当leader失效时,需在followers中选取出新的leader,可能此时follower落后于leader,因此需要选择一个"up-to-date"的follower.选择follower时需要兼顾一个问题,就是新leaderserver上所已经承载的partition leader的个数,如果一个server上有过多的partition leader,意味着此server将承受着更多的IO压力.在选举新leader,需要考虑到"负载均衡". 3. 数据结构: 我们以几张图来总结
17、一下Message是如何在Kafka中存储的,以及如何查找指定offset的Message的。 Message是按照topic来组织,每个topic可以分成多个的partition,比如:有5个partition的名为为page_visits的topic的目录结构为: partition是分段的,每个段叫LogSegment,包括了一个数据文件和一个索引文件,下图是某个partition目录下的文件: 可以看到,这个partition有4个LogSegment。 借用博主@lizhitao博客上的一张图来展示是如何查找Message的。 比如:要查找绝对offset为7的
18、Message: 1 首先是用二分查找确定它是在哪个LogSegment中,自然是在第一个Segment中。 2 打开这个Segment的index文件,也是用二分查找找到offset小于或者等于指定offset的索引条目中最大的那个offset。自然offset为6的那个索引是我们要找的,通过索引文件我们知道offset为6的Message在数据文件中的位置为9807。 3 打开数据文件,从位置为9807的那个地方开始顺序扫描直到找到offset为7的那条Message。 这套机制是建立在offset是有序的。索引文件被映射到内存中,所以查找的速度还是很快的。 一句话,Kafka的
19、Message存储采用了分区(partition),分段(LogSegment)和稀疏索引这几个手段来达到了高效性。 4. 配置集群 A. 下载KAFKA (这里滤过了集群zookeeper的部署) http://kafka.apache.org/downloads.html KAFKA 目录如下 目录 说明 bin 操作kafka的可执行脚本,还包含windows下脚本 config 配置文件所在目录 libs 依赖库目录 logs 日志数据目录,目录kafka把server端日志分为5种类型,分为:server,request,state,log
20、cleaner,controller B. 解压 # tar zxvf kafka_2.10-0.8.1.tgz -kafka C. 进入kafka目录,配置conf/server.properties 文件,,其他默认不变 broker.id=3 (每台机器有自己的ID) host.name=192.168.11.160(每台机器有自己的IP) work.threads=15(处理网络请求线程数) num.io.threads=15(磁盘IO线程数) log.dirs=/opt/kafka/logs(kafka 存放目录的路径) num.partit
21、ions=3(topic分区的数量设置) zookeeper.connect=192.168.11.160:2181,192.168.11.161:2181,192.168.11.207:2181(Kafka级联) 具体详见 文档末尾的参数设置表 D. kafka的启动:(启动文件是kafka-server-start.sh,所有的启动参数放在server.properties文件中) # /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties & E. 创建topic命令
22、/opt/kafka/bin/kafka-topics.sh --create --topic topic_js --replication-factor 3 --partitions 3 --zookeeper 192.168.11.161:2181; 192.168.11.162:2181; 192.168.11.163:2181 F. 查看topic 命令 /opt/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.16.9:2181 G. 测试客户端消费,终端命令 /opt/kafka/bin/kafka-
23、console-consumer.sh --zookeeper 192.168.11.162:2181 --from-beginning --topic topic_js H. Kafka topic的删除命令: /opt/kafka/bin/kafka-topics.sh --zookeeper 192.168.11.160:2181 --delete --topic topic_a 5. 开发应用 A. 开发一个Producer应用 public class SimpleProducerThread extends Thread { String brokerIp
24、 = null; String topic = null; public SimpleProducerThread(String brokerIp, String topic) { this.brokerIp = brokerIp; this.topic = topic; } @Override public void run() { super.run(); Properties props = new Properties(); // 设置zookeeperip以及端口,传递topic以及partition信息 // prop
25、s.put("zk.connect", this.brokerIp + ":2181"); props.put("serializer.class", "kafka.serializer.StringEncoder"); // 消息确认 props.put("request.required.acks", "1"); // 异步发送 props.put("producer.type", "async"); // 每次发送多少条 props.put("batch.num.messages", "100"); // 设置指定分区 // prop
26、s.put("partitioner.class",
// "com.cmcc.dmp.fetch.filemonitor.consumer.RunPartition");
// #broker用于接收producer消息的hostname以及监听端口
props.put("metadata.broker.list", this.brokerIp + ":9092");
ProducerConfig config = new ProducerConfig(props);
Producer
27、cer
28、 producer.send(new KeyedMessage
29、 // Kafka的消费者链接,连接到不同的zookeeper上消费信息 private final ConsumerConnector consumer; private final String topic; private String hostIp; private static int lineNum = 0; public ConsumerThread(String topic, String hostIp) { this.topic = topic; this.hostIp = hostIp; this.consumer =
30、kafka.consumer.Consumer .createJavaConsumerConnector(createConsumerConfig()); } /** * * @author lining * * @描述 创建消费这配置参数 * * @return */ private ConsumerConfig createConsumerConfig() { Properties props = new Properties(); // 设置zookeeper的链接地址,监听来自这方面
31、的信息。即发送到这三个zookeeper上的topic都能被这个consumer消费 props.put("zookeeper.connect", this.hostIp + ":2181"); // props.put("zookeeper.connect", "192.168.11.210:2181,192.168.11.160:2181,192.168.11.161:2181"); // 设置group id props.put("group.id", "test_group"); // kafka的group 消费记录是保存在zookeeper上的,
32、 // 但这个信息在zookeeper上不是实时更新的, 需要有个间隔时间更新
// 此参数就是配置消息被读取之后标记为已消费的关键字
props.put("mit.interval.ms", "1000");
props.put("zookeeper.session.timeout.ms", "100000");
return new ConsumerConfig(props);
}
public void run()
{
// 设置Topic=>Thread Num映射关系, 构建具体的流
Map 33、ger> topickMap = new HashMap 34、eam.iterator();
System.out.println("*********Results1********");
while (it.hasNext())
{
System.out.println("ThreadId:" + Thread.currentThread().getId() +":"
+"readLines:" + (lineNum++) + "----"+ new String(it.next().message()));
}
}
}
6. 性能优化
配置文件优化:
1.网络和io操作线程配置优化
35、
# broker处理消息的最大线程数
work.threads=xxx
# broker处理磁盘IO的线程数
num.io.threads=xxx
建议配置:
一般work.threads主要处理网络io,读写缓冲区数据,基本没有io等待,配置线程数量为cpu核数加1.
num.io.threads主要进行磁盘io操作,高峰期可能有些io等待,因此配置需要大些。配置线程数量为cpu核数2倍,最大不超过3倍.
2.log数据文件刷新策略
为了大幅度提高producer写入吞吐量,需要定期批量写文件。
建议配置:
# 每当producer写入10000条消息时,刷数据到磁 36、盘
log.flush.interval.messages=10000
# 每间隔1秒钟时间,刷数据到磁盘
log.flush.interval.ms=1000
3.日志保留策略配置
当kafka server的被写入海量消息后,会生成很多数据文件,且占用大量磁盘空间,如果不及时清理,可能磁盘空间不够用,kafka默认是保留7天。
建议配置:
# 保留三天,也可以更短
log.retention.hours=72
# 段文件配置1GB,有利于快速回收磁盘空间,重启kafka加载也会加快(如果文件过小,则文件数量比较多,
# kafka启动时是单线程扫描目录(log.d 37、ir)下所有数据文件)
log.segment.bytes=1073741824
7. 监控
1.下载Kafka Web Console
2.安装sbt
a. centos : yum install sbt
b. ubuntu : apt-get install sbt
3.配置Kafka Web Console
a.增加数据库依赖包(mysql),解压kafka-web-console.tar.gz,进入目录cd kafka-web-console
编辑文件vim build.sbt
增加mysql配置:
......
libraryD 38、ependencies ++= Seq(
jdbc,
cache,
"org.squeryl" % "squeryl_2.10" % "0.9.5-6",
"com.twitter" % "util-zk_2.10" % "6.11.0",
"com.twitter" % "finagle-core_2.10" % "6.15.0",
"org.apache.kafka" % "kafka_2.10" % "0.8.1",
"org.quartz-scheduler" % "quartz" % "2.2.1",
"mysql" % "mysql- 39、connector-java" % "5.1.9"
exclude("javax.jms", "jms")
exclude("com.sun.jdmk", "jmxtools")
exclude("com.sun.jmx", "jmxri")
)
.......
4.配置mysql的jdbc驱动
vim application.conf
增加代码如下:
[plain] view plaincopy
1. .......
2. db.default.driver=com.mysql.jdbc.Driver
3. db.default.u 40、rl="jdbc:mysql://192.168.2.105:3306/mafka?useUnicode=true&characterEncoding=UTF8&connectTimeout=5000&socketTimeout=10000"
4. db.default.user=xxx
5. db.default.password=xxx
6. .......
5.执行sql语句(如下绿色选框所示)
6.编译
lizhitao@localhost:~$ sbt package
打包编译时会从官网上下载很多jar,由于网络原因,所以很慢,需要耐心等待。
注意 41、下载的jar是隐藏的,在cd ~/.ivy2 目录(相应子目录)下可以看到所有jar.
ivy2所有jar包百度云下载
ivy2所有jar包下载
7.运行
lizhitao@localhost:~$ sbt run
8.浏览访问
访问地址: http://ip:9000/
8. 常见问题摘要
A. LeaderNotAvailableException
1. 其中该分区所在的broker挂了,如果是多副本,该分区所在broker恰好为leader
B. connection reset by peer
检查防火墙和kafka 配 42、置文件中的 hostname 设置成ip
C.
9. 参数设置表
Property
Default
Description
broker.id
每个broker都可以用一个唯一的非负整数id进行标识;这个id可以作为broker的“名字”,并且它的存在使得broker无须混淆consumers就可以迁移到不同的host/port上。你可以选择任意你喜欢的数字作为id,只要id是唯一的即可。
log.dirs
/tmp/kafka-logs
kafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新parti 43、tion时,都会选择在包含最少partitions的路径下进行。
port
6667
server接受客户端连接的端口
zookeeper.connect
null
ZooKeeper连接字符串的格式为:hostname:port,此处hostname和port分别是ZooKeeper集群中某个节点的host和port;为了当某个host宕掉之后你能通过其他ZooKeeper节点进行连接,你可以按照一下方式制定多个hosts:
hostname1:port1, hostname2:port2, hostname3:port3.
ZooKeeper 允许你增加一个“chroo 44、t”路径,将集群中所有kafka数据存放在特定的路径下。当多个Kafka集群或者其他应用使用相同ZooKeeper集群时,可以使用这个方式设置数据存放路径。这种方式的实现可以通过这样设置连接字符串格式,如下所示:
hostname1:port1,hostname2:port2,hostname3:port3/chroot/path
这样设置就将所有kafka集群数据存放在/chroot/path路径下。注意,在你启动broker之前,你必须创建这个路径,并且consumers必须使用相同的连接格式。
message.max.bytes
1000000
server可以接收的消息最大尺 45、寸。重要的是,consumer和producer有关这个属性的设置必须同步,否则producer发布的消息对consumer来说太大。
work.threads
3
server用来处理网络请求的网络线程数目;一般你不需要更改这个属性。
num.io.threads
8
server用来处理请求的I/O线程的数目;这个线程数目至少要等于硬盘的个数。
background.threads
4
用于后台处理的线程数目,例如文件删除;你不需要更改这个属性。
queued.max.requests
500
在网络线程停止读取新请求之前,可以排队等待I/O线程处理的最大请求个数。 46、
host.name
null
broker的hostname;如果hostname已经设置的话,broker将只会绑定到这个地址上;如果没有设置,它将绑定到所有接口,并发布一份到ZK
advertised.host.name
null
如果设置,则就作为broker 的hostname发往producer、consumers以及其他brokers
advertised.port
null
此端口将给与producers、consumers、以及其他brokers,它会在建立连接时用到; 它仅在实际端口和server需要绑定的端口不一样时才需要设置。
socket.send 47、buffer.bytes
100 * 1024
SO_SNDBUFF 缓存大小,server进行socket 连接所用
socket.receive.buffer.bytes
100 * 1024
SO_RCVBUFF缓存大小,server进行socket连接时所用
socket.request.max.bytes
100 * 1024 * 1024
server允许的最大请求尺寸; 这将避免server溢出,它应该小于Java heap size
num.partitions
1
如果创建topic时没有给出划分partitions个数,这个数字将是topic下p 48、artitions数目的默认数值。
log.segment.bytes
1014*1024*1024
topic partition的日志存放在某个目录下诸多文件中,这些文件将partition的日志切分成一段一段的;这个属性就是每个文件的最大尺寸;当尺寸达到这个数值时,就会创建新文件。此设置可以由每个topic基础设置时进行覆盖。
查看 the per-topic configuration section
log.roll.hours
24 * 7
即使文件没有到达log.segment.bytes,只要文件创建时间到达此属性,就会创建新文件。这个设置也可以有topic 49、层面的设置进行覆盖;
查看the per-topic configuration section
log.cleanup.policy
delete
log.retention.minutes和log.retention.hours
7 days
每个日志文件删除之前保存的时间。默认数据保存时间对所有topic都一样。
log.retention.minutes 和 log.retention.bytes 都是用来设置删除日志文件的,无论哪个属性已经溢出。
这个属性设置可以在topic基本设置时进行覆盖。
查看the per-topic configuration section
log.retention.bytes
-1
每个topic下每个partition保存数据的总量;注意,这是每个partitions的上限,因此这个数值乘以partitions的个数就是每个topic保存的数据总量。同时注意:如果log.retention.hours和log.retention.bytes都设置了,则超过了任何一个限制都会造成删除一个段文件。
注意,






