Sender configuration example (multi-line)
This example shows a possible configuration for your Logstash Sender servers that are used to send data from multi-line log file records to Log Analysis, as part of your scalable data collection architecture.
Input
input
section.
Parameter | Description |
---|---|
zk_connect |
Specify the Apache ZooKeeper server and port in the following format <zookeeper_server:port>. |
group_id |
Specify the group ID. The group_id identifies the groups of
consumers. |
topic_id |
Specify the topic ID. The topic_id identifies the topic that consumes the
messages. Use the same name for the topic_id and the group_id . For
example:
If
you are running multiple Logstash servers in your
Receiver cluster, ensure that 2 instances of Logstash do not read data from
the same |
consumer_threads |
Ensure that the consumer_threads parameter matches the number of partitions
that are specified in the Apache Kafka configuration. The
consumer_threads parameter specifies the number of consumers that are created in a
consumer group. Each thread or consumer maps to a partition for the specified topic or logical data
source, This ensures that data is processed concurrently. If you specify fewer partitions than
consumer threads, some threads remain idle while they wait for an available partition. |
consumer_restart_on_error |
Set this value to true. |
consumer_restart_sleep_ms |
Set this value to 100 milliseconds. Specify 100. |
fetch_message_max_bytes |
Set the maximum value which triggers message collection. Specify 500000. |
queue_size |
Specify the size of the message queue. Specify 2000. |
auto_offset_reset |
Specify smallest. This value controls how Apache Kafka processes the log files, whether it starts with the earliest or smallest or latest or largest log file. For more information, see https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-auto_offset_reset. |
Add a Kafka
section for each topic ID that you want to use. The topics
correspond to the data sources in Log Analysis.
input {
kafka {
zk_connect => <Ip_address>:17981
group_id => BGL_WAS_SystemML
topic_id => BGL_WAS_SystemML
consumer_threads => 4
consumer_restart_on_error => true
consumer_restart_sleep_ms => 100
fetch_message_max_bytes => 500000
queue_size => 2000
auto_offset_reset => smallest
}
kafka {
zk_connect => <Ip_address>:17981
group_id => ETZ_DB_DB2Diag
topic_id => ETZ_DB_DB2Diag
consumer_threads => 4
consumer_restart_on_error => true
consumer_restart_sleep_ms => 100
fetch_message_max_bytes => 500000
queue_size => 2000
auto_offset_reset => smallest
}
}
<Ip_address>
is the IP address of the Apache ZooKeeper server.Filter
host
and path
fields to the
corresponding data source in Log Analysis. Ensure that
the patterns here match the patterns that are specified in the patterns_dir
directory.
filter {
mutate {
add_tag => [NO_OP
]
}
if grok_lfa
in [tags] {
mutate {
replace => { host
=> %{LFA_SITE}_%{LFA_MODULE}
}
add_field => { path
=> %{LFA_TYPE}
}
add_field => { Hostname
=> %{LFA_HOSTNAME}
}
}
}
if "grok_filebeat" in [tags] {
mutate {
replace => { host
=> %{[fields][site]}_%{[fields][module]}
}
add_field => { path
=> %{[fields][type]}
}
add_field => { Hostname
=> %{[beat][hostname]}
}
}
}
}
Output
The following example is based on the Logstash output plugin, which is
created if you install the version of Logstash that is delivered with
Log Analysis. The
rubydebug
codec is only used for debugging. Remove it in production versions.
To maintain the lines in files with multiple lines, add the metadata_fields
section.
Variable | Description |
---|---|
<Ip_address> |
The IP address of the server where Log Analysis is installed. |
<Cache_directory_path> |
The path to the cache-dir directory where cached files are stored. |
<Log_directory_path> |
The path to the directory where logs from the Ruby-debug codec are
stored. |
output
section contains the parameters that are
listed in the following table.
Parameter | Description |
---|---|
scala_url |
Specify the url that is used to connect to Log Analysis. For example https://<Ip_address>:9987/Unity/DataCollector. |
scala_user |
Specify the user used to connect to Log Analysis. |
scala_password |
Specify the password for the Log Analysis user. |
scala_keystore_path |
Specify the path to the Log Analysis keystore file. |
batch_size |
Specify the number of records in the batch that is sent to Log Analysis. Specify 500000. |
idle_flush_time |
Specify the number of seconds that Logstash idles before flushing the cache. Specify 5. |
num_concurrent_writers |
Specify 20. |
use_structured_api |
Set this parameter to false. |
disk_cache_path |
Specify the path to directory where files are cached. For example Cache_directory_path/cache-dir. |
metadata_fields |
For more information, see Adding metadata fields. |
output {
if "NO_OP" in [tags] {
scala {
scala_url => https://<Ip_address>:9987/Unity/DataCollector
scala_user => unityadmin
scala_password => unityadmin
scala_keystore_path =>
batch_size => 500000
idle_flush_time => 5
sequential_flush => true
num_concurrent_writers => 20
use_structured_api => false
disk_cache_path => Cache_directory_path/cache-dir
metadata_fields => {
BGL_WAS@SystemML
=> {
field_names
=> Hostname,Service,Middleware,resourceID
field_paths
=> Hostname,LFA_SERVICE,MIDDLEWARE,resourceID
}
}
date_format_string => yyyy-MM-dd'T'HH:mm:ssX
log_file => <Log_directory_path>/scala_ml_logstash.log
log_level => info
}
## File Output plugin using rubydebug codec for troubleshooting
messages received / processed
## Should be disabled in production environment
file {
path => <Log_directory_path>
/multiline-rubydebug.log"
codec => rubydebug
}
}
}