ImageVerifierCode 换一换
格式:DOC , 页数:26 ,大小:831.50KB ,
资源ID:11224175      下载积分:10 金币
快捷注册下载
登录下载
邮箱/手机:
温馨提示:
快捷下载时,用户名和密码都是您填写的邮箱或者手机号,方便查询和重复下载(系统自动生成)。 如填写123,账号就是123,密码也是123。
特别说明:
请自助下载,系统不会自动发送文件的哦; 如果您已付费,想二次下载,请登录后访问:我的下载记录
支付方式: 支付宝    微信支付   
验证码:   换一换

开通VIP
 

温馨提示:由于个人手机设置不同,如果发现不能下载,请复制以下地址【https://www.zixin.com.cn/docdown/11224175.html】到电脑端继续下载(重复下载【60天内】不扣币)。

已注册用户请登录:
账号:
密码:
验证码:   换一换
  忘记密码?
三方登录: 微信登录   QQ登录  

开通VIP折扣优惠下载文档

            查看会员权益                  [ 下载后找不到文档?]

填表反馈(24小时):  下载求助     关注领币    退款申请

开具发票请登录PC端进行申请

   平台协调中心        【在线客服】        免费申请共赢上传

权利声明

1、咨信平台为文档C2C交易模式,即用户上传的文档直接被用户下载,收益归上传人(含作者)所有;本站仅是提供信息存储空间和展示预览,仅对用户上传内容的表现方式做保护处理,对上载内容不做任何修改或编辑。所展示的作品文档包括内容和图片全部来源于网络用户和作者上传投稿,我们不确定上传用户享有完全著作权,根据《信息网络传播权保护条例》,如果侵犯了您的版权、权益或隐私,请联系我们,核实后会尽快下架及时删除,并可随时和客服了解处理情况,尊重保护知识产权我们共同努力。
2、文档的总页数、文档格式和文档大小以系统显示为准(内容中显示的页数不一定正确),网站客服只以系统显示的页数、文件格式、文档大小作为仲裁依据,个别因单元格分列造成显示页码不一将协商解决,平台无法对文档的真实性、完整性、权威性、准确性、专业性及其观点立场做任何保证或承诺,下载前须认真查看,确认无误后再购买,务必慎重购买;若有违法违纪将进行移交司法处理,若涉侵权平台将进行基本处罚并下架。
3、本站所有内容均由用户上传,付费前请自行鉴别,如您付费,意味着您已接受本站规则且自行承担风险,本站不进行额外附加服务,虚拟产品一经售出概不退款(未进行购买下载可退充值款),文档一经付费(服务费)、不意味着购买了该文档的版权,仅供个人/单位学习、研究之用,不得用于商业用途,未经授权,严禁复制、发行、汇编、翻译或者网络传播等,侵权必究。
4、如你看到网页展示的文档有www.zixin.com.cn水印,是因预览和防盗链等技术需要对页面进行转换压缩成图而已,我们并不对上传的文档进行任何编辑或修改,文档下载后都不会有水印标识(原文档上传前个别存留的除外),下载后原文更清晰;试题试卷类文档,如果标题没有明确说明有答案则都视为没有答案,请知晓;PPT和DOC文档可被视为“模板”,允许上传人保留章节、目录结构的情况下删减部份的内容;PDF文档不管是原文档转换或图片扫描而得,本站不作要求视为允许,下载前可先查看【教您几个在下载文档中可以更好的避免被坑】。
5、本文档所展示的图片、画像、字体、音乐的版权可能需版权方额外授权,请谨慎使用;网站提供的党政主题相关内容(国旗、国徽、党徽--等)目的在于配合国家政策宣传,仅限个人学习分享使用,禁止用于任何广告和商用目的。
6、文档遇到问题,请及时联系平台进行协调解决,联系【微信客服】、【QQ客服】,若有其他问题请点击或扫码反馈【服务填表】;文档侵犯商业秘密、侵犯著作权、侵犯人身权等,请点击“【版权申诉】”,意见反馈和侵权处理邮箱:1219186828@qq.com;也可以拔打客服电话:0574-28810668;投诉电话:18658249818。

