To implement scalable data loading, you must configure at least one Apache Kafka broker.
About this task
Apache Kafka is bundled with
Log Analysis. The
sample configuration files for Apache Kafka are in the
<HOME>/IBM®/LogAnalysis/kafka/test-configs/kafka-configs directory.
Create one partition per topic for every two physical processors on the server where the broker
is installed. For example, if you use eight core processors, create four partitions per topic in the
Apache Kafka broker.
To implement High Availability messaging, you must create multiple brokers on different servers.
To set up multiple brokers, update the configuration files as described in step 3. Set it to the
same Apache ZooKeeper server and
update the broker ID so that it is unique for each broker.
Procedure
-
Copy the kafka_version_number.tgz to an appropriate
directory on the server where you want to install Apache Kafka, where
version_number is the Kafka version number.
-
Extract the kafka_version_number.tgz file.
-
Update each section of the configuration files as outlined in the following tables:
Table 1. Server basics and socket server settings
| Parameter |
Description |
broker.id=0 |
Specify a unique ID for each broker. |
port=17991 |
Specify the port that the socket server listens on. For example,
17991. |
#host.name=<local_host> |
Specify the host name that the broker binds to. If you do not specify a value, the broker
binds to all interfaces. |
#advertised.host.name=<hostname_routable_by_clients> |
Specify the host name that Apache ZooKeeper uses to advertise to clients
and consumers. If you do not specify a value, the value from
java.net.InetAddress.getCanonicalHostName() is used. |
#advertised.port=<port accessible by clients> |
Specify the port that publishes to Apache ZooKeeper that it uses to connect to
the clients. If you do not specify a value, it uses the same port that the broker uses. |
num.network.threads=3 |
Specify the number of threads that are used to process network requests. |
num.io.threads=8 |
Specify the number of threads that are used for input and output operations. |
socket.send.buffer.bytes=1048576 |
Specify the send buffer or SO_SNDBUF that is used by the socket
server. |
socket.receive.buffer.bytes=1048576 |
Specify the receive buffer or SO_RCVBUF that is used by the socket
server. |
socket.request.max.bytes=104857600 |
Specify the maximum size for requests that are accepted by the socket server. This setting
helps you to protect against memory issues. |
queued.max.requests=16 |
Specify the maximum number of threads that are allowed before the network threads are
blocked. |
fetch.purgatory.purge.interval.requests=100 |
Specify the interval in number of requests that triggers the purging of the fetch
requests. |
producer.purgatory.purge.interval.requests=100 |
Specify the interval in number of requests that triggers the purging of the producer
requests. |
Table 2. Log basics
| Parameter |
Description |
log.dirs=<Log_dir_path> |
Specify the directory or directories where you want to store the log files. For example,
la/home/logs1, la/home/logs2, la/home/logs3 |
num.partitions=4 |
Specify the number of partitions that are used to process a topic. Specify half the number of
physical processors on the server. For example, if you have 8 processors, specify
4. |
num.recovery.threads.per.data.dir=1 |
Specify the maximum number of threads that are used for log recovery for each data
directory. |
log.index.size.max.bytes=154624 |
Specify the maximum size of the offset index in bytes. |
log.index.interval.bytes=4096 |
Specify the interval in bytes when an entry is added to the offset index. |
message.max.bytes=1000000 |
Specify the maximum size of messages that the server can receive. |
auto.create.topics.enable=true |
Use this to enable the automatic creation of topics on the server. |
Table 3. Log flush policy
| Parameter |
Description |
default.replication.factor=1 |
Specify the replication factor used for automatically created topics. |
log.flush.interval.messages=100000 |
Specify the maximum number of messages that can accumulate before they are flushed. |
log.flush.interval.ms=50000 |
Specify the maximum amount of time that messages can accumulate in log before they are
flushed. |
log.flush.scheduler.interval.ms=2000 |
Specify an interval after which the log is flushed. |
Table 4. Log retention policy
| Parameter |
Description |
log.retention.hours=168 |
Specify the minimum age of a log file before |
#log.retention.bytes=1073741824 |
Specify the number of bytes that must be retained in each of the specified segments. If the
amount reaches this minimum, logs are retained rather than being deleted. |
log.segment.bytes=1073741824 |
Specify the maximum size of a segment. When this limit is reached, a new segment is
created. |
log.retention.check.interval.ms=300000 |
Specify the interval that elapses before Apache Kafka deletes the log files
according to the rules that are specified in the log retention policies. |
log.cleaner.enable=false |
The log cleaner is disabled by default. This setting means that the segment is deleted after
the retention period that is specified in the log retention policy expires. |
log.roll.hours=168 |
Specify the maximum time that must elapse before a new log segment is rolled out. |
Table 5. Apache Zookeeper parameters
| Parameter |
Description |
zookeeper.connect=<localhost>:17981 |
Specify the host name and port of the Apache ZooKeeper server. |
zookeeper.connection.timeout.ms=6000 |
Specify the timeout value in milliseconds after which connections to the Apache ZooKeeper time out. |
zk.sync.time.ms=2000 |
Specify the interval after which Apache ZooKeeper synchronizes with the
connected servers. |
num.replica.fetchers=4 |
Specify the number of threads that are used to replicate messages from a source broker.
Increasing this value can lead to increased parallelism in I/O operations in the broker. |
replica.fetch.max.bytes=1048576 |
Specify the number of bytes that are used in fetch requests for each partition. |
replica.fetch.wait.max.ms=500 |
Specify the maximum wait time for each fetcher request that is issued by the follower
replicas. This value should not be greater than the one specified in
replica.lag.time.max.ms. |
replica.high.watermark.checkpoint.interval.ms=5000 |
Specify the interval at which the high watermark is saved to disk. |
replica.socket.timeout.ms=30000 |
Specify the socket timeout value for network requests. The value needs be the same or greater
than the value that is specified in replica.fetch.wait.max.ms. |
replica.socket.receive.buffer.bytes=65536 |
Specify the buffer for the socket receiver. |
replica.lag.time.max.ms=10000 |
Specify the time before the follower is removed. |
controller.socket.timeout.ms=30000 |
Specify the socket timeout value for channels used to send messages from controllers to
brokers. |
Table 6. Durability and hardening parameters
| Parameter |
Description |
retries=0 |
If you specify a value that is greater than zero, any failed messages are sent again. |
acks=all |
Specify the number of acknowledgments that the producer requires the receiver to receive
before a request is complete. |
-
Start Apache ZooKeeper.
To start it in console mode, enter the following
command:
<Kafka_home>/bin/zookeeper-server-start.sh config/zookeeper.properties
-
Start the Apache Kafka
broker. Enter the following command:
<Kafka_home>/bin/kafka-server-start.sh -daemon config/server0.properties
To
stop the broker, enter the following
command:
<Kafka_home>/bin/kafka-server-stop.sh
To stop
Apache ZooKeeper, enter the
following
command:
<Kafka_home>/bin/zookeeper-server-stop.sh
Example
############################# Server Basics #############################
broker.id=0
############################# Socket Server Settings #############################
port=17991
#host.name=<local_host>
#advertised.host.name=<hostname_routable_by_clients>
#advertised.port=<port accessible by clients>
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
queued.max.requests=16
fetch.purgatory.purge.interval.requests=100
producer.purgatory.purge.interval.requests=100
############################# Log Basics #############################
log.dirs=<Log_dir_path>
num.partitions=4
num.recovery.threads.per.data.dir=1
log.index.size.max.bytes=154624
log.index.interval.bytes=4096
message.max.bytes=1000000
auto.create.topics.enable=true
############################# Log Flush Policy #############################
default.replication.factor=1
log.flush.interval.messages=100000
log.flush.interval.ms=50000
log.flush.scheduler.interval.ms=2000
############################# Log Retention Policy #############################
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
log.roll.hours=168
############################# Zookeeper #############################
zookeeper.connect=<localhost>:17981
zookeeper.connection.timeout.ms=6000
zk.sync.time.ms=2000
# Replication configurations
num.replica.fetchers=4
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536
replica.lag.time.max.ms=10000
replica.lag.max.messages=4000
controller.socket.timeout.ms=30000
controller.message.queue.size=10
###################################
# Durability and hardening
###################################
retries=0
acks=all