资源描述
storm 实战及实例讲解
先给大家打打气,看看效果。这是taobao对外公布的storm使用情况,请大家欣赏,这是一个系列文章希望自己能够完成。给自己加油,写出来有利于日后查询同时也惠及他人。该storm入门教程将从搭建集群到如何编写storm上可以稳定运行的代码。本文不采用twitter官方文档里的starter项目,读者可以对比学习。效果更佳。
转载请注明出处:comaple
1.Storm 在taobao的使用情况:
We make statistics of logs and extract useful information from thestatistics in almost real-time with Storm. Logs are read from Kafka-likepersistent message queues into spouts, then processed and emitted over thetopologies to compute desired results, which are then stored into distributeddatabases to be used elsewhere. Input log count varies from 2 millions to 1.5billion every day, whose size is up to 2 terabytes among the projects. The mainchallenge here is not only real-time processing of big data set; storing andpersisting result is also a challenge and needs careful design andimplementation.
淘宝使用storm和消息队列结合,每天能够处理2百万到15亿条日志,日志量达到2TB的近实时处理。
2.使用场景
上周开始学习storm的使用,现在探索出来两种使用场景。
1, 通过配置drpc服务器,将storm的topology发布为drpc服务。客户端程序可以调用drpc服务将数据发送到storm集群中,并接收处理结果的反馈。这种方式需要drpc服务器进行转发,其中drpc服务器底层通过thrift实现。适合的业务场景主要是实时计算。并且扩展性良好,可以增加每个节点的工作worker数量来动态扩展。
2, 第二种场景是通过beanstalkd来实现信息的导入,将topology任务提交到storm集群后可以通过开发beanstalkd客户端来向集群中发送信息,这种方式客户端收不到结果反馈。这个场景适合纯粹的数据分析处理的业务场景。
3.Strom drpc服务配置:
端口可以不用配置,默认是:3772
Nimbus节点的配置:
storm.zookeeper.servers:
- "10.10.249.195"
- "10.10.249.196"
#
# nimbus.host: "nimbus"
## Locations of the drpc servers
drpc.servers:
- "10.10.249.197"
# - "server2"
Supervisor节点的配置:
########### These MUST be filled in for astorm configuration
storm.zookeeper.servers:
- "10.10.249.195"
- "10.10.249.196"
#
nimbus.host: "10.10.249.195"
#
## Locations of the drpc servers
drpc.servers:
- "10.10.249.197"
# - "server2"
supervisor.slots.ports:
-6700
-6701
- 6702
Drpc服务器节点配置
该节点只需配置zookeeper地址即可。默认开放的端口:3772
storm.zookeeper.servers:
-"10.10.249.195"
-"10.10.249.196"
启动drpc服务:./storm drpc
如果想了解storm集群的详细配置过程可参看:点击打开链接
前面已近介绍了storm集群的搭建,和使用场景,那么现在让我们一起来探讨一下storm具体该怎么使用吧。
首先,我们要明白如何创建一个topology,topology是storm集群上面运行的基本单元,而一个topology又可以有若干个sport和bolt以某种策略组合而成,关于storm的流分组等概念我们可以,参考这里的一些资料。提醒一下这些概念很重要。我们开发storm应用的第一步就是定义一个topolog,下面我讲直接上代码,如果这些概念搞不懂的话很难弄清楚。我回尽量把注释写清楚。下面这个例子定义了一个简单的topology,它包括一个数据喷发节点spout,和一个数据处理节点bolt。
package aple.storm.test.topology;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import aple.storm.test.bolt.SimpleBolt;
import aple.storm.test.spout.SimpleSpout;
/**
* Created by IntelliJ IDEA.
* User: comaple.zhang
* Date: 12-8-28
* Time: 下午2:11
* To change this template use File | Settings | File Templates.
*/
public class SimpleTopology {
public static void main(String[] args) {
try {
//实例化topologyBuilder类。
TopologyBuilder topologyBuilder = new TopologyBuilder();
//设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。
topologyBuilder.setSpout("simple-spout", new SimpleSpout(), 1);
// 设置数据处理节点,并分配并发数。指定该几点接收喷发节点的策略为随机方式。
topologyBuilder.setBolt("simple-bolt", new SimpleBolt(), 3).shuffleGrouping("simple-spout");
Config config = new Config();
config.setDebug(false);
if (args != null && args.length > 0) {
/*设置该topology在storm集群中要抢占的资源slot数,一个slot对应这supervisor节点上的以个worker进程
如果你分配的spot数超过了你的物理节点所拥有的worker数目的话,有可能提交不成功,加入你的集群上面已经有了
一些topology而现在还剩下2个worker资源,如果你在代码里分配4个给你的topology的话,那么这个topology可以提交
但是提交以后你会发现并没有运行。 而当你kill掉一些topology后释放了一些slot后你的这个topology就会恢复正常运行。
*/
config.setNumWorkers(1);
StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());
} else {
//这里是本地模式下运行的启动代码。
config.setMaxTaskParallelism(1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("simple", config,
topologyBuilder.createTopology());
}
} catch (Exception e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
}
本讲将接着上一讲,把一个完成的topology完成。上一节主要介绍了一个基本的topology的构造过程,以及每一步所对应的storm集群中分配的资源情况。要想开发storm应用必须对上一讲我提到的那些概念有完全的了解,否则开发出来的应用很有可能有这样那样的问题而无法工作。那么接下来我们来一起定义一个spot节点和bolt节点。
spot节点:在实际的开发中这个节点可以起到和外界沟通的作用,他可以从一个数据库中按照某种规则取数据,也可以从分布式队列中取任务(该方式我会在后续章节中谈到)。这里我们将开发一个简单的模拟数据喷发的节点。具体方式见代码:
package aple.storm.test.spout;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.Map;
import java.util.Random;
/**
* Created by IntelliJ IDEA.
* User: comaple.zhang
* Date: 12-8-28
* Time: 下午2:11
* To change this template use File | Settings | File Templates.
*/
public class SimpleSpout extends BaseRichSpout {
/**
* 用来发射数据的工具类
*/
private SpoutOutputCollector collector;
private static String[] info = new String[]{
"comaple\t,12424,44w46,654,12424,44w46,654,",
"lisi\t,435435,6537,12424,44w46,654,",
"lipeng\t,45735,6757,12424,44w46,654,",
"hujintao\t,45735,6757,12424,44w46,654,",
"jiangmin\t,23545,6457,2455,7576,qr44453",
"beijing\t,435435,6537,12424,44w46,654,",
"xiaoming\t,46654,8579,w3675,85877,077998,",
"xiaozhang\t,9789,788,97978,656,345235,09889,",
"ceo\t,46654,8579,w3675,85877,077998,",
"cto\t,46654,8579,w3675,85877,077998,",
"zhansan\t,46654,8579,w3675,85877,077998,"};
Random rd = new Random();
/**
* 这里初始化collector
* @param conf
* @param context
* @param collector
*/
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
/**
* 该方法会在SpoutTracker类中被调用每调用一次就可以向storm集群中发射一条数据(一个tuple元组)
* 该方法会被不停的调用
*/
@Override
public void nextTuple() {
try {
String msg = info[rd.nextInt(10)];
//调用发射方法
collector.emit(new Values(msg));
//模拟等待100ms
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 这里定义字段id,该id在简单模式下没有用处,但在按照字段分组的模式下有很大的用处。
* 该declarer变量有很大作用,我们还可以调用 declarer.declareStream(); 来定义stramId,该id可以用来定义
* 更加复杂的流拓扑结构
* @param declarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("source"));
}
}
bolt节点: 处理节点,该节点接收喷发节点发送的数据进行简单的处理后,发射出去。
package aple.storm.test.bolt;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
/**
* Created by IntelliJ IDEA.
* User: comaple.zhang
* Date: 12-8-28
* Time: 下午2:11
* To change this template use File | Settings | File Templates.
*/
public class SimpleBolt extends BaseBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields( "info"));
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
try {
String mesg = input.getString(0);
if (mesg != null)
collector.emit(new Values( mesg+"mesg is processed!"));
} catch (Exception e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
}
展开阅读全文