Kafka 使用 Protocol Buffers(簡稱 Protobuf)作為其消息序列化格式,它是一種高效且可擴展的二進制序列化協議。處理二進制數據時,Protobuf 可以將復雜的數據結構轉換為緊湊的二進制格式,從而提高傳輸效率和減少數據大小。
要在 Kafka 中使用 Protobuf 處理二進制數據,請按照以下步驟操作:
Person 的消息:syntax = "proto3";
message Person {
string name = 1;
int32 age = 2;
bytes avatar = 3;
}
在這個例子中,我們定義了一個包含姓名、年齡和頭像(avatar)的 Person 消息。頭像是一個二進制字段,可以使用 bytes 類型表示。
protoc 編譯器根據 .proto 文件生成對應編程語言的代碼。例如,為 Java 生成代碼:protoc --java_out=. person.proto
這將生成一個名為 PersonOuterClass.java 的文件,其中包含 Person 消息的序列化和反序列化方法。
Person 消息序列化為二進制格式。例如,在 Java 中:import com.example.PersonOuterClass.Person;
Person person = Person.newBuilder()
.setName("John Doe")
.setAge(30)
.setAvatar(ByteString.copyFromUtf8("https://example.com/avatar.jpg"))
.build();
byte[] serializedPerson = person.toByteArray();
Person 對象。例如,在 Java 中:import com.example.PersonOuterClass.Person;
byte[] receivedSerializedPerson = ...; // 從 Kafka 接收到的二進制數據
Person deserializedPerson = Person.parseFrom(receivedSerializedPerson);
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(properties);
producer.send(new ProducerRecord<>("person-topic", person.toByteArray()));
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList("person-topic"));
while (true) {
ConsumerRecord<String, byte[]> record = consumer.poll(Duration.ofMillis(100));
byte[] receivedSerializedPerson = record.value();
Person deserializedPerson = Person.parseFrom(receivedSerializedPerson);
// 處理反序列化后的消息
}
通過以上步驟,你可以在 Kafka 中使用 Protobuf 處理二進制數據。這種處理方式可以提高傳輸效率,減少數據大小,并使數據結構更加緊湊。