Lambda sample code for subscription filter
Sample Python code that you can use for the Lambda function in the CloudWatch subscription filter
Note: Before you use the sample code, make sure that
AWS_REGION is set to the
proper AWS region. Change the value if
necessary.################################################################################
#
# Licensed Materials - Property of IBM
# 5900-B3O
# (C) Copyright IBM Corp. 2023.
#
# US Government Users Restricted Rights -
# Use, duplication or disclosure restricted by GSA ADP Schedule
# Contract with IBM Corporation
#
# DISCLAIMER OF WARRANTIES :
# Permission is granted to copy and modify this Sample code provided
# that both the copyright notice,- and this permission notice and
# warranty disclaimer appear in all copies and modified versions.
#
# THIS SAMPLE CODE IS LICENSED TO YOU AS-IS.
# IBM AND ITS SUPPLIERS AND LICENSORS DISCLAIM ALL WARRANTIES,
# EITHER EXPRESS OR IMPLIED, IN SUCH SAMPLE CODE, INCLUDING THE
# WARRANTY OF NON-INFRINGEMENT AND THE IMPLIED WARRANTIES OF
# MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. IN NO EVENT
# WILL IBM OR ITS LICENSORS OR SUPPLIERS BE LIABLE FOR ANY DAMAGES
# ARISING OUT OF THE USE OF OR INABILITY TO USE THE SAMPLE CODE OR
# COMBINATION OF THE SAMPLE CODE WITH ANY OTHER CODE. IN NO EVENT
# SHALL IBM OR ITS LICENSORS AND SUPPLIERS BE LIABLE FOR ANY LOST
# REVENUE, LOST PROFITS OR DATA, OR FOR DIRECT, INDIRECT, SPECIAL,
# CONSEQUENTIAL,INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER CAUSED AND
# REGARDLESS OF THE THEORY OF LIABILITY,-, EVEN IF IBM OR ITS
# LICENSORS OR SUPPLIERS HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH
# DAMAGES.
################################################################################
import boto3
import time
import gzip
import json
import base64
import datetime
import dateutil
def lambda_handler(event, context):
readCloudWatchLog(event)
def readCloudWatchLog(event):
'''
Reads the incomming cloud watch log.
Change AWS_REGION to desired region
'''
AWS_REGION = "us-east-2"
client = boto3.client('logs',region_name=AWS_REGION)
cloudwatchDataB64Encoded = event['awslogs']['data']
compressedCloudwatchData = base64.b64decode(cloudwatchDataB64Encoded)
uncompressedCloudwatchData = gzip.decompress(compressedCloudwatchData)
cloudwatchDataJSON = json.loads(uncompressedCloudwatchData)
importCloudwatchLogGroupName, importCloudwatchLogStreamName = getLogDetails(cloudwatchDataJSON)
log_events = cloudwatchDataJSON['logEvents']
for log_event in log_events:
logMessage = json.loads(log_event['message'])
message = getMoreLogs(client, log_event['timestamp'], logMessage['message'], importCloudwatchLogGroupName, importCloudwatchLogStreamName)
writeToCloudWatch(client, log_event['timestamp'], message, importCloudwatchLogGroupName)
def getLogDetails(cloudwatchDataJSON):
'''
Getting log group name and log stream name from incoming log
'''
importCloudwatchLogGroupName = cloudwatchDataJSON['logGroup']
importCloudwatchLogStreamName = cloudwatchDataJSON['logStream']
return importCloudwatchLogGroupName, importCloudwatchLogStreamName
def writeToCloudWatch(client, logTimestamp, message, importCloudwatchLogGroupName):
'''
writting log message to cloudwatch
edit logGroupName and/or logStreamName to customize log group name
or log stream name
'''
topic = getTopic(message)
datestamp = datetime.datetime.fromtimestamp(logTimestamp/1e3).strftime('%Y-%m-%d')
logGroupName = importCloudwatchLogGroupName + "-" + topic + 'Log'
logStreamName = datestamp + '_'+ topic + '_Log_Stream'
if not logGroupExist(client, logGroupName):
createLogGroup (client, logGroupName)
if not logStreamExist(client, logGroupName, logStreamName):
create_log_stream (client, logGroupName, logStreamName)
log_event_insert = {
'logGroupName': logGroupName,
'logStreamName': logStreamName,
'logEvents' :[
{
'timestamp' : logTimestamp,
'message' : message
},
],
}
response = client.put_log_events(**log_event_insert)
def getMoreLogs(client, logTimestamp, firstMessage, importCloudwatchLogGroupName, importCloudwatchLogStreamName):
'''
Check for more logs to append to the first message
'''
message = firstMessage
logTimestampEnd = logTimestamp + 1
get_log_event = {
'logGroupName': importCloudwatchLogGroupName,
'logStreamName': importCloudwatchLogStreamName,
'startTime' : logTimestamp,
'endTime' : logTimestampEnd,
'startFromHead' : True
}
response = client.get_log_events(**get_log_event)
found = False
for events in response['events']:
eventMessage = json.loads(events['message'])
if(found):
# Message found on previous loop
if('message' in eventMessage):
if(not 'Topic' in eventMessage['message']):
# Extra lines of messages found append to original
message += "\n" + eventMessage['message']
else:
# New Topic
break
else:
# No more extra lines of messages
break
else:
if('message' in eventMessage):
# eventMessages contains message
# Check for message match
messageString = ''
if(eventMessage['message'] == firstMessage):
# Message Found
found = True
# If not found then loop again
return message
def logGroupExist(client, logGroupName):
'''
check to see if log group exist. If log group doesn't exist, a log group
will be created
'''
describe_log_group = {
'logGroupNamePrefix' : logGroupName
}
response = client.describe_log_groups(**describe_log_group)
if(len(response['logGroups']) > 0):
return True
else:
return False
def logStreamExist(client, logGroupName, logStreamName):
'''
check to see if log stream exist. If log stream doesn't exist, a log stream
will be created
'''
describe_log_stream = {
'logGroupName' : logGroupName,
'logStreamNamePrefix' : logStreamName
}
response = client.describe_log_streams(**describe_log_stream)
if(len(response['logStreams']) > 0):
return True
else:
return False
def createLogGroup (client, logGroupName):
'''
Create log group if it not exist
'''
create_log_group = {
'logGroupName' : logGroupName
}
response = client.create_log_group(**create_log_group)
def create_log_stream (client, logGroupName, logStreamName):
'''
Create log stream if it not exist
'''
create_log_stream = {
'logGroupName' : logGroupName,
'logStreamName' : logStreamName
}
response = client.create_log_stream(**create_log_stream)
def getTopic (message):
'''
Get topic from message for log group
'''
return message[message.index('[Topic: ') + 8:message.index(']', message.index('[Topic: ') + 8 , -1)]