使用Storm實現實時大數據分析


摘要:隨著數據體積的越來越大,實時處理成為了許多機構需要面對的首要挑戰。Shruthi Kumar和Siddharth Patankar在Dr.Dobb’s上結合了汽車超速監視,為我們演示了使用Storm進行實時大數據分析。CSDN在此編譯、整理。


簡單和明了,Storm讓大數據分析變得輕松加愉快。

當今世界,公司的日常運營經常會生成TB級別的數據。數據來源囊括了互聯網裝置可以捕獲的任何類型數據,網站、社交媒體、交易型商業數據以及其它商業環境中創建的數據。考慮到數據的生成量,實時處理成為了許多機構需要面對的首要挑戰。我們經常用的一個非常有效的開源實時計算工具就是Storm —— Twitter開發,通常被比作“實時的Hadoop”。然而Storm遠比Hadoop來的簡單,因為用它處理大數據不會帶來新老技術的交替。

Shruthi Kumar、Siddharth Patankar共同效力于Infosys,分別從事技術分析和研發工作。本文詳述了Storm的使用方法,例子中的項目名稱為“超速報警系統(Speeding Alert System)”。我們想實現的功能是:實時分析過往車輛的數據,一旦車輛數據超過預設的臨界值 —— 便觸發一個trigger并把相關的數據存入數據庫。

1.  Storm是什么

     全量數據處理使用的大多是鼎鼎大名的hadoop或者hive,作為一個批處理系統,hadoop以其吞吐量大、自動容錯等優點,在海量數據處理上得到了廣泛的使用。

      Hadoop下的Map/Reduce框架對于數據的處理流程是

      1、 將要處理的數據上傳到Hadoop的文件系統HDFS中。

      2、 Map階段

             a)   Master對Map的預處理:對于大量的數據進行切分,劃分為M個16~64M的數據分片(可通過參數自定義分片大小)

             b)   調用Mapper函數:Master為Worker分配Map任務,每個分片都對應一個Worker進行處理。各個Worker讀取并調用用戶定義的Mapper函數    處理數據,并將結果存入HDFS,返回存儲位置給Master。

一個Worker在Map階段完成時,在HDFS中,生成一個排好序的Key-values組成的文件。并將位置信息匯報給Master。

      3、 Reduce階段

             a)   Master對Reduce的預處理:Master為Worker分配Reduce任務,他會將所有Mapper產生的數據進行映射,將相同key的任務分配給某個Worker。

             b)   調用Reduce函數:各個Worker將分配到的數據集進行排序(使用工具類Merg),并調用用戶自定義的Reduce函數,并將結果寫入HDFS。

每個Worker的Reduce任務完成后,都會在HDFS中生成一個輸出文件。Hadoop并不將這些文件合并,因為這些文件往往會作為另一個Map/reduce程序的輸入。

         以上的流程,粗略概括,就是從HDFS中獲取數據,將其按照大小分片,進行分布式處理,最終輸出結果。從流程來看,Hadoop框架進行數據處理有以下要求:

1、 數據已經存在在HDFS當中。

2、 數據間是少關聯的。各個任務執行器在執行負責的數據時,無需考慮對其他數據的影響,數據之間應盡可能是無聯系、不會影響的。

使用Hadoop,適合大批量的數據處理,這是他所擅長的。由于基于Map/Reduce這種單級的數據處理模型進行,因此,如果數據間的關聯系較大,需要進行數據的多級交互處理(某個階段的處理數據依賴于上一個階段),需要進行多次map/reduce。又由于map/reduce每次執行都需要遍歷整個數據集,對于數據的實時計算并不合適,于是有了storm。

      對比Hadoop的批處理,Storm是個實時的、分布式以及具備高容錯的計算系統。同Hadoop一樣Storm也可以處理大批量的數據,然而Storm在保證高可靠性的前提下還可以讓處理進行的更加實時;也就是說,所有的信息都會被處理。Storm同樣還具備容錯和分布計算這些特性,這就讓Storm可以擴展到不同的機器上進行大批量的數據處理。他同樣還有以下的這些特性:

  • 易于擴展:對于擴展,伴隨著業務的發展,我們的數據量、計算量可能會越來越大,所以希望這個系統是可擴展的。你只需要添加機器和改變對應的topology(拓撲)設置。Storm使用Hadoop Zookeeper進行集群協調,這樣可以充分的保證大型集群的良好運行。

  • 每條信息的處理都可以得到保證。

  • Storm集群管理簡易。

  • Storm的容錯機能:一旦topology遞交,Storm會一直運行它直到topology被廢除或者被關閉。而在執行中出現錯誤時,也會由Storm重新分配任務。這是分布式系統中通用問題。一個節點掛了不能影響我的應用。

  • 低延遲。都說了是實時計算系統了,延遲是一定要低的。

  • 盡管通常使用Java,Storm中的topology可以用任何語言設計。

       在線實時流處理模型

       對于處理大批量數據的Map/reduce程序,在任務完成之后就停止了,但Storm是用于實時計算的,所以,相應的處理程序會一直執行(等待任務,有任務則執行)直至手動停止。

       對于Storm,他是實時處理模型,與hadoop的不同是,他是針對在線業務而存在的計算平臺,如統計某用戶的交易量、生成為某個用戶的推薦列表等實時性高的需求。他是一個“流處理”框架。何謂流處理?storm將數據以Stream的方式,并按照Topology的順序,依次處理并最終生成結果。

