ImageVerifierCode 换一换
格式:DOC , 页数:56 ,大小:771.90KB ,
资源ID:7597428      下载积分:10 金币
验证码下载
登录下载
邮箱/手机:
验证码: 获取验证码
温馨提示:
支付成功后,系统会自动生成账号(用户名为邮箱或者手机号,密码是验证码),方便下次登录下载和查询订单;
特别说明:
请自助下载,系统不会自动发送文件的哦; 如果您已付费,想二次下载,请登录后访问:我的下载记录
支付方式: 支付宝    微信支付   
验证码:   换一换

开通VIP
 

温馨提示:由于个人手机设置不同,如果发现不能下载,请复制以下地址【https://www.zixin.com.cn/docdown/7597428.html】到电脑端继续下载(重复下载【60天内】不扣币)。

已注册用户请登录:
账号:
密码:
验证码:   换一换
  忘记密码?
三方登录: 微信登录   QQ登录  
声明  |  会员权益     获赠5币     写作写作

1、填表:    下载求助     留言反馈    退款申请
2、咨信平台为文档C2C交易模式,即用户上传的文档直接被用户下载,收益归上传人(含作者)所有;本站仅是提供信息存储空间和展示预览,仅对用户上传内容的表现方式做保护处理,对上载内容不做任何修改或编辑。所展示的作品文档包括内容和图片全部来源于网络用户和作者上传投稿,我们不确定上传用户享有完全著作权,根据《信息网络传播权保护条例》,如果侵犯了您的版权、权益或隐私,请联系我们,核实后会尽快下架及时删除,并可随时和客服了解处理情况,尊重保护知识产权我们共同努力。
3、文档的总页数、文档格式和文档大小以系统显示为准(内容中显示的页数不一定正确),网站客服只以系统显示的页数、文件格式、文档大小作为仲裁依据,个别因单元格分列造成显示页码不一将协商解决,平台无法对文档的真实性、完整性、权威性、准确性、专业性及其观点立场做任何保证或承诺,下载前须认真查看,确认无误后再购买,务必慎重购买;若有违法违纪将进行移交司法处理,若涉侵权平台将进行基本处罚并下架。
4、本站所有内容均由用户上传,付费前请自行鉴别,如您付费,意味着您已接受本站规则且自行承担风险,本站不进行额外附加服务,虚拟产品一经售出概不退款(未进行购买下载可退充值款),文档一经付费(服务费)、不意味着购买了该文档的版权,仅供个人/单位学习、研究之用,不得用于商业用途,未经授权,严禁复制、发行、汇编、翻译或者网络传播等,侵权必究。
5、如你看到网页展示的文档有www.zixin.com.cn水印,是因预览和防盗链等技术需要对页面进行转换压缩成图而已,我们并不对上传的文档进行任何编辑或修改,文档下载后都不会有水印标识(原文档上传前个别存留的除外),下载后原文更清晰;试题试卷类文档,如果标题没有明确说明有答案则都视为没有答案,请知晓;PPT和DOC文档可被视为“模板”,允许上传人保留章节、目录结构的情况下删减部份的内容;PDF文档不管是原文档转换或图片扫描而得,本站不作要求视为允许,下载前自行私信或留言给上传者【pc****0】。
6、本文档所展示的图片、画像、字体、音乐的版权可能需版权方额外授权,请谨慎使用;网站提供的党政主题相关内容(国旗、国徽、党徽--等)目的在于配合国家政策宣传,仅限个人学习分享使用,禁止用于任何广告和商用目的。
7、本文档遇到问题,请及时私信或留言给本站上传会员【pc****0】,需本站解决可联系【 微信客服】、【 QQ客服】,若有其他问题请点击或扫码反馈【 服务填表】;文档侵犯商业秘密、侵犯著作权、侵犯人身权等,请点击“【 版权申诉】”(推荐),意见反馈和侵权处理邮箱:1219186828@qq.com;也可以拔打客服电话:4008-655-100;投诉/维权电话:4009-655-100。

注意事项

本文(Kafka安装配置及使用说明.doc)为本站上传会员【pc****0】主动上传,咨信网仅是提供信息存储空间和展示预览,仅对用户上传内容的表现方式做保护处理,对上载内容不做任何修改或编辑。 若此文所含内容侵犯了您的版权或隐私,请立即通知咨信网(发送邮件至1219186828@qq.com、拔打电话4008-655-100或【 微信客服】、【 QQ客服】),核实后会尽快下架及时删除,并可随时和客服了解处理情况,尊重保护知识产权我们共同努力。
温馨提示:如果因为网速或其他原因下载失败请重新下载,重复下载【60天内】不扣币。 服务填表

Kafka安装配置及使用说明.doc

