是的,Spring Kafka 可以實現消息過濾。在 Spring Kafka 中,你可以使用 KafkaMessageListenerContainer
和 MessageListenerAdapter
來處理接收到的消息。為了實現消息過濾,你可以在 MessageListenerAdapter
的實現類中編寫自定義的邏輯來過濾消息。
以下是一個簡單的示例:
ConsumerAwareErrorHandler
接口的類,用于處理接收到的錯誤消息:import org.springframework.kafka.listener.ConsumerAwareErrorHandler;
import org.springframework.kafka.listener.Message;
public class CustomErrorHandler implements ConsumerAwareErrorHandler {
@Override
public void handle(Exception thrownException, Message message, ConsumerRecord<?, ?> data) {
// 在這里編寫你的錯誤處理邏輯
}
}
MessageListener
接口的類,用于處理接收到的消息:import org.springframework.kafka.listener.MessageListener;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
@Component
public class CustomMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
// 在這里編寫你的消息過濾和處理邏輯
String payload = new String(message.getPayload());
String key = message.getKey();
// 示例:根據消息頭或消息體進行過濾
if (shouldFilter(payload)) {
// 處理過濾后的消息
} else {
// 忽略過濾后的消息
}
}
private boolean shouldFilter(String payload) {
// 在這里編寫你的過濾邏輯
return payload.contains("filtered");
}
}
KafkaListenerEndpoint
配置類中,將 CustomMessageListener
與 KafkaMessageListenerContainer
關聯起來:import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.config.MethodKafkaListenerEndpointRegistrar;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaListenerConfig implements KafkaListenerConfigurer {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
Map<String, Object> props = new HashMap<>();
// 配置你的消費者屬性,如 groupId、bootstrapServers 等
// ...
registrar.registerEndpoint(new MethodKafkaListenerEndpoint<>(
"custom-topic",
"customMethod",
getClass().getClassLoader(),
String.class,
String.class,
props
));
}
@Bean
public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry(KafkaListenerEndpointRegistrar registrar) {
KafkaListenerEndpointRegistry registry = new KafkaListenerEndpointRegistry();
registrar.afterPropertiesSet();
registry.start();
return registry;
}
}
CustomMessageListener
實現類中,使用 @KafkaListener
注解指定要監聽的主題和組:import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class CustomMessageListener implements MessageListener {
@Override
@KafkaListener(topics = "custom-topic", groupId = "custom-group")
public void onMessage(Message message) {
// 在這里編寫你的消息過濾和處理邏輯
}
}
現在,當你的應用程序接收到發送到 custom-topic
主題的消息時,CustomMessageListener
將根據 shouldFilter
方法中的過濾邏輯來決定是否處理該消息。