當然為了更好的理解文章,你首先需要安裝和設置Storm。需要通過以下幾個簡單的步驟:

  • 從Storm官方下載Storm安裝文件

  • 將bin/directory解壓到你的PATH上,并保證bin/storm腳本是可執行的。

      盡管 Storm 是使用 Clojure 語言開發的,您仍然可以在 Storm 中使用幾乎任何語言編寫應用程序。所需的只是一個連接到 Storm 的架構的適配器。已存在針對 Scala、JRuby、Perl 和 PHP 的適配器,但是還有支持流式傳輸到 Storm 拓撲結構中的結構化查詢語言適配器。

2.  Storm的組件

       Storm集群和Hadoop集群表面上看很類似。但是Hadoop上運行的是MapReduce jobs,而在Storm上運行的是拓撲(topology),這兩者之間是非常不一樣的。一個關鍵的區別是: 一個MapReduce job最終會結束, 而一個topology永遠會運行(除非你手動kill掉)。

       Storm集群主要由一個主節點(Nimbus后臺程序)和一群工作節點(worker node)Supervisor的節點組成,通過 Zookeeper進行協調。Nimbus類似Hadoop里面的JobTracker。Nimbus負責在集群里面分發代碼,分配計算任務給機器, 并且監控狀態。

      每一個工作節點上面運行一個叫做Supervisor的節點。Supervisor會監聽分配給它那臺機器的工作,根據需要啟動/關閉工作進程。每一個工作進程執行一個topology的一個子集;一個運行的topology由運行在很多機器上的很多工作進程組成。

1.jpg

1、 Nimbus主節點:

     主節點通常運行一個后臺程序 —— Nimbus,用于響應分布在集群中的節點,分配任務和監測故障。這個很類似于Hadoop中的Job Tracker。

2、Supervisor工作節點:

      工作節點同樣會運行一個后臺程序 —— Supervisor,用于收聽工作指派并基于要求運行工作進程。每個工作節點都是topology中一個子集的實現。而Nimbus和Supervisor之間的協調則通過Zookeeper系統或者集群。

3、Zookeeper

     Zookeeper是完成Supervisor和Nimbus之間協調的服務。而應用程序實現實時的邏輯則被封裝進Storm中的“topology”。topology則是一組由Spouts(數據源)和Bolts(數據操作)通過Stream Groupings進行連接的圖。下面對出現的術語進行更深刻的解析。

4、Worker:

       運行具體處理組件邏輯的進程。

5、Task:

       worker中每一個spout/bolt的線程稱為一個task. 在storm0.8之后,task不再與物理線程對應,同一個spout/bolt的task可能會共享一個物理線程,該線程稱為executor。

6、Topology(拓撲):

       storm中運行的一個實時應用程序,因為各個組件間的消息流動形成邏輯上的一個拓撲結構。一個topology是spouts和bolts組成的圖, 通過stream groupings將圖中的spouts和bolts連接起來,如下圖:

      2.jpg

     一個topology會一直運行直到你手動kill掉,Storm自動重新分配執行失敗的任務, 并且Storm可以保證你不會有數據丟失(如果開啟了高可靠性的話)。如果一些機器意外停機它上面的所有任務會被轉移到其他機器上。

運行一個topology很簡單。首先,把你所有的代碼以及所依賴的jar打進一個jar包。然后運行類似下面的這個命令:

      storm jar allmycode.jar backtype.storm.MyTopology arg1 arg2

這個命令會運行主類: backtype.strom.MyTopology, 參數是arg1arg2。這個類的main函數定義這個topology并且把它提交給Nimbus。storm jar負責連接到Nimbus并且上傳jar包。

Topology的定義是一個Thrift結構,并且Nimbus就是一個Thrift服務, 你可以提交由任何語言創建的topology。上面的方面是用JVM-based語言提交的最簡單的方法。

7、Spout:

       消息源spout是Storm里面一個topology里面的消息生產者。簡而言之,Spout從來源處讀取數據并放入topology。Spout分成可靠和不可靠兩種;當Storm接收失敗時,可靠的Spout會對tuple(元組,數據項組成的列表)進行重發;而不可靠的Spout不會考慮接收成功與否只發射一次。

       消息源可以發射多條消息流stream。使用OutputFieldsDeclarer.declareStream來定義多個stream,然后使用SpoutOutputCollector來發射指定的stream。

      而Spout中最主要的方法就是nextTuple(),該方法會發射一個新的tuple到topology,如果沒有新tuple發射則會簡單的返回。

       要注意的是nextTuple方法不能阻塞,因為storm在同一個線程上面調用所有消息源spout的方法。另外兩個比較重要的spout方法是ack和fail。storm在檢測到一個tuple被整個topology成功處理的時候調用ack,否則調用fail。storm只對可靠的spout調用ack和fail。