1、Kafka安装配置及使用说明(铁树2018-08-08)(Windows平台,5个分布式节点,修改消息大小,调用程序范例)1 安装配置采用5台服务器作为集群节点,IP地址为:XX.XX.0.12-XX.XX.0.16.每台机器依次安装配置JDK、zookeeper、kafka,先安装完一台机器,然后拷贝到其他机器,再修改配置文件。1.1 JDK安装配置JDK版本:jdk1.7.0_51_x64解压版(jdk1.7.0_51_x64.rar)解压到C盘kafka目录下,如图所示。设置环境变量:JAVA_HOME:C:kafkajdk1.7.0_51_x64PATH:C:kafkajdk1.7.0

2、_51_x64bin1.2 zookeeper安装配置1.2.1 解压安装zookeeper版本:3.4.12 (zookeeper-3.4.12.tar.gz)解压到C盘kafka目录下,如图所示。1.2.2 创建zookeeper数据目录和日志目录zkdata #存放快照 C:kafkazookeeper-3.4.12zkdatazkdatalog#存放日志C:kafkazookeeper-3.4.12zkdatalog1.2.3 修改配置文件进入到“C:kafkazookeeper-3.4.12”目录下的conf目录中,复制zoo_sample.cfg(官方提供的zookeeper的样板

3、文件),重命名为zoo.cfg(官方指定的文件命名规则)。默认内容:修改后配置文件为:# The number of milliseconds of each ticktickTime=2000# The number of ticks that the initial # synchronization phase can takeinitLimit=10# The number of ticks that can pass between # sending a request and getting an acknowledgementsyncLimit=5# the directory

4、 where the snapshot is stored.# do not use /tmp for storage, /tmp here is just # example sakes.dataDir=C:/kafka/zookeeper-3.4.12/zkdatadataLogDir=C:/kafka/zookeeper-3.4.12/zkdatalog# the port at which the clients will connectclientPort=12181server.1=XX.XX.0.12:12888:13888server.2=XX.XX.0.13:12888:13

5、888server.3=XX.XX.0.14:12888:13888server.4=XX.XX.0.15:12888:13888server.5=XX.XX.0.16:12888:13888# the maximum number of client connections.# increase this if you need to handle more clients#maxClientCnxns=60# Be sure to read the maintenance section of the # administrator guide before turning on auto

6、purge.# http:/zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance# The number of snapshots to retain in dataDirautopurge.snapRetainCount=100# Purge task interval in hours# Set to 0 to disable auto purge featureautopurge.purgeInterval=24配置文件解释:#tickTime:这个时间是作为 Zookeeper 服务器之间或客户端与服务器

7、之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。#initLimit:这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 10个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 10*2000=20 秒#syncLimit:这个配置项标识 Leader 与Fol

8、lower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*2000=10秒#dataDir:快照日志的存储路径#dataLogDir:事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多#clientPort:这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。修改他的端口改大点通过配置 autopurge.snapRetainCount 和 autopurge.p

9、urgeInterval 这两个参数能够实现定时清理了。这两个参数都是在zoo.cfg中配置的:autopurge.purgeInterval这个参数指定了清理频率,单位是小时,需要填写一个1或更大的整数,默认是0,表示不开启自己清理功能。autopurge.snapRetainCount这个参数和上面的参数搭配使用,这个参数指定了需要保留的文件数目。默认是保留3个。1.2.4 创建myid文件在“C:kafkazookeeper-3.4.12zkdata”目录下,创建myid文件(无后缀名),内容为对应IP地址的主机号。如server.1则内容为1。1.3 Kafka安装配置1.3.1 解压

10、安装kafka 版本:kafka1.1.1(kafka_2.11-1.1.1.tgz)解压到C盘kafka目录下,如图所示。1.3.2 创建消息目录kafkalogs :C:kafkakafka_2.11-1.1.1kafkalogs1.3.3 修改配置文件打开C:kafkakafka_2.11-1.1.1config server.properties实际的修改项为:broker.id=1listeners=PLAINTEXT:/:19092log.dirs= C:/kafka/kafka_2.11-1.1.1/kafkalogs#在log.retention.hours=168 下面新增下

11、面三项(消息大小最大1GB)message.max.byte=1073741824replica.fetch.max.bytes=1073741824log.segment.bytes=1073741824default.replication.factor=2#设置zookeeper的连接端口zookeeper.connect=XX.XX.0.12:12181,XX.XX.0.13:12181,XX.XX.0.14:12181,XX.XX.0.15:12181,XX.XX.0.16:12181配置说明:broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一

12、样port=19092 #当前kafka对外提供服务的端口默认是9092host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。work.threads=3 #这个是borker进行网络处理的线程数num.io.threads=8 #这个是borker进行I/O处理的线程数log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号

13、分割的目录中,那个分区数最少就放那一个socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小num.partitions=1 #默认的分区数,一个topic默认1个分区数log