注意事项

本文(kafka-技术分享.doc)为本站上传会员【仙人****88】主动上传,咨信网仅是提供信息存储空间和展示预览,仅对用户上传内容的表现方式做保护处理,对上载内容不做任何修改或编辑。 若此文所含内容侵犯了您的版权或隐私,请立即通知咨信网(发送邮件至1219186828@qq.com、拔打电话4009-655-100或【 微信客服】、【 QQ客服】),核实后会尽快下架及时删除,并可随时和客服了解处理情况,尊重保护知识产权我们共同努力。
温馨提示:如果因为网速或其他原因下载失败请重新下载,重复下载【60天内】不扣币。 服务填表

kafka-技术分享.doc

1、大数据组件-KAFKA 技术分享 1. KAFKA 介绍 1 1.1 背景 1 1.2 组件 2 1.3 特性 3 2. 设计思想理念 3 3. 配置集群 5 4. 开发应用 7 5. 性能优化 10 6. 监控 11 1.下载Kafka Web Console 11 2.安装sbt 11 3.配置Kafka Web Console 11 4.配置mysql的jdbc驱动 11 5.执行sql语句(如下绿色选框所示) 12 6.编译 12 7.运行 12 8.浏览访问 13 7. 常见问题摘要 13 8. 参数设置表 14 9. 待续 23

2、 1. KAFKA 介绍 Kafka是由LinkedIn开发的一个分布式的消息系统,使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如Cloudera、Apache Storm、Spark都支持与Kafka集成。 1.1 背景      当今社会各种应用系统诸如商业、社交、搜索、浏览等像信息工厂一样不断的生产出各种信息,在大数据时代,我们面临如下几个挑战: 如何收集这些巨大的信息 如何分析它        如何及时做到如上两点      以上几个挑战形成了一个业务需求模型,即生产者生产(produce)各种信息,消费者消费(con

3、sume)(处理分析)这些信息,而在生产者与消费者之间,需要一个沟通两者的桥梁-消息系统。      从一个微观层面来说,这种需求也可理解为不同的系统之间如何传递消息。 1.2 组件 Topic:12 消息存放的目录即主题。消息发送时都被发送到一个topic,其本质就是一个目录,而topic是由一些Partition Logs(分区日志)组成 Producer: 消息生产者,就是向kafka broker发消息的客户端。Producer采用异步push方式,极大提高Kafka系统的吞吐率(可以通过参数控制是采用同步还是异步方式)。producer端,

4、可以将消息buffer起来,当消息的条数达到一定阀值时,批量发送给broker。 小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率。不过这也有一定的隐患,比如说当producer失效时,那些尚未发送的消息将会丢失。 producer将会和Topic下所有partition leader保持socket连接;消息由producer直接通过socket发送到broker,中间不会经过任何"路由层".事实上,消息被路由到哪个partition上,由producer客户端决定。partition leader的位置(host:port)注册在zookeeper中,produ

5、cer作为zookeeper client,已经注册了watch用来监听partition leader的变更事件。 Consumer:2 1 消息消费者,向kafka broker取消息的客户端 . consumer端向broker发送"fetch"请求,并告知其获取消息的offset;此后consumer将会获得一定条数的消息;consumer端也可以重置offset来重新消费消息。 每个consumer属于一个consumer group;反过来说,每个group中可以有多个consumer.发送到Topic的消息,只会被订阅此Topic的每个group中的一个consumer

6、消费.如果所有的consumer都具有相同的group,这种情况和queue模式很像;消息将会在consumers之间负载均衡.如果所有的consumer都具有不同的group,那这就是"发布-订阅";消息将会广播给所有的消费者. kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息.同样,Consumer可以批量fetch多条消息。消息量的大小可以通过配置文件来指定. 在kafka中,一个partition中的消息只会被group中的一个consumer消费;每个grou

