Example sender configuration (single line)

This example shows a possible configuration for your Logstash Sender servers that are used to send data from single line log file records to Log Analysis, as part of your scalable data collection architecture.

Input

Add the parameters that are listed in the following table to the input section.
Table 1. Input parameters for sender configurations
Parameter Description
zk_connect Specify the Apache ZooKeeper server and port in the following format <zookeeper_server:port>.
group_id Specify the group ID. The group_id identifies the groups of consumers.
topic_id Specify the topic ID. The topic_id identifies the topic that consumes the messages. Use the same name for the topic_id and the group_id. For example:
   group_id => MY_WAS_SystemOut
	      topic_id => MY_WAS_SystemOut
	    	      	      

If you are running multiple Logstash servers in your Receiver cluster, ensure that 2 instances of Logstash do not read data from the same topic_id. Each instance must read data from a different topic_id. The topic_id is specified in the input section of the Apache Kafka configuration file.

consumer_threads Ensure that the consumer_threads parameter matches the number of partitions that are specified in the Apache Kafka configuration. The consumer_threads parameter specifies the number of consumers that are created in a consumer group. Each thread or consumer maps to a partition for the specified topic or logical data source, This ensures that data is processed concurrently. If you specify fewer partitions than consumer threads, some threads remain idle while they wait for an available partition.
consumer_restart_on_error Set this value to true.
consumer_restart_sleep_ms Set this value to 100 milliseconds. Specify 100.
fetch_message_max_bytes Set the maximum value which triggers message collection. Specify 500000.
queue_size Specify the size of the message queue. Specify 2000.
auto_offset_reset Specify smallest. This value controls how Apache Kafka processes the log files, whether it starts with the earliest or smallest or latest or largest log file. For more information, see https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-auto_offset_reset.

Add a Kafka section for each Topic ID that you want to use. The topics should correspond to the data sources in Log Analysis.


input {
	## Kafka input plugin
	kafka {
		  zk_connect => <IP_ADDRESS>:17981
                group_id => PUNE_WAS_SystemOut
                topic_id => PUNE_WAS_SystemOut
                consumer_threads => 4
                consumer_restart_on_error => true
                consumer_restart_sleep_ms => 100
                fetch_message_max_bytes => 500000
	         queue_size => 2000
                auto_offset_reset => smallest
	}
	kafka {
		  zk_connect => <IP_ADDRESS>:17981
                group_id => ETZ_Apache_Hadoop
                topic_id => ETZ_Apache_Hadoop
                consumer_threads => 4
                consumer_restart_on_error => true
                consumer_restart_sleep_ms => 100
                fetch_message_max_bytes => 500000
	         queue_size => 2000
                auto_offset_reset => smallest
	}
	kafka {
		  zk_connect => <IP_ADDRESS>:17981
                group_id => Linux_OS_Syslog
                topic_id => Linux_OS_Syslog
		  consumer_threads => 4
                consumer_restart_on_error => true
                consumer_restart_sleep_ms => 100
                fetch_message_max_bytes => 500000
	         queue_size => 2000
                auto_offset_reset => smallest
	}
}
where <IP_ADDRESS> is the IP address of the server where Apache ZooKeeper is installed.

Filter

Use the filter to add the host and path fields to the corresponding data source in Log Analysis. Ensure that the patterns here match the patterns specified in the patterns_dir directory.

filter {
	mutate {
		add_tag => [NO_OP]
	}
	if grok_lfa in [tags] {
		mutate {
			replace => { host => %{LFA_SITE}_%{LFA_MODULE}}
			add_field => { path => %{LFA_TYPE} }
			add_field => { Hostname => %{LFA_HOSTNAME} }
		}
	}
	if "grok_filebeat" in [tags] {
		mutate {
			replace => { host => %{[fields][site]}_%{[fields][module]} }
			add_field => { path => %{[fields][type]} }
			add_field => { Hostname => %{[beat][hostname]} }
		}
	}
	if "grok_syslog" in [tags] {
		mutate {
			replace => { host => Linux_OS}
			add_field => { path => /Syslog }
		}
	}

}

Output

The following example is based on the Logstash output plugin which is created if you install the version of Logstash that is delivered with Log Analysis. The Rubydebug codec is only used for debugging. Remove it in production versions.

Specify the variables in the output section as outlined in the Variables for output section table:
Table 2. Variables for output section
Variable Description
<ip_address> The IP address of the server where Log Analysis is installed.
<cache_directory_path> The path to the cache-dir directory where cached files are stored.
<log_directory_path> The path to the directory where logs from the Ruby-debug codec are stored.
The output section contains the parameters that are listed in the following table.
Table 3. Parameters for output section
Parameter Description
scala_url Specify the url that is used to connect to Log Analysis. For example https://<Ip_address>:9987/Unity/DataCollector.
scala_user Specify the user used to connect to Log Analysis.
scala_password Specify the password for the Log Analysis user.
scala_keystore_path Specify the path to the Log Analysis keystore file.
batch_size Specify the number of records in the batch that is sent to Log Analysis. Specify 500000.
idle_flush_time Specify the number of seconds that Logstash idles before flushing the cache. Specify 5.
num_concurrent_writers Specify 20.
use_structured_api Set this parameter to false.
disk_cache_path Specify the path to directory where files are cached. For example Cache_directory_path/cache-dir.
metadata_fields For more information, see Adding metadata fields.

output {
if NO_OP in [tags] {
   scala {
    scala_url => https://<Ip_address>:9987/Unity/DataCollector
    scala_user => unityadmin
    scala_password => unityadmin
    scala_keystore_path => 
    batch_size => 500000
    idle_flush_time => 5
    sequential_flush => true
    num_concurrent_writers => 20
    use_structured_api => false
    disk_cache_path => <cache_directory_path>/cache-dir
    metadata_fields => {
      PUNE_WAS@SystemOut => {
        field_names => Hostname
        field_paths => Hostname
      }
      PUN_OS@Syslog => {
        field_names => Hostname,Application
        field_paths => logsource,program
      }
    }
    scala_fields => {
       Linux@/Syslog
          => message,collector,host,priority,timestamp,logsource,program,
severity,facility,facility_label,severity_label,tags
    }
    date_format_string => yyyy-MM-dd'T'HH:mm:ssX
    log_file => <log_directory_path>/scala_logstash.log
    log_level => info
   }
   ## File Output plugin using rubydebug codec for troubleshooting 
messages received / processed
   ## Should be disabled in production environment
   file {
    path => <log_directory_path>/singleline-rubydebug.log
    codec => rubydebug
   }
  }
}