Custom Serializer

Kafka stores and transports byte arrays in its topics. If you wish to use the Java Objects instead of byte arrays, Kafka provides interfaces to accomplish it.

To implement a serializer, create a class that implements the org.apache.kafka.common.serialization.Serializer interface.

To implement a deserializer, create a class that implements the org.apache.kafka.common.serialization.Deserializer interface.

To implement serializer and deserializer, you need to implement the following methods:
  • configure()
  • close()
  • serialize() / deserialize()

Serializing MyMessage in Producer

You create a serializer class that implements org.apache.kafka.common.serialization.Serializer.

serialize() receives your object and returns a serialized version as bytes array.

For example, Serializing MyMessage object in Producer is as follows:

public class MyValueSerializer implements Serializer<MyMessage>
{
    private boolean isKey;

    @Override
    public void configure(Map<String, ?> configs, boolean isKey)
    {
        this.isKey = isKey;
    }

    @Override
    public byte[] serialize(String topic, MyMessage message)
    {
        if (message == null) {
            return null;
        }

        try {

            (serialize your MyMessage object into bytes)

            return bytes;

        } catch (IOException | RuntimeException e) {
            throw new SerializationException("Error serializing value", e);
        }
    }

    @Override
    public void close()
    {

    }
}

Deserializing MyMessage in Consumer

You create a deserializer class that implements org.apache.kafka.common.serialization.Deserializer

deserialize() receives serialized value as bytes array and returns your object.

For example, Deserializing MyMessage object in Consumer is as follows:

public class MyValueDeserializer implements Deserializer<MyMessage>
{
    private boolean isKey;

    @Override
    public void configure(Map<String, ?> configs, boolean isKey)
    {
        this.isKey = isKey;
    }

    @Override
    public MyMessage deserialize(String s, byte[] value)
    {
        if (value == null) {
            return null;
        }

        try {

            (deserialize value into your MyMessage object)

            MyMessage message = new MyMessage();
            return message;

        } catch (IOException | RuntimeException e) {
            throw new SerializationException("Error deserializing value", e);
        }
    }

    @Override
    public void close()
    {

    }
}
Note: Like Kafka message value, Kafka message key also follows the same process in serializing and deserializing the objects.