Example sender configuration (single line)
This example shows a possible configuration for your Logstash Sender servers that are used to send data from single line log file records to Log Analysis, as part of your scalable data collection architecture.
Input
input
section.
Parameter | Description |
---|---|
zk_connect |
Specify the Apache ZooKeeper server and port in the following format <zookeeper_server:port>. |
group_id |
Specify the group ID. The group_id identifies the groups of
consumers. |
topic_id |
Specify the topic ID. The topic_id identifies the topic that consumes the
messages. Use the same name for the topic_id and the group_id . For
example:
If
you are running multiple Logstash servers in your
Receiver cluster, ensure that 2 instances of Logstash do not read data from
the same |
consumer_threads |
Ensure that the consumer_threads parameter matches the number of partitions
that are specified in the Apache Kafka configuration. The
consumer_threads parameter specifies the number of consumers that are created in a
consumer group. Each thread or consumer maps to a partition for the specified topic or logical data
source, This ensures that data is processed concurrently. If you specify fewer partitions than
consumer threads, some threads remain idle while they wait for an available partition. |
consumer_restart_on_error |
Set this value to true. |
consumer_restart_sleep_ms |
Set this value to 100 milliseconds. Specify 100. |
fetch_message_max_bytes |
Set the maximum value which triggers message collection. Specify 500000. |
queue_size |
Specify the size of the message queue. Specify 2000. |
auto_offset_reset |
Specify smallest. This value controls how Apache Kafka processes the log files, whether it starts with the earliest or smallest or latest or largest log file. For more information, see https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-auto_offset_reset. |
Add a Kafka
section for each Topic ID that you want to use. The topics should
correspond to the data sources in Log Analysis.
input {
## Kafka input plugin
kafka {
zk_connect => <IP_ADDRESS>:17981
group_id => PUNE_WAS_SystemOut
topic_id => PUNE_WAS_SystemOut
consumer_threads => 4
consumer_restart_on_error => true
consumer_restart_sleep_ms => 100
fetch_message_max_bytes => 500000
queue_size => 2000
auto_offset_reset => smallest
}
kafka {
zk_connect => <IP_ADDRESS>:17981
group_id => ETZ_Apache_Hadoop
topic_id => ETZ_Apache_Hadoop
consumer_threads => 4
consumer_restart_on_error => true
consumer_restart_sleep_ms => 100
fetch_message_max_bytes => 500000
queue_size => 2000
auto_offset_reset => smallest
}
kafka {
zk_connect => <IP_ADDRESS>:17981
group_id => Linux_OS_Syslog
topic_id => Linux_OS_Syslog
consumer_threads => 4
consumer_restart_on_error => true
consumer_restart_sleep_ms => 100
fetch_message_max_bytes => 500000
queue_size => 2000
auto_offset_reset => smallest
}
}
where
<IP_ADDRESS>
is the IP address of the server where Apache ZooKeeper is installed.Filter
host
and path
fields to the
corresponding data source in Log Analysis. Ensure that
the patterns here match the patterns specified in the patterns_dir
directory.
filter {
mutate {
add_tag => [NO_OP
]
}
if grok_lfa
in [tags] {
mutate {
replace => { host
=> %{LFA_SITE}_%{LFA_MODULE}
}
add_field => { path
=> %{LFA_TYPE}
}
add_field => { Hostname
=> %{LFA_HOSTNAME}
}
}
}
if "grok_filebeat" in [tags] {
mutate {
replace => { host
=> %{[fields][site]}_%{[fields][module]}
}
add_field => { path
=> %{[fields][type]}
}
add_field => { Hostname
=> %{[beat][hostname]}
}
}
}
if "grok_syslog" in [tags] {
mutate {
replace => { host
=> Linux_OS
}
add_field => { path
=> /Syslog
}
}
}
}
Output
The following example is based on the Logstash output plugin which is
created if you install the version of Logstash that is delivered with
Log Analysis. The
Rubydebug
codec is only used for debugging. Remove it in production versions.
output
section as outlined in the
Variables for output section table:
Variable | Description |
---|---|
<ip_address> |
The IP address of the server where Log Analysis is installed. |
<cache_directory_path> |
The path to the cache-dir directory where cached files are stored. |
<log_directory_path> |
The path to the directory where logs from the Ruby-debug codec are
stored. |
output
section contains the parameters that are
listed in the following table.
Parameter | Description |
---|---|
scala_url |
Specify the url that is used to connect to Log Analysis. For example https://<Ip_address>:9987/Unity/DataCollector. |
scala_user |
Specify the user used to connect to Log Analysis. |
scala_password |
Specify the password for the Log Analysis user. |
scala_keystore_path |
Specify the path to the Log Analysis keystore file. |
batch_size |
Specify the number of records in the batch that is sent to Log Analysis. Specify 500000. |
idle_flush_time |
Specify the number of seconds that Logstash idles before flushing the cache. Specify 5. |
num_concurrent_writers |
Specify 20. |
use_structured_api |
Set this parameter to false. |
disk_cache_path |
Specify the path to directory where files are cached. For example Cache_directory_path/cache-dir. |
metadata_fields |
For more information, see Adding metadata fields. |
output {
if NO_OP
in [tags] {
scala {
scala_url => https://<Ip_address>:9987/Unity/DataCollector
scala_user => unityadmin
scala_password => unityadmin
scala_keystore_path =>
batch_size => 500000
idle_flush_time => 5
sequential_flush => true
num_concurrent_writers => 20
use_structured_api => false
disk_cache_path => <cache_directory_path>/cache-dir
metadata_fields => {
PUNE_WAS@SystemOut
=> {
field_names
=> Hostname
field_paths
=> Hostname
}
PUN_OS@Syslog
=> {
field_names
=> Hostname,Application
field_paths
=> logsource,program
}
}
scala_fields => {
Linux@/Syslog
=> message,collector,host,priority,timestamp,logsource,program,
severity,facility,facility_label,severity_label,tags
}
date_format_string => yyyy-MM-dd'T'HH:mm:ssX
log_file => <log_directory_path>/scala_logstash.log
log_level => info
}
## File Output plugin using rubydebug codec for troubleshooting
messages received / processed
## Should be disabled in production environment
file {
path => <log_directory_path>/singleline-rubydebug.log
codec => rubydebug
}
}
}