2000字范文,分享全网优秀范文,学习好帮手!
2000字范文 > [Storm]分布式单词计数(一)一个简单的storm demo

[Storm]分布式单词计数(一)一个简单的storm demo

时间:2022-03-25 06:58:37

相关推荐

[Storm]分布式单词计数(一)一个简单的storm demo

目录

前言:

基本概念:

1.1 Spout

1.1 业务 SentenceSpout

1.2 SentenceSpout引用的部分类源码

BaseRichSpout源码

Values源码

ISpout的源码

2. Bolt

2.1 业务分隔语句SplitSentenceBolt

2.2 业务单词计数 bolt

2.3 业务实时上报 bolt

2.4 相关引用类源码

BaseRichBolt 源码

OutputCollector

3. 拓扑Topology

前言:

阅读《Storm分布式实时计算模式》书中的例子;

demo可以直接执行输出;

代码写在:wordcount chapter1 v1

代码链接

基本概念:

storm 包括 拓扑 数据流 和spout(数据流生成者) bolt(运算)组成。

拓扑很像hadoop中的job,但是会一直运行下去。

Stream

stream的核心数据结构是tuple,tuple是一个或者多个键值对的列表,

Stream 是由无限制的tuple组成的序列。

Spout

代表着拓扑的入口,充当采集器的角色,连接到数据源,将数据转化为一个个tuple

并将tuple作为数据流进行发射。

主要工作是:编写代码从数据源或者API消费数据。

比如采集:应用的日志时间

spout通常不会实现业务逻辑,所以在多个topology可以复用。

bolt

运算或者函数,将一个或者多个数据流作为输入,对于数据实施运算后,选择性的输出一个或者多个数据流。

bolt可以订阅多个由spout或者其他bolt发射的数据流,这样可以建立复杂的数据流转换网络。

bolt 可以进行:过滤 链接 计算 数据库读写

1.1 Spout

1.1 业务 SentenceSpout

import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import java.util.Map;import storm.blueprints.utils.Utils;/*** BaseRichSpout相当于一个比较简单的实现** */public class SentenceSpout extends BaseRichSpout {private SpoutOutputCollector collector;private String[] sentences = {"my dog has fleas","i like cold beverages","the dog ate my homework","don't have a cow man","i don't think i like fleas"};private int index = 0;/*** 所有的storm的组件都应该实现这个接口* 通过这个方法会告诉storm 这个组件这个类将会发射那些数据流* 就是发射sentence这个数据流* 这边声明的数据流,在下面算子中,也就是SplitSentenceBolt 会去获取这个值;* */public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("sentence"));}/*** 所有的spout的组件在初始化的时候调用这个方法* map是包含了Storm的配置信息的map,第二个TopologyContext对象提供了拓扑中组件的信息,SpoutOutputCollector提供了发射tuple的方法。* 但是这个地方因为open只是简单地将SpoutOutputCollector对象的引用保存在变量中* 保存在这个类中* */public void open(Map config, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;}/*** 核心实现;* storm通过这个方法向输出的collector发射tuple* 这个意思就是我们发射当前索引对应的语句,然后递增索引指向下一个语句* */public void nextTuple() {//这个sentences[index]相当于tuple,其实就是一个list//emit相当于发出发射的意思this.collector.emit(new Values(sentences[index]));index++;//这个应该是一个循环if (index >= sentences.length) {index = 0;}Utils.waitForMillis(1);}public static void main(String[] args) {String[] sentences = {"my dog has fleas","i like cold beverages","the dog ate my homework","don't have a cow man","i don't think i like fleas"};for (String str : sentences) {System.out.println(str);}}}

1.2 SentenceSpout引用的部分类源码

BaseRichSpout源码

package backtype.storm.topology.base;import backtype.storm.topology.IRichSpout;public abstract class BaseRichSpout extends BaseComponent implements IRichSpout {public BaseRichSpout() {}public void close() {}public void activate() {}public void deactivate() {}public void ack(Object msgId) {}public void fail(Object msgId) {}}

Values源码

package backtype.storm.tuple;import java.util.ArrayList;public class Values extends ArrayList<Object> {public Values() {}public Values(Object... vals) {super(vals.length);Object[] arr$ = vals;int len$ = vals.length;for(int i$ = 0; i$ < len$; ++i$) {Object o = arr$[i$];this.add(o);}}}

其实就是一个ArrayList,只不过是入参不定数量的。

ISpout的源码

package backtype.storm.spout;import backtype.storm.task.TopologyContext;import java.io.Serializable;import java.util.Map;public interface ISpout extends Serializable {void open(Map var1, TopologyContext var2, SpoutOutputCollector var3);void close();void activate();void deactivate();void nextTuple();void ack(Object var1);void fail(Object var1);}

对于open方法所有的Spout在初始化的时候都需要调用这个方法。

map是包含了Storm的配置信息的map,第二个TopologyContext对象提供了拓扑中组件的信息,SpoutOutputCollector提供了发射tuple的方法。

2. Bolt

2.1 业务分隔语句SplitSentenceBolt

import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.Map;/*** 业务代码* 实现语句分割bolt* BaseRichBolt 是一个简单的实现,继承这个类,就可以不用去实现本例子不需要关心的方法** */public class SplitSentenceBolt extends BaseRichBolt{private OutputCollector collector;/**** 类同于Spout 中的open方法;* 用于初始化;* 初始化可以做一些例如初始化数据库连接等操作;* 不过这个例子没有额外的操作;* */public void prepare(Map config, TopologyContext context, OutputCollector collector) {this.collector = collector;}/*** 核心功能执行;* 从哪来的数据?从订阅中的数据流中,接收到一个tuple;* 然后这个方法读取sentence* 然后去根据空格分隔成为单词,然后在发射出去;* */public void execute(Tuple tuple) {String sentence = tuple.getStringByField("sentence");String[] words = sentence.split(" ");for(String word : words){this.collector.emit(new Values(word));}}/*** 声明输出流;* 这里面声明了输出流中包含了word这个字段;** */public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}}

