Configuring the Sender cluster for single line logs
To implement a scalable data collection architecture, install, and configure a cluster of Logstash servers to send data to Log Analysis.
Before you begin
Install Logstash on the servers and create the required utility script. For more information, see Installing Logstash and the utility script.
When Log Analysis is not available, Logstash caches data on the local disk as specified in the sender-logstash-single_line.conf or sender-logstash-multi_line.conf configuration file. Ensure that there is sufficient space on your local disk in case this happens. Assign 1 Gigabyte (GB) disk space for every 1 GB of data that you load into Log Analysis with Logstash. For example, if you stream 200 GB per day, ensure that the local disk has 200 GB space in the local disk.
About this task
Repeat these steps for each instance of Logstash in your cluster.
Procedure
- Stop the Logstash instance.
-
Edit the Logstash
configuration file.
For example, if you use the Logstash that is delivered with Log Analysis, edit the <logstash_install>/logstash/logstash-version_number/logstash-scala/logstash/config/<logstash_instance>.config file, where
version_number
is the Logstash version number, as defined in Other supported software. -
To ensure that the Logstash instance can read data
that is sent from the Apache Kafka cluster, add the following
information in the
input
section for each topic. The Sender Logstash instance reads data from the topic or partition that you specify in theinput
section. The important parameters aregroup_id
,topic_id
, andconsumer_threads
.Add the parameters that are listed in the table below:Table 1. Parameters for Logstash configuration Parameter Description zk_connect
Specify the Apache ZooKeeper server and port in the following format <zookeeper_server:port>. group_id
Specify the group ID. The group_id
identifies the groups of consumers.topic_id
Specify the topic ID. The topic_id
identifies the topic that consumes the messages. Use the same name for thetopic_id
and thegroup_id
. For example:group_id =>
MY_WAS_SystemOut
topic_id =>MY_WAS_SystemOut
If you are running multiple Logstash servers in your Receiver cluster, ensure that 2 instances of Logstash do not read data from the same
topic_id
. Each instance must read data from a differenttopic_id
. Thetopic_id
is specified in theinput
section of the Apache Kafka configuration file.consumer_threads
Ensure that the consumer_threads
parameter matches the number of partitions that are specified in the Apache Kafka configuration. Theconsumer_threads
parameter specifies the number of consumers that are created in a consumer group. Each thread or consumer maps to a partition for the specified topic or logical data source, This ensures that data is processed concurrently. If you specify fewer partitions than consumer threads, some threads remain idle while they wait for an available partition.consumer_restart_on_error
Set this value to true. consumer_restart_sleep_ms
Set this value to 100 milliseconds. Specify 100. fetch_message_max_bytes
Set the maximum value which triggers message collection. Specify 500000. queue_size
Specify the size of the message queue. Specify 2000. auto_offset_reset
Specify smallest. This value controls how Apache Kafka processes the log files, whether it starts with the earliest or smallest or latest or largest log file. For more information, see https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-auto_offset_reset. The following example is theinput
section of a configuration for log file records with a single line:input { ## Kafka input plugin kafka { zk_connect =>
<IP_ADDRESS>:17981
group_id =>PUNE_WAS_SystemOut
topic_id =>PUNE_WAS_SystemOut
consumer_threads => 4 consumer_restart_on_error => true consumer_restart_sleep_ms => 100 fetch_message_max_bytes => 500000 queue_size => 2000 auto_offset_reset => smallest } kafka { zk_connect =><IP_ADDRESS>:17981
group_id =>ETZ_Apache_Hadoop
topic_id =>ETZ_Apache_Hadoop
consumer_threads => 4 consumer_restart_on_error => true consumer_restart_sleep_ms => 100 fetch_message_max_bytes => 500000 queue_size => 2000 auto_offset_reset => smallest } kafka { zk_connect =><IP_ADDRESS>:17981
group_id =>Linux_OS_Syslog
topic_id =>Linux_OS_Syslog
consumer_threads => 4 consumer_restart_on_error => true consumer_restart_sleep_ms => 100 fetch_message_max_bytes => 500000 queue_size => 2000 auto_offset_reset => smallest } } -
To ensure that the Logstash instance tags the log
files with the required data before it is sent to Log Analysis, update the
filter
section.The required parameters for thefilter
section are different for single and multi-line log file records.The following example is thefilter
section of a configuration for log file records with a single line:filter { mutate { add_tag => [
NO_OP
] } ifgrok_lfa
in [tags] { mutate { replace => {host
=>%{LFA_SITE}_%{LFA_MODULE}
} add_field => {path
=>%{LFA_TYPE}
} add_field => {Hostname
=>%{LFA_HOSTNAME}
} } } if "grok_filebeat" in [tags] { mutate { replace => {host
=>%{[fields][site]}_%{[fields][module]}
} add_field => {path
=>%{[fields][type]}
} add_field => {Hostname
=>%{[beat][hostname]}
} } } if "grok_syslog" in [tags] { mutate { replace => {host
=>Linux_OS
} add_field => {path
=>/Syslog
} } } } -
Review the output section and ensure that everything is correct.
Specify the variables in the
output
section as outlined in the Variables for output section table:Table 2. Variables for output section Variable Description <ip_address>
The IP address of the server where Log Analysis is installed. <cache_directory_path>
The path to the cache-dir
directory where cached files are stored.<log_directory_path>
The path to the directory where logs from the Ruby-debug
codec are stored.Theoutput
section contains the parameters that are listed in the following table.Table 3. Parameters for output section Parameter Description scala_url
Specify the url that is used to connect to Log Analysis. For example https://<Ip_address>:9987/Unity/DataCollector. scala_user
Specify the user used to connect to Log Analysis. scala_password
Specify the password for the Log Analysis user. scala_keystore_path
Specify the path to the Log Analysis keystore file. batch_size
Specify the number of records in the batch that is sent to Log Analysis. Specify 500000. idle_flush_time
Specify the number of seconds that Logstash idles before flushing the cache. Specify 5. num_concurrent_writers
Specify 20. use_structured_api
Set this parameter to false. disk_cache_path
Specify the path to directory where files are cached. For example Cache_directory_path/cache-dir. metadata_fields
For more information, see Adding metadata fields. For example, if you install Logstash with the remote tool, theoutput
section is as follows:output { if
NO_OP
in [tags] { scala { scala_url =>https://<Ip_address>:9987/Unity/DataCollector
scala_user =>unityadmin
scala_password =>unityadmin
scala_keystore_path => batch_size => 500000 idle_flush_time => 5 sequential_flush => true num_concurrent_writers => 20 use_structured_api => false disk_cache_path =><cache_directory_path>/cache-dir
metadata_fields => {PUNE_WAS@SystemOut
=> {field_names
=>Hostname
field_paths
=>Hostname
}PUN_OS@Syslog
=> {field_names
=>Hostname,Application
field_paths
=>logsource,program
} } scala_fields => {Linux@/Syslog
=>message,collector,host,priority,timestamp,logsource,program, severity,facility,facility_label,severity_label,tags
} date_format_string =>yyyy-MM-dd'T'HH:mm:ssX
log_file =><log_directory_path>/scala_logstash.log
log_level =>info
} ## File Output plugin using rubydebug codec for troubleshooting messages received / processed ## Should be disabled in production environment file { path =><log_directory_path>/singleline-rubydebug.log
codec => rubydebug } } } - Start the Logstash cluster.