Configuring the Sender cluster for single line logs

To implement a scalable data collection architecture, install, and configure a cluster of Logstash servers to send data to Log Analysis.

Before you begin

Install Logstash on the servers and create the required utility script. For more information, see Installing Logstash and the utility script.

When Log Analysis is not available, Logstash caches data on the local disk as specified in the sender-logstash-single_line.conf or sender-logstash-multi_line.conf configuration file. Ensure that there is sufficient space on your local disk in case this happens. Assign 1 Gigabyte (GB) disk space for every 1 GB of data that you load into Log Analysis with Logstash. For example, if you stream 200 GB per day, ensure that the local disk has 200 GB space in the local disk.

About this task

Repeat these steps for each instance of Logstash in your cluster.

Procedure

  1. Stop the Logstash instance.
  2. Edit the Logstash configuration file.
    For example, if you use the Logstash that is delivered with Log Analysis, edit the <logstash_install>/logstash/logstash-version_number/logstash-scala/logstash/config/<logstash_instance>.config file, where version_number is the Logstash version number, as defined in Other supported software.
  3. To ensure that the Logstash instance can read data that is sent from the Apache Kafka cluster, add the following information in the input section for each topic. The Sender Logstash instance reads data from the topic or partition that you specify in the input section. The important parameters are group_id, topic_id, and consumer_threads.
    Add the parameters that are listed in the table below:
    Table 1. Parameters for Logstash configuration
    Parameter Description
    zk_connect Specify the Apache ZooKeeper server and port in the following format <zookeeper_server:port>.
    group_id Specify the group ID. The group_id identifies the groups of consumers.
    topic_id Specify the topic ID. The topic_id identifies the topic that consumes the messages. Use the same name for the topic_id and the group_id. For example:
       group_id => MY_WAS_SystemOut
    	      topic_id => MY_WAS_SystemOut
    	    	      	      

    If you are running multiple Logstash servers in your Receiver cluster, ensure that 2 instances of Logstash do not read data from the same topic_id. Each instance must read data from a different topic_id. The topic_id is specified in the input section of the Apache Kafka configuration file.

    consumer_threads Ensure that the consumer_threads parameter matches the number of partitions that are specified in the Apache Kafka configuration. The consumer_threads parameter specifies the number of consumers that are created in a consumer group. Each thread or consumer maps to a partition for the specified topic or logical data source, This ensures that data is processed concurrently. If you specify fewer partitions than consumer threads, some threads remain idle while they wait for an available partition.
    consumer_restart_on_error Set this value to true.
    consumer_restart_sleep_ms Set this value to 100 milliseconds. Specify 100.
    fetch_message_max_bytes Set the maximum value which triggers message collection. Specify 500000.
    queue_size Specify the size of the message queue. Specify 2000.
    auto_offset_reset Specify smallest. This value controls how Apache Kafka processes the log files, whether it starts with the earliest or smallest or latest or largest log file. For more information, see https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-auto_offset_reset.
    The following example is the input section of a configuration for log file records with a single line:
    
    input {
    	## Kafka input plugin
    	kafka {
    		  zk_connect => <IP_ADDRESS>:17981
                    group_id => PUNE_WAS_SystemOut
                    topic_id => PUNE_WAS_SystemOut
                    consumer_threads => 4
                    consumer_restart_on_error => true
                    consumer_restart_sleep_ms => 100
                    fetch_message_max_bytes => 500000
    	         queue_size => 2000
                    auto_offset_reset => smallest
    	}
    	kafka {
    		  zk_connect => <IP_ADDRESS>:17981
                    group_id => ETZ_Apache_Hadoop
                    topic_id => ETZ_Apache_Hadoop
                    consumer_threads => 4
                    consumer_restart_on_error => true
                    consumer_restart_sleep_ms => 100
                    fetch_message_max_bytes => 500000
    	         queue_size => 2000
                    auto_offset_reset => smallest
    	}
    	kafka {
    		  zk_connect => <IP_ADDRESS>:17981
                    group_id => Linux_OS_Syslog
                    topic_id => Linux_OS_Syslog
    		  consumer_threads => 4
                    consumer_restart_on_error => true
                    consumer_restart_sleep_ms => 100
                    fetch_message_max_bytes => 500000
    	         queue_size => 2000
                    auto_offset_reset => smallest
    	}
    }
  4. To ensure that the Logstash instance tags the log files with the required data before it is sent to Log Analysis, update the filter section.
    The required parameters for the filter section are different for single and multi-line log file records.
    The following example is the filter section of a configuration for log file records with a single line:
    
    filter {
    	mutate {
    		add_tag => [NO_OP]
    	}
    	if grok_lfa in [tags] {
    		mutate {
    			replace => { host => %{LFA_SITE}_%{LFA_MODULE}}
    			add_field => { path => %{LFA_TYPE} }
    			add_field => { Hostname => %{LFA_HOSTNAME} }
    		}
    	}
    	if "grok_filebeat" in [tags] {
    		mutate {
    			replace => { host => %{[fields][site]}_%{[fields][module]} }
    			add_field => { path => %{[fields][type]} }
    			add_field => { Hostname => %{[beat][hostname]} }
    		}
    	}
    	if "grok_syslog" in [tags] {
    		mutate {
    			replace => { host => Linux_OS}
    			add_field => { path => /Syslog }
    		}
    	}
    
    }
  5. Review the output section and ensure that everything is correct.
    Specify the variables in the output section as outlined in the Variables for output section table:
    Table 2. Variables for output section
    Variable Description
    <ip_address> The IP address of the server where Log Analysis is installed.
    <cache_directory_path> The path to the cache-dir directory where cached files are stored.
    <log_directory_path> The path to the directory where logs from the Ruby-debug codec are stored.
    The output section contains the parameters that are listed in the following table.
    Table 3. Parameters for output section
    Parameter Description
    scala_url Specify the url that is used to connect to Log Analysis. For example https://<Ip_address>:9987/Unity/DataCollector.
    scala_user Specify the user used to connect to Log Analysis.
    scala_password Specify the password for the Log Analysis user.
    scala_keystore_path Specify the path to the Log Analysis keystore file.
    batch_size Specify the number of records in the batch that is sent to Log Analysis. Specify 500000.
    idle_flush_time Specify the number of seconds that Logstash idles before flushing the cache. Specify 5.
    num_concurrent_writers Specify 20.
    use_structured_api Set this parameter to false.
    disk_cache_path Specify the path to directory where files are cached. For example Cache_directory_path/cache-dir.
    metadata_fields For more information, see Adding metadata fields.
    For example, if you install Logstash with the remote tool, the output section is as follows:
    
    output {
    if NO_OP in [tags] {
       scala {
        scala_url => https://<Ip_address>:9987/Unity/DataCollector
        scala_user => unityadmin
        scala_password => unityadmin
        scala_keystore_path => 
        batch_size => 500000
        idle_flush_time => 5
        sequential_flush => true
        num_concurrent_writers => 20
        use_structured_api => false
        disk_cache_path => <cache_directory_path>/cache-dir
        metadata_fields => {
          PUNE_WAS@SystemOut => {
            field_names => Hostname
            field_paths => Hostname
          }
          PUN_OS@Syslog => {
            field_names => Hostname,Application
            field_paths => logsource,program
          }
        }
        scala_fields => {
           Linux@/Syslog
              => message,collector,host,priority,timestamp,logsource,program,
    severity,facility,facility_label,severity_label,tags
        }
        date_format_string => yyyy-MM-dd'T'HH:mm:ssX
        log_file => <log_directory_path>/scala_logstash.log
        log_level => info
       }
       ## File Output plugin using rubydebug codec for troubleshooting 
    messages received / processed
       ## Should be disabled in production environment
       file {
        path => <log_directory_path>/singleline-rubydebug.log
        codec => rubydebug
       }
      }
    }
  6. Start the Logstash cluster.