Sender configuration example (multi-line)

This example shows a possible configuration for your Logstash Sender servers that are used to send data from multi-line log file records to Log Analysis, as part of your scalable data collection architecture.

Input

Add the parameters that are listed in the following table to the input section.
Table 1. Input parameters for sender configurations
Parameter Description
zk_connect Specify the Apache ZooKeeper server and port in the following format <zookeeper_server:port>.
group_id Specify the group ID. The group_id identifies the groups of consumers.
topic_id Specify the topic ID. The topic_id identifies the topic that consumes the messages. Use the same name for the topic_id and the group_id. For example:
   group_id => MY_WAS_SystemOut
	      topic_id => MY_WAS_SystemOut
	    	      	      

If you are running multiple Logstash servers in your Receiver cluster, ensure that 2 instances of Logstash do not read data from the same topic_id. Each instance must read data from a different topic_id. The topic_id is specified in the input section of the Apache Kafka configuration file.

consumer_threads Ensure that the consumer_threads parameter matches the number of partitions that are specified in the Apache Kafka configuration. The consumer_threads parameter specifies the number of consumers that are created in a consumer group. Each thread or consumer maps to a partition for the specified topic or logical data source, This ensures that data is processed concurrently. If you specify fewer partitions than consumer threads, some threads remain idle while they wait for an available partition.
consumer_restart_on_error Set this value to true.
consumer_restart_sleep_ms Set this value to 100 milliseconds. Specify 100.
fetch_message_max_bytes Set the maximum value which triggers message collection. Specify 500000.
queue_size Specify the size of the message queue. Specify 2000.
auto_offset_reset Specify smallest. This value controls how Apache Kafka processes the log files, whether it starts with the earliest or smallest or latest or largest log file. For more information, see https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-auto_offset_reset.

Add a Kafka section for each topic ID that you want to use. The topics correspond to the data sources in Log Analysis.


input {
	kafka {
		zk_connect => <Ip_address>:17981
              group_id => BGL_WAS_SystemML
              topic_id => BGL_WAS_SystemML
              consumer_threads => 4
              consumer_restart_on_error => true
              consumer_restart_sleep_ms => 100
	       fetch_message_max_bytes => 500000
	       queue_size => 2000
              auto_offset_reset => smallest
	}
	kafka {
		zk_connect => <Ip_address>:17981
              group_id => ETZ_DB_DB2Diag
              topic_id => ETZ_DB_DB2Diag
              consumer_threads => 4
              consumer_restart_on_error => true
              consumer_restart_sleep_ms => 100
	       fetch_message_max_bytes => 500000
	       queue_size => 2000
              auto_offset_reset => smallest
	}
}
<Ip_address> is the IP address of the Apache ZooKeeper server.

Filter

Use the filter to add the host and path fields to the corresponding data source in Log Analysis. Ensure that the patterns here match the patterns that are specified in the patterns_dir directory.

filter {
	mutate {
		add_tag => [NO_OP]
	}
	if grok_lfa in [tags] {
		mutate {
			replace => { host => %{LFA_SITE}_%{LFA_MODULE}}
			add_field => { path => %{LFA_TYPE} }
			add_field => { Hostname => %{LFA_HOSTNAME} }
		}
	}
	if "grok_filebeat" in [tags] {
		mutate {
			replace => { host => %{[fields][site]}_%{[fields][module]} }
			add_field => { path => %{[fields][type]} }
			add_field => { Hostname => %{[beat][hostname]} }
		}
	}

}

Output

The following example is based on the Logstash output plugin, which is created if you install the version of Logstash that is delivered with Log Analysis. The rubydebug codec is only used for debugging. Remove it in production versions.

To maintain the lines in files with multiple lines, add the metadata_fields section.

Specify the variables as defined in the following table:
Table 2. Variables for output section
Variable Description
<Ip_address> The IP address of the server where Log Analysis is installed.
<Cache_directory_path> The path to the cache-dir directory where cached files are stored.
<Log_directory_path> The path to the directory where logs from the Ruby-debug codec are stored.
The output section contains the parameters that are listed in the following table.
Table 3. Parameters for output section
Parameter Description
scala_url Specify the url that is used to connect to Log Analysis. For example https://<Ip_address>:9987/Unity/DataCollector.
scala_user Specify the user used to connect to Log Analysis.
scala_password Specify the password for the Log Analysis user.
scala_keystore_path Specify the path to the Log Analysis keystore file.
batch_size Specify the number of records in the batch that is sent to Log Analysis. Specify 500000.
idle_flush_time Specify the number of seconds that Logstash idles before flushing the cache. Specify 5.
num_concurrent_writers Specify 20.
use_structured_api Set this parameter to false.
disk_cache_path Specify the path to directory where files are cached. For example Cache_directory_path/cache-dir.
metadata_fields For more information, see Adding metadata fields.

output {
if "NO_OP" in [tags] {
   scala {
    scala_url => https://<Ip_address>:9987/Unity/DataCollector
    scala_user => unityadmin
    scala_password => unityadmin
    scala_keystore_path => 
    batch_size => 500000
    idle_flush_time => 5
    sequential_flush => true
    num_concurrent_writers => 20
    use_structured_api => false
    disk_cache_path => Cache_directory_path/cache-dir
    metadata_fields => {
      BGL_WAS@SystemML => {
        field_names => Hostname,Service,Middleware,resourceID
        field_paths => Hostname,LFA_SERVICE,MIDDLEWARE,resourceID
      }
    }
    date_format_string => yyyy-MM-dd'T'HH:mm:ssX
    log_file => <Log_directory_path>/scala_ml_logstash.log
    log_level => info
   }
   ## File Output plugin using rubydebug codec for troubleshooting 
messages received / processed
   ## Should be disabled in production environment
   file {
    path => <Log_directory_path>/multiline-rubydebug.log"
    codec => rubydebug
   }
  }
}