收藏 分销(赏)

storm-实战及实例讲解.docx

上传人:仙人****88 文档编号:9445346 上传时间:2025-03-26 格式:DOCX 页数:8 大小:19.29KB 下载积分:10 金币
下载 相关 举报
storm-实战及实例讲解.docx_第1页
第1页 / 共8页
storm-实战及实例讲解.docx_第2页
第2页 / 共8页


点击查看更多>>
资源描述
storm 实战及实例讲解                                                                                                                                                                    先给大家打打气,看看效果。这是taobao对外公布的storm使用情况,请大家欣赏,这是一个系列文章希望自己能够完成。给自己加油,写出来有利于日后查询同时也惠及他人。该storm入门教程将从搭建集群到如何编写storm上可以稳定运行的代码。本文不采用twitter官方文档里的starter项目,读者可以对比学习。效果更佳。 转载请注明出处:comaple 1.Storm 在taobao的使用情况: We make statistics of logs and extract useful information from thestatistics in almost real-time with Storm. Logs are read from Kafka-likepersistent message queues into spouts, then processed and emitted over thetopologies to compute desired results, which are then stored into distributeddatabases to be used elsewhere. Input log count varies from 2 millions to 1.5billion every day, whose size is up to 2 terabytes among the projects. The mainchallenge here is not only real-time processing of big data set; storing andpersisting result is also a challenge and needs careful design andimplementation. 淘宝使用storm和消息队列结合,每天能够处理2百万到15亿条日志,日志量达到2TB的近实时处理。 2.使用场景 上周开始学习storm的使用,现在探索出来两种使用场景。 1,  通过配置drpc服务器,将storm的topology发布为drpc服务。客户端程序可以调用drpc服务将数据发送到storm集群中,并接收处理结果的反馈。这种方式需要drpc服务器进行转发,其中drpc服务器底层通过thrift实现。适合的业务场景主要是实时计算。并且扩展性良好,可以增加每个节点的工作worker数量来动态扩展。 2,  第二种场景是通过beanstalkd来实现信息的导入,将topology任务提交到storm集群后可以通过开发beanstalkd客户端来向集群中发送信息,这种方式客户端收不到结果反馈。这个场景适合纯粹的数据分析处理的业务场景。 3.Strom drpc服务配置: 端口可以不用配置,默认是:3772 Nimbus节点的配置: storm.zookeeper.servers:     - "10.10.249.195"     - "10.10.249.196" # # nimbus.host: "nimbus" ## Locations of the drpc servers drpc.servers:     - "10.10.249.197" #    - "server2"   Supervisor节点的配置: ########### These MUST be filled in for astorm configuration storm.zookeeper.servers:     - "10.10.249.195"     - "10.10.249.196" # nimbus.host: "10.10.249.195" # ## Locations of the drpc servers drpc.servers:     - "10.10.249.197" #    - "server2" supervisor.slots.ports:     -6700     -6701 - 6702 Drpc服务器节点配置 该节点只需配置zookeeper地址即可。默认开放的端口:3772 storm.zookeeper.servers:      -"10.10.249.195"      -"10.10.249.196" 启动drpc服务:./storm drpc 如果想了解storm集群的详细配置过程可参看:点击打开链接 前面已近介绍了storm集群的搭建,和使用场景,那么现在让我们一起来探讨一下storm具体该怎么使用吧。            首先,我们要明白如何创建一个topology,topology是storm集群上面运行的基本单元,而一个topology又可以有若干个sport和bolt以某种策略组合而成,关于storm的流分组等概念我们可以,参考这里的一些资料。提醒一下这些概念很重要。我们开发storm应用的第一步就是定义一个topolog,下面我讲直接上代码,如果这些概念搞不懂的话很难弄清楚。我回尽量把注释写清楚。下面这个例子定义了一个简单的topology,它包括一个数据喷发节点spout,和一个数据处理节点bolt。 package aple.storm.test.topology; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.topology.TopologyBuilder; import aple.storm.test.bolt.SimpleBolt; import aple.storm.test.spout.SimpleSpout; /**  * Created by IntelliJ IDEA.  * User: comaple.zhang  * Date: 12-8-28  * Time: 下午2:11  * To change this template use File | Settings | File Templates.  */ public class SimpleTopology {     public static void main(String[] args) {         try {             //实例化topologyBuilder类。             TopologyBuilder topologyBuilder = new TopologyBuilder();             //设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。             topologyBuilder.setSpout("simple-spout", new SimpleSpout(), 1);             // 设置数据处理节点,并分配并发数。指定该几点接收喷发节点的策略为随机方式。             topologyBuilder.setBolt("simple-bolt", new SimpleBolt(), 3).shuffleGrouping("simple-spout");             Config config = new Config();             config.setDebug(false);             if (args != null && args.length > 0) {                 /*设置该topology在storm集群中要抢占的资源slot数,一个slot对应这supervisor节点上的以个worker进程                  如果你分配的spot数超过了你的物理节点所拥有的worker数目的话,有可能提交不成功,加入你的集群上面已经有了                  一些topology而现在还剩下2个worker资源,如果你在代码里分配4个给你的topology的话,那么这个topology可以提交                  但是提交以后你会发现并没有运行。 而当你kill掉一些topology后释放了一些slot后你的这个topology就会恢复正常运行。                 */                 config.setNumWorkers(1);                 StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());             } else {                 //这里是本地模式下运行的启动代码。                 config.setMaxTaskParallelism(1);                 LocalCluster cluster = new LocalCluster();                 cluster.submitTopology("simple", config,                         topologyBuilder.createTopology());             }         } catch (Exception e) {             e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.         }     } } 本讲将接着上一讲,把一个完成的topology完成。上一节主要介绍了一个基本的topology的构造过程,以及每一步所对应的storm集群中分配的资源情况。要想开发storm应用必须对上一讲我提到的那些概念有完全的了解,否则开发出来的应用很有可能有这样那样的问题而无法工作。那么接下来我们来一起定义一个spot节点和bolt节点。 spot节点:在实际的开发中这个节点可以起到和外界沟通的作用,他可以从一个数据库中按照某种规则取数据,也可以从分布式队列中取任务(该方式我会在后续章节中谈到)。这里我们将开发一个简单的模拟数据喷发的节点。具体方式见代码: package aple.storm.test.spout; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import java.util.Map; import java.util.Random; /**  * Created by IntelliJ IDEA.  * User: comaple.zhang  * Date: 12-8-28  * Time: 下午2:11  * To change this template use File | Settings | File Templates.  */ public class SimpleSpout extends BaseRichSpout {     /**      * 用来发射数据的工具类      */     private SpoutOutputCollector collector;     private static String[] info = new String[]{             "comaple\t,12424,44w46,654,12424,44w46,654,",             "lisi\t,435435,6537,12424,44w46,654,",             "lipeng\t,45735,6757,12424,44w46,654,",             "hujintao\t,45735,6757,12424,44w46,654,",             "jiangmin\t,23545,6457,2455,7576,qr44453",             "beijing\t,435435,6537,12424,44w46,654,",             "xiaoming\t,46654,8579,w3675,85877,077998,",             "xiaozhang\t,9789,788,97978,656,345235,09889,",             "ceo\t,46654,8579,w3675,85877,077998,",             "cto\t,46654,8579,w3675,85877,077998,",             "zhansan\t,46654,8579,w3675,85877,077998,"};     Random rd = new Random();     /**      * 这里初始化collector      * @param conf      * @param context      * @param collector      */     @Override     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {         this.collector = collector;     }     /**      * 该方法会在SpoutTracker类中被调用每调用一次就可以向storm集群中发射一条数据(一个tuple元组)      * 该方法会被不停的调用      */     @Override     public void nextTuple() {         try {             String msg = info[rd.nextInt(10)];             //调用发射方法             collector.emit(new Values(msg));             //模拟等待100ms             Thread.sleep(100);         } catch (InterruptedException e) {             e.printStackTrace();         }     }     /**      * 这里定义字段id,该id在简单模式下没有用处,但在按照字段分组的模式下有很大的用处。      * 该declarer变量有很大作用,我们还可以调用  declarer.declareStream();  来定义stramId,该id可以用来定义      * 更加复杂的流拓扑结构      * @param declarer      */     @Override     public void declareOutputFields(OutputFieldsDeclarer declarer) {         declarer.declare(new Fields("source"));     } } bolt节点: 处理节点,该节点接收喷发节点发送的数据进行简单的处理后,发射出去。 package aple.storm.test.bolt; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; /**  * Created by IntelliJ IDEA.  * User: comaple.zhang  * Date: 12-8-28  * Time: 下午2:11  * To change this template use File | Settings | File Templates.  */ public class SimpleBolt extends BaseBasicBolt {     @Override     public void declareOutputFields(OutputFieldsDeclarer declarer) {         declarer.declare(new Fields( "info"));     }     @Override     public void execute(Tuple input, BasicOutputCollector collector) {         try {             String mesg = input.getString(0);             if (mesg != null)                 collector.emit(new Values( mesg+"mesg is processed!"));         } catch (Exception e) {             e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.         }     } }
展开阅读全文

开通  VIP会员、SVIP会员  优惠大
下载10份以上建议开通VIP会员
下载20份以上建议开通SVIP会员


开通VIP      成为共赢上传

当前位置:首页 > 教育专区 > 小学其他

移动网页_全站_页脚广告1

关于我们      便捷服务       自信AI       AI导航        抽奖活动

©2010-2025 宁波自信网络信息技术有限公司  版权所有

客服电话:0574-28810668  投诉电话:18658249818

gongan.png浙公网安备33021202000488号   

icp.png浙ICP备2021020529号-1  |  浙B2-20240490  

关注我们 :微信公众号    抖音    微博    LOFTER 

客服