7、p中consumer消息消费互相独立;我们可以认为一个group是一个"订阅"者,一个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer可以消费多个partitions中的消息.kafka只能保证一个partition中的消息被某个consumer消费时,消息是顺序的.事实上,从Topic角度来说,消息仍不是有序的. Message: 一个消息单位 Broker: 一台kafka服务器就是一个broker。一个集群由多个broker组成。 Consumer Group (CG):q 这是kafka用来实现一个t

8、opic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。 Partition: 为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序 分区机制partition:Kafka的broker端支持消息分区,Producer可以决定把消息发

9、到哪个分区,在一个分区中消息的顺序就是Producer发送消息的顺序,一个主题中可以有多个分区,具体分区的数量是可配置的。 一个Topic的多个partitions,被分布在kafka集群中的多个server上;每个server(kafka实例)负责partitions中消息的读写操作;此外kafka还可以配置partitions需要备份的个数(replicas),每个partition将会被备份到多台机器上,以提高可用性. replicated 基于replicated方案,那么就意味着需要对多个备份进行调度;每个partition都有一个server为"leader";leader负

10、责所有的读写操作,如果leader失效,那么将会有其他follower来接管(成为新的leader);follower只是单调的和leader跟进,同步消息即可..由此可见作为leader的server承载了全部的请求压力,因此从集群的整体考虑,有多少个partitions就意味着有多少个"leader",kafka会将"leader"均衡的分散在每个实例上,来确保整体的性能稳定. Offset: kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first

11、offset就是00000000000.kafka 具体流程: 1. Producer根据指定的partition方法(round-robin、hash等),将消息发布到指定topic的partition里面 2. kafka集群接收到Producer发过来的消息后,将其持久化到硬盘,并保留消息指定时长(可配置),而不关注消息是否被消费。 3. Consumer从kafka集群pull数据,并控制获取消息的offset 1.3 特性 A. 高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。Kafka的设计初衷便是要能处理TB级的数据,其更强调的是吞

12、吐率。 B. 持久化:通过将数据持久化到硬盘以及replication防止数据丢失。 C. 系统扩展性: kafka使用zookeeper来实现动态的集群扩展,broker会在zookeeper注册并保持相关的元数据(topic,partition信息等)更新。 D. 客户端负载均衡: 所有 broker 节点和生产者、消费者注册到 Zookeeper 。节点变化触发相关客户端的平衡操作。节点都尝试自平衡直到达成一致。 E. 服务器负载均衡:每个服务器充当它自身分区的leader并且充当其他服务器的分区的follower 2. 设计思想理念 消息状态:在Kafka

13、中,消息的状态被保存在consumer中,broker不会关心哪个消息被消费了被谁消费了,只记录一个offset值(指向partition中下一个要被消费的消息位置),这就意味着如果consumer处理不好的话,broker上的一个消息可能会被消费多次。 消息持久化:Kafka中会把消息持久化到本地文件系统中,并且保持极高的效率。 消息有效期:Kafka会长久保留其中的消息,以便consumer可以多次消费,当然其中很多细节是可配置的。 批量发送:Kafka支持以消息集合为单位进行批量发送,以提高push效率。 push-and-pull:Kafka中的Producer和consum

14、er采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管从broker pull消息,两者对消息的生产和消费是异步的。 Kafka集群中broker之间的关系:不是主从关系,各个broker在集群中地位一样,我们可以随意的增加或删除任何一个broker节点。 负载均衡方面:Kafka提供了一个 metadata API来管理broker之间的负载(对Kafka0.8.x而言,对于0.7.x主要靠zookeeper来实现负载均衡)。 复制备份:kafka将每个partition数据复制到多个server上,任何一个partitio

