N21_第x周_Storm_01_單機實踐篇

   這2周沒有按馬哥安排的課程走,因公司需要,大家一直在試嘗大數據這塊。作業不能不做,也不知道馬哥哪周的作業會有storm,只好先將這段時間的實驗慢慢記錄下來(其它flume、kafka、spark等本周會慢慢補充),等知道具體的作業題目后,再完善。

實驗目的

  了解storm的原理,并用storm單機版實驗加深理解,為后面的大數據做準備。

  了解Topology、spout、bolt、Nimbus、Suppervisor,怎么用。

  本篇不涉及原理及相關解釋,可以度娘。   

實驗題目

  1473770496158165.jpg

  1473770824893767.jpg

    RandomSpout類:讀取外部數據并封裝為tuple發送出去,模擬從goods數據中隨機取一個商品名稱封裝到tuple中發送出去;

    UpperBolt類:將收到的原始商品名稱,轉換成大寫再發送出去;

    SuffixBolt類:給商品名稱添加后綴,然后將數據寫入文件中;

    TopoMain類:描述topology的結構,以及創建topology并提交給集群;

RandomSpout.java

// 讀取外部數據并封裝為tuple發送出去
public class RandomSpout extends BaseRichSpout{
SpoutOutputCollector collector = null;
String [] goods = {"iphone","xiaomi","meizu","zhongxing","huawei","moto","sumsung","simens"};
/**
 *  獲取消息并發送給下一個組件的方法,會被storm不斷地調用(最重要的一個方法)
 * 
 *  從goods數據中隨機取一個商品名稱封裝到tuple中發送出去
 * */
@Override
public void nextTuple() {
// 隨機取到一個商品名稱
Random random = new Random();
String good = goods[random.nextInt(goods.length)];
//封裝到tuple中發送出去
collector.emit(new Values(good));
//休眠500毫秒
Utils.sleep(500);
}
//進行初始化,只在開始的時候調用一次
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
/**
 *  定義tuple的scheme
 * */
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("src_word"));   //第一個商品名稱
}
}

UpperBolt.java

/**
 *  將收到的原始商品名稱,轉換成大寫再發送出去
 * */
public class UpperBolt extends BaseBasicBolt{
/**
 *  execute:每來一次消息,就會被執行一次
 * */
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
// 從tuple中拿到我們的數據----原始商品名
String src_word = tuple.getString(0);
//轉換成大寫
String upper_word = src_word.toUpperCase();
//發送出去
collector.emit(new Values(upper_word));
}
//聲明bolt組件要發送tuple的字段定義
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("upper_word"));
}
}

SuffixBolt.java

/**
 *  給商品名稱添加后綴,然后將數據寫入文件中
 *  @author zhouyong
 * */
public class SuffixBolt extends BaseBasicBolt {
FileWriter fileWriter = null;
//初始化方法,會被調用一次
@Override
public void prepare(Map stormConf, TopologyContext context) {
try{
fileWriter = new FileWriter("/home/hadoop/" + UUID.randomUUID());
}catch(Exception ex){
ex.printStackTrace();
}
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
// 從消息元組tuple中拿到上一個組件發送過來的數據
String upper_word = tuple.getString(0);
//給商品名稱添加后綴
String result = upper_word + "_suffix";
try{
fileWriter.append(result);
fileWriter.append("\n");
fileWriter.flush();
}catch(Exception ex){
ex.printStackTrace();
}
}
//聲明該組件要發送出去的tuple的字段定義
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
// TODO Auto-generated method stub
}
}

主類TopoMain.java

/**
 *  描述topology的結構,以及創建topology并提交給集群
 *  @author zhouyong
 * */
public class TopoMain {
public static void main(String [] args) throws Exception{
 TopologyBuilder topologyBuilder = new TopologyBuilder();
 
 //設置消息源組件為RandomSpout
 //唯為標識,spout實例,并發數
 topologyBuilder.setSpout("randomspout", new RandomSpout(), 4);
 
 //設置邏輯處理組件UpperBolt,并指定接收randomspout的消息
 topologyBuilder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout");
 
 //設置邏輯處理組件SuffixBolt,并指定接收upperbolt的消息
 topologyBuilder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");
 
 //創建一個topology
 StormTopology topo = topologyBuilder.createTopology();
 
 //創建一個storm的配置參數對象
 Config conf = new Config();
 //設置storm集群為這個topo啟動的進程數
 conf.setNumWorkers(4);
 conf.setDebug(true);
 conf.setNumAckers(0);
 
 //提交topo到storm集群中
 StormSubmitter.submitTopology("demotopo", conf, topo);
 
}
}

