Build Messaging Solutions with Apache Kafka or Event Streams for IBM Cloud: Part 2

13 min read

By: Anuj Jain

Build messaging solutions: Evaluate the viability of the solution

This post is a part of the multi-part blog series: Build Messaging Solutions with Apache Kafka or Event Streams for IBM Cloud

In the previous post in this multi-part series on building messaging solutions, we quantified requirements related to various specific categories. In the second step of this process, we can now use the quantified requirements to build a preliminary design to evaluate the viability of the solution. Various concepts and features come into play, and relevant options need to be weighed-in to determine which ones are the best-suited options for the use case and requirements. We will evaluate those as we design.

Evaluate: How do I start putting these together in my design?

Designing for the sources and sinks

The first step in the design is identifying and getting deeper into the details for your data sources and sinks. The main goal is to put together the various options you may have for the flow of your data messages. This can be achieved through single or multiple stages of processing, applying logic, state management, and persistence—traversing from data producers to final consumers. The flow options should be laid out end-to-end for each type of message.

In our IoT example from the previous blog post, we identified health-beat message types that came from a wearable health-band device. The source is the health-band’s micro-controllers. The JavaScript code (Producers) creates messages and sends to the Kafka brokers. After some processing and event handling done by another set of JavaScript processes (Consumers), the sinks are the persisted messages written to a separate database system.

Knowing the sources and sinks (and with a general idea of the flows), it’s time to get deeper into how to make the processing happen.

Message content and identifiers

We’ll first cover identifiers. Processing messages almost always needs identifiers to track the state and progress. In our use-case example, the best identifier is the unique device ID for the health-band. There is no session context in our use case but it could be relevant in other scenarios. The device ID can be the “key” in the messages (more on this later). As a best practice, it would make sense to combine the message with message type identifier (e.g., the health-beat message and a device model number).

Both of these identifiers can make processing more efficient. Another best practice is to add a timestamp that indicates when the message was created. Together, these can determine who, when, and what is being sent, and these identifiers help with determining the type of processing needed. The message in our example will contain a key (which will be the device ID) and the rest of the content, including the type, model number, and timestamp as name/value pairs in JSON format.

Topics

As a general practice, Topics are not something you would like to have in large numbers or create/drop frequently and dynamically. Doing so will impose overhead on the system, and it makes it more complex for routine operations and administration. Also, the ability for Consumers to subscribe and work with such dynamically created Topics would need to be incorporated in the design. This type of design is possible—Consumers subscribe to Topic names of a certain pattern—but it adds to the overall complexity.

Each Topic is typically where messages of the same type of processing needs are sent. The Producer code maintains the knowledge (configuration) of where to send a particular type of message, along with which key and message structure. The received messages are stored in the Topic in the order they are received. Consumer processes are usually dedicated to one Topic and have the relevant logic for efficient handling the type of messages in that Topic.

Another way to handle messages is to have all of them (for all Producers and message types) go to the same Topic; a set of main Consumers will then read and send the messages to different Topics for subsequent processing. This “receive and delegate” approach provides some centralized control over the incoming messages (e.g., the Producer does not need to know much) and is a good option if changing the Producer code is not convenient. It can also provide more throughput scalability with less risk of data loss.

This is possible by keeping the main Consumers lightweight and doing quick persistence to a separate database as messages are received, possibly with some additional enrichment of data that can be useful in subsequent processing. The heavier logic is delegated to the specialized Consumers working on the individual Topics where messages were routed by the main Consumers.

Numerous designs are possible, and the best option is usually dictated by a combination of requirements and capabilities. For our IoT use-case example, we will take the approach where all the Producers send the health-beat messages to a specific Topic named “health-beats.”

Next, it is time to weigh-in data consistency and fault tolerance.

Consumer groups and topic partitions

A quick recap—the processing of the health-beat messages has two types of consumption requirements:

  1. Check geolocation and health stats

  2. Trigger actions in some cases

A single Consumer processing from each Topic will not scale and is not fault-tolerant. We will use enough Consumers to meet the throughput requirements for each Topic. Also, the Consumers will be part of corresponding Consumer Groups—one for each Topic.

Based on latency calculations for the health-beats messages (45 ms latency per message), one Consumer would provide a consumption rate of roughly 23 messages per second (single threaded consumption from each Consumer). We already calculated the expected message production rate is 167 health-beats message per second. So, to prevent any lag from building up, we need multiple consumers (167/23=8). Here are the calculations:

Health-beats message latency: 45ms
Health-beats message consumption rate:
  = 1/(45/1000) = 23 messages per second per Consumer
Minimum number of Consumers needed (steady state):
  = 167 messages produced per second/23 messages consumed per second = 8 Consumers

Starting eight Consumers on the health-beats Topic does not yet provide a consumption capacity of 167 messages per second. Each Topic, like the health-beats, has one or more Partitions. Each partition can have only one Consumer processing at any time (this applies to Consumers in a Consumer Group and with manual offset management—both of which we will need). That means we would also need at least eight Partitions for the health-beats Topic.

Why, you ask? Topics have Partitions (how data is physically stored) for control over scalability and fault tolerance. There is a 1:1 assignment of Consumer (in a Consumer Group) to a Topic Partition. So, with eight Partitions, you need eight Consumers in a Consumer Group to get the maximum consumption throughput. After that, any additional Consumers will sit idle as standby for processing.

Having more Consumers than the number of Partitions is good for Consumer fault tolerance (e.g., idle Consumers can become active if another Consumer crashes) but does not help with processing throughput. Alternatively, having fewer Consumers than Partitions helps in being ready for scaling out the consumption throughput when needed. If there are any processing lags (for whatever reason—recent outage, burst, etc.) you can get more consumption throughput and clear out any lag by starting more Consumers without having to add more Partitions numbers at that time.

When you add more Partitions, it does not move or rebalance the existing messages, it only distributes the new messages from that point onward. It is better to plan ahead and have more Partitions ready and set up beforehand.

I briefly mentioned single-threaded consumption in the calculation. Multi-threaded consumption or processing in a Consumer is not recommended because in most use cases (like in our IoT example), message positions or offsets have to be managed and committed. Coordinating the activities of multiple threads within a Consumer will lead to more complex and unmanageable code.

Keys

We know Producers send messages to a Topic, but that can be done with certain options:

  1. Manual: The message is sent to the Topic, but a specific Partition number is also specified by the Producer. This type of manual assignment needs the Producer to know which specific Partition it should write to. Usually, that means more configuration and maintenance.

  2. Non-key: The message is sent to the Topic without any keys or Partition number. By default, the message goes to Partitions in a round-robin manner and automatically provides well-balanced Partitions.

  3. With key: The message is sent to the Topic, but a message “key” is also sent in the message. Based on the hash value of the specified key and the current number of Partitions, Kafka keeps messages with the same keys (e.g., device ID) going to the same Partition. This way a ‘”key-partition stickiness” is maintained as long as the number of Partitions is the same. This option helps in cases where knowing any context or relation between a series of same key messages is important for processing. It is important to note that this “key-partition stickiness” or assignment can change, and the Consumers should be able to handle that change gracefully without breaking consistency and other requirements.

Which option to use depends on what best suits the processing goal. In our IoT example, we will use the health-bands’ device ID as the message key, and the message will have the rest of the content as name/value pairs in JSON format.

Processing buffers and offset management

When processing, you need to know what is read and processed and be able to mark it as completed. Manual offset management provides the most control over that. Each message from a Topic Partition can be individually consumed, processed, and marked as committed, and the Consumer continues to read from the next offset position. This way of “read-process-commit” is safer and more fault tolerant. If the processing completes but fails without committing, the Consumer will get the uncommitted message as a duplicate the next time.

Reading, processing, and committing individual messages one at a time can be inefficient, especially in high-throughput scenarios. If the use case permits, messages should be read and processed as they arrive, but they may be committed in batches. With committing in batches, a buffer of the state of the messages read is maintained in the code, and the position is committed every 100 or 500 messages (or based on other logical criteria).

This is also a good option when there is context or relationships between messages that need to be evaluated in the logic before marking messages as completely processed or committed. It can also have performance benefits since any persistence can be done in batches instead of one message at a time (which typically defeats the purpose of having Kafka). Some downsides include Consumers needing more memory for the buffers and that any buffered batch messages will be lost from memory and remain uncommitted in case of failures. You might get the same messages more than once as duplicates when the Consumer connects again.