15、n有一个leader和多个follower(可以没有);备份的个数可以通过broker配置文件来设定.leader处理所有的read-write请求,follower需要和leader保持同步.Follower和consumer一样,消费消息并保存在本地日志中;leader负责跟踪所有的follower状态,如果follower"落后"太多或者失效,leader将会把它从replicas同步列表中删除.当所有的follower都将一条消息保存成功,此消息才被认为是"committed",那么此时consumer才能消费它.即使只有一个replicas实例存活,仍然可以保证消息的正常发送和接收,

16、只要zookeeper集群存活即可.(不同于其他分布式存储,比如hbase需要"多数派"存活才行) 当leader失效时,需在followers中选取出新的leader,可能此时follower落后于leader,因此需要选择一个"up-to-date"的follower.选择follower时需要兼顾一个问题,就是新leaderserver上所已经承载的partition leader的个数,如果一个server上有过多的partition leader,意味着此server将承受着更多的IO压力.在选举新leader,需要考虑到"负载均衡". 3. 数据结构: 我们以几张图来总结

17、一下Message是如何在Kafka中存储的,以及如何查找指定offset的Message的。 Message是按照topic来组织,每个topic可以分成多个的partition,比如:有5个partition的名为为page_visits的topic的目录结构为: partition是分段的,每个段叫LogSegment,包括了一个数据文件和一个索引文件,下图是某个partition目录下的文件: 可以看到,这个partition有4个LogSegment。 借用博主@lizhitao博客上的一张图来展示是如何查找Message的。 比如:要查找绝对offset为7的

18、Message: 1 首先是用二分查找确定它是在哪个LogSegment中,自然是在第一个Segment中。 2 打开这个Segment的index文件,也是用二分查找找到offset小于或者等于指定offset的索引条目中最大的那个offset。自然offset为6的那个索引是我们要找的,通过索引文件我们知道offset为6的Message在数据文件中的位置为9807。 3 打开数据文件,从位置为9807的那个地方开始顺序扫描直到找到offset为7的那条Message。 这套机制是建立在offset是有序的。索引文件被映射到内存中,所以查找的速度还是很快的。 一句话,Kafka的

19、Message存储采用了分区(partition),分段(LogSegment)和稀疏索引这几个手段来达到了高效性。 4. 配置集群 A. 下载KAFKA (这里滤过了集群zookeeper的部署) http://kafka.apache.org/downloads.html KAFKA 目录如下 目录 说明 bin 操作kafka的可执行脚本,还包含windows下脚本 config 配置文件所在目录 libs 依赖库目录 logs 日志数据目录,目录kafka把server端日志分为5种类型,分为:server,request,state,log

20、cleaner,controller B. 解压 # tar   zxvf    kafka_2.10-0.8.1.tgz  -kafka C. 进入kafka目录,配置conf/server.properties 文件,,其他默认不变 broker.id=3 (每台机器有自己的ID) host.name=192.168.11.160(每台机器有自己的IP) work.threads=15(处理网络请求线程数) num.io.threads=15(磁盘IO线程数) log.dirs=/opt/kafka/logs(kafka 存放目录的路径) num.partit

21、ions=3(topic分区的数量设置) zookeeper.connect=192.168.11.160:2181,192.168.11.161:2181,192.168.11.207:2181(Kafka级联) 具体详见 文档末尾的参数设置表 D. kafka的启动:(启动文件是kafka-server-start.sh,所有的启动参数放在server.properties文件中) # /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties & E. 创建topic命令

22、/opt/kafka/bin/kafka-topics.sh --create --topic topic_js --replication-factor 3 --partitions 3 --zookeeper 192.168.11.161:2181; 192.168.11.162:2181; 192.168.11.163:2181 F. 查看topic 命令 /opt/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.16.9:2181 G. 测试客户端消费,终端命令 /opt/kafka/bin/kafka-