8、Bolt:

     Topology中所有的處理都由Bolt完成。即所有的消息處理邏輯被封裝在bolts里面。Bolt可以完成任何事,比如:連接的過濾、聚合、訪問文件/數據庫、等等。

        Bolt從Spout中接收數據并進行處理,如果遇到復雜流的處理也可能將tuple發送給另一個Bolt進行處理。即需要經過很多blots。比如算出一堆圖片里面被轉發最多的圖片就至少需要兩步:第一步算出每個圖片的轉發數量。第二步找出轉發最多的前10個圖片。(如果要把這個過程做得更具有擴展性那么可能需要更多的步驟)。

        Bolts可以發射多條消息流, 使用OutputFieldsDeclarer.declareStream定義stream,使用OutputCollector.emit來選擇要發射的stream。

      而Bolt中最重要的方法是execute(),以新的tuple作為參數接收。不管是Spout還是Bolt,如果將tuple發射成多個流,這些流都可以通過declareStream()來聲明。

     bolts使用OutputCollector來發射tuple,bolts必須要為它處理的每一個tuple調用OutputCollector的ack方法,以通知Storm這個tuple被處理完成了,從而通知這個tuple的發射者spouts。 一般的流程是: bolts處理一個輸入tuple,  發射0個或者多個tuple, 然后調用ack通知storm自己已經處理過這個tuple了。storm提供了一個IBasicBolt會自動調用ack。

9、Tuple:

       一次消息傳遞的基本單元。本來應該是一個key-value的map,但是由于各個組件間傳遞的tuple的字段名稱已經事先定義好,所以tuple中只要按序填入各個value就行了,所以就是一個value list.

10、Stream:

        源源不斷傳遞的tuple就組成了stream。消息流stream是storm里的關鍵抽象。一個消息流是一個沒有邊界的tuple序列, 而這些tuple序列會以一種分布式的方式并行地創建和處理。通過對stream中tuple序列中每個字段命名來定義stream。在默認的情況下,tuple的字段類型可以是:integer,long,short, byte,string,double,float,boolean和byte array。你也可以自定義類型(只要實現相應的序列化器)。

     每個消息流在定義的時候會被分配給一個id,因為單向消息流使用的相當普遍, OutputFieldsDeclarer定義了一些方法讓你可以定義一個stream而不用指定這個id。在這種情況下這個stream會分配個值為‘default’默認的id 。

      Storm提供的最基本的處理stream的原語是spout和bolt。你可以實現spout和bolt提供的接口來處理你的業務邏輯。

      3.png

11、Stream Groupings:

Stream Grouping定義了一個流在Bolt任務間該如何被切分。這里有Storm提供的6個Stream Grouping類型:

1). 隨機分組(Shuffle grouping):隨機分發tuple到Bolt的任務,保證每個任務獲得相等數量的tuple。

2). 字段分組(Fields grouping):根據指定字段分割數據流,并分組。例如,根據“user-id”字段,相同“user-id”的元組總是分發到同一個任務,不同“user-id”的元組可能分發到不同的任務。

3). 全部分組(All grouping):tuple被復制到bolt的所有任務。這種類型需要謹慎使用。

4). 全局分組(Global grouping):全部流都分配到bolt的同一個任務。明確地說,是分配給ID最小的那個task。

5). 無分組(None grouping):你不需要關心流是如何分組。目前,無分組等效于隨機分組。但最終,Storm將把無分組的Bolts放到Bolts或Spouts訂閱它們的同一線程去執行(如果可能)。

6). 直接分組(Direct grouping):這是一個特別的分組類型。元組生產者決定tuple由哪個元組處理者任務接收。

當然還可以實現CustomStreamGroupimg接口來定制自己需要的分組。

storm 和hadoop的對比來了解storm中的基本概念。

Hadoop Storm
系統角色 JobTracker Nimbus
TaskTracker Supervisor
Child Worker
應用名稱 Job Topology
組件接口 Mapper/Reducer Spout/Bolt

3.  Storm應用場景

       Storm 與其他大數據解決方案的不同之處在于它的處理方式。Hadoop 在本質上是一個批處理系統。數據被引入 Hadoop 文件系統 (HDFS) 并分發到各個節點進行處理。當處理完成時,結果數據返回到 HDFS 供始發者使用。Storm 支持創建拓撲結構來轉換沒有終點的數據流。不同于 Hadoop 作業,這些轉換從不停止,它們會持續處理到達的數據。

Twitter列舉了Storm的三大類應用:

1. 信息流處理{Stream processing}
Storm可用來實時處理新數據和更新數據庫,兼具容錯性和可擴展性。即Storm可以用來處理源源不斷流進來的消息,處理之后將結果寫入到某個存儲中去。

2. 連續計算{Continuous computation}
Storm可進行連續查詢并把結果即時反饋給客戶端。比如把Twitter上的熱門話題發送到瀏覽器中。

