2000字范文,分享全网优秀范文,学习好帮手!
2000字范文 > 【Storm】storm安装 配置 使用以及Storm单词计数程序的实例分析

【Storm】storm安装 配置 使用以及Storm单词计数程序的实例分析

时间:2019-12-23 15:12:26

相关推荐

【Storm】storm安装 配置 使用以及Storm单词计数程序的实例分析

前言:阅读笔记

storm和hadoop集群非常像。hadoop执行mr。storm执行topologies。 mr和topologies最关键的不同点是:mr执行终于会结束,而topologies永远执行直到你kill。storm集群有两种节点:master和worker。 master执行一个后台进程Nimbus,和hadoop的jobtracker相似。 Nimbus负责在集群中分发代码。为工作节点分配任务,并监控故障。

worker执行一个后台进程Supervisor。supervisor监听分配来的任务,启动和停止worker进程去处理nimbus分配来的任务。 每一个worker进程执行拓扑的一个子集;一个执行的拓扑结构由非常多分布在不同机器的worker进程构成。

全部nimbus和supervisor之间的协调工作是有zk集群来做的。 此外。nimbus和supervisor是fail-fast和stateless;全部状态保存在zk或者本地磁盘。 守护进程能够是无状态的并且失效或重新启动时不会影响整个系统的健康。执行storm

storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2

storm jar负责连接nimbus而且上传jar。 原始主要的storm提供了spouts和bolts做流转换。

spouts和bolts是执行应用逻辑要实现的接口。

spout是流的源。读进数据并以流的形式发送出去; bolt消费输入的流,处理或者以新的流发送出去。

Storm会自己主动又一次分配失败的任务,而且storm保证不会有数据丢失。即使机器宕机

下载安装

/downloads.html

1、依赖安装

yum install uuid -y yum install e2fsprogs -y yum install libuuid* yum install libtool -y yum install *c++* -y yum install git -y

2、zk集群

/simonchi/article/details/43019401

3、zeromq&jzmq

tar -xzvf zeromq-4.0.5.tar.gz ./autogen.sh ./configure && make && make install jzmq git clone git:///nathanmarz/jzmq.git ./autogen.sh ./configure && make && make install

4、python

./configure make make install rm -f /usr/bin/python ln /usr/local/bin/python3.4 /usr/bin/python python -V vi /usr/bin/yum #!/usr/bin/python 改为 #!/usr/bin/python2.4

配置执行

storm.yaml

storm.zookeeper.servers: - 192.168.11.176 - 192.168.11.177 - 192.168.11.178 storm.zookeeper.port: 2181 nimbus.host: "192.168.11.176" storm.local.dir: "/home/storm/workdir" supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703

1、Nimbus: 在Storm主控节点上执行"bin/storm nimbus >/dev/null 2>&1 &"启动Nimbus后台程序,并放到后台执行。 2、Supervisor: 在Storm各个工作节点上执行"bin/storm supervisor >/dev/null 2>&1 &"启动Supervisor后台程序,并放到后台执行; 3、UI: 在Storm主控节点上执行"bin/storm ui >/dev/null 2>&1 &"启动UI后台程序。并放到后台执行,启动后能够通过http://{nimbus host}:8080观察集群的worker资源使用情况、Topologies的执行状态等信息。

单词计数程序

Spout

