Reach Us

Extracting and Analyzing AWS CloudWatch Logs using Python and Boto3

Ensuring the reliability, availability, and performance of modern cloud-based systems is critical for businesses and users. Site Reliability Engineering (SRE) is an approach that focuses on merging software engineering practices with IT operations to build and maintain scalable and highly reliable systems. In this blog, we will explore how we leveraged Boto3, the AWS SDK for Python to automate the extraction and analysis of AWS CloudWatch Logs.

The customer and the project background:

Our customer is a major Indian MNC manufacturer of Internal combustion engine motorcycles and scooters, and had recently forayed into the electric vehicle business. Our customer had their IOT software component designed at a design center in Europe. The European design center had deployed their proprietary IOT software in AWS on EC2 instances running Docker containers. The container instances receive massive amounts of diagnostic data from the electric vehicle’s Telematic Control Unit (TCU), which are streamed into log files. The unified CloudWatch agent was installed on the server and the logs were being sent to multiple AWS CloudWatch log groups. The European team had recently handed over the AWS account to our SRE team. Another team in our customer’s development center was handling the development of mobile apps based on the IOT APIs.

Now, the problem statement:

The development team wanted to extract the TCU logs (https://en.wikipedia.org/wiki/Telematic_control_unit), from a specific AWS CloudWatch log group. The TCU is represented as a unique 15-digit number and each line in the log file was associated with a specific TCU number. Each connected TCU from the electric vehicle would send millions of lines of diagnostic information as logs.

Initial analysis:

CloudWatch logs provide a Log Insights feature, where logs can be queried using a domain-specific query language. The query feature had a “hard limit” of 10,000 hits. This means only 10,000 lines matching a specific query could be extracted as CSV from the Log Insights. Initially, we did this the hard way by manually running the below query multiple times, to extract 10,000 lines at a time by adjusting the timestamp field.

fields @timestamp, @message, @logStream, @log
| sort @timestamp asc
| filter @message like /TCU NUMBER/
| limit 10000

The limit directive in the above query is a forced one, as this domain-specific query forces a limit directive. The maximum value of the limit is 10,000. It can be lower but removing the limit or giving a higher limit, errors out the query.

Manual querying was tedious. We created AWS accounts for developers with access only to fetch logs from the CloudWatch log groups. Despite sharing this with the developers, it was time-consuming to extract the logs. Remember, to extract a match of 1,00,000 lines in the log group, the query had to be run a minimum of 10 times by adjusting the timestamp field, and 10 files were downloaded as only 10,000 matches could be fetched from the Log Insights. Also, the downloaded Excel file had a 13-digit timestamp in Unix epoch format, which had to be converted to human readable time using Excel formulas.

Our solution:

We quickly realized the futility of extracting the logs manually and started exploring options.

  1. One was to stream the CloudWatch logs to ElasticSearch/OpenSearch and then use a frontend to query the logs. This involved setting up ElasticSearch and a Kibana frontend and required budget approvals. It would have taken at least a month to materialize. Considering a major chunk of our working time in the last 2 days was spent extracting logs from the AWS Management Console, this was not an immediate solution.
  2. Use the AWS SDK for Python, and write a script to fetch the logs, convert the timestamp, and download the logs. This was a simple solution achievable in a day.

We created this solution on the third day. Below is the script that was created and passed to the team and developers, and the task of downloading the logs was simplified and transferred to the developers. The script fetches the “timestamp” and “message” fields from the CloudWatch log group for a particular time period and writes to a file.

This simple and readable script emphasizes the fact that complex problems can be solved through simple automation. Additionally, knowing a scripting language can save the day for you as a Site Reliability Engineer.

Disclaimer: This is a boto3 script. It uses the filter_log_events method of the CloudWatch logs client. Multiple online blogs were referred and ChatGPT was leveraged to create this script.

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/filter_log_events.html

import boto3
import csv
from datetime import datetime


""" Fetch Response from the User. The user should have the credentials and permission to fetch Cloudwatch Logs """

access_key = input("Enter the AWS_ACCESS_KEY_ID: ")
secret_key = input("Enter the AWS_SECRET_ACCESS_KEY: ")
session_token = input("Enter the AWS_SESSION_TOKEN: ")

# Use the credentials to create a new boto3 session
temporary_session = boto3.Session(
   aws_access_key_id=access_key,
   aws_secret_access_key=secret_key,
   aws_session_token=session_token
)

# Initialize cloudwatch logs client
cloudwatch = temporary_session.client('logs')



""" The function get_matching_logs_csv uses filter_log_events method to fetch the logs from cloudwatch Log Group and store it in the variable “log_events”. The logs get appended to this variable and it will store all the matching logs. The human-readable time format is converted to epoch format in milliseconds for querying Cloudwatch Logs. """ 

def get_matching_logs_csv(log_group_name, start_time, end_time, search_string):
   log_events = []
   next_token = None

   while True:
       kwargs = {
           'logGroupName': log_group_name,
           'startTime': int(start_time.timestamp() * 1000),
           'endTime': int(end_time.timestamp() * 1000),
           'filterPattern': search_string,
       }

       if next_token:
           kwargs['nextToken'] = next_token

       response = cloudwatch.filter_log_events(**kwargs)

       if 'events' in response:
           log_events.extend(response['events'])

       if 'nextToken' in response:
           next_token = response['nextToken']
       else:
           break

   return log_events

""" The function convert_logs_to_csv writes the contents of log_events in a CSV file with the name 'TCU_'+search_string+'_matched_logs.csv'. This function reconverts the 13 digit Unix Epoch timestamp to Human readable Timestamp and writes into the timestamp field in the log file """

def convert_logs_to_csv(log_events, csv_filename):
   with open(csv_filename, 'w', newline='') as csvfile:
       fieldnames = ['timestamp', 'message']
       writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
       writer.writeheader()
       for log in log_events:
           timestamp = datetime.fromtimestamp(log['timestamp'] / 1000.0).strftime('%Y-%m-%d %H:%M:%S')
           message = log['message']
           writer.writerow({'timestamp': timestamp, 'message': message})


log_group_name = '/logs/iot-sock-server'

StartTime=input("Enter a Start Time in Format %Y-%m-%d %H:%M:%S: ")
EndTime=input("Enter a End Time in Format %Y-%m-%d %H:%M:%S: ")
search_string=input("Enter the TCU Number to Search: ")

start_time=datetime.strptime(StartTime, "%Y-%m-%d %H:%M:%S")
end_time=datetime.strptime(EndTime, "%Y-%m-%d %H:%M:%S")

# Get the Number of Lines in the csvfile for printing below
def get_csv_line_count(filename):
   line_count = 0

   with open(filename, 'r') as csvfile:
       reader = csv.reader(csvfile)
       for _ in reader:
           line_count += 1

   return line_count

logs = get_matching_logs_csv(log_group_name, start_time, end_time, search_string)

if logs:
   csv_filename = 'TCU_'+search_string+'_matched_logs.csv'
   convert_logs_to_csv(logs, csv_filename)
   line_count = get_csv_line_count(csv_filename)
   print(f"Logs matching '{search_string}' between {start_time} and {end_time} downloaded to '{csv_filename}'.")
   print(f"Number of records in '{csv_filename}': {line_count}")
else:
   print("No matching logs found.")

Please feel free to reuse or modify it to suit your requirements. If you have any suggestions to improve the script, a different approach, or the next steps you would consider, do share them with us in a comment below.

Looking for Site Reliability Engineering expertise for your cloud environment? Write to us at sales@cloudifyops.com to know more.

Privacy Settings
We use cookies to enhance your experience while using our website. If you are using our Services via a browser you can restrict, block or remove cookies through your web browser settings. We also use content and scripts from third parties that may use tracking technologies. You can selectively provide your consent below to allow such third party embeds. For complete information about the cookies we use, data we collect and how we process them, please check our Privacy Policy
Youtube
Consent to display content from - Youtube
Vimeo
Consent to display content from - Vimeo
Google Maps
Consent to display content from - Google
Spotify
Consent to display content from - Spotify
Sound Cloud
Consent to display content from - Sound
Contact Us