From the above scenarios, you might have guessed that for meeting a zero-data-loss guarantee, having duplicate messages is inevitable. This follows the “at least once” delivery guarantee that needs to be handled gracefully by the Consumer. It is the Consumer’s responsibility to handle any duplicates correctly and in a manner that is suited to the use case without causing consistency issues downstream (more on this later).

Capacity limits—size and duration

There are multiple, important reasons why knowing and designing appropriately within capacity limits is critical. These limits depend on your chosen Event Streams plan; if you are managing your own cluster, you can check and configure the Kafka values yourself.

Basically, you need to make sure your sizes and capacity are enough to not lose any data. For example, Consumers can slow down or become inoperable for various reasons—planned outages, network or infrastructure issues, slow processing from a dependency service, etc. These outages can be intermittent, short, or last for longer periods, but all of them increase risks of data loss.

Below are some example scenarios to consider.

Size limits:

If Consumers are not consuming fast enough, it can result in lag that will cause the Topics to start filling up. If there is a maximum capacity limit on your Topic-Partition (retention.bytes), you may start losing data once that maximum storage limit is reached.

Consider our health-beats example use case:

Throughput: 167 health-beats/sec
Number of Partitions: 8
Message size throughput per partition: 3KB*167/8 = 63KB/sec
Time taken to reach 1 GB Topic Partition (no messages are Consumed):
= 1*1024*1024/63
= 16644s = ~4.6 hours

The figures above show that the health-beat messages are susceptible to data loss for a Consumer outage longer than 4.6 hours for a Topic-Partition size limit of 1 GB (e.g., on Event Streams Standard plans)

Another size limit to check is message or message batch sizes. With a 3KB message size for individual health-beat messages, we are within the limit of maximum 1 MB per message for Event Streams for IBM Cloud or the default for Kafka (message.max.bytes at Broker level and max.message.bytes at Topic level).

Duration/Time period limits:

Topics retain messages for a certain maximum configured period (retention.ms) and then start dropping the older data. If you receive messages throughout the day and are doing batch processing once a day or longer, make sure to check the Topic retention period. A 24-hour period might not suffice. Also, let’s say messages are set to 30-day maximum retention.

Consumer offset positions are also retained for a maximum configured period (offsets.retention.minutes). If the Consumers from a certain Consumer Group are started after a period of seven days (e.g., a weekly or monthly batch), the last committed offsets of the Consumers may no longer exist, and the Consumer wouldn’t know up to which offset they completed processing last time.

Beyond the retention periods needed for the routine message processing, there could be a need for much longer-term access to messages (e.g., for future analytics, policy, compliance, etc.). For such historical long-term persistence, the messages are best stored in a separate database repository.

In our use case example, we require messages to be available for up to one year. We achieve this by persisting the messages to a separate database as they get processed. That adds latency, but we accounted for that in the message-processing latency calculations. There are various other configurations and limits that can be evaluated—not just for data loss, but for performance as well—but the ones above are some of the most important to consider when getting started with designing your solution.

Monitoring the health and performance

In a high-throughput use case (or even otherwise), one of the most important metrics to monitor is the processing lag between message consumption and production. Ideally, there should be zero sustained processing lag or, as a worst-case, a lag that can be overcome within acceptable time limits.

The Consumer lag is mainly monitored by tracking the current-end vs. current-committed offset positions of the messages in the Topic. Utility scripts (like Kafka-consumer-groups.sh available from Apache Kafka) provide the information for checking the lags and can be wrapped into custom scripts for more advanced monitoring.

Another good indicator of activity is data volumes—Topic bytes in and out will help to quantify normal vs. abnormal behavior and trends. Such monitoring is available on Event Streams via the Grafana dashboard.

The Producers should also be monitored for errors and retries. They should not be dropping or losing data, and the Producer client code should be utilized to log and monitor these. Similarly on the Consumer client code, if batched buffers are used (between reads and commits) the code should log the state (sizes, growth, etc.) because they can create issues with the rate of processing or committing and extent of duplicate processing. To get deeper into the available options, refer to various Kafka metrics.

To summarize the evaluation/design process and to provide a high-level architecture diagram below is a brief video presentation (no audio) that goes through the steps:

Once you have done some preliminary design and evaluated the viability of your solution you can review the essential proactive planning and preparation as described in the next blog—Step 3: Be Proactive.

Learn about this Kafka as a service platform, available on both public and private clouds

Full blog series

Be the first to hear about news, product updates, and innovation from IBM Cloud