Kafka
Retrying in Kafka
Distributed streaming
Data pipelines
Spring Kafka

Kafka Retry Mechanism in Spring Kafka

by: Eby Augustine

January 04, 2024

titleImage

Introduction

Kafka is a distributed streaming platform widely used for building real-time data pipelines and streaming applications. However, in distributed systems, failures are inevitable. Handling failures gracefully is crucial to ensure data integrity and system reliability. One of the critical aspects of building resilient Kafka applications is implementing a robust retry mechanism.

Why Retry Mechanism is Important in Kafka?

In Kafka-based systems, producers publish messages to topics, and consumers consume these messages asynchronously. However, due to various reasons such as network issues, temporary unavailability of resources, or transient errors, message processing might fail. In such cases, retrying failed messages can help ensure that important data is not lost and processing is eventually successful.

Retry Strategies

There are several retry strategies that can be employed when retrying failed Kafka messages. Some common strategies include:

  1. Exponential Backoff: This strategy increases the delay between retries exponentially. It helps reduce the load on the system during peak times and prevents overwhelming downstream services with retry attempts.
  2. Fixed Delay: In this strategy, the delay between retries remains constant. It's simpler than exponential backoff but may not be as effective in reducing load spikes.
  3. Maximum Attempts: Limiting the maximum number of retry attempts prevents infinite retry loops and ensures that retries don't continue indefinitely.

Implementation with Spring Kafka

Now, let's see how to implement a retry mechanism using Spring Kafka. We'll use Spring Kafka's RetryTopicConfiguration to configure retry behavior for Kafka topics.

Define DLT (Dead Letter Topic) Handler

We start by defining a Dead Letter Topic handler, which handles messages that have exceeded the maximum retry attempts. This handler typically logs or stores information about failed messages for further analysis. DLT and retry topics will be automatically created as per our configuration.

@Component
@Slf4j
public class CustomDltProcessor {

    @Autowired KafkaService kafkaService;

    public void processDltMessage(
            String message,
            Exception exception,
            @Header(KafkaHeaders.ORIGINAL_OFFSET) byte[] offset,
            @Header(KafkaHeaders.EXCEPTION_FQCN) String descException,
            @Header(KafkaHeaders.EXCEPTION_STACKTRACE) String stacktrace,
            @Header(KafkaHeaders.EXCEPTION_MESSAGE) String errorMessage) {

        KafkaErrorResponse kafkaErrorResponse =
                KafkaErrorResponse.builder()
                        .payload(message)
                        .exceptionMessage("Error while processing kafka message :: " + errorMessage)
                        .exceptionStacktrace(stacktrace)
                        .build();
        log.error("Error while processing kafka message :: " + kafkaErrorResponse.toString());
    }
}

Configure Retry Behavior

Next, we configure retry behavior for Kafka topics using RetryTopicConfiguration. We specify parameters such as maximum attempts, retry interval, and the DLT handler method.

@Configuration
@Slf4j
public class KafkaRetryConfig {

    @Value("${ksm.kafka.topic.update-topic}")
    String updateTopic;

    @Autowired private CustomDltProcessor customDltProcessor;
    
    /* Below bean won't retry for exceptions in notRetryOn list */

    @Bean
    public RetryTopicConfiguration retryTopicCreation(KafkaTemplate<String, String> template) {
        return RetryTopicConfigurationBuilder.newInstance()
                .exponentialBackoff(1000, 2, 5000)
                .maxAttempts(3)
                .excludeTopics(List.of(updateTopic))
                .dltHandlerMethod("customDltProcessor", "processDltMessage")
                .dltProcessingFailureStrategy(DltStrategy.FAIL_ON_ERROR)
                .notRetryOn(
                        List.of(
                                ResubmitInvalidRequestException.class,
                                NullPointerException.class,
                                SerializationException.class))
                .create(template);
    }

