Part 3 - Producing and Consuming with Spring

Part 3 - Producing and Consuming with Spring

Kafka 101

Introduction

This article belongs to a set of 3, where the aim is to introduce Kafka and how to use it.

The 3 parts are:

This part shows a small example on how to create a Kafka cluster and how to send data to it and get data from it.

There's a code example on my GitHub.

The setup comprises a Kafka cluster with 3 brokers and two Spring applications one to send data - the Producer and the other to get data - the Consumer.

Fig. 1: Use Case

Creating a Simple Kafka Cluster

It is created a 3 Kafka brokers cluster and a Graphical User Interface (GUI) provided by AKHQ and all are created by Docker Compose.

Fig. 2: System's diagram

To configure a Kafka cluster it is needed to configure 4 settings on each broker:

  • KAFKA_LISTENERS: here are the ports used for brokers to communicate with each other;

  • KAFKA_ADVERTISED_LISTENERS: these are the ports used for communication with Kafka producers and consumers. If this setting is empty, the KAFKA_LISTENERS setting will be used;

  • KAFKA_BROKER_ID: unique number inside the cluster the broker belongs to;

Producer

This producer simulates a car that sends data to two topics: ADAS and LIDAR. The data sent is the result of toString() of each Data Transfer Object (DTO) because there's no particular need for the data format to be sent to Kafka.

The KafkaProducerConfig class is used as a means to instantiate a KafkaTemplate bean configured with a KafkaDefaultFactory to be used later on.

@Configuration
public class KafkaProducerConfig {

    /**
     * Kafka address to connect to
     */
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    /**
     * Creates a Kafka Producer Factory
     * @return
     */
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
          ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          bootstrapServers);
        configProps.put(
          ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        configProps.put(
          ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    /**
     * High level instance of an enitty ready to produce messages to a Kafka cluster
     * @return
     */
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Code Fragment 1: Configuring Spring resources to use Kafka

On the next code fragment, it can be seen how KafkaTemplate is used to send data. Only have to define de data and the topic to send to.

The process is initiated automatically by annotating the init() method with @PostConstruct .

@Component
public class InitProducer {

    private final Logger LOG = LoggerFactory.getLogger(getClass());

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * Runs automaticaly as soon Spring starts
     * 
     * @throws InterruptedException
     */
    @PostConstruct
    public void init() throws InterruptedException {
        Car car1 = new Car("CAR_1");
        Car car2 = new Car("CAR_2");

        String adasData = null;
        String lidarData = null;
        while (true) {

            // CAR 1
            adasData = car1.getAdasData().toString();
            LOG.info("Car 1 ADAS: " + adasData);
            kafkaTemplate.send("ADAS", adasData);

            lidarData = car1.getLidarData().toString();
            LOG.info("Car 1 LIDAR: " + lidarData);
            kafkaTemplate.send("LIDAR", lidarData);

            // CAR 2
            adasData = car2.getAdasData().toString();
            LOG.info("Car 2 ADAS: " + adasData);
            kafkaTemplate.send("ADAS", adasData);

            lidarData = car2.getLidarData().toString();
            LOG.info("Car 2 LIDAR: " + lidarData);
            kafkaTemplate.send("LIDAR", lidarData);

            Thread.sleep(1000);
        }
    }

}

Code Fragment 2: sending data

When running the application generates the following logs:

Fig. 4: Producer's log output

Consumer

Create a KafkaConsumerConfig to set up the consumer. It is necessary to provide the broker to connect to and the consumer group the application belongs to. Those are defined in the properties.yaml file.

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String consumerGroup;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

Code Fragment 3: configuring the consumer

The InitConsume class will start all the process of

@Component
public class InitConsume {

    private final Logger LOG = LoggerFactory.getLogger(getClass());

    @KafkaListener(topics = "LIDAR")
    public void consumeLidar(String message) {
        LOG.info("TOPIC: LIDAR -- message: " + message);
    }

    @KafkaListener(topics = "ADAS")
    public void consumeADAS(String message) {
        LOG.info("TOPIC: ADAS -- message: " + message);
    }
}

When running you can see something similar like the following log:

Fig. 5: Consumer's log output

Monitoring the cluster

Alongside the Kafka cluster, there's also a GUI provided by AKHQ. It can be accessed by opening the browser at the address localhost:8080.

Fig. 6: AKHQ Topics view

Fig. 7: AKHQ Topic ADAS messages

Fig. 8: AKHQ Topic LIDAR messages