3. 分布式遠程程序調用{Distributed RPC}
       Storm可用來并行處理密集查詢。Storm的拓撲結構是一個等待調用信息的分布函數,當它收到一條調用信息后,會對查詢進行計算,并返回查詢結果。舉個例子Distributed RPC可以做并行搜索或者處理大集合的數據。

        通過配置drpc服務器,將storm的topology發布為drpc服務??蛻舳顺绦蚩梢哉{用drpc服務將數據發送到storm集群中,并接收處理結果的反饋。這種方式需要drpc服務器進行轉發,其中drpc服務器底層通過thrift實現。適合的業務場景主要是實時計算。并且擴展性良好,可以增加每個節點的工作worker數量來動態擴展。

4.  項目實施,構建Topology

      當下情況我們需要給Spout和Bolt設計一種能夠處理大量數據(日志文件)的topology,當一個特定數據值超過預設的臨界值時促發警報。使用Storm的topology,逐行讀入日志文件并且監視輸入數據。在Storm組件方面,Spout負責讀入輸入數據。它不僅從現有的文件中讀入數據,同時還監視著新文件。文件一旦被修改Spout會讀入新的版本并且覆蓋之前的tuple(可以被Bolt讀入的格式),將tuple發射給Bolt進行臨界分析,這樣就可以發現所有可能超臨界的記錄。

下一節將對用例進行詳細介紹。

臨界分析

這一節,將主要聚焦于臨界值的兩種分析類型:瞬間臨界(instant thershold)和時間序列臨界(time series threshold)。

  • 瞬間臨界值監測:一個字段的值在那個瞬間超過了預設的臨界值,如果條件符合的話則觸發一個trigger。舉個例子當車輛超越80公里每小時,則觸發trigger。

  • 時間序列臨界監測:字段的值在一個給定的時間段內超過了預設的臨界值,如果條件符合則觸發一個觸發器。比如:在5分鐘類,時速超過80KM兩次及以上的車輛。

Listing One顯示了我們將使用的一個類型日志,其中包含的車輛數據信息有:車牌號、車輛行駛的速度以及數據獲取的位置。

AB 123 60 North city
BC 123 70 South city
CD 234 40 South city
DE 123 40 East  city
EF 123 90 South city
GH 123 50 West  city

這里將創建一個對應的XML文件,這將包含引入數據的模式。這個XML將用于日志文件的解析。XML的設計模式和對應的說明請見下表。

1.jpg

XML文件和日志文件都存放在Spout可以隨時監測的目錄下,用以關注文件的實時更新。而這個用例中的topology請見下圖。

1.gif

Figure 1:Storm中建立的topology,用以實現數據實時處理

如圖所示:FilelistenerSpout接收輸入日志并進行逐行的讀入,接著將數據發射給ThresoldCalculatorBolt進行更深一步的臨界值處理。一旦處理完成,被計算行的數據將發送給DBWriterBolt,然后由DBWriterBolt存入給數據庫。下面將對這個過程的實現進行詳細的解析。

Spout的實現

Spout以日志文件和XML描述文件作為接收對象。XML文件包含了與日志一致的設計模式。不妨設想一下一個示例日志文件,包含了車輛的車牌號、行駛速度、以及數據的捕獲位置。(看下圖)

1.gif

Figure2:數據從日志文件到Spout的流程圖

Listing Two顯示了tuple對應的XML,其中指定了字段、將日志文件切割成字段的定界符以及字段的類型。XML文件以及數據都被保存到Spout指定的路徑。

Listing Two:用以描述日志文件的XML文件。

<TUPLEINFO>   
<FIELDLIST>   
<FIELD>   
<COLUMNNAME>vehicle_number</COLUMNNAME>   
<COLUMNTYPE>string</COLUMNTYPE>   
</FIELD>   
   
<FIELD>  
<COLUMNNAME>speed</COLUMNNAME>   
<COLUMNTYPE>int</COLUMNTYPE>   
</FIELD>   
   
<FIELD>   
<COLUMNNAME>location</COLUMNNAME>   
<COLUMNTYPE>string</COLUMNTYPE>   
</FIELD>   
</FIELDLIST>   
<DELIMITER>,</DELIMITER>   
</TUPLEINFO>

通過構造函數及它的參數Directory、PathSpout和TupleInfo對象創建Spout對象。TupleInfo儲存了日志文件的字段、定界符、字段的類型這些很必要的信息。這個對象通過XSTream序列化XML時建立。

Spout的實現步驟:

  • 對文件的改變進行分開的監聽,并監視目錄下有無新日志文件添加。

  • 在數據得到了字段的說明后,將其轉換成tuple。

  • 聲明Spout和Bolt之間的分組,并決定tuple發送給Bolt的途徑。

Spout的具體編碼在Listing Three中顯示。

Listing Three:Spout中open、nextTuple和delcareOutputFields方法的邏輯。    

public void open( Map conf, TopologyContext context,SpoutOutputCollector collector )     
{     
           _collector = collector;     
         try     
         {     
         fileReader  =  new BufferedReader(new FileReader(new File(file)));    
         }    
         catch (FileNotFoundException e)    
         {    
         System.exit(1);     
         }    
}                                                            
   
public void nextTuple()    
{    
         protected void ListenFile(File file)    
         {    
         Utils.sleep(2000);    
         RandomAccessFile access = null;    
         String line = null;     
            try     
            {    
                while ((line = access.readLine()) != null)    
                {    
                    if (line !=null)    
                    {     
                         String[] fields=null;    
                          if (tupleInfo.getDelimiter().equals("|"))  fields = line.split("\\"+tupleInfo.getDelimiter());     
                          else     
                          fields = line.split  (tupleInfo.getDelimiter());     
                          if (tupleInfo.getFieldList().size() == fields.length)  _collector.emit(new Values(fields));    
                    }    
               }    
            }    
            catch (IOException ex){ }    
            }    
}    
   
public void declareOutputFields(OutputFieldsDeclarer declarer)    
{    
      String[] fieldsArr = new String [tupleInfo.getFieldList().size()];    
      for(int i=0; i<tupleInfo.getFieldList().size(); i++)    
      {    
              fieldsArr[i] = tupleInfo.getFieldList().get(i).getColumnName();    
      }    
declarer.declare(new Fields(fieldsArr));    
}

declareOutputFileds()決定了tuple發射的格式,這樣的話Bolt就可以用類似的方法將tuple譯碼。Spout持續對日志文件的數據的變更進行監聽,一旦有添加Spout就會進行讀入并且發送給Bolt進行處理。

Bolt的實現

Spout的輸出結果將給予Bolt進行更深一步的處理。經過對用例的思考,我們的topology中需要如Figure 3中的兩個Bolt。

Figure 3:Spout到Bolt的數據流程。

ThresholdCalculatorBolt

Spout將tuple發出,由ThresholdCalculatorBolt接收并進行臨界值處理。在這里,它將接收好幾項輸入進行檢查;分別是:

臨界值檢查

  • 臨界值欄數檢查(拆分成字段的數目)

  • 臨界值數據類型(拆分后字段的類型)

  • 臨界值出現的頻數

  • 臨界值時間段檢查

Listing Four中的類,定義用來保存這些值。

Listing Four:ThresholdInfo類

public class ThresholdInfo implementsSerializable    
   
{      
        private String action;     
        private String rule;     
        private Object thresholdValue;    
        private int thresholdColNumber;     
        private Integer timeWindow;     
        private int frequencyOfOccurence;     
}

基于字段中提供的值,臨界值檢查將被Listing Five中的execute()方法執行。代碼大部分的功能是解析和接收值的檢測。

Listing Five:臨界值檢測代碼段

public void execute(Tuple tuple, BasicOutputCollector collector)     {        if(tuple!=null)         {            List<Object> inputTupleList = (List<Object>) tuple.getValues();            int thresholdColNum = thresholdInfo.getThresholdColNumber();             Object thresholdValue = thresholdInfo.getThresholdValue();             String thresholdDataType = tupleInfo.getFieldList().get(thresholdColNum-1).getColumnType();             Integer timeWindow = thresholdInfo.getTimeWindow();             int frequency = thresholdInfo.getFrequencyOfOccurence();             if(thresholdDataType.equalsIgnoreCase("string"))             {                 String valueToCheck = inputTupleList.get(thresholdColNum-1).toString();                 String frequencyChkOp = thresholdInfo.getAction();                 if(timeWindow!=null)                 {                     long curTime = System.currentTimeMillis();                     long diffInMinutes = (curTime-startTime)/(1000);                     if(diffInMinutes>=timeWindow)                     {                         if(frequencyChkOp.equals("=="))                         {                              if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))                              {                                  count.incrementAndGet();                                  if(count.get() > frequency)                                      splitAndEmit(inputTupleList,collector);                              }                         }                         else if(frequencyChkOp.equals("!="))                         {                             if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))                             {                                  count.incrementAndGet();                                  if(count.get() > frequency)                                      splitAndEmit(inputTupleList,collector);                              }                          }                          else                         System.out.println("Operator not supported");                       }                  }                  else                 {                      if(frequencyChkOp.equals("=="))                      {                          if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))                          {                              count.incrementAndGet();                              if(count.get() > frequency)                                  splitAndEmit(inputTupleList,collector);                              }                      }                      else if(frequencyChkOp.equals("!="))                      {                           if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))                           {                               count.incrementAndGet();                               if(count.get() > frequency)                                   splitAndEmit(inputTupleList,collector);                              }                       }                   }                }                else if(thresholdDataType.equalsIgnoreCase("int") ||                     thresholdDataType.equalsIgnoreCase("double") ||                     thresholdDataType.equalsIgnoreCase("float") ||                     thresholdDataType.equalsIgnoreCase("long") ||                     thresholdDataType.equalsIgnoreCase("short"))                {                    String frequencyChkOp = thresholdInfo.getAction();                    if(timeWindow!=null)                    {                         long valueToCheck =                          Long.parseLong(inputTupleList.get(thresholdColNum-1).toString());                         long curTime = System.currentTimeMillis();                         long diffInMinutes = (curTime-startTime)/(1000);                         System.out.println("Difference in minutes="+diffInMinutes);                         if(diffInMinutes>=timeWindow)                         {                              if(frequencyChkOp.equals("<"))                              {                                  if(valueToCheck < Double.parseDouble(thresholdValue.toString()))                                  {                                       count.incrementAndGet();                                       if(count.get() > frequency)                                           splitAndEmit(inputTupleList,collector);                                  }                              }                              else if(frequencyChkOp.equals(">"))                              {                                   if(valueToCheck > Double.parseDouble(thresholdValue.toString()))                                    {                                       count.incrementAndGet();                                       if(count.get() > frequency)                                           splitAndEmit(inputTupleList,collector);                                   }                               }                               else if(frequencyChkOp.equals("=="))                               {                                  if(valueToCheck == Double.parseDouble(thresholdValue.toString()))                                  {                                      count.incrementAndGet();                                      if(count.get() > frequency)                                          splitAndEmit(inputTupleList,collector);                                   }                               }                               else if(frequencyChkOp.equals("!="))                               {        . . .                                }                           }                 }          else             splitAndEmit(null,collector);          }          else        {               System.err.println("Emitting null in bolt");               splitAndEmit(null,collector);        }    }

