Kafka in Spring Boot on Docker

The article presents the basic configuration of Apache Kafka on a Docker container and a microservice built in the Spring Boot framework. This article is a good introduction to understand how applications interact with each other. In our example, we will send messages using the JSON format and then display them. 

We will create a simple application simulating banking processes, where the user will be able to withdraw a certain amount of funds. We will use the REST interface. Kafka’s task will be to queue orders.

Note the following material is intended for readers who are familiar with the basics of Kafka and Docker. If you hear about them for the first time, I encourage you to read: https://devapo.io/blog/meet-apache-kafka-a-streaming-data-platform/ and documentation https://docs.docker.com/.

We also offer custom development services, so you can take advantage of Kafka’s potential without having your in-house IT team – for this purpose, contact us.

In the main catalogue of the project, let’s create the docker-compose.yml file. It is responsible for running multiple containers at the same time and automates their creation. The images appear in a specific order and time, which is important to us as Zookeeper should launch first. 

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
    - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
    - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'

Now let’s look at the above configuration. We created two docker instances and called them zookeeper and kafka. We can download the image successively from https://hub.docker.com/r/wurstmeister/zookeeper and https://hub.docker.com/r/wurstmeister/kafka. We encourage you to read the documentation.

Note the configuration of the second container:

KAFKA_ADVERTISED_HOST_NAME → enter the IP of the docker host, in our case we do it locally, so enter IP localhost.

KAFKA_ZOOKEEPER_CONNECT → give the port on which the zookeeper cluster is monitoring

KAFKA_AUTO_CREATE_TOPICS_ENABLE: → we don’t want Kafka to create topics automatically, so we set the value to ‘false’. Later in the article, we will create them ourselves, using the Spring Boot application.

We have configured Kafke to run on a single Node.

To start the containers, just enter the docker-compose up command.

The fastest and easiest way to build a simple application based on the Spring Boot framework is to use spring initializer. It offers a quick way to select the dependencies we need in a project. In our case, it will be Spring for Apache Kafka and Spring Web.

Go to the website https://start.spring.io/, then follow the instructions. 

In the finished project, we create the application.yml file in src/main/java/resources. We will define there the configuration values necessary for our project.

server:
  port: 8081
spring:
  kafka:
    consumer:
      group-id: group-1
      auto-offset-reset: earliest
      bootstrap-servers: localhost:9092

At this point, we encourage you to read the official documentation released by Spring on the Apache Kafka configuration:
https://docs.spring.io/spring-kafka/reference/html/ 

We have created two blocks of the kafka and topic properties. Let us now move on to explaining them. The group-id attribute is a string of characters that uniquely identifies the group of consumer processes this consumer belongs to, let’s call it: group-1.

Setting the value to earliest in auto-offset-reset will cause consumers to read offset messages starting as early as possible, when no other offset exists for that consumer.

POJO class

Now let’s reate a POJO class responsible for deserialization. We will use the Jackson library for this. This will allow us to map the REST sent values to the java object.

package com.kafka.example.app.kafka;

import com.fasterxml.jackson.annotation.JsonProperty;

public class Account {

    private String holder;
    private String funds;

    public Account(
            @JsonProperty String holder,
            @JsonProperty String funds) {
        this.holder = holder;
        this.funds = funds;
    }

    public Account() {
    }

    public String getHolder() {
        return holder;
    }

    public void setHolder(String holder) {
        this.holder = holder;
    }

    public String getFunds() {
        return funds;
    }

    public void setFunds(String funds) {
        this.funds = funds;
    }

Producer Configuration Class

Now we will create a new Producer class. It will be another class with the @Configuration annotation, where we will define the producer’s settings, i.e. the kafka client responsible for publishing records to the Kafka cluster.

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;


@Configuration
public class Producer {

    final KafkaProperties kafkaProperties;

    public Producer(KafkaProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
    }


    @Bean
    public Map<String, Object> producerConfiguration() {
        Map<String, Object> properties = new HashMap<>(kafkaProperties.buildProducerProperties());
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return properties;
    }

    @Bean
    ProducerFactory<String, Object> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfiguration());
    }

    @Bean
    KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("transaction-1", 2, (short) 1);
    }
}

We used the KafkaTemplate instance. It is a template used to perform high-level operations. We are going to use it to send messages to Kafka.

Now let’s discuss Producer Configurations. We do it in producerConfiguration by filling the <String, Object> map with appropriate values. For this we will use the kafkaProperties instance that has been injected by the constructor. Next, we overwrite the default Kafka serializer. The manufacturer will serialize the keys as Strings using the kafka library StringSerializer and JSON values using JsonSerializer. This is fully provided and supported by Spring Kafka.

