Introduction
This article belongs to a set of 3, where the aim is to introduce Kafka and how to use it.
The 3 parts are:
This part shows a small example on how to create a Kafka cluster and how to send data to it and get data from it.
There's a code example on my GitHub.
The setup comprises a Kafka cluster with 3 brokers and two Spring applications one to send data - the Producer and the other to get data - the Consumer.
Fig. 1: Use Case
Creating a Simple Kafka Cluster
It is created a 3 Kafka brokers cluster and a Graphical User Interface (GUI) provided by AKHQ and all are created by Docker Compose.
Fig. 2: System's diagram
To configure a Kafka cluster it is needed to configure 4 settings on each broker:
KAFKA_LISTENERS: here are the ports used for brokers to communicate with each other;
KAFKA_ADVERTISED_LISTENERS: these are the ports used for communication with Kafka producers and consumers. If this setting is empty, the KAFKA_LISTENERS setting will be used;
KAFKA_BROKER_ID: unique number inside the cluster the broker belongs to;
Producer
This producer simulates a car that sends data to two topics: ADAS and LIDAR. The data sent is the result of toString()
of each Data Transfer Object (DTO) because there's no particular need for the data format to be sent to Kafka.
The KafkaProducerConfig
class is used as a means to instantiate a KafkaTemplate
bean configured with a KafkaDefaultFactory
to be used later on.
@Configuration
public class KafkaProducerConfig {
/**
* Kafka address to connect to
*/
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
/**
* Creates a Kafka Producer Factory
* @return
*/
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
/**
* High level instance of an enitty ready to produce messages to a Kafka cluster
* @return
*/
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Code Fragment 1: Configuring Spring resources to use Kafka
On the next code fragment, it can be seen how KafkaTemplate
is used to send data. Only have to define de data and the topic to send to.
The process is initiated automatically by annotating the init()
method with @PostConstruct
.
@Component
public class InitProducer {
private final Logger LOG = LoggerFactory.getLogger(getClass());
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* Runs automaticaly as soon Spring starts
*
* @throws InterruptedException
*/
@PostConstruct
public void init() throws InterruptedException {
Car car1 = new Car("CAR_1");
Car car2 = new Car("CAR_2");
String adasData = null;
String lidarData = null;
while (true) {
// CAR 1
adasData = car1.getAdasData().toString();
LOG.info("Car 1 ADAS: " + adasData);
kafkaTemplate.send("ADAS", adasData);
lidarData = car1.getLidarData().toString();
LOG.info("Car 1 LIDAR: " + lidarData);
kafkaTemplate.send("LIDAR", lidarData);
// CAR 2
adasData = car2.getAdasData().toString();
LOG.info("Car 2 ADAS: " + adasData);
kafkaTemplate.send("ADAS", adasData);
lidarData = car2.getLidarData().toString();
LOG.info("Car 2 LIDAR: " + lidarData);
kafkaTemplate.send("LIDAR", lidarData);
Thread.sleep(1000);
}
}
}
Code Fragment 2: sending data
When running the application generates the following logs:
Fig. 4: Producer's log output
Consumer
Create a KafkaConsumerConfig
to set up the consumer. It is necessary to provide the broker to connect to and the consumer group the application belongs to. Those are defined in the properties.yaml file.
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String consumerGroup;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Code Fragment 3: configuring the consumer
The InitConsume
class will start all the process of
@Component
public class InitConsume {
private final Logger LOG = LoggerFactory.getLogger(getClass());
@KafkaListener(topics = "LIDAR")
public void consumeLidar(String message) {
LOG.info("TOPIC: LIDAR -- message: " + message);
}
@KafkaListener(topics = "ADAS")
public void consumeADAS(String message) {
LOG.info("TOPIC: ADAS -- message: " + message);
}
}
When running you can see something similar like the following log:
Fig. 5: Consumer's log output
Monitoring the cluster
Alongside the Kafka cluster, there's also a GUI provided by AKHQ. It can be accessed by opening the browser at the address localhost:8080.
Fig. 6: AKHQ Topics view
Fig. 7: AKHQ Topic ADAS messages
Fig. 8: AKHQ Topic LIDAR messages