package com.cmcc.chiwei.storm;import java.io.BufferedReader;import java.io.FileNotFoundException;import java.io.FileReader;import java.util.Map;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;public class WordReader extends BaseRichSpout {private static final long serialVersionUID = 1L;private SpoutOutputCollector collector;private FileReader fileReader;private boolean completed = false;public boolean isDistributed() {return false;}public void ack(Object msgId) {System.out.println("OK:"+msgId);}public void close() {}public void fail(Object msgId) {System.out.println("FAIL:"+msgId);}/*** The only thing that the methods will do It is emit each * file line*/public void nextTuple() {if(completed){try {Thread.sleep(1000);} catch (InterruptedException e) {//Do nothing}return;}String str;//Open the readerBufferedReader reader = new BufferedReader(fileReader);try{//Read all lineswhile((str = reader.readLine()) != null){/*** By each line emmit a new value with the line as a their*/this.collector.emit(new Values(str),str);}}catch(Exception e){throw new RuntimeException("Error reading tuple",e);}finally{completed = true;}}/*** We will create the file and get the collector object*/public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {try {this.fileReader = new FileReader(conf.get("wordsFile").toString());} catch (FileNotFoundException e) {throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");}this.collector = collector;}/*** Declare the output field "word"*/public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("line"));}}

Bolt1

package com.cmcc.chiwei.storm;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseBasicBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;public class WordNormalizer extends BaseBasicBolt {private static final long serialVersionUID = 1L;public void cleanup() {}/*** The bolt will receive the line from the* words file and process it to Normalize this line* * The normalize will be put the words in lower case* and split the line to get all words in this */public void execute(Tuple input, BasicOutputCollector collector) {String sentence = input.getString(0);String[] words = sentence.split(" ");for(String word : words){word = word.trim();if(!word.isEmpty()){word = word.toLowerCase();collector.emit(new Values(word));}}}/*** The bolt will only emit the field "word" */public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}}

Bolt2

package com.cmcc.chiwei.storm;import java.util.HashMap;import java.util.Map;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseBasicBolt;import backtype.storm.tuple.Tuple;public class WordCounter extends BaseBasicBolt {private static final long serialVersionUID = 1L;Integer id;String name;Map<String, Integer> counters;/*** At the end of the spout (when the cluster is shutdown* We will show the word counters*/@Overridepublic void cleanup() {System.out.println("-- Word Counter ["+name+"-"+id+"] --");for(Map.Entry<String, Integer> entry : counters.entrySet()){System.out.println(entry.getKey()+": "+entry.getValue());}}/*** On create */@Overridepublic void prepare(Map stormConf, TopologyContext context) {this.counters = new HashMap<String, Integer>();this.name = context.getThisComponentId();this.id = context.getThisTaskId();}public void declareOutputFields(OutputFieldsDeclarer declarer) {}public void execute(Tuple input, BasicOutputCollector collector) {String str = input.getString(0);if(!counters.containsKey(str)){counters.put(str, 1);}else{Integer c = counters.get(str) + 1;counters.put(str, c);}}}

Topology

package com.cmcc.chiwei.storm;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.topology.TopologyBuilder;import backtype.storm.tuple.Fields;public class TopologyMain {public static void main(String[] args) throws InterruptedException {//Topology创建拓扑,安排storm各个节点以及它们交换数据的方式TopologyBuilder builder = new TopologyBuilder();builder.setSpout("word-reader",new WordReader());builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");builder.setBolt("word-counter", new WordCounter(),1).fieldsGrouping("word-normalizer", new Fields("word"));//ConfigurationConfig conf = new Config();conf.put("wordsFile", args[0]);conf.setDebug(false);//Topology runconf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);LocalCluster cluster = new LocalCluster();cluster.submitTopology("Getting-Started-Topology", conf, builder.createTopology());Thread.sleep(2000);cluster.shutdown();}}

words.txt

helloworldstorm flume hadoop hdfswhat's wrong flume ?what's up hdfs ?Hi,storm,what are you doing ?

执行结果

OK:helloOK:worldOK:storm flume hadoop hdfsOK:what's wrong flume ?

OK:what's up hdfs ? OK:Hi,storm,what are you doing ?

-- Word Counter [word-counter-2] --what's: 2flume: 2hdfs: 2you: 1storm: 1up: 1hello: 1hadoop: 1hi,storm,what: 1are: 1doing: 1wrong: 1?

: 3 world: 1

分析内容: spout 读取原始数据,为bolt提供数据。bolt 从spout或其他bolt接收数据并处理。处理结果可作为其他bolt的数据源或终于结果。nimbus 主节点的守护进程。负责为工作节点分发任务。

topology 拓扑结构。storm的一个任务单元。define fields定义域,由spout和bolt提供。被bolt接收。 一个storm集群就是在一连串的bolt之间转换spout传过来的数据。 如: spout读到一行文本,文本行传给一个bolt。按单词分割后传给还有一个bolt,第二个bolt做计数累加。

Spout

open --> nextTuple

Bolt1

declareOutputFields --> execute

Bolt2

prepare --> execute --> cleanup

更具体的内容,将在兴许慢慢解说,我也在研究中。

。。

。。

望各位不吝不吝赐教!。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。