Now let’s discuss the topic field that we created by the NewTopic Bean injection. The first parameter is responsible for the name of the new topic, the second for the number of partitions to be divided into, and the third is the replication ratio. Leaving it at level , we will use one node.

Consumer Configuration Class

Now let’s create a configuration class responsible for consuming messages from the Kafka cluster.

package com.kafka.example.app.kafka.Configuration;

import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

@org.springframework.context.annotation.Configuration
public class Consumer {

    final KafkaProperties kafkaProperties;

    public Consumer(KafkaProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
    }



    @Bean
    ConsumerFactory&lt;String, Object&gt; consumerFactory() {
        final JsonDeserializer&lt;Object&gt; jsonDeserializer = new JsonDeserializer&lt;&gt;();
        jsonDeserializer.addTrustedPackages("*");
        return new DefaultKafkaConsumerFactory&lt;&gt;(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), jsonDeserializer);
    }

    @Bean
    ConcurrentKafkaListenerContainerFactory&lt;String, Object&gt; kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory&lt;String, Object&gt; concurrentKafkaListenerContainerFactory
                = new ConcurrentKafkaListenerContainerFactory();
        concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory());

        return concurrentKafkaListenerContainerFactory;
    }

}




As before, let’s create the Consumer class. Let’s inject a kafkaProperties instance through the constructor. Next, let’s discuss the deserializer configurations. We have created a ConsumerFactory that will be used by the KafkaListenerCointanerFactory object. We have defined that KafkaListener will handle messages in JSON format.

The addTrustedPackages functionality is responsible for determining trusted packages. For our needs, we will not define them, so ‘*’ means that we trust all packages.

Kafka Controler 

In this part we will create a rest controller to handle the POST operation, and pass the message to the kafka topic. Next, we will use the @KafkaListener annotation.

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


@RequestMapping ("/api/kafka")
@RestController
public class Controller {

    final KafkaTemplate kafkaTemplate;


    Logger logger = LoggerFactory.getLogger(Controller.class);

    public Controller(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @PostMapping
    public String sentMessage(@RequestBody Account account) {
        this.kafkaTemplate.send("transaction-1", new Account(account.getHolder(), account.getFunds()));

        return "Hello World!";
    }

    @KafkaListener(topics = "transaction-1")
    public void listener(@Payload Account account,  ConsumerRecord&lt;String, Account&gt; cr) {
        logger.info("Topic [transaction-1] Received message from {} with {} PLN ", account.getHolder(), account.getFunds());
        logger.info(cr.toString());
    }


}

We defined the controller using Spring @RestController and @RequestMapping annotations – setting the endpoint to api/kafka.


In the next step we will use the LoggerFactory class from the slf4j library. It is a logger production tool that will help us better understand how the topic works.

The sendMessage method is responsible for sending a new Account type object to the Kafka cluster, in our application we have defined one topic called transation-1.

Now let’s discuss the listener method, we used the @KafkaListener annotation in it, we used it to define the message receiver in the topic transaction-1.

Then we will log the message that was summarized by topic transation-1 and display the data provided by the ConsumerRecord class, in which we will find the topic name and partition number from which the record is received and an offset pointing to the record in the Kafka partition.

Let’s start by entering the docker-compose up command in the main project folder.

We can see that the container with Kafka and Zookeeper has started.

C:\Users\Krystian\Desktop\kafka-spring>docker-compose up

Starting kafka-spring-example-app_kafka_1     ... done

Starting kafka-spring-example-app_zookeeper_1 ... done

Attaching to kafka-spring-example-app_kafka_1, kafka-spring-example-app_zookeeper_1

Then we run our spring application. Please note the message below. We can see that the example-group, according to our assumptions, assigned two partitions transaction-1-0 and transaction-1-1.

2021-02-1 08:08:21.483  INFO 67372 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : transation-topic: partitions assigned: [transaction-1-0, transaction-1-1]

Now let’s perform the POST operation

POST https://localhost:8081/api/kafka
Content-Type: application/json
Accept: application/json

{
"holder": "Tonny",
"funds": "2000"
}

The output should look like this

2021-02-19 08:12:37.024  INFO 67372 --- [ntainer#0-0-C-1] com.kafka.example.app.kafka.Controller   : Topic [transaction-1] Received message from Tonny with 2000 PLN

2021-02-19 08:12:37.024  INFO 67372 --- [ntainer#0-0-C-1] com.kafka.example.app.kafka.Controller   : ConsumerRecord(topic = transaction-1, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1613718756987, serialized key size = -1, serialized value size = 33, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = com.kafka.example.app.kafka.Account@2538a849)

We can see that the message we sent earlier went to the first topic as first. You should pay attention to the next log, the topic and the allocated partition in particular. 



Check out other articles in the technology bites category

Discover tips on how to make better use of technology in projects

Do you have any questions?