版權聲明:本文為博主原創文章,未經博主允許不得轉載。
目錄(?)[+]
學習心得(三)流配置中介紹多路復用流的時候,有說到Flume支持從一個源發送事件到多個通道中,這被稱為事件流的復用。這里需要在配置中定義事件流的復制/復用,選擇1個或者多個通道進行數據流向。
而關于selector配置前面也講過:
<Agent>.sources.<Source1>.selector.type= replicating
這個源的選擇類型為復制。這個參數不指定一個選擇的時候,默認情況下它復制
復用則是麻煩一下,流的事情是被篩選的發生到不同的渠道,需要指定源和扇出通道的規則,感覺與case when 類似。
復用的參數為:
<Agent>.sources.<Source1>.selector.type= multiplexing
這里需要配置1個代理作為源發送與2個代理作為接受復制事件,共3個flume配置
首先是作為源發送的代理配置
[html] view plain copy
#配置文件:replicate_source_case11.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 192.168.233.129
a1.sinks.k1.port = 50000
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c2
a1.sinks.k2.hostname = 192.168.233.130
a1.sinks.k2.port = 50000
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
這里設置了2個channels與2個sinks,那么我們也要設置2個sinks對應的代理配置:
下面是第一個接受復制事件代理配置
[html] view plain copy
#配置文件:replicate_sink1_case11.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.channels = c1
a2.sources.r1.bind = 192.168.233.129
a2.sources.r1.port = 50000
# Describe the sink
a2.sinks.k1.type = logger
a2.sinks.k1.channel = c1
# Use a channel which buffers events inmemory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
下面是第二個接受復制事件代理配置:
[html] view plain copy
#配置文件:replicate_sink2_case11.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.channels = c1
a3.sources.r1.bind = 192.168.233.130
a3.sources.r1.port = 50000
# Describe the sink
a3.sinks.k1.type = logger
a3.sinks.k1.channel = c1
# Use a channel which buffers events inmemory
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
#敲命令
首先先啟動2個接受復制事件代理,如果先啟動源發送的代理,會報他找不到sinks的綁定,因為2個接事件的代理還未起來。
flume-ng agent -cconf -f conf/replicate_sink1_case11.conf -n a1 -Dflume.root.logger=INFO,console
flume-ng agent -cconf -f conf/replicate_sink2_case11.conf -n a1 -Dflume.root.logger=INFO,console
在啟動源發送的代理
flume-ng agent -cconf -f conf/replicate_source_case11.conf -n a1 -Dflume.root.logger=INFO,console
啟動成功后
打開另一個終端輸入,往偵聽端口送數據
echo "hello looklook5"| nc 192.168.233.128 50000
#在啟動源發送的代理終端查看console輸出
可以看到他的正常啟動以及發送數據成功
#在啟動源第一個接事件的代理終端查看console輸出
可以看到他的正常啟動,以及接受到源代理發送的數據
#在啟動源第二個接事件的代理終端查看console輸出
同樣可以可以看到他的正常啟動,以及接受到源代理發送的數據
Ok,成功
因為復用的流的事件要聲明一個頭部,然后我們檢查頭部對應的值,因為我們這邊源類用http source
下面是源代理的配置
[html] view plain copy
#配置文件:multi_source_case12.conf
a1.sources= r1
a1.sinks= k1 k2
a1.channels= c1 c2
#Describe/configure the source
a1.sources.r1.type= org.apache.flume.source.http.HTTPSource
a1.sources.r1.port= 50000
a1.sources.r1.host= 192.168.233.128
a1.sources.r1.selector.type= multiplexing
a1.sources.r1.channels= c1 c2
a1.sources.r1.selector.header= state
a1.sources.r1.selector.mapping.CZ= c1
a1.sources.r1.selector.mapping.US= c2
a1.sources.r1.selector.default= c1
#Describe the sink
a1.sinks.k1.type= avro
a1.sinks.k1.channel= c1
a1.sinks.k1.hostname= 192.168.233.129
a1.sinks.k1.port= 50000
a1.sinks.k2.type= avro
a1.sinks.k2.channel= c2
a1.sinks.k2.hostname= 192.168.233.130
a1.sinks.k2.port= 50000
# Usea channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity= 1000
a1.channels.c1.transactionCapacity= 100
a1.channels.c2.type= memory
a1.channels.c2.capacity= 1000
a1.channels.c2.transactionCapacity= 100
這里設置了2個channels與2個sinks 同時判斷頭部屬性,當CZ的時,事件發送到sinks1,US時發送到sink2,其他的都發送到sink2,因此我們還有配置2個sinks對于的代理。這里的2個接受代理我們沿用之前復制的接受代理。
#敲命令
與之前復制的情況一樣,首先先啟動2個接受復制事件代理,如果先啟動源發送的代理,會報他找不到sinks的綁定,因為2個接事件的代理還未起來。
flume-ng agent -cconf -f conf/multi_sink1_case12.conf -n a1 -Dflume.root.logger=INFO,console
flume-ng agent -cconf -f conf/multi_sink2_case12.conf -n a1 -Dflume.root.logger=INFO,console
在啟動源發送的代理
flume-ng agent -cconf -f conf/multi_source_case12.conf -n a1 -Dflume.root.logger=INFO,console
啟動成功后
打開另一個終端輸入,往偵聽端口送數據
curl -X POST -d '[{"headers" :{"state" : "CZ"},"body" :"TEST1"}]' http://192.168.233.128:50000
curl -X POST -d '[{"headers" :{"state" : "US"},"body" :"TEST2"}]' http://192.168.233.128:50000
curl -X POST -d '[{"headers" :{"state" : "SH"},"body" :"TEST3"}]' http://192.168.233.128:50000
#在啟動源發送的代理終端查看console輸出
可以看到他的正常啟動以及發送數據成功
#在啟動源第一個接事件的代理終端查看console輸出
這里可以清楚的看到,這個接事件代理只收到了2個事件,因為第二個事件因為我們設置復用,將頭部信息對于的事件分流的關系,發送到另一個接事件代理去了。
#在啟動源第二個接事件的代理終端查看console輸出
Ok,第二個接事件代理因為復用分流,果然只獲得了第二個事件信息。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。