14、.retention.hours=168 #默认消息的最大持久化时间,168小时,7天message.max.byte=5242880 #消息保存的最大值5Mdefault.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务replica.fetch.max.bytes=5242880 #取消息的最大字节数log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件log.retention.check.interval.ms=

15、300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口1.4 其他节点配置将安装以上配置好的目录c:kafka拷贝到其他节点的c盘目录,并修改如下配置。1、JAVA环境变量:JAVA_HOME:C:ka

16、fkajdk1.7.0_51_x64PATH:C:kafkajdk1.7.0_51_x64bin2、zookeeper的myidC:kafkazookeeper-3.4.12zkdatamyid,修改为对应的数值XX.XX.0.12:1XX.XX.0.13:2XX.XX.0.14:3XX.XX.0.15:4XX.XX.0.16:53、kafka配置C:kafkakafka_2.11-1.1.1config server.properties的broker.id,修改为对应的数值XX.XX.0.12:1XX.XX.0.13:2XX.XX.0.14:3XX.XX.0.15:4XX.XX.0.16:

17、51.5 服务启动1、 启动zookeeperC:kafkazookeeper-3.4.12binzkServer.cmdXX.XX.0.12-16,依次双击启动。2、 启动kafka运行cmd,cd C:kafkakafka_2.11-1.1.1目录,再执行命令:【cd C:kafkakafka_2.11-1.1.1】C:kafkakafka_2.11-1.1.1.binwindowskafka-server-start.bat .configserver.properties 1.6 服务状态测试1.6.1 创建Topics打开cmd 进入C:kafkakafka_2.11-1.1.1bi

18、nwindowsC:kafkakafka_2.11-1.1.1binwindowskafka-topics.bat -create -zookeeper localhost:12181 -replication-factor 1 -partitions 1 -topic test0011.6.2 打开一个Producer打开cmd 进入C:kafkakafka_2.11-1.1.1binwindowsC:kafkakafka_2.11-1.1.1binwindowskafka-console-producer.bat -broker-list localhost:19092 -topic te

19、st001等待输入消息内容。1.6.3 打开一个Consumer打开cmd 进入C:kafkakafka_2.11-1.1.1binwindowsC:kafkakafka_2.11-1.1.1binwindowskafka-console-consumer.bat -zookeeper localhost:12181 -topic test001然后就可以在Producer控制台窗口输入消息了,很快Consumer窗口就会显示出Producer发送的消息。1.6.4 查看所有主题C:UsersDevelopC:kafkakafka_2.11-1.1.1binwindowskafka-topic

20、s.bat -list -zookeeper localhost:121811.6.5 查看Topic分区和副本C:UsersDevelopC:kafkakafka_2.11-1.1.1binwindowskafka-topics.bat -describe -zookeeper localhost:121811.7 消息大小调整Kafka对于10KB大小的消息吞吐率最好,默认配置最大支持1MB的消息大小。对于大消息的传输,需要修改kafka的server.properties、consumer、producer的相关配置。server.properties修改:打开C:kafkakafka_

21、2.11-1.1.1config server.properties(按照最大1GB)message.max.bytes=1073741824replica.fetch.max.bytes=1073741824log.segment.bytes=1073741824consumer配置:max.partition.fetch.bytes=1073741824Producer配置:max.request.size =1073741824#33554432,默认32Mbuffer.memory= 1073741824mon.errors.RecordTooLargeException: The m

22、essage is 36428062 bytes when serialized which is larger than the total memory buffer you have configured with the buffer.memory configuration.附件太大可能会内存溢出,还会涉及超时参数配置等。2 JAVA程序示例2.1 Producer程序示例2.1.1 Properties文件配置#producerbootstrap.servers=XX.XX.0.12:19092,XX.XX.0.13:19092,XX.XX.0.14:19092,XX.XX.0.1

23、5:19092,XX.XX.0.16:19092producer.type=syncrequest.required.acks=1#consumermit=true#latest, earliest, noneauto.offset.reset=earliest 建议公共参数(如服务地址)配置在properties文件里。其他参数根据接口需要程序中配置。/ 创建Producerprivate Producer createProducer() Properties props = new Properties();String path = ProducerDemo.class.getReso

