溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么實現Cassandra與Hadoop MapReduce的整合

發布時間:2021-11-20 16:49:58 來源:億速云 閱讀:139 作者:小新 欄目:開發技術

這篇文章主要介紹怎么實現Cassandra與Hadoop MapReduce的整合,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!

整合Cassandra與Hadoop MapReduce

看到這個標題,大家一定會問了。這個整合如何定義?

我個人認為,所謂的整合是指:我們可以編寫MapReduce程序,從HDFS中讀取數據然后插入到Cassandra中。也可以是直接從Cassandra中讀取數據,然后進行相應的計算。

從HDFS中讀取數據然后插入到Cassandra中

對于這種類型,我們可以按照以下幾個步驟來操作。

1將需要插入Cassandra的數據上傳到HDFS中。

2啟動Hadoop MapReduce程序。

這種類型的整合其實和Cassandra本身沒有什么聯系。我們只是運行普通的MapReduce程序,然后在Map或者Reduce端將計算好的數據插入到Cassandra中。僅此而已。

直接從Cassandra中讀取數據,然后進行相應的計算

這個功能是在Cassandra0.6.x版本中添加上去的。其可以從Cassandra直接讀取MapReduce需要的數據,實現對于Cassandra的全表掃描的功能。

操作步驟如下:

1在MapReduce程序中指定使用的KeySpace,ColumnFamily,和SlicePredicate等和Cassandra相關的參數。(關于這些概念,可以參考《大話Cassandra數據模型》和《談談Cassandra的客戶端》)

2啟動Hadoop MapReduce程序。

這種類型的整合和從HDFS讀取數據的整合相比,還是有許多不同的,主要有下面幾點區別:

1輸入數據來源不同:前一種是從HDFS中讀取輸入數據,后一種是從Cassandra中直接讀取數據。

2Hadoop的版本不同:前一種可以使用任何版本的Hadoop,后一種只能使用Hadoop0.20.x

整合Hadoop0.19.x與Cassandra0.6.x

在Cassandra0.6.x中,默認實現的是與Hadoop0.20.x的整合,我們無法直接在Hadoop0.19.x中使用。

所以,要實現這個目標,我們***步需要做的事情是,修改Cassandra的源代碼,提供一個可以在Hadoop0.19.x中使用的功能。

想要進行這項測試,我們可以按照如下步驟來進行:

1下載修改后的代碼。

2在MapReduce中指定如下內容(注意,這里的class使用的package都是com.alibaba.dw.cassandra.hadoop下面的):

ConfigHelper.setColumnFamily(conf,Keyspace,MemberCF,"/home/admin/apache-cassandra-0.6.1/conf");SlicePredicatepredicate=newSlicePredicate().setColumn_names(Arrays.asList("CITY".getBytes(UTF8),"EMPLOYEES_COUNT".getBytes(UTF8)));ConfigHelper.setSlicePredicate(conf,predicate);ConfigHelper.setRangeBatchSize(conf,512);ConfigHelper.setSuperColumn(conf,"MemberInfo");
3確保每一臺運行MapReduce的機器的指定目錄與MapReduce程序中設定的storage-conf.xml文件路徑一致。

4運行 Hadoop MapReduce程序。

存在的問題與改進
在實際的使用中,我們會發現Map端會出現這樣的錯誤信息:

java.lang.RuntimeException:TimedOutException()atcom.alibaba.dw.cassandra.hadoop.ColumnFamilyRecordReader$RowIterator.maybeInit(ColumnFamilyRecordReader.java:125)atcom.alibaba.dw.cassandra.hadoop.ColumnFamilyRecordReader$RowIterator.computeNext(ColumnFamilyRecordReader.java:164)atcom.alibaba.dw.cassandra.hadoop.ColumnFamilyRecordReader$RowIterator.computeNext(ColumnFamilyRecordReader.java:1)atcom.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:135)atcom.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:130)atcom.alibaba.dw.cassandra.hadoop.ColumnFamilyRecordReader.next(ColumnFamilyRecordReader.java:224)atcom.alibaba.dw.cassandra.hadoop.ColumnFamilyRecordReader.next(ColumnFamilyRecordReader.java:1)atorg.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:192)atorg.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:176)atorg.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48)atorg.apache.hadoop.mapred.MapTask.run(MapTask.java:342)atorg.apache.hadoop.mapred.Child.main(Child.java:158)Causedby:TimedOutException()atorg.apache.cassandra.thrift.Cassandra$get_range_slices_result.read(Cassandra.java:11015)atorg.apache.cassandra.thrift.Cassandra$Client.recv_get_range_slices(Cassandra.java:623)atorg.apache.cassandra.thrift.Cassandra$Client.get_range_slices(Cassandra.java:597)atcom.alibaba.dw.cassandra.hadoop.ColumnFamilyRecordReader$RowIterator.maybeInit(ColumnFamilyRecordReader.java:108)...11more

引起這樣的問題的原因就在于使用ThriftAPI從Cassandra讀取數據失敗了。所以我們可以優化這段代碼,提供想要的錯誤處理功能來提供程序的可用性。

以上是“怎么實現Cassandra與Hadoop MapReduce的整合”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女