溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Java中kafka自定義分區類和攔截器的實現方法

發布時間:2020-07-17 15:11:45 來源:億速云 閱讀:205 作者:小豬 欄目:編程語言

這篇文章主要講解了Java中kafka自定義分區類和攔截器的實現方法,內容清晰明了,對此有興趣的小伙伴可以學習一下,相信大家閱讀完之后會有幫助。

生產者發送到對應的分區有以下幾種方式:

(1)指定了patition,則直接使用;(可以查閱對應的java api, 有多種參數)

(2)未指定patition但指定key,通過對key的value進行hash出一個patition;

(3)patition和key都未指定,使用輪詢選出一個patition。

但是kafka提供了,自定義分區算法的功能,由業務手動實現分布:

1、實現一個自定義分區類,CustomPartitioner實現Partitioner

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class CustomPartitioner implements Partitioner {

  /**
   *
   * @param topic 當前的發送的topic
   * @param key  當前的key值
   * @param keyBytes 當前的key的字節數組
   * @param value 當前的value值
   * @param valueBytes 當前的value的字節數組
   * @param cluster
   * @return
   */
  @Override
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    //這邊根據返回值就是分區號, 這邊就是固定發送到三號分區
    return 3;
  }

  @Override
  public void close() {

  }
  @Override
  public void configure(Map<String, &#63;> configs) {

  }

}

2、producer配置文件指定,具體的分區類

// 具體的分區類
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.CustomPartitioner");

技巧:可以使用ProducerConfig中提供的配置ProducerConfig

kafka producer攔截器

攔截器(interceptor)是在Kafka 0.10版本被引入的。

interceptor使得用戶在消息發送前以及producer回調邏輯前有機會對消息做一些定制化需求,比如修改消息等。

許用戶指定多個interceptor按序作用于同一條消息從而形成一個攔截鏈(interceptor chain)。

所使用的類為:

org.apache.kafka.clients.producer.ProducerInterceptor

我們可以編碼測試下:

1、定義消息攔截器,實現消息處理(可以是加時間戳等等,unid等等。)

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;
import java.util.UUID;

public class MessageInterceptor implements ProducerInterceptor<String, String> {

  @Override
  public void configure(Map<String, &#63;> configs) {
    System.out.println("這是MessageInterceptor的configure方法");
  }

  /**
   * 這個是消息發送之前進行處理
   *
   * @param record
   * @return
   */
  @Override
  public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
    // 創建一個新的record,把uuid入消息體的最前部
    System.out.println("為消息添加uuid");
    return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
        UUID.randomUUID().toString().replace("-", "") + "," + record.value());
  }

  /**
   * 這個是生產者回調函數調用之前處理
   * @param metadata
   * @param exception
   */
  @Override
  public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    System.out.println("MessageInterceptor攔截器的onAcknowledgement方法");
  }

  @Override
  public void close() {
    System.out.println("MessageInterceptor close 方法");
  }
}

2、定義計數攔截器

import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class CounterInterceptor implements ProducerInterceptor<String, String>{
  private int errorCounter = 0;
  private int successCounter = 0;

  @Override
  public void configure(Map<String, &#63;> configs) {
    System.out.println("這是CounterInterceptor的configure方法");
  }

  @Override
  public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
    System.out.println("CounterInterceptor計數過濾器不對消息做任何操作");
    return record;
  }

  @Override
  public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    // 統計成功和失敗的次數
    System.out.println("CounterInterceptor過濾器執行統計失敗和成功數量");
    if (exception == null) {
      successCounter++;
    } else {
      errorCounter++;
    }
  }

  @Override
  public void close() {
    // 保存結果
    System.out.println("Successful sent: " + successCounter);
    System.out.println("Failed sent: " + errorCounter);
  }
}

3、producer客戶端:

import org.apache.kafka.clients.producer.*;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class Producer1 {
  public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    // Kafka服務端的主機名和端口號
    props.put("bootstrap.servers", "localhost:9092");
    // 等待所有副本節點的應答
    props.put("acks", "all");
    // 消息發送最大嘗試次數
    props.put("retries", 0);
    // 一批消息處理大小
    props.put("batch.size", 16384);
    // 請求延時,可能生產數據太快了
    props.put("linger.ms", 1);
    // 發送緩存區內存大小,數據是先放到生產者的緩沖區
    props.put("buffer.memory", 33554432);
    // key序列化
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    // value序列化
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    // 具體的分區類
    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.CustomPartitioner");
    //定義攔截器
    List<String> interceptors = new ArrayList<>();
    interceptors.add("kafka.MessageInterceptor");
    interceptors.add("kafka.CounterInterceptor");
    props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

    Producer<String, String> producer = new KafkaProducer<>(props);
    for (int i = 0; i < 1; i++) {
      producer.send(new ProducerRecord<String, String>("test_0515", i + "", "xxx-" + i), new Callback() {
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
          System.out.println("這是producer回調函數");
        }
      });
    }
    /*System.out.println("現在執行關閉producer");
    producer.close();*/
    producer.close();
  }
}

總結,我們可以知道攔截器鏈各個方法的執行順序,假如有A、B攔截器,在一個攔截器鏈中:

(1)執行A的configure方法,執行B的configure方法

(2)執行A的onSend方法,B的onSend方法

(3)生產者發送完畢后,執行A的onAcknowledgement方法,B的onAcknowledgement方法。

(4)執行producer自身的callback回調函數。

(5)執行A的close方法,B的close方法。

看完上述內容,是不是對Java中kafka自定義分區類和攔截器的實現方法有進一步的了解,如果還想學習更多內容,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女