以下是整理flume的各種采集方式 代碼直接用
一、source類型是netcat
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux1
a1.sources.r1.port = 666
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
命令./flume-ng agent -n a1 -f ../conf/netcat.conf -Dflume.root.logger=INFO,console
二、source類型是spooldir
a1.sources = r1 ##給agent的source起名
a1.sinks = k1 ##給agent的sinks起名
a1.channels = c1 ##給agent的channels起名
a1.sources.r1.type = spooldir ##文件夾
a1.sources.r1.spoolDir = /root/flume ##要采集的目錄
a1.sources.r1.fileHeader = true ##采集過的文件是否需要添加一個后綴
a1.sinks.k1.type = logger
a1.channels.c1.type = memory ##把緩存數據放到內存
a1.channels.c1.capacity = 1000 ##管道里面最多可以存放多少事件
a1.channels.c1.transactionCapacity = 100 ##每次對最接收多少事件
a1.sources.r1.channels = c1 ## 把source和channels連接上
a1.sinks.k1.channel = c1 ##把sinks和channel連接上
命令:…/bin/flume-ng agent -n a1 -f ../conf/spooldir.conf -Dflume.root.logger=INFO,console
三、source 類型是avro
這個配置source是avro類型,是一個服務器,這個服務器
開啟一個端口8088,目是接收數據的
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = linux1 ##當前這一臺機器的ip
a1.sources.r1.port = 8088
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
命令:…/bin/flume-ng agent -n a1 -f ../conf/server.conf -Dflume.root.logger=INFO,console
四、source是socket類型
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type =syslogtcp
a1.sources.r1.bind=linux1
a1.sources.r1.port=8080
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
五、sink類型是avro
數據發送者 是一個客戶端 目的就是發送數據
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux2 ##當前這一臺機器的ip
a1.sources.r1.port = 666
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux1
a1.sinks.k1.port = 8088
a1.sinks.k1.batch-size = 2
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
./bin/flume-ng agent -n a1 -f ./conf/client.conf -Dflume.root.logger=INFO,console
六、sink類型是兩個avro
a1.sources = r1 r2
a1.sinks = k1 k2
a1.channels = c1 c2
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux2
a1.sources.r1.port = 666
a1.sources.r2.type = netcat
a1.sources.r2.bind = linux2
a1.sources.r2.port = 777
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux1
a1.sinks.k1.port = 8088
a1.sinks.k1.batch-size = 2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = linux1
a1.sinks.k2.port = 8088
a1.sinks.k2.batch-size = 2
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sources.r2.channels = c2
a1.sinks.k2.channel = c2
七、sink 類型是hdfs
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux1
a1.sources.r1.port = 666
a1.sinks.k1.type = hdfs #sink到hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
##filePrefix 默認值:FlumeData
##寫入hdfs的文件名前綴,可以使用flume提供的日期及%{host}表達式。
a1.sinks.k1.hdfs.filePrefix = events-
##默認值:30
##hdfs sink間隔多長將臨時文件滾動成最終目標文件,單位:秒
##如果設置成0,則表示不根據時間來滾動文件
#注:滾動(roll)指的是,hdfs sink將臨時文件重命名成最終目標文件,
#并新打開一個臨時文件來寫入數據;
a1.sinks.k1.hdfs.rollInterval = 30
##默認值:1024
##當臨時文件達到該大?。▎挝唬篵ytes)時,滾動成目標文件;
##如果設置成0,則表示不根據臨時文件大小來滾動文件;
a1.sinks.k1.hdfs.rollSize = 0
##默認值:10
##當events數據達到該數量時候,將臨時文件滾動成目標文件;
##如果設置成0,則表示不根據events數據來滾動文件;
a1.sinks.k1.hdfs.rollCount = 0
##batchSize 默認值:100
##每個批次刷新到HDFS上的events數量;
a1.sinks.k1.hdfs.batchSize = 1
##useLocalTimeStamp
##默認值:flase
##是否使用當地時間。
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件類型,默認是Sequencefile,可用DataStream,則為普通文本
a1.sinks.k1.hdfs.fileType = DataStream
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
八、sink類型是kafka類型的
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux1
a1.sources.r1.port = 666
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = Hellokafka
a1.sinks.k1.brokerList = linux1:9092,linux2:9092,linux3:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
九、Source 是mysql
a1.sources = r1
a1.sinks = k1
a1.channels = c1
###########sources#################
a1.sources.r1.type = org.keedio.flume.source.SQLSource
a1.sources.r1.hibernate.connection.url = jdbc:mysql://localhost:3306/test
a1.sources.r1.hibernate.connection.user = root
a1.sources.r1.hibernate.connection.password = 123456
a1.sources.r1.hibernate.connection.autocommit = true
a1.sources.r1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
a1.sources.r1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
a1.sources.r1.run.query.delay=10000
a1.sources.r1.status.file.path = /root/data/flume/
a1.sources.r1.status.file.name = sqlSource.status
a1.sources.r1.start.from = 0
a1.sources.r1.custom.query = select id,userName from user where id > $@$ order by id asc
a1.sources.r1.batch.size = 1000
a1.sources.r1.max.rows = 1000
a1.sources.r1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.r1.hibernate.c3p0.min_size=1
a1.sources.r1.hibernate.c3p0.max_size=10
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。