Кафка поведение потребителей при неудачном десериализации

голоса
0

Мне было интересно, как сказать Кафка потребителю при десериализации объекта не может уронить запись из Кафки, журнал ошибок и продолжать слушать другие входящие сообщения.

У меня есть мой потребитель настроен, как показано ниже: ConsumerProperties:

Map<String, Object> consumerProperties = new HashMap<>();
    consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            kafkaPropertiesConfiguration.getBootstrapServers());
    consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());

    consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            NullDeserializer.class.getName());

    consumerProperties.put(ConsumerConfig.CLIENT_ID_CONFIG,
            kafkaTelecontrolEventConsumerProperties.getClientIdConfig());
    consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG,
            kafkaTelecontrolEventConsumerProperties.getGroupIdConfig());

    return new DefaultKafkaConsumerFactory<>(consumerProperties); 

NullDeserializer.class мой десериализатор используется для тестирования:

@Log4j2
public class NullDeserializer implements Deserializer<Object> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public Object deserialize(String topic, byte[] data) {
        try {
            TeleControl.Event.parseFrom(data) ;
        } catch (InvalidProtocolBufferException e) {
            throw new RuntimeException(e);
        }
        return null;
    }

    @Override
    public void close() {

    }
}

IntegrationFlow:

return IntegrationFlows
                .from(Kafka
                        .messageDrivenChannelAdapter(telecontrolEventConsumerFactory,
                                eventConsumerProperties.getTelecontrolEventTopic())
                )
                .handle(System.err::println)
                .get();

Моя главная проблема в том, когда новый RuntimeException выбрасывается при разборе терпит неудачу, как сказать kafkaConsumer зарегистрировать ошибку, уронить запись и продолжить обработку следующего сообщения Кафки.

Задан 27/11/2018 в 15:03
источник пользователем
На других языках...                            


1 ответов

голоса
1

См ErrorHandlingDeserializer введены в 2.2.

Предстоящий релиз 2.2.1 (на завтра) заменяет его , ErrorHandlingDeserializer2который типобезопасен.

В обоих случаях, исключение десериализации передается слушателю контейнер , который, в свою очередь, передает его непосредственно к ErrorHandlerвместо вызова слушателя.

Ответил 27/11/2018 в 15:38
источник пользователем

Cookies help us deliver our services. By using our services, you agree to our use of cookies. Learn more