將這4個java類打包成jar包,jar包名稱為demotopo.jar。

1473771658505019.jpg

環境部署

1,安裝zookeeper;

2,安裝storm;

CentOS6.5,我們統一將zookeeper和storm安裝到/opt/hadoop/下。

安裝zookeeper

zookeeper版本:zookeeper-3.4.8.tar.gz

單機版部署zookeeper,只要解壓就可以了,可以不做配置上的修改。

啟動zookeeper

1>, 啟動zookeeper

# ./zkServer.sh start

2>, 檢查zookeeper是否成功

# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config:
/opt/hadoop/zookeeper/zookeeper/bin/../conf/zoo.cfg
Mode: standalone

 1473772366993930.jpg

安裝storm

1>,版本:apache-storm-0.9.6.tar.gz

2>,安裝配置

   解壓到/opt/hadoop/storm/后,配置storm.yaml,

文件在/usr/local/storm/conf/storm.yaml,內容:
 storm.zookeeper.servers:
     - 127.0.0.1
 storm.zookeeper.port: 2181
 nimbus.host: "127.0.0.1"
 storm.local.dir: "/tmp/storm"
 supervisor.slots.ports:
  - 6700
  - 6701
  - 6702
  - 6703

注意: 這里要特別注意前后空格問題,否則在啟動時會不通過。


3>, 啟動主節點:nimbus

# ./storm nimbus\\前臺啟動,窗口不能關閉
## bin/storm nimbus 1>/dev/null 2>&1 &\\后臺啟動
檢查是否啟動
# jps
10790 nimbus
11030 worker
10870 supervisor
8457 QuorumPeerMain
11366 Jps
11023 worker
11255 core
11018 worker
11019 worker

前臺窗口啟動主節點nimbus截圖:

1473772984288521.jpg


4>,啟動一個前端UI

# bin/storm ui\\前端啟動,窗口不能關
## bin/storm ui 1>/dev/null 2>&1 &\\后臺啟動
# jps
11255 core

http://172.31.3.148:8080/index.html  

       1473773190267950.jpg


5>,啟動從節點: supervisor

# ./storm supervisor\\前端啟動
## ./storm supervisor\\后臺啟動
# jps
10790 nimbus
11030 worker
10870 supervisor

1473773344524546.jpg

用jps檢查下所有服務是否都正常:

[root@localhost apache-storm-0.9.6]# jps
10790 nimbus
11030 worker
10870 supervisor
8457 QuorumPeerMain
11366 Jps
11023 worker
11255 core
11018 worker
11019 worker

——-

提交Topologies

1>,上傳jar包

將demotopo.jar包上傳到storm的安裝目錄/opt/hadoop/storm/下:

1473773781461054.jpg

2>,將jar包發送給storm去執行

命令格式:storm jar 【jar路徑】 【拓撲包名.拓撲類名】【stormIP地址】【storm端口】【拓撲名稱】【參數】

# ./storm jar ../../demotopo.jar cn.itcast.storm.TopoMain