23、console-consumer.sh --zookeeper 192.168.11.162:2181 --from-beginning --topic topic_js H. Kafka topic的删除命令: /opt/kafka/bin/kafka-topics.sh --zookeeper 192.168.11.160:2181 --delete --topic topic_a 5. 开发应用 A. 开发一个Producer应用 public class SimpleProducerThread extends Thread { String brokerIp

24、 = null; String topic = null; public SimpleProducerThread(String brokerIp, String topic) { this.brokerIp = brokerIp; this.topic = topic; } @Override public void run() { super.run(); Properties props = new Properties(); // 设置zookeeperip以及端口,传递topic以及partition信息 // prop

25、s.put("zk.connect", this.brokerIp + ":2181"); props.put("serializer.class", "kafka.serializer.StringEncoder"); // 消息确认 props.put("request.required.acks", "1"); // 异步发送 props.put("producer.type", "async"); // 每次发送多少条 props.put("batch.num.messages", "100"); // 设置指定分区 // prop

26、s.put("partitioner.class", // "com.cmcc.dmp.fetch.filemonitor.consumer.RunPartition"); // #broker用于接收producer消息的hostname以及监听端口 props.put("metadata.broker.list", this.brokerIp + ":9092"); ProducerConfig config = new ProducerConfig(props); Producer producer = new Produ

27、cer(config); for (int t = 0; t < 1000; t++) { String line = "460004900270029;19867101111;杭州;6363;53601;1111-1111;20141114065900;0;2323;0;0"; //如果是异步,单位为batch,kafka内部随机分区号,分发到各个分区,负载均衡 producer.send(new KeyedMessage(this.topic, line)); //指定分区 //

28、 producer.send(new KeyedMessage(this.topic, "1",line)); //根据内容指定分区 // producer.send(new KeyedMessage(this.topic, line.substring(0, line.indexOf(";")),line)); } producer.close(); } B. 开发一个Consumer 应用 public class ConsumerThread extends Thread {

29、 // Kafka的消费者链接,连接到不同的zookeeper上消费信息 private final ConsumerConnector consumer; private final String topic; private String hostIp; private static int lineNum = 0; public ConsumerThread(String topic, String hostIp) { this.topic = topic; this.hostIp = hostIp; this.consumer =

30、kafka.consumer.Consumer .createJavaConsumerConnector(createConsumerConfig()); } /** * * @author lining * * @描述 创建消费这配置参数
* * @return */ private ConsumerConfig createConsumerConfig() { Properties props = new Properties(); // 设置zookeeper的链接地址,监听来自这方面

31、的信息。即发送到这三个zookeeper上的topic都能被这个consumer消费 props.put("zookeeper.connect", this.hostIp + ":2181"); // props.put("zookeeper.connect", "192.168.11.210:2181,192.168.11.160:2181,192.168.11.161:2181"); // 设置group id props.put("group.id", "test_group"); // kafka的group 消费记录是保存在zookeeper上的,

32、 // 但这个信息在zookeeper上不是实时更新的, 需要有个间隔时间更新 // 此参数就是配置消息被读取之后标记为已消费的关键字 props.put("mit.interval.ms", "1000"); props.put("zookeeper.session.timeout.ms", "100000"); return new ConsumerConfig(props); } public void run() { // 设置Topic=>Thread Num映射关系, 构建具体的流 Map

33、ger> topickMap = new HashMap(); topickMap.put(topic, 1); Map>> streamMap = consumer .createMessageStreams(topickMap); KafkaStream stream = streamMap.get(topic).get(0); ConsumerIterator it = str

34、eam.iterator(); System.out.println("*********Results1********"); while (it.hasNext()) { System.out.println("ThreadId:" + Thread.currentThread().getId() +":" +"readLines:" + (lineNum++) + "----"+ new String(it.next().message())); } } } 6. 性能优化 配置文件优化: 1.网络和io操作线程配置优化

35、 # broker处理消息的最大线程数 work.threads=xxx # broker处理磁盘IO的线程数 num.io.threads=xxx 建议配置: 一般work.threads主要处理网络io,读写缓冲区数据,基本没有io等待,配置线程数量为cpu核数加1. num.io.threads主要进行磁盘io操作,高峰期可能有些io等待,因此配置需要大些。配置线程数量为cpu核数2倍,最大不超过3倍. 2.log数据文件刷新策略 为了大幅度提高producer写入吞吐量,需要定期批量写文件。 建议配置: # 每当producer写入10000条消息时,刷数据到磁

36、盘 log.flush.interval.messages=10000 # 每间隔1秒钟时间,刷数据到磁盘 log.flush.interval.ms=1000 3.日志保留策略配置 当kafka server的被写入海量消息后,会生成很多数据文件,且占用大量磁盘空间,如果不及时清理,可能磁盘空间不够用,kafka默认是保留7天。 建议配置: # 保留三天,也可以更短  log.retention.hours=72 # 段文件配置1GB,有利于快速回收磁盘空间,重启kafka加载也会加快(如果文件过小,则文件数量比较多, # kafka启动时是单线程扫描目录(log.d

37、ir)下所有数据文件) log.segment.bytes=1073741824 7. 监控 1.下载Kafka Web Console 2.安装sbt a. centos  : yum install sbt b. ubuntu : apt-get install sbt    3.配置Kafka Web Console a.增加数据库依赖包(mysql),解压kafka-web-console.tar.gz,进入目录cd kafka-web-console 编辑文件vim build.sbt  增加mysql配置: ...... libraryD