    /* Below bean only retires when ResubmitInvalidRequestException is thrown */ 
    @Bean
    public RetryTopicConfiguration retryTopicCreationForOtherCases(
            KafkaTemplate<String, String> template) {
        return RetryTopicConfigurationBuilder.newInstance()
                .maxAttempts(1)
                .excludeTopics(List.of(updateTopic))
                .dltHandlerMethod("customDltProcessor", "processDltMessage")
                .dltProcessingFailureStrategy(DltStrategy.FAIL_ON_ERROR)
                .retryOn(
                        List.of(
                                ResubmitInvalidRequestException.class))
                .create(template);
    }
}

Custom Producer Configuration

To ensure retry topics and DLT receives messages in proper format, we have to configure Producer

@Configuration
public class KafkaProducerConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put("security.protocol", "SSL");
        ProducerFactory<String, String> producerFactory =
                new DefaultKafkaProducerFactory<>(
                        config, new StringSerializer(), new StringSerializer());
        return producerFactory;
    }

    @Bean
    public KafkaTemplate<?, ?> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean(name = "producerObject")
    public ProducerFactory<String, Object> producerObjectFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put("security.protocol", "SSL");
        return new DefaultKafkaProducerFactory<>(
                config, new StringSerializer(), new JsonSerializer<>());
    }

    @Bean(name = "kafkaObjectTemplate")
    public KafkaTemplate<String, Object> kafkaObjectTemplate() {
        return new KafkaTemplate<>(producerObjectFactory());
    }
}

Kafka Service To Send Message

With Producer Configuration setup done, we can use the same while sending kafka messages.


@Service
@Slf4j
public class KafkaService {
    private final KafkaTemplate<String, Object> kafkaTemplate;

    @Autowired
    public KafkaService(
            @Qualifier("kafkaObjectTemplate") KafkaTemplate<String, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendKafkaMessage(String topic, Object message) {
        ProducerRecord<String, Object> record = new ProducerRecord<>(topic, message);
        RecordMetadata metadata;
        try {
            metadata = kafkaTemplate.send(record).get().getRecordMetadata();
            long offset = metadata.offset();
            int partition = metadata.partition();
            log.info(
                    "Message sent successfully to partition "
                            + partition
                            + " with offset "
                            + offset
                            + " for the topic"
                            + topic);
        } catch (Exception e) {
            // Handle any exceptions that may occur
            throw new RuntimeException(e);
        }
    }
}

Apply Retry Configuration

Finally, we apply the retry configuration to Kafka consumers by associating it with specific topics.

@Service
@Slf4j
@RequiredArgsConstructor
public class CreateKafkaMessageListener {
 @KafkaListener(topics = "#{ '${ksm.kafka.topic.create-topic}' }")
    public void listen(@NotNull ConsumerRecord<String, String> record)
            throws JsonProcessingException {
        // Process the incoming Kafka message here
        String key = record.key();
        String value = record.value();
        MDC.put("requestId", UUID.randomUUID().toString());
        log.debug("Received message: key=" + key + ", value=" + value);
        ObjectMapper objectMapper = new ObjectMapper();

        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        CustomRequest request = objectMapper.readValue(value, new TypeReference<>() {});
        
        // Process request as per requirement
        
    }
    

Conclusion

In conclusion, implementing a robust retry mechanism is crucial for building reliable Kafka applications. By employing appropriate retry strategies and configuring retry behavior using Spring Kafka, developers can ensure that their systems can handle failures gracefully and maintain data integrity even in the face of transient errors.

contact us

Get started now

Get a quote for your project.
logofooter
title_logo

USA

Edstem Technologies LLC
254 Chapman Rd, Ste 208 #14734
Newark, Delaware 19702 US

INDIA

Edstem Technologies Pvt Ltd
Office No-2B-1, Second Floor
Jyothirmaya, Infopark Phase II
Ernakulam, Kerala
Pin: 682303

Work With Us

Blogs

Contact Us

© 2024 — Edstem All Rights Reserved

Privacy PolicyTerms of Use