這2周沒有按馬哥安排的課程走,因公司需要,大家一直在試嘗大數據這塊。作業不能不做,也不知道馬哥哪周的作業會有storm,只好先將這段時間的實驗慢慢記錄下來(其它flume、kafka、spark等本周會慢慢補充),等知道具體的作業題目后,再完善。
實驗目的
了解storm的原理,并用storm單機版實驗加深理解,為后面的大數據做準備。
了解Topology、spout、bolt、Nimbus、Suppervisor,怎么用。
本篇不涉及原理及相關解釋,可以度娘。
實驗題目
RandomSpout類:讀取外部數據并封裝為tuple發送出去,模擬從goods數據中隨機取一個商品名稱封裝到tuple中發送出去;
UpperBolt類:將收到的原始商品名稱,轉換成大寫再發送出去;
SuffixBolt類:給商品名稱添加后綴,然后將數據寫入文件中;
TopoMain類:描述topology的結構,以及創建topology并提交給集群;
RandomSpout.java
// 讀取外部數據并封裝為tuple發送出去 public class RandomSpout extends BaseRichSpout{ SpoutOutputCollector collector = null; String [] goods = {"iphone","xiaomi","meizu","zhongxing","huawei","moto","sumsung","simens"}; /** * 獲取消息并發送給下一個組件的方法,會被storm不斷地調用(最重要的一個方法) * * 從goods數據中隨機取一個商品名稱封裝到tuple中發送出去 * */ @Override public void nextTuple() { // 隨機取到一個商品名稱 Random random = new Random(); String good = goods[random.nextInt(goods.length)]; //封裝到tuple中發送出去 collector.emit(new Values(good)); //休眠500毫秒 Utils.sleep(500); } //進行初始化,只在開始的時候調用一次 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } /** * 定義tuple的scheme * */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("src_word")); //第一個商品名稱 } }
UpperBolt.java
/** * 將收到的原始商品名稱,轉換成大寫再發送出去 * */ public class UpperBolt extends BaseBasicBolt{ /** * execute:每來一次消息,就會被執行一次 * */ @Override public void execute(Tuple tuple, BasicOutputCollector collector) { // 從tuple中拿到我們的數據----原始商品名 String src_word = tuple.getString(0); //轉換成大寫 String upper_word = src_word.toUpperCase(); //發送出去 collector.emit(new Values(upper_word)); } //聲明bolt組件要發送tuple的字段定義 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("upper_word")); } }
SuffixBolt.java
/** * 給商品名稱添加后綴,然后將數據寫入文件中 * @author zhouyong * */ public class SuffixBolt extends BaseBasicBolt { FileWriter fileWriter = null; //初始化方法,會被調用一次 @Override public void prepare(Map stormConf, TopologyContext context) { try{ fileWriter = new FileWriter("/home/hadoop/" + UUID.randomUUID()); }catch(Exception ex){ ex.printStackTrace(); } } @Override public void execute(Tuple tuple, BasicOutputCollector collector) { // 從消息元組tuple中拿到上一個組件發送過來的數據 String upper_word = tuple.getString(0); //給商品名稱添加后綴 String result = upper_word + "_suffix"; try{ fileWriter.append(result); fileWriter.append("\n"); fileWriter.flush(); }catch(Exception ex){ ex.printStackTrace(); } } //聲明該組件要發送出去的tuple的字段定義 @Override public void declareOutputFields(OutputFieldsDeclarer arg0) { // TODO Auto-generated method stub } }
主類TopoMain.java
/** * 描述topology的結構,以及創建topology并提交給集群 * @author zhouyong * */ public class TopoMain { public static void main(String [] args) throws Exception{ TopologyBuilder topologyBuilder = new TopologyBuilder(); //設置消息源組件為RandomSpout //唯為標識,spout實例,并發數 topologyBuilder.setSpout("randomspout", new RandomSpout(), 4); //設置邏輯處理組件UpperBolt,并指定接收randomspout的消息 topologyBuilder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout"); //設置邏輯處理組件SuffixBolt,并指定接收upperbolt的消息 topologyBuilder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt"); //創建一個topology StormTopology topo = topologyBuilder.createTopology(); //創建一個storm的配置參數對象 Config conf = new Config(); //設置storm集群為這個topo啟動的進程數 conf.setNumWorkers(4); conf.setDebug(true); conf.setNumAckers(0); //提交topo到storm集群中 StormSubmitter.submitTopology("demotopo", conf, topo); } }
將這4個java類打包成jar包,jar包名稱為demotopo.jar。
環境部署
1,安裝zookeeper;
2,安裝storm;
CentOS6.5,我們統一將zookeeper和storm安裝到/opt/hadoop/下。
安裝zookeeper
zookeeper版本:zookeeper-3.4.8.tar.gz
單機版部署zookeeper,只要解壓就可以了,可以不做配置上的修改。
啟動zookeeper
1>, 啟動zookeeper
# ./zkServer.sh start
2>, 檢查zookeeper是否成功
# ./zkServer.sh status ZooKeeper JMX enabled by default Using config: /opt/hadoop/zookeeper/zookeeper/bin/../conf/zoo.cfg Mode: standalone
安裝storm
1>,版本:apache-storm-0.9.6.tar.gz
2>,安裝配置
解壓到/opt/hadoop/storm/后,配置storm.yaml,
文件在/usr/local/storm/conf/storm.yaml,內容: storm.zookeeper.servers: - 127.0.0.1 storm.zookeeper.port: 2181 nimbus.host: "127.0.0.1" storm.local.dir: "/tmp/storm" supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703
注意: 這里要特別注意前后空格問題,否則在啟動時會不通過。
3>, 啟動主節點:nimbus
# ./storm nimbus\\前臺啟動,窗口不能關閉 ## bin/storm nimbus 1>/dev/null 2>&1 &\\后臺啟動
檢查是否啟動 # jps 10790 nimbus 11030 worker 10870 supervisor 8457 QuorumPeerMain 11366 Jps 11023 worker 11255 core 11018 worker 11019 worker
前臺窗口啟動主節點nimbus截圖:
4>,啟動一個前端UI
# bin/storm ui\\前端啟動,窗口不能關 ## bin/storm ui 1>/dev/null 2>&1 &\\后臺啟動 # jps 11255 core
http://172.31.3.148:8080/index.html
5>,啟動從節點: supervisor
# ./storm supervisor\\前端啟動 ## ./storm supervisor\\后臺啟動 # jps 10790 nimbus 11030 worker 10870 supervisor
用jps檢查下所有服務是否都正常:
[root@localhost apache-storm-0.9.6]# jps 10790 nimbus 11030 worker 10870 supervisor 8457 QuorumPeerMain 11366 Jps 11023 worker 11255 core 11018 worker 11019 worker
——-
提交Topologies
1>,上傳jar包
將demotopo.jar包上傳到storm的安裝目錄/opt/hadoop/storm/下:
2>,將jar包發送給storm去執行
命令格式:storm jar 【jar路徑】 【拓撲包名.拓撲類名】【stormIP地址】【storm端口】【拓撲名稱】【參數】
# ./storm jar ../../demotopo.jar cn.itcast.storm.TopoMain
# ./storm jar ../../demotopo.jar cn.itcast.storm.TopoMain Running: /usr/local/java/bin/java -client -Dstorm.options= -Dstorm.home=/opt/hadoop/storm/apache-storm-0.9.6 -Dstorm.log.dir=/opt/hadoop/storm/apache-storm-0.9.6/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/hadoop/storm/apache-storm-0.9.6/lib/carbonite-1.4.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/jetty-util-6.1.26.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/compojure-1.1.3.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/jgrapht-core-0.9.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/tools.logging-0.2.3.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/jetty-6.1.26.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/clj-time-0.4.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-codec-1.6.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/servlet-api-2.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/storm-core-0.9.6.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/hiccup-0.3.6.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/clj-stacktrace-0.2.2.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-lang-2.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/chill-java-0.3.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/kryo-2.21.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/logback-classic-1.0.13.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/minlog-1.2.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/objenesis-1.2.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/clout-1.0.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/tools.cli-0.2.4.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/ring-devel-0.3.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/ring-jetty-adapter-0.3.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/ring-servlet-0.3.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/snakeyaml-1.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/log4j-over-slf4j-1.6.6.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/core.incubator-0.1.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/ring-core-1.1.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/asm-4.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/math.numeric-tower-0.0.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/disruptor-2.10.4.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/joda-time-2.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-exec-1.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/jline-2.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/slf4j-api-1.7.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/tools.macro-0.1.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/clojure-1.5.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-fileupload-1.2.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/reflectasm-1.07-shaded.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/logback-core-1.0.13.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-io-2.4.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-logging-1.1.3.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/json-simple-1.1.jar:../../demotopo.jar:/opt/hadoop/storm/apache-storm-0.9.6/conf:/opt/hadoop/storm/apache-storm-0.9.6/bin -Dstorm.jar=../../demotopo.jar cn.itcast.storm.TopoMain 654 [main] INFO backtype.storm.StormSubmitter - Jar not uploaded to master yet. Submitting jar... 708 [main] INFO backtype.storm.StormSubmitter - Uploading topology jar ../../demotopo.jar to assigned location: /opt/hadoop/storm/apache-storm-0.9.6/nimbus/inbox/stormjar-c392c9d8-ebaf-47e8-be98-6d98526e82a8.jar 752 [main] INFO backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: /opt/hadoop/storm/apache-storm-0.9.6/nimbus/inbox/stormjar-c392c9d8-ebaf-47e8-be98-6d98526e82a8.jar 752 [main] INFO backtype.storm.StormSubmitter - Submitting topology demotopo in distributed mode with conf {"topology.workers":4,"topology.acker.executors":0,"topology.debug":true} 973 [main] INFO backtype.storm.StormSubmitter - Finished submitting topology: demotopo
3>,檢查
檢查Topology(名為demotopo)是否運行
檢查/home/hadoop/下是否生成了4個文件
這里的4個worker是在配置文件中配置的。
檢查文件大小是否在不停變化
檢查文件中單詞是否隨機,每個單詞后是否以suffix結尾
注:附件是實驗jar包,請將.rar改為.jar。
原創文章,作者:365,如若轉載,請注明出處:http://www.www58058.com/46231