2.2 业务单词计数 bolt

package storm.demo.chapter1.v1;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.HashMap;import java.util.Map;/*** 业务代码* 实现单词计数bolt* */public class WordCountBolt extends BaseRichBolt{private OutputCollector collector;private HashMap<String, Long> counts = null;/*** 初始化;* 初始化counts 这是一个map,用来存储单词和对应的计数;* 在我们storm应用中其实很多也是各种维度分隔计数;* 重点:* 通常最好是在构造函数中对基本数据类型和可序列话的 对象进行复制和实例化;* prepare进行不可序列化的对象进行实例化;* HashMap 是可以序列化的;* */public void prepare(Map config, TopologyContext context, OutputCollector collector) {this.collector = collector;this.counts = new HashMap<String, Long>();}/*** 获得单词 ;* 然后获取单词的数量;* 进行单词数量++;* 重新刷新发射* */public void execute(Tuple tuple) {String word = tuple.getStringByField("word");Long count = this.counts.get(word);if(count == null){count = 0L;}count++;this.counts.put(word, count);this.collector.emit(new Values(word, count));}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word", "count"));}}

2.3 业务实时上报 bolt

package storm.demo.chapter1.v1;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Tuple;import java.util.ArrayList;import java.util.Collections;import java.util.HashMap;import java.util.List;import java.util.Map;/*** 实现上报的BOLT* 对所有的单词计数生成一份报告;* 在这边是简单的将接收到的计数BOLT发射出的计数tuple 进行存储;* */public class ReportBolt extends BaseRichBolt {private HashMap<String, Long> counts = null;public void prepare(Map config, TopologyContext context, OutputCollector collector) {this.counts = new HashMap<String, Long>();}public void execute(Tuple tuple) {String word = tuple.getStringByField("word");Long count = tuple.getLongByField("count");this.counts.put(word, count);}/*** 数据流末端的bolt,不会发射tuple;* */public void declareOutputFields(OutputFieldsDeclarer declarer) {// this bolt does not emit anything}/*** stoorm在种植一个bolt之前会调用这个方法;* 通常用来释放bolt占用的资源,比如释放打开的句柄 或者数据库连接;* 重点:* 但是对于拓扑在storm集群上面运行的时候,这个cleanup方法是不可靠的;* 不能保证会执行,涉及到了strom的容错机制;* 如果是在开发环境中,是可以保证这个被调用的;* */@Overridepublic void cleanup() {System.out.println("--- FINAL COUNTS ---");List<String> keys = new ArrayList<String>();keys.addAll(this.counts.keySet());Collections.sort(keys);for (String key : keys) {System.out.println(key + " : " + this.counts.get(key));}System.out.println("--------------");}}

2.4 相关引用类源码

BaseRichBolt 源码

package backtype.storm.topology.base;import backtype.storm.topology.IRichBolt;public abstract class BaseRichBolt extends BaseComponent implements IRichBolt {public BaseRichBolt() {}public void cleanup() {}}

* 但是对于拓扑在storm集群上面运行的时候,这个cleanup方法是不可靠的;

* 不能保证会执行,涉及到了strom的容错机制;

* 如果是在开发环境中,是可以保证这个被调用的;

OutputCollector

//// Source code recreated from a .class file by IntelliJ IDEA// (powered by FernFlower decompiler)//package backtype.storm.task;import backtype.storm.tuple.Tuple;import java.util.Arrays;import java.util.Collection;import java.util.List;public class OutputCollector implements IOutputCollector {private IOutputCollector _delegate;public OutputCollector(IOutputCollector delegate) {this._delegate = delegate;}public List<Integer> emit(String streamId, Tuple anchor, List<Object> tuple) {return this.emit(streamId, (Collection)Arrays.asList(anchor), tuple);}public List<Integer> emit(String streamId, List<Object> tuple) {return this.emit(streamId, (Collection)((List)null), tuple);}public List<Integer> emit(Collection<Tuple> anchors, List<Object> tuple) {return this.emit("default", anchors, tuple);}public List<Integer> emit(Tuple anchor, List<Object> tuple) {return this.emit("default", anchor, tuple);}public List<Integer> emit(List<Object> tuple) {return this.emit("default", tuple);}public void emitDirect(int taskId, String streamId, Tuple anchor, List<Object> tuple) {this.emitDirect(taskId, streamId, (Collection)Arrays.asList(anchor), tuple);}public void emitDirect(int taskId, String streamId, List<Object> tuple) {this.emitDirect(taskId, streamId, (Collection)((List)null), tuple);}public void emitDirect(int taskId, Collection<Tuple> anchors, List<Object> tuple) {this.emitDirect(taskId, "default", anchors, tuple);}public void emitDirect(int taskId, Tuple anchor, List<Object> tuple) {this.emitDirect(taskId, "default", anchor, tuple);}public void emitDirect(int taskId, List<Object> tuple) {this.emitDirect(taskId, "default", tuple);}public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {return this._delegate.emit(streamId, anchors, tuple);}public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {this._delegate.emitDirect(taskId, streamId, anchors, tuple);}public void ack(Tuple input) {this._delegate.ack(input);}public void fail(Tuple input) {this._delegate.fail(input);}public void reportError(Throwable error) {this._delegate.reportError(error);}}

对于这个发射方法,其实就是发射List类型的tuple;

我们在学习ack机制的时候,我们知道Storm的Bolt有BaseBasicBolt和BaseRichBolt。

在BaseBasicBolt中,BasicOutputCollector在emit数据的时候,会自动和输入的tuple相关联,而在execute方法结束的时候那个输入tuple会被自动ack。

在使用BaseRichBolt需要在emit数据的时候,显示指定该数据的源tuple要加上第二个参数anchor tuple,以保持tracker链路,即collector.emit(oldTuple, newTuple);并且需要在execute执行成功后调用OutputCollector.ack(tuple), 当失败处理时,执行OutputCollector.fail(tuple);

引用自:/zengqiang1/article/details/71124004

3. 拓扑Topology

package storm.demo.chapter1.v1;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.topology.TopologyBuilder;import backtype.storm.tuple.Fields;import static storm.demo.utils.Utils.*;/*** 业务代码* 实现拓扑;* 定义好了输入 和计算单元bolt进行整合成为一个可以运行的拓扑;* */public class WordCountTopology {private static final String SENTENCE_SPOUT_ID = "sentence-spout";private static final String SPLIT_BOLT_ID = "split-bolt";private static final String COUNT_BOLT_ID = "count-bolt";private static final String REPORT_BOLT_ID = "report-bolt";private static final String TOPOLOGY_NAME = "word-count-topology";/*** 输出结果:* --- FINAL COUNTS ---* a : 1344* ate : 1344* beverages : 1344* cold : 1344* cow : 1344* dog : 2688* don't : 2687* fleas : 2687* has : 1344* have : 1344* homework : 1344* i : 4030* like : 2687* man : 1344* my : 2688* the : 1344* think : 1343* --------------** */public static void main(String[] args) throws Exception {//输入SentenceSpout spout = new SentenceSpout();//boltSplitSentenceBolt splitBolt = new SplitSentenceBolt();WordCountBolt countBolt = new WordCountBolt();ReportBolt reportBolt = new ReportBolt();//官方类,这个类用来提供流式的接口风格的API来定义拓扑组件之间的数据流TopologyBuilder builder = new TopologyBuilder();builder.setSpout(SENTENCE_SPOUT_ID, spout);/*** 对于这个builder,我们首先注册一个bolt订阅* 使用.shuffleGrouping(SENTENCE_SPOUT_ID); 来订阅语句的数据源;* shuffleGrouping这个方法用来告诉storm,要将spout发射的tuple随机均匀的分发给SPLIT_BOLT* 对于setBolt这个方法会注册bolt,并且返回BoltDeclarer 这个实例;* 这边涉及到了"数据流分组"的内容;* */// SentenceSpout --> SplitSentenceBoltbuilder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);/*** 将特定的tuple路由到特殊的bolt实例中;* 重点:* 可以使用fieldsGrouping方法来保证所有的word字段值相同的tuple 会路由到同一个wordCountBolt中* */// SplitSentenceBolt --> WordCountBoltbuilder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));/*** globalGrouping 统一路由到唯一的ReportBolt的任务重** */// WordCountBolt --> ReportBoltbuilder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);//上述数据流都已经定义好,运行单词计数计算的最后一步是编译并提交到集群上Config config = new Config();//这边使用的是Storm本地模式 LocalCluster在本地的开发环境中来模拟一个完整的storm集群LocalCluster cluster = new LocalCluster();//当一个拓扑提交的时候,storm会将默认配置和config实例中的配置合并然后作为参数传递给submitTopology//合并之后的配置将会分发给各个spout的bolt的open() prepare()方法cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());//控制执行的时长,修改这个会输出结果不一样waitForSeconds(10);cluster.killTopology(TOPOLOGY_NAME);cluster.shutdown();}}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。