# ./storm jar ../../demotopo.jar cn.itcast.storm.TopoMain
Running: /usr/local/java/bin/java -client -Dstorm.options= -Dstorm.home=/opt/hadoop/storm/apache-storm-0.9.6 -Dstorm.log.dir=/opt/hadoop/storm/apache-storm-0.9.6/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/hadoop/storm/apache-storm-0.9.6/lib/carbonite-1.4.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/jetty-util-6.1.26.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/compojure-1.1.3.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/jgrapht-core-0.9.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/tools.logging-0.2.3.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/jetty-6.1.26.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/clj-time-0.4.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-codec-1.6.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/servlet-api-2.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/storm-core-0.9.6.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/hiccup-0.3.6.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/clj-stacktrace-0.2.2.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-lang-2.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/chill-java-0.3.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/kryo-2.21.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/logback-classic-1.0.13.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/minlog-1.2.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/objenesis-1.2.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/clout-1.0.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/tools.cli-0.2.4.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/ring-devel-0.3.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/ring-jetty-adapter-0.3.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/ring-servlet-0.3.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/snakeyaml-1.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/log4j-over-slf4j-1.6.6.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/core.incubator-0.1.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/ring-core-1.1.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/asm-4.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/math.numeric-tower-0.0.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/disruptor-2.10.4.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/joda-time-2.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-exec-1.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/jline-2.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/slf4j-api-1.7.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/tools.macro-0.1.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/clojure-1.5.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-fileupload-1.2.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/reflectasm-1.07-shaded.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/logback-core-1.0.13.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-io-2.4.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-logging-1.1.3.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/json-simple-1.1.jar:../../demotopo.jar:/opt/hadoop/storm/apache-storm-0.9.6/conf:/opt/hadoop/storm/apache-storm-0.9.6/bin -Dstorm.jar=../../demotopo.jar cn.itcast.storm.TopoMain
654  [main] INFO  backtype.storm.StormSubmitter - Jar not uploaded to master yet. Submitting jar...
708  [main] INFO  backtype.storm.StormSubmitter - Uploading topology jar ../../demotopo.jar to assigned location: /opt/hadoop/storm/apache-storm-0.9.6/nimbus/inbox/stormjar-c392c9d8-ebaf-47e8-be98-6d98526e82a8.jar
752  [main] INFO  backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: /opt/hadoop/storm/apache-storm-0.9.6/nimbus/inbox/stormjar-c392c9d8-ebaf-47e8-be98-6d98526e82a8.jar
752  [main] INFO  backtype.storm.StormSubmitter - Submitting topology demotopo in distributed mode with conf {"topology.workers":4,"topology.acker.executors":0,"topology.debug":true}
973  [main] INFO  backtype.storm.StormSubmitter - Finished submitting topology: demotopo

3>,檢查

檢查Topology(名為demotopo)是否運行

      1473774107376858.jpg

檢查/home/hadoop/下是否生成了4個文件

  1473774207108853.jpg

  這里的4個worker是在配置文件中配置的。

檢查文件大小是否在不停變化

   1473774307357070.jpg

檢查文件中單詞是否隨機,每個單詞后是否以suffix結尾

  1473774437982428.jpg

注:附件是實驗jar包,請將.rar改為.jar。

N21_第x周_Storm_01_單機實踐篇demotopo.rar

原創文章,作者:365,如若轉載,請注明出處:http://www.www58058.com/46231

(0)
365365
上一篇 2016-09-15 22:21
下一篇 2016-09-15 22:21

相關推薦

  • bash腳本編程之select語句、函數

    概述     承接上篇,繼續介紹一下另一個循環語句select,還有腳本中函數的相關內容,分為三個部分:         1、select語句的介紹和效果演示       &nbs…

    Linux干貨 2016-08-21
  • Linux基礎之部分命令使用實例一

    1、 Linux上的文件管理類命令都有哪些,其常用的使用方法及其相關示例演示。 常用有:cp、mv、rm  cp [OPTION] SOURCE DEST //復制    option:      -i:交換式,用來提醒用戶是否覆蓋,當將源文件復制并粘貼到目的路徑時,如果目的路徑…

    Linux干貨 2016-11-08
  • HAproxy對wordpress的代理

    一,實驗環境 網絡拓撲結構 Web2,master是centos7.3 Web1,backup是centos6.8 客服端的window主機首win7 所有的主機通過switch相連,處于172.18.0.0/16的網段中 二,安裝的軟件 使用wordpress版本4.3.1 Web1安裝httpd,php,php-mysql Web2安裝nfs-utils…

    Linux干貨 2017-05-17
  • 文本處理工具Sed及VIM的使用

    1.文本處理工具Sed   Stream Editor 行編輯器                            圖1-1 sed幫助手冊  用法:      se…

    Linux干貨 2016-08-10
  • HAProxy實現請求的80端口轉發至后端的8000端口并實現動靜分離

    一 實驗目的 用HAProxy作為負載均衡器,實現把前端請求調度到后端,前端監聽80端口,轉發至后端的8000端口,并會對訪問資源進行判斷實現不同的訪問內容轉發至相對應的服務器。 二 實驗拓撲 三 實驗環境 IP 功能 192.168.20.108 HAProxy 192.168.237.129 Nginx 192.168.237.130 Nginx+PHP…

    Linux干貨 2016-12-19
  • 馬哥教育網絡班21期+第14周課程練習

    iptables關鍵點 表:filter(過濾,防火墻);nat(網絡地址轉換);mangle(拆解報文,做出修改,封裝報文);raw(關閉nat表啟用的鏈接追蹤機制);上述是根據功能來區分的,寫規則要明白要實現的功能 鏈:PREROUTING,INPUT,FORWARD,OUTPUT,POSTROUTING 數據流向:流入PREROUTING–…

    Linux干貨 2016-08-26
欧美性久久久久