24、urce(/).getFile().toString()+ kafka.properties; try FileInputStream fis = new FileInputStream(new File(path); props.load(fis);props.put(key.serializer, mon.serialization.IntegerSerializer);props.put(value.serializer,mon.serialization.StringSerializer);fis.close(); catch (Exception e) e.printStackTra

25、ce(); return new KafkaProducer(props);2.1.2 Properties配置详解# 0:producer不会等待broker发送ack# 1:当leader接收到消息后发送ack# all(-1):当所有的follower都同步消息成功后发送ackrequest.required.acks=02.1.3 主题+VALUEimport java.io.File;import java.io.FileInputStream;import java.util.Properties;import org.apache.kafka.clients.producer.K

26、afkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;public class TopicValue / 创建Producerprivate Producer createProducer() Properties props = new Properties();String path = ProducerDemo.class.getResource(/).getFile().toString() + kafk

27、a.properties; try FileInputStream fis = new FileInputStream(new File(path); props.load(fis); props.put(key.serializer, mon.serialization.StringSerializer); props.put(value.serializer,mon.serialization.StringSerializer);fis.close(); catch (Exception e) e.printStackTrace(); return new KafkaProducer(pr

28、ops);public static void main(String args) / 消息主题String topicName=test001;TopicValue topicValueProducer=new TopicValue();Producer producer = topicValueProducer.createProducer();producer.send(new ProducerRecord(topicName, 消息: TopicValue);producer.flush();producer.close();System.out.println(Message sen

29、d successfully); 2.1.4 主题+KEY+VALUE2.1.4.1 package kjsp.kafka.producer;import java.io.File;import java.io.FileInputStream;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.Prod

30、ucerRecord;public class TopicIntegerString / 创建Producerprivate Producer createProducer() Properties props = new Properties();String path = ProducerDemo.class.getResource(/).getFile().toString() + kafka.properties; try FileInputStream fis = new FileInputStream(new File(path); props.load(fis); props.p

31、ut(key.serializer, mon.serialization.IntegerSerializer); props.put(value.serializer,mon.serialization.StringSerializer);fis.close(); catch (Exception e) e.printStackTrace(); return new KafkaProducer(props);public static void main(String args) / 消息主题String topicName=test001;TopicIntegerString topicVa

32、lueProducer=new TopicIntegerString();Producer producer = topicValueProducer.createProducer();producer.send(new ProducerRecord(topicName, 1,消息: TopicIntegerString1);producer.flush();producer.close();System.out.println(Message send successfully); 2.1.4.2 import java.io.File;import java.io.FileInputStr

33、eam;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;public class TopicStringString / 创建Producerprivate Producer createProducer() Properties props = new Propert

34、ies();String path = ProducerDemo.class.getResource(/).getFile().toString() + kafka.properties; try FileInputStream fis = new FileInputStream(new File(path); props.load(fis); props.put(key.serializer, mon.serialization.StringSerializer); props.put(value.serializer,mon.serialization.StringSerializer);

35、fis.close(); catch (Exception e) e.printStackTrace(); return new KafkaProducer(props);public static void main(String args) / 消息主题String topicName=test001;TopicStringString topicValueProducer=new TopicStringString();Producer producer = topicValueProducer.createProducer();producer.send(new ProducerRec

36、ord(topicName, TopicStringString001, 消息: TopicStringString001);producer.flush();producer.close();System.out.println(Message send successfully); 2.1.4.3 package kjsp.kafka.producer;import java.io.File;import java.io.FileInputStream;import java.util.Properties;import org.apache.kafka.clients.producer.

37、KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;public class TopicStringByte / 创建Producerprivate Producer createProducer() Properties props = new Properties();String path = ProducerDemo.class.getResource(/).getFile().toString()

38、+ kafka.properties; try FileInputStream fis = new FileInputStream(new File(path); props.load(fis); props.put(key.serializer, mon.serialization.StringSerializer); props.put(value.serializer,mon.serialization.ByteArraySerializer);fis.close(); catch (Exception e) e.printStackTrace(); return new KafkaPr

39、oducer(props);public static void main(String args) / 消息主题String topicName=test001;TopicStringByte topicValueProducer=new TopicStringByte();Producer producer = topicValueProducer.createProducer();producer.send(new ProducerRecord(topicName, TopicStringByte001, 消息: TopicStringByte001.getBytes();produce

40、r.flush();producer.close();System.out.println(Message send successfully); 2.1.4.4 package kjsp.kafka.producer;import java.io.File;import java.io.FileInputStream;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import

41、 org.apache.kafka.clients.producer.ProducerRecord;public class TopicByteByte / 创建Producerprivate Producer createProducer() Properties props = new Properties();String path = ProducerDemo.class.getResource(/).getFile().toString() + kafka.properties; try FileInputStream fis = new FileInputStream(new File(path); props.load(fis); props.put(key.serializer, mon.serialization.ByteArraySerializer); props.put(value.serializer,mon.serialization.ByteArraySerializer);fis.close(); cat

移动网页_全站_页脚广告1

关于我们      便捷服务       自信AI       AI导航        获赠5币

©2010-2025 宁波自信网络信息技术有限公司  版权所有

客服电话:4008-655-100  投诉/维权电话:4009-655-100

gongan.png浙公网安备33021202000488号   

icp.png浙ICP备2021020529号-1  |  浙B2-20240490  

关注我们 :gzh.png    weibo.png    LOFTER.png 

客服