摘要:隨著數據體積的越來越大,實時處理成為了許多機構需要面對的首要挑戰。Shruthi Kumar和Siddharth Patankar在Dr.Dobb’s上結合了汽車超速監視,為我們演示了使用Storm進行實時大數據分析。CSDN在此編譯、整理。
簡單和明了,Storm讓大數據分析變得輕松加愉快。
當今世界,公司的日常運營經常會生成TB級別的數據。數據來源囊括了互聯網裝置可以捕獲的任何類型數據,網站、社交媒體、交易型商業數據以及其它商業環境中創建的數據。考慮到數據的生成量,實時處理成為了許多機構需要面對的首要挑戰。我們經常用的一個非常有效的開源實時計算工具就是Storm —— Twitter開發,通常被比作“實時的Hadoop”。然而Storm遠比Hadoop來的簡單,因為用它處理大數據不會帶來新老技術的交替。
Shruthi Kumar、Siddharth Patankar共同效力于Infosys,分別從事技術分析和研發工作。本文詳述了Storm的使用方法,例子中的項目名稱為“超速報警系統(Speeding Alert System)”。我們想實現的功能是:實時分析過往車輛的數據,一旦車輛數據超過預設的臨界值 —— 便觸發一個trigger并把相關的數據存入數據庫。
1. Storm是什么
全量數據處理使用的大多是鼎鼎大名的hadoop或者hive,作為一個批處理系統,hadoop以其吞吐量大、自動容錯等優點,在海量數據處理上得到了廣泛的使用。
Hadoop下的Map/Reduce框架對于數據的處理流程是:
1、 將要處理的數據上傳到Hadoop的文件系統HDFS中。
2、 Map階段
a) Master對Map的預處理:對于大量的數據進行切分,劃分為M個16~64M的數據分片(可通過參數自定義分片大小)
b) 調用Mapper函數:Master為Worker分配Map任務,每個分片都對應一個Worker進行處理。各個Worker讀取并調用用戶定義的Mapper函數 處理數據,并將結果存入HDFS,返回存儲位置給Master。
一個Worker在Map階段完成時,在HDFS中,生成一個排好序的Key-values組成的文件。并將位置信息匯報給Master。
3、 Reduce階段
a) Master對Reduce的預處理:Master為Worker分配Reduce任務,他會將所有Mapper產生的數據進行映射,將相同key的任務分配給某個Worker。
b) 調用Reduce函數:各個Worker將分配到的數據集進行排序(使用工具類Merg),并調用用戶自定義的Reduce函數,并將結果寫入HDFS。
每個Worker的Reduce任務完成后,都會在HDFS中生成一個輸出文件。Hadoop并不將這些文件合并,因為這些文件往往會作為另一個Map/reduce程序的輸入。
以上的流程,粗略概括,就是從HDFS中獲取數據,將其按照大小分片,進行分布式處理,最終輸出結果。從流程來看,Hadoop框架進行數據處理有以下要求:
1、 數據已經存在在HDFS當中。
2、 數據間是少關聯的。各個任務執行器在執行負責的數據時,無需考慮對其他數據的影響,數據之間應盡可能是無聯系、不會影響的。
使用Hadoop,適合大批量的數據處理,這是他所擅長的。由于基于Map/Reduce這種單級的數據處理模型進行,因此,如果數據間的關聯系較大,需要進行數據的多級交互處理(某個階段的處理數據依賴于上一個階段),需要進行多次map/reduce。又由于map/reduce每次執行都需要遍歷整個數據集,對于數據的實時計算并不合適,于是有了storm。
對比Hadoop的批處理,Storm是個實時的、分布式以及具備高容錯的計算系統。同Hadoop一樣Storm也可以處理大批量的數據,然而Storm在保證高可靠性的前提下還可以讓處理進行的更加實時;也就是說,所有的信息都會被處理。Storm同樣還具備容錯和分布計算這些特性,這就讓Storm可以擴展到不同的機器上進行大批量的數據處理。他同樣還有以下的這些特性:
-
易于擴展:對于擴展,伴隨著業務的發展,我們的數據量、計算量可能會越來越大,所以希望這個系統是可擴展的。你只需要添加機器和改變對應的topology(拓撲)設置。Storm使用Hadoop Zookeeper進行集群協調,這樣可以充分的保證大型集群的良好運行。
-
每條信息的處理都可以得到保證。
-
Storm集群管理簡易。
-
Storm的容錯機能:一旦topology遞交,Storm會一直運行它直到topology被廢除或者被關閉。而在執行中出現錯誤時,也會由Storm重新分配任務。這是分布式系統中通用問題。一個節點掛了不能影響我的應用。
-
低延遲。都說了是實時計算系統了,延遲是一定要低的。
-
盡管通常使用Java,Storm中的topology可以用任何語言設計。
在線實時流處理模型
對于處理大批量數據的Map/reduce程序,在任務完成之后就停止了,但Storm是用于實時計算的,所以,相應的處理程序會一直執行(等待任務,有任務則執行)直至手動停止。
對于Storm,他是實時處理模型,與hadoop的不同是,他是針對在線業務而存在的計算平臺,如統計某用戶的交易量、生成為某個用戶的推薦列表等實時性高的需求。他是一個“流處理”框架。何謂流處理?storm將數據以Stream的方式,并按照Topology的順序,依次處理并最終生成結果。
當然為了更好的理解文章,你首先需要安裝和設置Storm。需要通過以下幾個簡單的步驟:
-
從Storm官方下載Storm安裝文件
-
將bin/directory解壓到你的PATH上,并保證bin/storm腳本是可執行的。
盡管 Storm 是使用 Clojure 語言開發的,您仍然可以在 Storm 中使用幾乎任何語言編寫應用程序。所需的只是一個連接到 Storm 的架構的適配器。已存在針對 Scala、JRuby、Perl 和 PHP 的適配器,但是還有支持流式傳輸到 Storm 拓撲結構中的結構化查詢語言適配器。
2. Storm的組件
Storm集群和Hadoop集群表面上看很類似。但是Hadoop上運行的是MapReduce jobs,而在Storm上運行的是拓撲(topology),這兩者之間是非常不一樣的。一個關鍵的區別是: 一個MapReduce job最終會結束, 而一個topology永遠會運行(除非你手動kill掉)。
Storm集群主要由一個主節點(Nimbus后臺程序)和一群工作節點(worker node)Supervisor的節點組成,通過 Zookeeper進行協調。Nimbus類似Hadoop里面的JobTracker。Nimbus負責在集群里面分發代碼,分配計算任務給機器, 并且監控狀態。
每一個工作節點上面運行一個叫做Supervisor的節點。Supervisor會監聽分配給它那臺機器的工作,根據需要啟動/關閉工作進程。每一個工作進程執行一個topology的一個子集;一個運行的topology由運行在很多機器上的很多工作進程組成。
1、 Nimbus主節點:
主節點通常運行一個后臺程序 —— Nimbus,用于響應分布在集群中的節點,分配任務和監測故障。這個很類似于Hadoop中的Job Tracker。
2、Supervisor工作節點:
工作節點同樣會運行一個后臺程序 —— Supervisor,用于收聽工作指派并基于要求運行工作進程。每個工作節點都是topology中一個子集的實現。而Nimbus和Supervisor之間的協調則通過Zookeeper系統或者集群。
3、Zookeeper
Zookeeper是完成Supervisor和Nimbus之間協調的服務。而應用程序實現實時的邏輯則被封裝進Storm中的“topology”。topology則是一組由Spouts(數據源)和Bolts(數據操作)通過Stream Groupings進行連接的圖。下面對出現的術語進行更深刻的解析。
4、Worker:
運行具體處理組件邏輯的進程。
5、Task:
worker中每一個spout/bolt的線程稱為一個task. 在storm0.8之后,task不再與物理線程對應,同一個spout/bolt的task可能會共享一個物理線程,該線程稱為executor。
6、Topology(拓撲):
storm中運行的一個實時應用程序,因為各個組件間的消息流動形成邏輯上的一個拓撲結構。一個topology是spouts和bolts組成的圖, 通過stream groupings將圖中的spouts和bolts連接起來,如下圖:
一個topology會一直運行直到你手動kill掉,Storm自動重新分配執行失敗的任務, 并且Storm可以保證你不會有數據丟失(如果開啟了高可靠性的話)。如果一些機器意外停機它上面的所有任務會被轉移到其他機器上。
運行一個topology很簡單。首先,把你所有的代碼以及所依賴的jar打進一個jar包。然后運行類似下面的這個命令:
storm jar all–my–code.jar backtype.storm.MyTopology arg1 arg2
這個命令會運行主類: backtype.strom.MyTopology, 參數是arg1, arg2。這個類的main函數定義這個topology并且把它提交給Nimbus。storm jar負責連接到Nimbus并且上傳jar包。
Topology的定義是一個Thrift結構,并且Nimbus就是一個Thrift服務, 你可以提交由任何語言創建的topology。上面的方面是用JVM-based語言提交的最簡單的方法。
7、Spout:
消息源spout是Storm里面一個topology里面的消息生產者。簡而言之,Spout從來源處讀取數據并放入topology。Spout分成可靠和不可靠兩種;當Storm接收失敗時,可靠的Spout會對tuple(元組,數據項組成的列表)進行重發;而不可靠的Spout不會考慮接收成功與否只發射一次。
消息源可以發射多條消息流stream。使用OutputFieldsDeclarer.declareStream來定義多個stream,然后使用SpoutOutputCollector來發射指定的stream。
而Spout中最主要的方法就是nextTuple(),該方法會發射一個新的tuple到topology,如果沒有新tuple發射則會簡單的返回。
要注意的是nextTuple方法不能阻塞,因為storm在同一個線程上面調用所有消息源spout的方法。另外兩個比較重要的spout方法是ack和fail。storm在檢測到一個tuple被整個topology成功處理的時候調用ack,否則調用fail。storm只對可靠的spout調用ack和fail。
8、Bolt:
Topology中所有的處理都由Bolt完成。即所有的消息處理邏輯被封裝在bolts里面。Bolt可以完成任何事,比如:連接的過濾、聚合、訪問文件/數據庫、等等。
Bolt從Spout中接收數據并進行處理,如果遇到復雜流的處理也可能將tuple發送給另一個Bolt進行處理。即需要經過很多blots。比如算出一堆圖片里面被轉發最多的圖片就至少需要兩步:第一步算出每個圖片的轉發數量。第二步找出轉發最多的前10個圖片。(如果要把這個過程做得更具有擴展性那么可能需要更多的步驟)。
Bolts可以發射多條消息流, 使用OutputFieldsDeclarer.declareStream定義stream,使用OutputCollector.emit來選擇要發射的stream。
而Bolt中最重要的方法是execute(),以新的tuple作為參數接收。不管是Spout還是Bolt,如果將tuple發射成多個流,這些流都可以通過declareStream()來聲明。
bolts使用OutputCollector來發射tuple,bolts必須要為它處理的每一個tuple調用OutputCollector的ack方法,以通知Storm這個tuple被處理完成了,從而通知這個tuple的發射者spouts。 一般的流程是: bolts處理一個輸入tuple, 發射0個或者多個tuple, 然后調用ack通知storm自己已經處理過這個tuple了。storm提供了一個IBasicBolt會自動調用ack。
9、Tuple:
一次消息傳遞的基本單元。本來應該是一個key-value的map,但是由于各個組件間傳遞的tuple的字段名稱已經事先定義好,所以tuple中只要按序填入各個value就行了,所以就是一個value list.
10、Stream:
源源不斷傳遞的tuple就組成了stream。消息流stream是storm里的關鍵抽象。一個消息流是一個沒有邊界的tuple序列, 而這些tuple序列會以一種分布式的方式并行地創建和處理。通過對stream中tuple序列中每個字段命名來定義stream。在默認的情況下,tuple的字段類型可以是:integer,long,short, byte,string,double,float,boolean和byte array。你也可以自定義類型(只要實現相應的序列化器)。
每個消息流在定義的時候會被分配給一個id,因為單向消息流使用的相當普遍, OutputFieldsDeclarer定義了一些方法讓你可以定義一個stream而不用指定這個id。在這種情況下這個stream會分配個值為‘default’默認的id 。
Storm提供的最基本的處理stream的原語是spout和bolt。你可以實現spout和bolt提供的接口來處理你的業務邏輯。
11、Stream Groupings:
Stream Grouping定義了一個流在Bolt任務間該如何被切分。這里有Storm提供的6個Stream Grouping類型:
1). 隨機分組(Shuffle grouping):隨機分發tuple到Bolt的任務,保證每個任務獲得相等數量的tuple。
2). 字段分組(Fields grouping):根據指定字段分割數據流,并分組。例如,根據“user-id”字段,相同“user-id”的元組總是分發到同一個任務,不同“user-id”的元組可能分發到不同的任務。
3). 全部分組(All grouping):tuple被復制到bolt的所有任務。這種類型需要謹慎使用。
4). 全局分組(Global grouping):全部流都分配到bolt的同一個任務。明確地說,是分配給ID最小的那個task。
5). 無分組(None grouping):你不需要關心流是如何分組。目前,無分組等效于隨機分組。但最終,Storm將把無分組的Bolts放到Bolts或Spouts訂閱它們的同一線程去執行(如果可能)。
6). 直接分組(Direct grouping):這是一個特別的分組類型。元組生產者決定tuple由哪個元組處理者任務接收。
當然還可以實現CustomStreamGroupimg接口來定制自己需要的分組。
storm 和hadoop的對比來了解storm中的基本概念。
Hadoop | Storm | |
系統角色 | JobTracker | Nimbus |
TaskTracker | Supervisor | |
Child | Worker | |
應用名稱 | Job | Topology |
組件接口 | Mapper/Reducer | Spout/Bolt |
3. Storm應用場景
Storm 與其他大數據解決方案的不同之處在于它的處理方式。Hadoop 在本質上是一個批處理系統。數據被引入 Hadoop 文件系統 (HDFS) 并分發到各個節點進行處理。當處理完成時,結果數據返回到 HDFS 供始發者使用。Storm 支持創建拓撲結構來轉換沒有終點的數據流。不同于 Hadoop 作業,這些轉換從不停止,它們會持續處理到達的數據。
Twitter列舉了Storm的三大類應用:
1. 信息流處理{Stream processing}
Storm可用來實時處理新數據和更新數據庫,兼具容錯性和可擴展性。即Storm可以用來處理源源不斷流進來的消息,處理之后將結果寫入到某個存儲中去。
2. 連續計算{Continuous computation}
Storm可進行連續查詢并把結果即時反饋給客戶端。比如把Twitter上的熱門話題發送到瀏覽器中。
3. 分布式遠程程序調用{Distributed RPC}
Storm可用來并行處理密集查詢。Storm的拓撲結構是一個等待調用信息的分布函數,當它收到一條調用信息后,會對查詢進行計算,并返回查詢結果。舉個例子Distributed RPC可以做并行搜索或者處理大集合的數據。
通過配置drpc服務器,將storm的topology發布為drpc服務??蛻舳顺绦蚩梢哉{用drpc服務將數據發送到storm集群中,并接收處理結果的反饋。這種方式需要drpc服務器進行轉發,其中drpc服務器底層通過thrift實現。適合的業務場景主要是實時計算。并且擴展性良好,可以增加每個節點的工作worker數量來動態擴展。
4. 項目實施,構建Topology
當下情況我們需要給Spout和Bolt設計一種能夠處理大量數據(日志文件)的topology,當一個特定數據值超過預設的臨界值時促發警報。使用Storm的topology,逐行讀入日志文件并且監視輸入數據。在Storm組件方面,Spout負責讀入輸入數據。它不僅從現有的文件中讀入數據,同時還監視著新文件。文件一旦被修改Spout會讀入新的版本并且覆蓋之前的tuple(可以被Bolt讀入的格式),將tuple發射給Bolt進行臨界分析,這樣就可以發現所有可能超臨界的記錄。
下一節將對用例進行詳細介紹。
臨界分析
這一節,將主要聚焦于臨界值的兩種分析類型:瞬間臨界(instant thershold)和時間序列臨界(time series threshold)。
-
瞬間臨界值監測:一個字段的值在那個瞬間超過了預設的臨界值,如果條件符合的話則觸發一個trigger。舉個例子當車輛超越80公里每小時,則觸發trigger。
-
時間序列臨界監測:字段的值在一個給定的時間段內超過了預設的臨界值,如果條件符合則觸發一個觸發器。比如:在5分鐘類,時速超過80KM兩次及以上的車輛。
Listing One顯示了我們將使用的一個類型日志,其中包含的車輛數據信息有:車牌號、車輛行駛的速度以及數據獲取的位置。
AB 123 | 60 | North city |
BC 123 | 70 | South city |
CD 234 | 40 | South city |
DE 123 | 40 | East city |
EF 123 | 90 | South city |
GH 123 | 50 | West city |
這里將創建一個對應的XML文件,這將包含引入數據的模式。這個XML將用于日志文件的解析。XML的設計模式和對應的說明請見下表。
XML文件和日志文件都存放在Spout可以隨時監測的目錄下,用以關注文件的實時更新。而這個用例中的topology請見下圖。
Figure 1:Storm中建立的topology,用以實現數據實時處理
如圖所示:FilelistenerSpout接收輸入日志并進行逐行的讀入,接著將數據發射給ThresoldCalculatorBolt進行更深一步的臨界值處理。一旦處理完成,被計算行的數據將發送給DBWriterBolt,然后由DBWriterBolt存入給數據庫。下面將對這個過程的實現進行詳細的解析。
Spout的實現
Spout以日志文件和XML描述文件作為接收對象。XML文件包含了與日志一致的設計模式。不妨設想一下一個示例日志文件,包含了車輛的車牌號、行駛速度、以及數據的捕獲位置。(看下圖)
Figure2:數據從日志文件到Spout的流程圖
Listing Two顯示了tuple對應的XML,其中指定了字段、將日志文件切割成字段的定界符以及字段的類型。XML文件以及數據都被保存到Spout指定的路徑。
Listing Two:用以描述日志文件的XML文件。
<TUPLEINFO> <FIELDLIST> <FIELD> <COLUMNNAME>vehicle_number</COLUMNNAME> <COLUMNTYPE>string</COLUMNTYPE> </FIELD> <FIELD> <COLUMNNAME>speed</COLUMNNAME> <COLUMNTYPE>int</COLUMNTYPE> </FIELD> <FIELD> <COLUMNNAME>location</COLUMNNAME> <COLUMNTYPE>string</COLUMNTYPE> </FIELD> </FIELDLIST> <DELIMITER>,</DELIMITER> </TUPLEINFO>
通過構造函數及它的參數Directory、PathSpout和TupleInfo對象創建Spout對象。TupleInfo儲存了日志文件的字段、定界符、字段的類型這些很必要的信息。這個對象通過XSTream序列化XML時建立。
Spout的實現步驟:
-
對文件的改變進行分開的監聽,并監視目錄下有無新日志文件添加。
-
在數據得到了字段的說明后,將其轉換成tuple。
-
聲明Spout和Bolt之間的分組,并決定tuple發送給Bolt的途徑。
Spout的具體編碼在Listing Three中顯示。
Listing Three:Spout中open、nextTuple和delcareOutputFields方法的邏輯。
public void open( Map conf, TopologyContext context,SpoutOutputCollector collector ) { _collector = collector; try { fileReader = new BufferedReader(new FileReader(new File(file))); } catch (FileNotFoundException e) { System.exit(1); } } public void nextTuple() { protected void ListenFile(File file) { Utils.sleep(2000); RandomAccessFile access = null; String line = null; try { while ((line = access.readLine()) != null) { if (line !=null) { String[] fields=null; if (tupleInfo.getDelimiter().equals("|")) fields = line.split("\\"+tupleInfo.getDelimiter()); else fields = line.split (tupleInfo.getDelimiter()); if (tupleInfo.getFieldList().size() == fields.length) _collector.emit(new Values(fields)); } } } catch (IOException ex){ } } } public void declareOutputFields(OutputFieldsDeclarer declarer) { String[] fieldsArr = new String [tupleInfo.getFieldList().size()]; for(int i=0; i<tupleInfo.getFieldList().size(); i++) { fieldsArr[i] = tupleInfo.getFieldList().get(i).getColumnName(); } declarer.declare(new Fields(fieldsArr)); }
declareOutputFileds()決定了tuple發射的格式,這樣的話Bolt就可以用類似的方法將tuple譯碼。Spout持續對日志文件的數據的變更進行監聽,一旦有添加Spout就會進行讀入并且發送給Bolt進行處理。
Bolt的實現
Spout的輸出結果將給予Bolt進行更深一步的處理。經過對用例的思考,我們的topology中需要如Figure 3中的兩個Bolt。
Figure 3:Spout到Bolt的數據流程。
ThresholdCalculatorBolt
Spout將tuple發出,由ThresholdCalculatorBolt接收并進行臨界值處理。在這里,它將接收好幾項輸入進行檢查;分別是:
臨界值檢查
-
臨界值欄數檢查(拆分成字段的數目)
-
臨界值數據類型(拆分后字段的類型)
-
臨界值出現的頻數
-
臨界值時間段檢查
Listing Four中的類,定義用來保存這些值。
Listing Four:ThresholdInfo類
public class ThresholdInfo implementsSerializable { private String action; private String rule; private Object thresholdValue; private int thresholdColNumber; private Integer timeWindow; private int frequencyOfOccurence; }
基于字段中提供的值,臨界值檢查將被Listing Five中的execute()方法執行。代碼大部分的功能是解析和接收值的檢測。
Listing Five:臨界值檢測代碼段
public void execute(Tuple tuple, BasicOutputCollector collector) { if(tuple!=null) { List<Object> inputTupleList = (List<Object>) tuple.getValues(); int thresholdColNum = thresholdInfo.getThresholdColNumber(); Object thresholdValue = thresholdInfo.getThresholdValue(); String thresholdDataType = tupleInfo.getFieldList().get(thresholdColNum-1).getColumnType(); Integer timeWindow = thresholdInfo.getTimeWindow(); int frequency = thresholdInfo.getFrequencyOfOccurence(); if(thresholdDataType.equalsIgnoreCase("string")) { String valueToCheck = inputTupleList.get(thresholdColNum-1).toString(); String frequencyChkOp = thresholdInfo.getAction(); if(timeWindow!=null) { long curTime = System.currentTimeMillis(); long diffInMinutes = (curTime-startTime)/(1000); if(diffInMinutes>=timeWindow) { if(frequencyChkOp.equals("==")) { if(valueToCheck.equalsIgnoreCase(thresholdValue.toString())) { count.incrementAndGet(); if(count.get() > frequency) splitAndEmit(inputTupleList,collector); } } else if(frequencyChkOp.equals("!=")) { if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString())) { count.incrementAndGet(); if(count.get() > frequency) splitAndEmit(inputTupleList,collector); } } else System.out.println("Operator not supported"); } } else { if(frequencyChkOp.equals("==")) { if(valueToCheck.equalsIgnoreCase(thresholdValue.toString())) { count.incrementAndGet(); if(count.get() > frequency) splitAndEmit(inputTupleList,collector); } } else if(frequencyChkOp.equals("!=")) { if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString())) { count.incrementAndGet(); if(count.get() > frequency) splitAndEmit(inputTupleList,collector); } } } } else if(thresholdDataType.equalsIgnoreCase("int") || thresholdDataType.equalsIgnoreCase("double") || thresholdDataType.equalsIgnoreCase("float") || thresholdDataType.equalsIgnoreCase("long") || thresholdDataType.equalsIgnoreCase("short")) { String frequencyChkOp = thresholdInfo.getAction(); if(timeWindow!=null) { long valueToCheck = Long.parseLong(inputTupleList.get(thresholdColNum-1).toString()); long curTime = System.currentTimeMillis(); long diffInMinutes = (curTime-startTime)/(1000); System.out.println("Difference in minutes="+diffInMinutes); if(diffInMinutes>=timeWindow) { if(frequencyChkOp.equals("<")) { if(valueToCheck < Double.parseDouble(thresholdValue.toString())) { count.incrementAndGet(); if(count.get() > frequency) splitAndEmit(inputTupleList,collector); } } else if(frequencyChkOp.equals(">")) { if(valueToCheck > Double.parseDouble(thresholdValue.toString())) { count.incrementAndGet(); if(count.get() > frequency) splitAndEmit(inputTupleList,collector); } } else if(frequencyChkOp.equals("==")) { if(valueToCheck == Double.parseDouble(thresholdValue.toString())) { count.incrementAndGet(); if(count.get() > frequency) splitAndEmit(inputTupleList,collector); } } else if(frequencyChkOp.equals("!=")) { . . . } } } else splitAndEmit(null,collector); } else { System.err.println("Emitting null in bolt"); splitAndEmit(null,collector); } }
經由Bolt發送的的tuple將會傳遞到下一個對應的Bolt,在我們的用例中是DBWriterBolt。
DBWriterBolt
經過處理的tuple必須被持久化以便于觸發tigger或者更深層次的使用。DBWiterBolt做了這個持久化的工作并把tuple存入了數據庫。表的建立由prepare()函數完成,這也將是topology調用的第一個方法。方法的編碼如Listing Six所示。
Listing Six:建表編碼。
public void prepare( Map StormConf, TopologyContext context ) { try { Class.forName(dbClass); } catch (ClassNotFoundException e) { System.out.println("Driver not found"); e.printStackTrace(); } try { connection driverManager.getConnection( "jdbc:mysql://"+databaseIP+":"+databasePort+"/"+databaseName, userName, pwd); connection.prepareStatement("DROP TABLE IF EXISTS "+tableName).execute(); StringBuilder createQuery = new StringBuilder( "CREATE TABLE IF NOT EXISTS "+tableName+"("); for(Field fields : tupleInfo.getFieldList()) { if(fields.getColumnType().equalsIgnoreCase("String")) createQuery.append(fields.getColumnName()+" VARCHAR(500),"); else createQuery.append(fields.getColumnName()+" "+fields.getColumnType()+","); } createQuery.append("thresholdTimeStamp timestamp)"); connection.prepareStatement(createQuery.toString()).execute(); // Insert Query StringBuilder insertQuery = new StringBuilder("INSERT INTO "+tableName+"("); String tempCreateQuery = new String(); for(Field fields : tupleInfo.getFieldList()) { insertQuery.append(fields.getColumnName()+","); } insertQuery.append("thresholdTimeStamp").append(") values ("); for(Field fields : tupleInfo.getFieldList()) { insertQuery.append("?,"); } insertQuery.append("?)"); prepStatement = connection.prepareStatement(insertQuery.toString()); } catch (SQLException e) { e.printStackTrace(); } }
數據分批次的插入數據庫。插入的邏輯由Listting Seven中的execute()方法提供。大部分的編碼都是用來實現可能存在不同類型輸入的解析。
Listing Seven:數據插入的代碼部分。
public void execute(Tuple tuple, BasicOutputCollector collector) { batchExecuted=false; if(tuple!=null) { List<Object> inputTupleList = (List<Object>) tuple.getValues(); int dbIndex=0; for(int i=0;i<tupleInfo.getFieldList().size();i++) { Field field = tupleInfo.getFieldList().get(i); try { dbIndex = i+1; if(field.getColumnType().equalsIgnoreCase("String")) prepStatement.setString(dbIndex, inputTupleList.get(i).toString()); else if(field.getColumnType().equalsIgnoreCase("int")) prepStatement.setInt(dbIndex, Integer.parseInt(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase("long")) prepStatement.setLong(dbIndex, Long.parseLong(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase("float")) prepStatement.setFloat(dbIndex, Float.parseFloat(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase("double")) prepStatement.setDouble(dbIndex, Double.parseDouble(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase("short")) prepStatement.setShort(dbIndex, Short.parseShort(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase("boolean")) prepStatement.setBoolean(dbIndex, Boolean.parseBoolean(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase("byte")) prepStatement.setByte(dbIndex, Byte.parseByte(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase("Date")) { Date dateToAdd=null; if (!(inputTupleList.get(i) instanceof Date)) { DateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); try { dateToAdd = df.parse(inputTupleList.get(i).toString()); } catch (ParseException e) { System.err.println("Data type not valid"); } } else { dateToAdd = (Date)inputTupleList.get(i); java.sql.Date sqlDate = new java.sql.Date(dateToAdd.getTime()); prepStatement.setDate(dbIndex, sqlDate); } } catch (SQLException e) { e.printStackTrace(); } } Date now = new Date(); try { prepStatement.setTimestamp(dbIndex+1, new java.sql.Timestamp(now.getTime())); prepStatement.addBatch(); counter.incrementAndGet(); if (counter.get()== batchSize) executeBatch(); } catch (SQLException e1) { e1.printStackTrace(); } } else { long curTime = System.currentTimeMillis(); long diffInSeconds = (curTime-startTime)/(60*1000); if(counter.get()<batchSize && diffInSeconds>batchTimeWindowInSeconds) { try { executeBatch(); startTime = System.currentTimeMillis(); } catch (SQLException e) { e.printStackTrace(); } } } } public void executeBatch() throws SQLException { batchExecuted=true; prepStatement.executeBatch(); counter = new AtomicInteger(0); }
一旦Spout和Bolt準備就緒(等待被執行),topology生成器將會建立topology并準備執行。下面就來看一下執行步驟。
在本地集群上運行和測試topology
-
通過TopologyBuilder建立topology。
-
使用Storm Submitter,將topology遞交給集群。以topology的名字、配置和topology的對象作為參數。
-
提交topology。
Listing Eight:建立和執行topology。
public class StormMain { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException { ParallelFileSpout parallelFileSpout = new ParallelFileSpout(); ThresholdBolt thresholdBolt = new ThresholdBolt(); DBWriterBolt dbWriterBolt = new DBWriterBolt(); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", parallelFileSpout, 1); builder.setBolt("thresholdBolt", thresholdBolt,1).shuffleGrouping("spout"); builder.setBolt("dbWriterBolt",dbWriterBolt,1).shuffleGrouping("thresholdBolt"); if(this.argsMain!=null && this.argsMain.length > 0) { conf.setNumWorkers(1); StormSubmitter.submitTopology( this.argsMain[0], conf, builder.createTopology()); } else { Config conf = new Config(); conf.setDebug(true); conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology( "Threshold_Test", conf, builder.createTopology()); } } }
topology被建立后將被提交到本地集群。一旦topology被提交,除非被取締或者集群關閉,它將一直保持運行不需要做任何的修改。這也是Storm的另一大特色之一。
這個簡單的例子體現了當你掌握了topology、spout和bolt的概念,將可以輕松的使用Storm進行實時處理。如果你既想處理大數據又不想遍歷Hadoop的話,不難發現使用Storm將是個很好的選擇。
5. storm常見問題解答
一、我有一個數據文件,或者我有一個系統里面有數據,怎么導入storm做計算?
你需要實現一個Spout,Spout負責將數據emit到storm系統里,交給bolts計算。怎么實現spout可以參考官方的kestrel spout實現:
https://github.com/nathanmarz/storm-kestrel
如果你的數據源不支持事務性消費,那么就無法得到storm提供的可靠處理的保證,也沒必要實現ISpout接口中的ack和fail方法。
二、Storm為了保證tuple的可靠處理,需要保存tuple信息,這會不會導致內存OOM?
Storm為了保證tuple的可靠處理,acker會保存該節點創建的tuple id的xor值,這稱為ack value,那么每ack一次,就將tuple id和ack value做異或(xor)。當所有產生的tuple都被ack的時候, ack value一定為0。這是個很簡單的策略,對于每一個tuple也只要占用約20個字節的內存。對于100萬tuple,也才20M左右。關于可靠處理看這個:
https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing
三、Storm計算后的結果保存在哪里?可以保存在外部存儲嗎?
Storm不處理計算結果的保存,這是應用代碼需要負責的事情,如果數據不大,你可以簡單地保存在內存里,也可以每次都更新數據庫,也可以采用NoSQL存儲。storm并沒有像s4那樣提供一個Persist API,根據時間或者容量來做存儲輸出。這部分事情完全交給用戶。
數據存儲之后的展現,也是你需要自己處理的,storm UI只提供對topology的監控和統計。
四、Storm怎么處理重復的tuple?
因為Storm要保證tuple的可靠處理,當tuple處理失敗或者超時的時候,spout會fail并重新發送該tuple,那么就會有tuple重復計算的問題。這個問題是很難解決的,storm也沒有提供機制幫助你解決。一些可行的策略:
(1)不處理,這也算是種策略。因為實時計算通常并不要求很高的精確度,后續的批處理計算會更正實時計算的誤差。
(2)使用第三方集中存儲來過濾,比如利用mysql,memcached或者redis根據邏輯主鍵來去重。
(3)使用bloom filter做過濾,簡單高效。
五、Storm的動態增刪節點
我在storm和s4里比較里談到的動態增刪節點,是指storm可以動態地添加和減少supervisor節點。對于減少節點來說,被移除的supervisor上的worker會被nimbus重新負載均衡到其他supervisor節點上。在storm 0.6.1以前的版本,增加supervisor節點不會影響現有的topology,也就是現有的topology不會重新負載均衡到新的節點上,在擴展集群的時候很不方便,需要重新提交topology。因此我在storm的郵件列表里提了這個問題,storm的開發者nathanmarz創建了一個issue 54并在0.6.1提供了rebalance命令來讓正在運行的topology重新負載均衡,具體見:
https://github.com/nathanmarz/storm/issues/54
和0.6.1的變更:
http://groups.google.com/group/storm-user/browse_thread/thread/24a8fce0b2e53246
storm并不提供機制來動態調整worker和task數目。
六、Storm UI里spout統計的complete latency的具體含義是什么?為什么emit的數目會是acked的兩倍?
這個事實上是storm郵件列表里的一個問題。Storm作者marz的解答:
The complete latency is the time from the spout emitting a tuple to that tuple being acked on the spout . So it tracks the time for the whole tuple tree to be processed. If you dive into the spout component in the UI, you'll see that a lot of the emitted/transferred is on the __ack* stream. This is the spout communicating with the ackers which take care of tracking the tuple trees.
簡單地說,complete latency表示了tuple從emit到被acked經過的時間,可以認為是tuple以及該tuple的后續子孫(形成一棵樹)整個處理時間。其次spout的emit和transfered還統計了spout和acker之間內部的通信信息,比如對于可靠處理的spout來說,會在emit的時候同時發送一個_ack_init給acker,記錄tuple id到task id的映射,以便ack的時候能找到正確的acker task。
6. 其他開源的大數據解決方案
自 Google 在 2004 年推出 MapReduce 范式以來,已誕生了多個使用原始 MapReduce 范式(或擁有該范式的質量)的解決方案。Google 對 MapReduce 的最初應用是建立萬維網的索引。盡管此應用程序仍然很流行,但這個簡單模型解決的問題也正在增多。
表 1 提供了一個可用開源大數據解決方案的列表,包括傳統的批處理和流式處理應用程序。在將 Storm 引入開源之前將近一年的時間里,Yahoo! 的 S4 分布式流計算平臺已向 Apache 開源。S4 于 2010 年 10 月發布,它提供了一個高性能計算 (HPC) 平臺,向應用程序開發人員隱藏了并行處理的復雜性。S4 實現了一個可擴展的、分散化的集群架構,并納入了部分容錯功能。
表 1. 開源大數據解決方案
解決方案 | 開發商 | 類型 | 描述 |
---|---|---|---|
Storm | 流式處理 | Twitter 的新流式大數據分析解決方案 | |
S4 | Yahoo! | 流式處理 | 來自 Yahoo! 的分布式流計算平臺 |
Hadoop | Apache | 批處理 | MapReduce 范式的第一個開源實現 |
Spark | UC Berkeley AMPLab | 批處理 | 支持內存中數據集和恢復能力的最新分析平臺 |
Disco | Nokia | 批處理 | Nokia 的分布式 MapReduce 框架 |
HPCC | LexisNexis | 批處理 | HPC 大數據集群 |
csdn(編譯/仲浩 王旭東/審校):http://www.csdn.net/article/2012-12-24/2813117-storm-realtime-big-data-analysis
轉自:http://blog.csdn.net/hguisu/article/details/8454368
原創文章,作者:s19930811,如若轉載,請注明出處:http://www.www58058.com/2657