2000字范文,分享全网优秀范文,学习好帮手!
2000字范文 > Storm实验 -- 单词计数4

Storm实验 -- 单词计数4

时间:2023-06-04 03:17:06

相关推荐

Storm实验 -- 单词计数4

独角兽企业重金招聘Python工程师标准>>>

在上一次单词计数的基础上做如下改动: 使用自定义分组策略,将首字母相同的单词发送给同一个task计数

自定义CustomStreamGrouping

package com.zhch.v4;import backtype.storm.generated.GlobalStreamId;import backtype.storm.grouping.CustomStreamGrouping;import backtype.storm.task.WorkerTopologyContext;import java.io.Serializable;import java.util.ArrayList;import java.util.List;public class ModuleGrouping implements CustomStreamGrouping, Serializable {private List<Integer> tasks;@Overridepublic void prepare(WorkerTopologyContext workerContext, GlobalStreamId streamId, List<Integer> targetTasks) {this.tasks = targetTasks;}@Overridepublic List<Integer> chooseTasks(int taskId, List<Object> values) {List<Integer> taskIds = new ArrayList<Integer>();if (values.size() > 0) {String str = values.get(0).toString();if (str.isEmpty()) {taskIds.add(0);} else {Integer index = str.charAt(0) % tasks.size();taskIds.add(tasks.get(index));}}return taskIds;}}

数据源spout

package com.zhch.v4;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import java.io.BufferedReader;import java.io.FileReader;import java.util.Map;import java.util.UUID;import java.util.concurrent.ConcurrentHashMap;public class SentenceSpout extends BaseRichSpout {private FileReader fileReader = null;private boolean completed = false;private ConcurrentHashMap<UUID, Values> pending;private SpoutOutputCollector collector;@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("sentence"));}@Overridepublic void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {this.collector = spoutOutputCollector;this.pending = new ConcurrentHashMap<UUID, Values>();try {this.fileReader = new FileReader(map.get("wordsFile").toString());} catch (Exception e) {throw new RuntimeException("Error reading file [" + map.get("wordsFile") + "]");}}@Overridepublic void nextTuple() {if (completed) {try {Thread.sleep(1000);} catch (InterruptedException e) {}}String line;BufferedReader reader = new BufferedReader(fileReader);try {while ((line = reader.readLine()) != null) {Values values = new Values(line);UUID msgId = UUID.randomUUID();this.pending.put(msgId, values);this.collector.emit(values, msgId);}} catch (Exception e) {throw new RuntimeException("Error reading tuple", e);} finally {completed = true;}}@Overridepublic void ack(Object msgId) {this.pending.remove(msgId);}@Overridepublic void fail(Object msgId) {this.collector.emit(this.pending.get(msgId), msgId);}}

实现语句分割bolt

package com.zhch.v4;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.Map;public class SplitSentenceBolt extends BaseRichBolt {private OutputCollector collector;@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.collector = outputCollector;}@Overridepublic void execute(Tuple tuple) {String sentence = tuple.getStringByField("sentence");String[] words = sentence.split(" ");for (String word : words) {collector.emit(tuple, new Values(word));}this.collector.ack(tuple);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("word"));}}

实现单词计数bolt

package com.zhch.v4;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import java.io.BufferedWriter;import java.io.FileWriter;import java.util.ArrayList;import java.util.Collections;import java.util.HashMap;import java.util.List;import java.util.Map;public class WordCountBolt extends BaseRichBolt {private OutputCollector collector;private HashMap<String, Long> counts = null;@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.collector = outputCollector;this.counts = new HashMap<String, Long>();}@Overridepublic void execute(Tuple tuple) {String word = tuple.getStringByField("word");Long count = this.counts.get(word);if (count == null) {count = 0L;}count++;this.counts.put(word, count);BufferedWriter writer = null;try {writer = new BufferedWriter(new FileWriter("/home/grid/stormData/result.txt"));List<String> keys = new ArrayList<String>();keys.addAll(this.counts.keySet());Collections.sort(keys);for (String key : keys) {Long c = this.counts.get(key);writer.write(key + " : " + c);writer.newLine();writer.flush();}} catch (Exception e) {e.printStackTrace();} finally {if (writer != null) {try {writer.close();} catch (Exception e) {e.printStackTrace();}writer = null;}}this.collector.ack(tuple);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("word", "count"));}}

实现单词计数topology

package com.zhch.v4;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.topology.TopologyBuilder;public class WordCountTopology {public static final String SENTENCE_SPOUT_ID = "sentence-spout";public static final String SPLIT_BOLT_ID = "split-bolt";public static final String COUNT_BOLT_ID = "count-bolt";public static final String TOPOLOGY_NAME = "word-count-topology-v4";public static void main(String[] args) throws Exception {SentenceSpout spout = new SentenceSpout();SplitSentenceBolt spiltBolt = new SplitSentenceBolt();WordCountBolt countBolt = new WordCountBolt();TopologyBuilder builder = new TopologyBuilder();builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);builder.setBolt(SPLIT_BOLT_ID, spiltBolt, 2).setNumTasks(4).shuffleGrouping(SENTENCE_SPOUT_ID);builder.setBolt(COUNT_BOLT_ID, countBolt, 2).customGrouping(SPLIT_BOLT_ID, new ModuleGrouping()); //使用 自定义 分组策略Config config = new Config();config.put("wordsFile", args[0]);if (args != null && args.length > 1) {config.setNumWorkers(2);//集群模式启动StormSubmitter.submitTopology(args[1], config, builder.createTopology());} else {LocalCluster cluster = new LocalCluster();cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());try {Thread.sleep(5 * 1000);} catch (InterruptedException e) {}cluster.killTopology(TOPOLOGY_NAME);cluster.shutdown();}}}

提交到Storm集群

storm jar Storm02-1.0-SNAPSHOT.jar com.zhch.v4.WordCountTopology /home/grid/stormData/input.txt word-count-topology-v4

运行结果:

[grid@hadoop5 stormData]$ cat result.txt Apache : 1ETL : 1It : 1Storm : 4a : 4analytics : 1and : 5any : 1at : 1can : 1cases: : 1clocked : 1computation : 2continuous : 1easy : 2guarantees : 1is : 6it : 2machine : 1makes : 1many : 1million : 1more : 1of : 2online : 1open : 1operate : 1over : 1scalable : 1second : 1set : 1simple : 1source : 1streams : 1system : 1unbounded : 1up : 1use : 2used : 1what : 1will : 1with : 1your : 1[grid@hadoop6 stormData]$ cat result.txt Hadoop : 1RPC : 1batch : 1be : 2benchmark : 1data : 2did : 1distributed : 2doing : 1fast: : 1fault-tolerant : 1for : 2free : 1fun : 1has : 1language : 1learning : 1lot : 1node : 1per : 2process : 1processed : 2processing : 2programming : 1realtime : 3reliably : 1to : 3torm : 1tuples : 1

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。