38、ependencies ++= Seq(   jdbc,   cache,   "org.squeryl" % "squeryl_2.10" % "0.9.5-6",   "com.twitter" % "util-zk_2.10" % "6.11.0",   "com.twitter" % "finagle-core_2.10" % "6.15.0",   "org.apache.kafka" % "kafka_2.10" % "0.8.1",   "org.quartz-scheduler" % "quartz" % "2.2.1",   "mysql" % "mysql-

39、connector-java" % "5.1.9"     exclude("javax.jms", "jms")     exclude("com.sun.jdmk", "jmxtools")     exclude("com.sun.jmx", "jmxri") ) ....... 4.配置mysql的jdbc驱动 vim application.conf 增加代码如下: [plain] view plaincopy 1. .......   2. db.default.driver=com.mysql.jdbc.Driver   3. db.default.u

40、rl="jdbc:mysql://192.168.2.105:3306/mafka?useUnicode=true&characterEncoding=UTF8&connectTimeout=5000&socketTimeout=10000"   4. db.default.user=xxx   5. db.default.password=xxx   6. .......   5.执行sql语句(如下绿色选框所示) 6.编译 lizhitao@localhost:~$ sbt package 打包编译时会从官网上下载很多jar,由于网络原因,所以很慢,需要耐心等待。 注意

41、下载的jar是隐藏的,在cd  ~/.ivy2 目录(相应子目录)下可以看到所有jar. ivy2所有jar包百度云下载 ivy2所有jar包下载 7.运行 lizhitao@localhost:~$ sbt run 8.浏览访问 访问地址: http://ip:9000/ 8. 常见问题摘要 A. LeaderNotAvailableException 1. 其中该分区所在的broker挂了,如果是多副本,该分区所在broker恰好为leader B. connection reset by peer 检查防火墙和kafka 配

42、置文件中的 hostname 设置成ip C. 9. 参数设置表 Property Default Description broker.id   每个broker都可以用一个唯一的非负整数id进行标识;这个id可以作为broker的“名字”,并且它的存在使得broker无须混淆consumers就可以迁移到不同的host/port上。你可以选择任意你喜欢的数字作为id,只要id是唯一的即可。 log.dirs /tmp/kafka-logs kafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新parti

43、tion时,都会选择在包含最少partitions的路径下进行。 port 6667 server接受客户端连接的端口 zookeeper.connect null ZooKeeper连接字符串的格式为:hostname:port,此处hostname和port分别是ZooKeeper集群中某个节点的host和port;为了当某个host宕掉之后你能通过其他ZooKeeper节点进行连接,你可以按照一下方式制定多个hosts: hostname1:port1, hostname2:port2, hostname3:port3. ZooKeeper 允许你增加一个“chroo

44、t”路径,将集群中所有kafka数据存放在特定的路径下。当多个Kafka集群或者其他应用使用相同ZooKeeper集群时,可以使用这个方式设置数据存放路径。这种方式的实现可以通过这样设置连接字符串格式,如下所示: hostname1:port1,hostname2:port2,hostname3:port3/chroot/path 这样设置就将所有kafka集群数据存放在/chroot/path路径下。注意,在你启动broker之前,你必须创建这个路径,并且consumers必须使用相同的连接格式。 message.max.bytes 1000000 server可以接收的消息最大尺

45、寸。重要的是,consumer和producer有关这个属性的设置必须同步,否则producer发布的消息对consumer来说太大。 work.threads 3 server用来处理网络请求的网络线程数目;一般你不需要更改这个属性。 num.io.threads 8 server用来处理请求的I/O线程的数目;这个线程数目至少要等于硬盘的个数。 background.threads 4 用于后台处理的线程数目,例如文件删除;你不需要更改这个属性。 queued.max.requests 500 在网络线程停止读取新请求之前,可以排队等待I/O线程处理的最大请求个数。

46、 host.name null broker的hostname;如果hostname已经设置的话,broker将只会绑定到这个地址上;如果没有设置,它将绑定到所有接口,并发布一份到ZK advertised.host.name null 如果设置,则就作为broker 的hostname发往producer、consumers以及其他brokers advertised.port null 此端口将给与producers、consumers、以及其他brokers,它会在建立连接时用到; 它仅在实际端口和server需要绑定的端口不一样时才需要设置。 socket.send

47、buffer.bytes 100 * 1024 SO_SNDBUFF 缓存大小,server进行socket 连接所用 socket.receive.buffer.bytes 100 * 1024 SO_RCVBUFF缓存大小,server进行socket连接时所用 socket.request.max.bytes 100 * 1024 * 1024 server允许的最大请求尺寸;  这将避免server溢出,它应该小于Java  heap size num.partitions 1 如果创建topic时没有给出划分partitions个数,这个数字将是topic下p

48、artitions数目的默认数值。 log.segment.bytes 1014*1024*1024 topic  partition的日志存放在某个目录下诸多文件中,这些文件将partition的日志切分成一段一段的;这个属性就是每个文件的最大尺寸;当尺寸达到这个数值时,就会创建新文件。此设置可以由每个topic基础设置时进行覆盖。 查看  the per-topic  configuration section log.roll.hours 24 * 7 即使文件没有到达log.segment.bytes,只要文件创建时间到达此属性,就会创建新文件。这个设置也可以有topic

49、层面的设置进行覆盖; 查看the per-topic  configuration section log.cleanup.policy delete   log.retention.minutes和log.retention.hours 7 days 每个日志文件删除之前保存的时间。默认数据保存时间对所有topic都一样。 log.retention.minutes 和 log.retention.bytes 都是用来设置删除日志文件的,无论哪个属性已经溢出。 这个属性设置可以在topic基本设置时进行覆盖。 查看the per-topic  configuration section log.retention.bytes -1 每个topic下每个partition保存数据的总量;注意,这是每个partitions的上限,因此这个数值乘以partitions的个数就是每个topic保存的数据总量。同时注意:如果log.retention.hours和log.retention.bytes都设置了,则超过了任何一个限制都会造成删除一个段文件。 注意,

移动网页_全站_页脚广告1

关于我们      便捷服务       自信AI       AI导航        抽奖活动

©2010-2025 宁波自信网络信息技术有限公司  版权所有

客服电话:0574-28810668  投诉电话:18658249818

gongan.png浙公网安备33021202000488号   

icp.png浙ICP备2021020529号-1  |  浙B2-20240490  

关注我们 :微信公众号    抖音    微博    LOFTER 

客服