經由Bolt發送的的tuple將會傳遞到下一個對應的Bolt,在我們的用例中是DBWriterBolt。

DBWriterBolt

經過處理的tuple必須被持久化以便于觸發tigger或者更深層次的使用。DBWiterBolt做了這個持久化的工作并把tuple存入了數據庫。表的建立由prepare()函數完成,這也將是topology調用的第一個方法。方法的編碼如Listing Six所示。

Listing Six:建表編碼。

public void prepare( Map StormConf, TopologyContext context )     
{           
    try     
    {    
        Class.forName(dbClass);    
    }     
    catch (ClassNotFoundException e)     
    {    
        System.out.println("Driver not found");    
        e.printStackTrace();    
    }    
     
    try     
    {    
       connection driverManager.getConnection(     
           "jdbc:mysql://"+databaseIP+":"+databasePort+"/"+databaseName, userName, pwd);    
       connection.prepareStatement("DROP TABLE IF EXISTS "+tableName).execute();    
     
       StringBuilder createQuery = new StringBuilder(    
           "CREATE TABLE IF NOT EXISTS "+tableName+"(");    
       for(Field fields : tupleInfo.getFieldList())    
       {    
           if(fields.getColumnType().equalsIgnoreCase("String"))    
               createQuery.append(fields.getColumnName()+" VARCHAR(500),");    
           else   
               createQuery.append(fields.getColumnName()+" "+fields.getColumnType()+",");    
       }    
       createQuery.append("thresholdTimeStamp timestamp)");    
       connection.prepareStatement(createQuery.toString()).execute();    
     
       // Insert Query    
       StringBuilder insertQuery = new StringBuilder("INSERT INTO "+tableName+"(");    
       String tempCreateQuery = new String();    
       for(Field fields : tupleInfo.getFieldList())    
       {    
            insertQuery.append(fields.getColumnName()+",");    
       }    
       insertQuery.append("thresholdTimeStamp").append(") values (");    
       for(Field fields : tupleInfo.getFieldList())    
       {    
           insertQuery.append("?,");    
       }    
     
       insertQuery.append("?)");    
       prepStatement = connection.prepareStatement(insertQuery.toString());    
    }    
    catch (SQLException e)     
    {           
        e.printStackTrace();    
    }           
}

