2000字范文,分享全网优秀范文,学习好帮手!
2000字范文 > apache-storm例子:统计句子中的单词数量

apache-storm例子:统计句子中的单词数量

时间:2020-06-08 01:04:20

相关推荐

apache-storm例子:统计句子中的单词数量

模型图

代码

pom.xml

<?xml version="1.0" encoding="UTF-8"?><project xmlns="/POM/4.0.0"xmlns:xsi="/2001/XMLSchema-instance"xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.zhyoulun</groupId><artifactId>storm_study</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>0.9.7</version></dependency></dependencies></project>

MainTopology.java

import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.topology.TopologyBuilder;public class MainTopology {public void runLocal(int waitSeconds) {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("sentenceSpout", new SentenceSpout(), 1);builder.setBolt("splitBolt", new SplitBolt(), 1).shuffleGrouping("sentenceSpout");builder.setBolt("countBolt", new CountBolt(), 1).shuffleGrouping("splitBolt");Config config = new Config();LocalCluster cluster = new LocalCluster();cluster.submitTopology("word_count", config, builder.createTopology());try {Thread.sleep(waitSeconds * 1000);} catch (InterruptedException e) {e.printStackTrace();}cluster.killTopology("word_count");cluster.shutdown();}public static void main(String[] args) {MainTopology topology = new MainTopology();topology.runLocal(60);}}

SentenceSpout.java

import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import java.util.Map;public class SentenceSpout extends BaseRichSpout {private SpoutOutputCollector collector;private String[] sentences = {"hello world", "study storm"};private int index = 0;public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {this.collector = spoutOutputCollector;}public void nextTuple() {//将一句话拆分成单词,发送每一个词this.collector.emit(new Values(this.sentences[index]));index++;if (index >= sentences.length) {index = 0;}//等待500mstry {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}}public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("sentence"));}}

SplitBolt.java

import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.Map;public class SplitBolt extends BaseRichBolt {private OutputCollector collector;public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.collector = outputCollector;}public void execute(Tuple tuple) {String sentence = tuple.getStringByField("sentence");String[] words = sentence.split(" ");//将一句话拆分成单词,发送每一个词for (String word : words) {this.collector.emit(new Values(word));}}public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("word"));}}

CountBolt.java

import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Tuple;import java.util.HashMap;import java.util.Map;import java.util.Set;public class CountBolt extends BaseRichBolt {private HashMap<String, Integer> wordMap = new HashMap<String, Integer>();public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {}public void execute(Tuple tuple) {//从tuple中读取单词String word = tuple.getStringByField("word");//计数int num;if (wordMap.containsKey(word)) {num = wordMap.get(word);} else {num = 0;}wordMap.put(word, 1 + num);//输出展示Set<String> keys = wordMap.keySet();for (String key : keys) {System.out.print(key + ":" + wordMap.get(key) + ",");}System.out.println();}public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {}}

运行结果

...5978 [Thread-9-countBolt] INFO backtype.storm.daemon.executor - Prepared bolt countBolt:(2)hello:1,world:1,hello:1,study:1,world:1,hello:1,study:1,world:1,storm:1,hello:1,study:1,world:1,storm:1,hello:2,study:1,world:2,storm:1,hello:2,study:2,world:2,storm:1,hello:2,study:2,world:2,storm:2,hello:2,study:2,world:2,storm:2,hello:3,...study:57,world:58,storm:57,hello:58,study:58,world:58,storm:57,hello:58,study:58,world:58,storm:58,hello:58,study:58,world:58,storm:58,hello:59,study:58,world:59,storm:58,hello:59,64444 [main] INFO backtype.storm.daemon.nimbus - Delaying event :remove for 30 secs for word_count-1-1511510371study:59,world:59,storm:58,hello:59,study:59,world:59,storm:59,hello:59,64490 [main] INFO backtype.storm.daemon.nimbus - Updated word_count-1-1511510371 with status {:type :killed, :kill-time-secs 30}...

提交到storm

修改MainTopology.java

import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.topology.TopologyBuilder;public class MainTopology {private TopologyBuilder builder;private Config config;public MainTopology() {this.builder = new TopologyBuilder();this.builder.setSpout("sentenceSpout", new SentenceSpout(), 1);this.builder.setBolt("splitBolt", new SplitBolt(), 1).shuffleGrouping("sentenceSpout");this.builder.setBolt("countBolt", new CountBolt(), 1).shuffleGrouping("splitBolt");this.config = new Config();}public void runCluster() {try {StormSubmitter.submitTopology("word_count",this.config,this.builder.createTopology());} catch (Exception e) {e.printStackTrace();}}public void runLocal(int waitSeconds) {LocalCluster cluster = new LocalCluster();cluster.submitTopology("word_count", this.config, this.builder.createTopology());try {Thread.sleep(waitSeconds * 1000);} catch (InterruptedException e) {e.printStackTrace();}cluster.killTopology("word_count");cluster.shutdown();}public static void main(String[] args) {MainTopology topology = new MainTopology();// topology.runLocal(60);topology.runCluster();}}

修改pom.xml

<?xml version="1.0" encoding="UTF-8"?><project xmlns="/POM/4.0.0"xmlns:xsi="/2001/XMLSchema-instance"xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.zhyoulun</groupId><artifactId>storm_study</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>0.9.7</version><!-- 不需要将这个依赖打入jar包中 --><scope>provided</scope></dependency></dependencies><build><plugins><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><archive><manifest><mainClass>MainTopology</mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>

打包

mvn package assembly:single

target文件夹中会生成文件storm_study-1.0-SNAPSHOT-jar-with-dependencies.jar

上传到storm

storm jar storm_study-1.0-SNAPSHOT-jar-with-dependencies.jar MainTopology

查看运行状态

查看日志

tail -f apache-storm/logs/worker-6700.png-11-24T08:27:37.260+0000 STDIO [INFO] storm:113,-11-24T08:27:37.260+0000 STDIO [INFO] hello:113,-11-24T08:27:37.760+0000 STDIO [INFO] study:113,-11-24T08:27:37.760+0000 STDIO [INFO] world:113,-11-24T08:27:37.760+0000 STDIO [INFO] storm:113,-11-24T08:27:37.760+0000 STDIO [INFO] hello:114,-11-24T08:27:37.760+0000 STDIO [INFO] study:113,-11-24T08:27:37.760+0000 STDIO [INFO] world:114,-11-24T08:27:37.760+0000 STDIO [INFO] storm:113,-11-24T08:27:37.760+0000 STDIO [INFO] hello:114,...

参考

Storm分布式实时计算模式Storm实时数据处理

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。