Lambda sample code for subscription filter

Sample Python code that you can use for the Lambda function in the CloudWatch subscription filter

Note: Before you use the sample code, make sure that AWS_REGION is set to the proper AWS region. Change the value if necessary.
################################################################################
#
# Licensed Materials - Property of IBM
# 5900-B3O
# (C) Copyright IBM Corp. 2023.
#
# US Government Users Restricted Rights -
# Use, duplication or disclosure restricted by GSA ADP Schedule
# Contract with IBM Corporation
#
# DISCLAIMER OF WARRANTIES :
# Permission is granted to copy and modify this  Sample code provided
# that both the copyright  notice,- and this permission notice and
# warranty disclaimer  appear in all copies and modified versions.
#
# THIS SAMPLE CODE IS LICENSED TO YOU AS-IS.
# IBM  AND ITS SUPPLIERS AND LICENSORS  DISCLAIM ALL WARRANTIES,
# EITHER EXPRESS OR IMPLIED, IN SUCH SAMPLE CODE, INCLUDING THE
# WARRANTY OF NON-INFRINGEMENT AND THE IMPLIED WARRANTIES OF
# MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. IN NO EVENT
# WILL IBM OR ITS LICENSORS OR SUPPLIERS BE LIABLE FOR ANY DAMAGES
# ARISING OUT OF THE USE OF OR INABILITY TO USE THE SAMPLE CODE OR
# COMBINATION OF THE SAMPLE CODE WITH ANY OTHER CODE. IN NO EVENT
# SHALL IBM OR ITS LICENSORS AND SUPPLIERS BE LIABLE FOR ANY LOST
# REVENUE, LOST PROFITS OR DATA, OR FOR DIRECT, INDIRECT, SPECIAL,
# CONSEQUENTIAL,INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER CAUSED AND
# REGARDLESS OF THE THEORY OF LIABILITY,-, EVEN IF IBM OR ITS
# LICENSORS OR SUPPLIERS HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH
# DAMAGES.
################################################################################

import boto3
import time
import gzip
import json
import base64
import datetime
import dateutil

def lambda_handler(event, context):
    readCloudWatchLog(event)
    
def readCloudWatchLog(event):
    '''
    Reads the incomming cloud watch log.
    Change AWS_REGION to desired region
    '''
    AWS_REGION = "us-east-2"
    client = boto3.client('logs',region_name=AWS_REGION)

    cloudwatchDataB64Encoded = event['awslogs']['data']
    compressedCloudwatchData = base64.b64decode(cloudwatchDataB64Encoded)
    uncompressedCloudwatchData = gzip.decompress(compressedCloudwatchData)
    cloudwatchDataJSON = json.loads(uncompressedCloudwatchData)

    importCloudwatchLogGroupName, importCloudwatchLogStreamName = getLogDetails(cloudwatchDataJSON)
    
    log_events = cloudwatchDataJSON['logEvents']
    for log_event in log_events:
        logMessage = json.loads(log_event['message'])
        message = getMoreLogs(client, log_event['timestamp'], logMessage['message'], importCloudwatchLogGroupName, importCloudwatchLogStreamName)
        writeToCloudWatch(client, log_event['timestamp'], message, importCloudwatchLogGroupName)
        
def getLogDetails(cloudwatchDataJSON):
    '''
    Getting log group name and log stream name from incoming log
    '''
    importCloudwatchLogGroupName = cloudwatchDataJSON['logGroup']
    importCloudwatchLogStreamName = cloudwatchDataJSON['logStream']
    return importCloudwatchLogGroupName, importCloudwatchLogStreamName
    
def writeToCloudWatch(client, logTimestamp, message, importCloudwatchLogGroupName):
    '''
    writting log message to cloudwatch
    edit logGroupName and/or logStreamName to customize log group name 
    or log stream name
    '''
    topic = getTopic(message)

    datestamp = datetime.datetime.fromtimestamp(logTimestamp/1e3).strftime('%Y-%m-%d')

    logGroupName = importCloudwatchLogGroupName + "-" + topic + 'Log'
    logStreamName = datestamp  + '_'+  topic + '_Log_Stream'

    if not logGroupExist(client, logGroupName):
        createLogGroup (client, logGroupName)

    if not logStreamExist(client, logGroupName, logStreamName):
        create_log_stream (client, logGroupName, logStreamName)
        
    log_event_insert = {
        'logGroupName': logGroupName,
        'logStreamName': logStreamName,
        'logEvents' :[
            {
                'timestamp' : logTimestamp,
                'message' : message
            },
            ],
    }


    response = client.put_log_events(**log_event_insert)

def getMoreLogs(client, logTimestamp, firstMessage, importCloudwatchLogGroupName, importCloudwatchLogStreamName):
    '''
    Check for more logs to append to the first message
    '''
    message = firstMessage
    logTimestampEnd = logTimestamp + 1
    
    get_log_event = {
        'logGroupName': importCloudwatchLogGroupName,
        'logStreamName': importCloudwatchLogStreamName,
        'startTime' : logTimestamp,
        'endTime' : logTimestampEnd,
        'startFromHead' : True
    }
    
    response = client.get_log_events(**get_log_event)

    found = False
    
    for events in response['events']:
        eventMessage = json.loads(events['message'])
        if(found):
            # Message found on previous loop
            if('message' in eventMessage):
                if(not 'Topic' in eventMessage['message']):
                    # Extra lines of messages found append to original
                    message += "\n" + eventMessage['message']
                else:
                    # New Topic
                    break
            else:
                # No more extra lines of messages
                break
        else:
            if('message' in eventMessage):
                # eventMessages contains message
                # Check for message match
                messageString = ''
                if(eventMessage['message'] == firstMessage):
                    # Message Found
                    found = True
        # If not found then loop again
    
    return message

def logGroupExist(client, logGroupName):
    '''
    check to see if log group exist. If log group doesn't exist, a log group 
    will be created
    '''
    describe_log_group = {
        'logGroupNamePrefix' : logGroupName
    }

    response = client.describe_log_groups(**describe_log_group)

    if(len(response['logGroups']) > 0):
        return True
    else:
        return False
    
def logStreamExist(client, logGroupName, logStreamName):
    '''
    check to see if log stream exist. If log stream doesn't exist, a log stream 
    will be created
    '''
    describe_log_stream = {
        'logGroupName' : logGroupName,
        'logStreamNamePrefix' : logStreamName
    }

    response = client.describe_log_streams(**describe_log_stream)

    if(len(response['logStreams']) > 0):
        return True
    else:
        return False

def createLogGroup (client, logGroupName):
    '''
    Create log group if it not exist
    '''
    create_log_group = {
        'logGroupName' : logGroupName
    }
    response = client.create_log_group(**create_log_group)

def create_log_stream (client, logGroupName, logStreamName):
    '''
    Create log stream if it not exist
    '''
    create_log_stream = {
        'logGroupName' : logGroupName,
        'logStreamName' : logStreamName
    }
    response = client.create_log_stream(**create_log_stream)

def getTopic (message):
    '''
    Get topic from message for log group
    '''
    return message[message.index('[Topic: ') + 8:message.index(']', message.index('[Topic: ') + 8 , -1)]