數據分批次的插入數據庫。插入的邏輯由Listting Seven中的execute()方法提供。大部分的編碼都是用來實現可能存在不同類型輸入的解析。

Listing Seven:數據插入的代碼部分。

public void execute(Tuple tuple, BasicOutputCollector collector)     {        batchExecuted=false;        if(tuple!=null)        {           List<Object> inputTupleList = (List<Object>) tuple.getValues();           int dbIndex=0;           for(int i=0;i<tupleInfo.getFieldList().size();i++)           {               Field field = tupleInfo.getFieldList().get(i);               try {                   dbIndex = i+1;                   if(field.getColumnType().equalsIgnoreCase("String"))                                    prepStatement.setString(dbIndex, inputTupleList.get(i).toString());                   else if(field.getColumnType().equalsIgnoreCase("int"))                       prepStatement.setInt(dbIndex,                           Integer.parseInt(inputTupleList.get(i).toString()));                   else if(field.getColumnType().equalsIgnoreCase("long"))                       prepStatement.setLong(dbIndex,                            Long.parseLong(inputTupleList.get(i).toString()));                   else if(field.getColumnType().equalsIgnoreCase("float"))                       prepStatement.setFloat(dbIndex,                            Float.parseFloat(inputTupleList.get(i).toString()));                   else if(field.getColumnType().equalsIgnoreCase("double"))                       prepStatement.setDouble(dbIndex,                            Double.parseDouble(inputTupleList.get(i).toString()));                   else if(field.getColumnType().equalsIgnoreCase("short"))                       prepStatement.setShort(dbIndex,                            Short.parseShort(inputTupleList.get(i).toString()));                   else if(field.getColumnType().equalsIgnoreCase("boolean"))                       prepStatement.setBoolean(dbIndex,                            Boolean.parseBoolean(inputTupleList.get(i).toString()));                   else if(field.getColumnType().equalsIgnoreCase("byte"))                       prepStatement.setByte(dbIndex,                            Byte.parseByte(inputTupleList.get(i).toString()));                   else if(field.getColumnType().equalsIgnoreCase("Date"))                   {                      Date dateToAdd=null;                      if (!(inputTupleList.get(i) instanceof Date))                        {                             DateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");                           try                            {                               dateToAdd = df.parse(inputTupleList.get(i).toString());                           }                           catch (ParseException e)                            {                               System.err.println("Data type not valid");                           }                       }                         else                      {                dateToAdd = (Date)inputTupleList.get(i);                java.sql.Date sqlDate = new java.sql.Date(dateToAdd.getTime());                prepStatement.setDate(dbIndex, sqlDate);                }                   }             catch (SQLException e)             {                 e.printStackTrace();            }        }        Date now = new Date();                  try       {            prepStatement.setTimestamp(dbIndex+1, new java.sql.Timestamp(now.getTime()));            prepStatement.addBatch();            counter.incrementAndGet();            if (counter.get()== batchSize)             executeBatch();        }         catch (SQLException e1)         {            e1.printStackTrace();        }                  }       else      {            long curTime = System.currentTimeMillis();           long diffInSeconds = (curTime-startTime)/(60*1000);           if(counter.get()<batchSize && diffInSeconds>batchTimeWindowInSeconds)           {                try {                    executeBatch();                    startTime = System.currentTimeMillis();                }                catch (SQLException e) {                     e.printStackTrace();                }           }       }    }         public void executeBatch() throws SQLException    {        batchExecuted=true;        prepStatement.executeBatch();        counter = new AtomicInteger(0);    }

一旦Spout和Bolt準備就緒(等待被執行),topology生成器將會建立topology并準備執行。下面就來看一下執行步驟。

在本地集群上運行和測試topology

  • 通過TopologyBuilder建立topology。

  • 使用Storm Submitter,將topology遞交給集群。以topology的名字、配置和topology的對象作為參數。

  • 提交topology。

Listing Eight:建立和執行topology。

public class StormMain    
{    
     public static void main(String[] args) throws AlreadyAliveException,     
                                                   InvalidTopologyException,     
                                                   InterruptedException     
     {    
          ParallelFileSpout parallelFileSpout = new ParallelFileSpout();    
          ThresholdBolt thresholdBolt = new ThresholdBolt();    
          DBWriterBolt dbWriterBolt = new DBWriterBolt();    
          TopologyBuilder builder = new TopologyBuilder();    
          builder.setSpout("spout", parallelFileSpout, 1);    
          builder.setBolt("thresholdBolt", thresholdBolt,1).shuffleGrouping("spout");    
          builder.setBolt("dbWriterBolt",dbWriterBolt,1).shuffleGrouping("thresholdBolt");    
          if(this.argsMain!=null && this.argsMain.length > 0)     
          {    
              conf.setNumWorkers(1);    
              StormSubmitter.submitTopology(     
                   this.argsMain[0], conf, builder.createTopology());    
          }    
          else   
          {        
              Config conf = new Config();    
              conf.setDebug(true);    
              conf.setMaxTaskParallelism(3);    
              LocalCluster cluster = new LocalCluster();    
              cluster.submitTopology(    
              "Threshold_Test", conf, builder.createTopology());    
          }    
     }    
}

topology被建立后將被提交到本地集群。一旦topology被提交,除非被取締或者集群關閉,它將一直保持運行不需要做任何的修改。這也是Storm的另一大特色之一。

這個簡單的例子體現了當你掌握了topology、spout和bolt的概念,將可以輕松的使用Storm進行實時處理。如果你既想處理大數據又不想遍歷Hadoop的話,不難發現使用Storm將是個很好的選擇。

5.  storm常見問題解答

一、我有一個數據文件,或者我有一個系統里面有數據,怎么導入storm做計算?
你需要實現一個Spout,Spout負責將數據emit到storm系統里,交給bolts計算。怎么實現spout可以參考官方的kestrel spout實現:
https://github.com/nathanmarz/storm-kestrel
如果你的數據源不支持事務性消費,那么就無法得到storm提供的可靠處理的保證,也沒必要實現ISpout接口中的ack和fail方法。
二、Storm為了保證tuple的可靠處理,需要保存tuple信息,這會不會導致內存OOM?
Storm為了保證tuple的可靠處理,acker會保存該節點創建的tuple id的xor值,這稱為ack value,那么每ack一次,就將tuple id和ack value做異或(xor)。當所有產生的tuple都被ack的時候, ack value一定為0。這是個很簡單的策略,對于每一個tuple也只要占用約20個字節的內存。對于100萬tuple,也才20M左右。關于可靠處理看這個:
https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing
三、Storm計算后的結果保存在哪里?可以保存在外部存儲嗎?
Storm不處理計算結果的保存,這是應用代碼需要負責的事情,如果數據不大,你可以簡單地保存在內存里,也可以每次都更新數據庫,也可以采用NoSQL存儲。storm并沒有像s4那樣提供一個Persist API,根據時間或者容量來做存儲輸出。這部分事情完全交給用戶。
數據存儲之后的展現,也是你需要自己處理的,storm UI只提供對topology的監控和統計。
四、Storm怎么處理重復的tuple?
因為Storm要保證tuple的可靠處理,當tuple處理失敗或者超時的時候,spout會fail并重新發送該tuple,那么就會有tuple重復計算的問題。這個問題是很難解決的,storm也沒有提供機制幫助你解決。一些可行的策略:
(1)不處理,這也算是種策略。因為實時計算通常并不要求很高的精確度,后續的批處理計算會更正實時計算的誤差。
(2)使用第三方集中存儲來過濾,比如利用mysql,memcached或者redis根據邏輯主鍵來去重。
(3)使用bloom filter做過濾,簡單高效。
五、Storm的動態增刪節點
我在storm和s4里比較里談到的動態增刪節點,是指storm可以動態地添加和減少supervisor節點。對于減少節點來說,被移除的supervisor上的worker會被nimbus重新負載均衡到其他supervisor節點上。在storm 0.6.1以前的版本,增加supervisor節點不會影響現有的topology,也就是現有的topology不會重新負載均衡到新的節點上,在擴展集群的時候很不方便,需要重新提交topology。因此我在storm的郵件列表里提了這個問題,storm的開發者nathanmarz創建了一個issue 54并在0.6.1提供了rebalance命令來讓正在運行的topology重新負載均衡,具體見:
https://github.com/nathanmarz/storm/issues/54
和0.6.1的變更:
http://groups.google.com/group/storm-user/browse_thread/thread/24a8fce0b2e53246
storm并不提供機制來動態調整worker和task數目。
六、Storm UI里spout統計的complete latency的具體含義是什么?為什么emit的數目會是acked的兩倍?
這個事實上是storm郵件列表里的一個問題。Storm作者marz的解答:

The complete latency is the time 
from the spout emitting a tuple to that
tuple being acked on the spout
. So it tracks the time 
for
 the whole tuple
tree to be processed.
If you dive into the spout component in the UI, you'll see that a lot of
the emitted/transferred is on the __ack* stream. 
This is the spout
communicating with the ackers which take care of tracking the tuple trees.

簡單地說,complete latency表示了tuple從emit到被acked經過的時間,可以認為是tuple以及該tuple的后續子孫(形成一棵樹)整個處理時間。其次spout的emit和transfered還統計了spout和acker之間內部的通信信息,比如對于可靠處理的spout來說,會在emit的時候同時發送一個_ack_init給acker,記錄tuple id到task id的映射,以便ack的時候能找到正確的acker task。

6.  其他開源的大數據解決方案

自 Google 在 2004 年推出 MapReduce 范式以來,已誕生了多個使用原始 MapReduce 范式(或擁有該范式的質量)的解決方案。Google 對 MapReduce 的最初應用是建立萬維網的索引。盡管此應用程序仍然很流行,但這個簡單模型解決的問題也正在增多。

表 1 提供了一個可用開源大數據解決方案的列表,包括傳統的批處理和流式處理應用程序。在將 Storm 引入開源之前將近一年的時間里,Yahoo! 的 S4 分布式流計算平臺已向 Apache 開源。S4 于 2010 年 10 月發布,它提供了一個高性能計算 (HPC) 平臺,向應用程序開發人員隱藏了并行處理的復雜性。S4 實現了一個可擴展的、分散化的集群架構,并納入了部分容錯功能。

表 1. 開源大數據解決方案

解決方案 開發商 類型 描述
Storm Twitter 流式處理 Twitter 的新流式大數據分析解決方案
S4 Yahoo! 流式處理 來自 Yahoo! 的分布式流計算平臺
Hadoop Apache 批處理 MapReduce 范式的第一個開源實現
Spark UC Berkeley AMPLab 批處理 支持內存中數據集和恢復能力的最新分析平臺
Disco Nokia 批處理 Nokia 的分布式 MapReduce 框架
HPCC LexisNexis 批處理 HPC 大數據集群

csdn(編譯/仲浩 王旭東/審校):http://www.csdn.net/article/2012-12-24/2813117-storm-realtime-big-data-analysis

轉自:http://blog.csdn.net/hguisu/article/details/8454368

原創文章,作者:s19930811,如若轉載,請注明出處:http://www.www58058.com/2657

(0)
s19930811s19930811
上一篇 2015-04-04
下一篇 2015-04-04

相關推薦

  • bash編程函數select語句的使用

    £select             select循環主要用于創建菜單,按數字順序排列的菜單項將顯示在標準錯誤上,并顯示PS3提示符,等待用戶的輸入,用戶輸入菜單列表中的某個數字,執行相應的命令,用戶輸入被保存在變量REPLY中。 select是個…

    Linux干貨 2016-08-24
  • 壓縮、解壓縮和歸檔工具

    1  compress和uncompress 用法:     compress [-dfvcVr] [-b maxbits] [file…] 選項:     -d   解壓縮     -c   結果…

    Linux干貨 2016-08-18
  • 第五周作業

    1、顯示/boot/grub/grub.conf中以至少一個空白字符開頭的行; [root@unclez ~]#  grep -E "^[[:space:]]+.*" /boot/grub/grub.cong 2、顯示/etc/rc.d/rc.sysinit文件中以#開頭,后面跟…

    Linux干貨 2016-12-31
  • CentOS7之Systemd管理

    Systemd 本章節內容: CentOS7啟動 Unit介紹 服務管理和查看 啟動排錯 破解口令 修復grub2 1、 CentOS啟動流程:POST –> Boot Sequence –> Bootloader –> kernel + initramfs(initrd) –> roo…

    Linux干貨 2016-09-23
  • CentOS7的啟動及Systemd的管理?

    CentOS7的啟動流程     POST–>Boot Sequence–>Bootloader–>kernel+initramfs(initrd)–>rootfs–>/sbin/init    &n…

    Linux干貨 2016-09-21
  • 【社招】【小米-北京】大數據運維工程師

    【社招】【小米-北京】大數據運維工程師 【工作地點】北京市海淀區安寧莊東路72號科利源大廈 【薪酬福利】15k-30k  期權獎勵、六險一金、水果花茶、班車、健身房、食堂 【投遞方式】郵件主題“崗位+姓名”發送至lipengcheng3@xiaomi.com   工作職責: 1、負責大數據平臺相關系統的運維保障,包括:Hadoo…

    Linux干貨 2017-07-28
欧美性久久久久