Configuring the Receiver cluster for single line logs

To implement a scalable data collection architecture, install and configure a cluster of Logstash servers to receive data from the LFA and write it to Apache Kafka.

Before you begin

Install Logstash on the remote servers and create the required utility script. For more information, see Installing Logstash and the utility script.

About this task

You must create at least one Logstash server to act as a receiver. In a production environment, you need to use more than one instance of Logstash in a cluster.

You need to complete this task for each instance of Logstash in your cluster.

Procedure

  1. Stop the Logstash server.
  2. Edit the <logstash_install>/logstash/logstash-version_number/logstash-scala/logstash/config/<logstash_instance>.config file, where version_number is the Logstash version number, as defined in Other supported software.
  3. To allow Logstash to receive data from the Log File Agent, add the receiver port information to the input section:
    Add the parameters that are listed in the following table to the input section.
    Table 1. Input parameters
    Parameter Description
    port Specify the port that is used to connect to IBM® Tivoli® Monitoring Log File Agent.
    type Specify the type of plug-in that you want to connect to. For example LFA or syslog.
    The input section connects to the IBM Tivoli Monitoring Log File Agent to collect the data.
    
    input {
           ## TCP Input plugin for IBM Log File Agent (LFA)
           tcp {
                    port => 18989
                    type => lfa
            }
            ## TCP input plugin for Syslog data
    	tcp {
    		port => 18969
    		type => syslog
    	}
            ## beats Input plugin for Filebeat data
    	beats {
    		port => 18979
    		#type => filebeat-ip
    	}
    }
    
    The type is referenced in the filter section to add the required fields and process the message before the message is sent to the Apache Kafka server.
  4. To process messages from the Log File Agent and add the required fields, update the filter section:

    The filter section matches each agent specific log record with an appropriate pattern. You can find an example file called SCALAPATTERNS in the <HOME>/IBM/LogAnalysis/utilities/kafka/test-configs directory. Copy this file to the patterns directory in your Logstash instances. For example, the patterns directory can be <logstash_install>/Logstash/patterns.

    The filter also extracts the fields that are required to define the logical and physical data source. The filter uses these fields to add two further fields, datasource and resourceID to the message. These fields are used to send the data to the appropriate partition or topic that is specified in the output section of your Apache Kafka configuration.

    The datasource field is used to create the topic in Apache Kafka. The resourceID field is used to map the data to partitions.

    For example:
    
    filter {
          if [fields][collector] == "filebeats" {
    		mutate {
    			## Add field data source based on the site, module & type
    			## defined in filebeat.yml config
    			add_field => [ datasource, 
    %{[fields][site]}_%{[fields][module]}_%{[fields][type]} ]
    			add_field => [ resourceID, %{[beat][hostname]}_%{source}_1 ]
    			add_tag => "grok_filebeat"
    		}
    	}
    
            if [type] == lfa {
                    grok {
                            patterns_dir => <patterns_dir_path>/petterns
                            match => [ message, %{LFAMESSAGE} ]
                            add_tag => [grok_lfa]
                    }
    	}
            if [type] == syslog {
                    grok {
                            match => [ message, %{SYSLOGLINE} ]
                            add_tag => [grok_syslog]
                    }
            }
    
            if grok_lfa in [tags] {
                    mutate {
                            replace => [message,%{LFA_ORIG_MSG}]
    			## Add field datasource based on site, module & type
    			## defined in .fmt file for LFA
                            add_field => [ datasource, 
    %{LFA_SITE}_%{LFA_MODULE}_%{LFA_TYPE}]
                            add_field => [ resourceID, %{LFA_HOSTNAME}_%{LFA_LOGNAME}_1]
                    }
            }
            if "grok_syslog" in [tags] {
    		grok {
    			match => [ message, <*>%{GREEDYDATA:SYSLOG_MSG}}
    		mutate {
    			replace => [message,%{SYSLOG_MSG}]
    			add_field => [ datasource, Linux_OS_Syslog]
    			add_field => [ resourceID, %{logsource}_1]
    		}
            }
    
    }
    where <Logstash_install_location> is the directory where you installed Logstash. <patterns_directory> is the directory where you stored the patterns used by Logstash.
  5. Create a new patterns file or use the <HOME>/IBM/LogAnalysis/kafka/test-configs/SCALAPATTERNS sample file. Save the file in the patterns directory. The following example is based on this file. It is broken up over a number of lines. When you create your own pattern, the code must be entered as a single line.
    LFAMESSAGE 
    <START>.*type='%{DATA:LFA_TYPE}';
    text='%{DATA:LFA_ORIG_MSG}';
    RemoteHost='%{DATA:LFA_REMOTE_HOST}';
    site='%{DATA:LFA_SITE}';instance='%{DATA:LFA_INSTANCE}';
    hostname='%{DATA:LFA_HOSTNAME}';
    cluster='.*';module='%{DATA:LFA_MODULE}';
    env='%{DATA:LFA_ENVIRONMENTNAME}';
    logpath='%{DATA:LFA_LOGNAME}';
    functional='%{DATA:LFA_FUNCTIONALNAME}';END
    LALFAMESSAGE 
    <START>.*text='%{DATA:LFA_OrigMsg}';
    RemoteHost='.*';
    hostname='%{DATA:LFA_HOSTNAME}';
    env='%{DATA:LFA_ENVIRONMENTNAME}';
    logpath='%{DATA:LFA_LOGNAME}';END
  6. To send messages that use the grok_lfa tag to the Apache Kafka cluster, add the fields that are listed in following table to the output section.
    Specify values for the variables that are listed in the following table.
    Table 2. Parameters for output section
    Parameters Description
    path Specify the full path to the directory where you store the log files for debugging.
    codec Specify the codec that you want to use as part of your output.
    bootstrap_servers List the Apache Kafka servers in the <Ip_address>:<port> format. Separate entries with commas.
    topic_id Specify %{datasource} to map the topic ID to the data source that is defined in the filter section.
    message_key Specify %{resourceID} to determine the Apache Kafka partition, which is mapped to the resource ID.
    batch_size Specify the number of records that must be created before a batch is sent to Apache Kafka. Specify 500000.
    acks Specify all.
    retries Specify the number of times Logstash sends a batch of data again if it fails initially. Specify 0.
    metadata_fetch_timeout_ms Specify 60000.
    metadata_max_age_ms Specify 300000.
    For example:
    
    output {
           if (_grokparsefailure in [tags]) {
    		file {
    			path => <Log_directory_path>/receiver-sl-grokparsefailure.log
    			codec => rubydebug
    		}
    	}
    	else {
    		file {
    			path => <Log_directory_path>/receiver-sl-rubydebug.log
    			codec => rubydebug
    		}
    
    	}
    	if (grok_lfa in [tags] or grok_filebeat in [tags] or 
    grok_syslog in [tags]) and ! (_grokparsefailure in [tags]) {
      		kafka {
    			bootstrap_servers => <Ip_address>:<port>
    			topic_id => %{datasource}
    			message_key => %{resourceID}
    			batch_size => 500000 
    			acks => all
    			retries => 0
    			metadata_fetch_timeout_ms => 60000
    			metadata_max_age_ms => 300000
    		}
    	}
    }
  7. Start the Logstash instance.
  8. Set up extra receiver configurations for availability and failover if required.

Example

The following example processes events where the type is lfa and matches these to the patterns. The datasource and resourceID are also added based on the metadata in the event.
filter {
        if [type] == lfa {
                grok {
                        patterns_dir => home/la/logstash/patterns
                        match => [ message, %{LFAMESSAGE} ]
                        add_tag => [grok_lfa]
                }
        }
        if grok_lfa in [tags] {
                mutate {
                        replace => [message,%{LFA_ORIG_MSG}]
                        add_field => [ datasource, 
%{LFA_SITE}_%{LFA_MODULE}_%{LFA_TYPE}]
                        add_field => [ resourceID, 
%{LFA_HOSTNAME}_%{LFA_LOGNAME}_1]
                }
        }
}
The output section writes data to the Apache Kafka cluster while mapping the data source to the topic_id parameter. This configuration ensures that one topic is created for the logical data source. It also ensures that data from each physical data source is written to the same partition within the topic. For example:

ouput{
if (grok_lfa in [tags]) and ! (_grokparsefailure in [tags]) {
  kafka {
    bootstrap_servers =>kafkabroker1.example.com:17911, 
kafkabroker2.example.com:17911
    topic_id => %{datasource}
    message_key => %{resourceID}
  }
}