Custom Serializer
Kafka stores and transports byte arrays in its topics. If you wish to use the Java Objects instead of byte arrays, Kafka provides interfaces to accomplish it.
To implement a serializer, create a class that implements the
org.apache.kafka.common.serialization.Serializer
interface.
To implement a deserializer, create a class that implements the
org.apache.kafka.common.serialization.Deserializer
interface.
- configure()
- close()
- serialize() / deserialize()
Serializing MyMessage in Producer
You create a serializer class that implements
org.apache.kafka.common.serialization.Serializer.
serialize() receives your object and returns a
serialized version as bytes array.
For example, Serializing MyMessage object in Producer is as follows:
public class MyValueSerializer implements Serializer<MyMessage>
{
private boolean isKey;
@Override
public void configure(Map<String, ?> configs, boolean isKey)
{
this.isKey = isKey;
}
@Override
public byte[] serialize(String topic, MyMessage message)
{
if (message == null) {
return null;
}
try {
(serialize your MyMessage object into bytes)
return bytes;
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error serializing value", e);
}
}
@Override
public void close()
{
}
}
Deserializing MyMessage in Consumer
You create a deserializer class that implements
org.apache.kafka.common.serialization.Deserializer
deserialize() receives serialized value as bytes
array and returns your object.
For example, Deserializing MyMessage object in Consumer is as follows:
public class MyValueDeserializer implements Deserializer<MyMessage>
{
private boolean isKey;
@Override
public void configure(Map<String, ?> configs, boolean isKey)
{
this.isKey = isKey;
}
@Override
public MyMessage deserialize(String s, byte[] value)
{
if (value == null) {
return null;
}
try {
(deserialize value into your MyMessage object)
MyMessage message = new MyMessage();
return message;
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error deserializing value", e);
}
}
@Override
public void close()
{
}
}