In this module, we will create a Lambda Function with a specific usage example. The lambda function that we are going to write will contain code for Athena to query and get the 5 Most Popular Songs by hits from the data processed in S3.
In this section, we will provide the code for the lambda function we just created. We will use boto3 to access the Athena client.
Boto is the Amazon Web Services (AWS) Software Development Kit (SDK) for Python. It allows Python developers to create, configure, and manage AWS services, such as EC2 and S3. Boto provides an easy-to-use, object-oriented API, as well as low-level access to AWS services. Read more about Boto here - https://boto3.amazonaws.com/v1/documentation/api/latest/index.html?id=docs_gateway
Read more about Boto3 Athena API methods here - https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena.html
import boto3
import time
import os
# Environment Variables
DATABASE = os.environ['DATABASE']
TABLE = os.environ['TABLE']
# Top X Constant
TOPX = 5
# S3 Constant
S3_OUTPUT = f's3://{os.environ["BUCKET_NAME"]}/query_results/'
# Number of Retries
RETRY_COUNT = 10
def lambda_handler(event, context):
client = boto3.client('athena')
# query variable with two environment variables and a constant
query = f"""
SELECT track_name as \"Track Name\",
artist_name as \"Artist Name\",
count(1) as \"Hits\"
FROM {DATABASE}.{TABLE}
GROUP BY 1,2
ORDER BY 3 DESC
LIMIT {TOPX};
"""
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={ 'Database': DATABASE },
ResultConfiguration={'OutputLocation': S3_OUTPUT}
)
query_execution_id = response['QueryExecutionId']
# Get Execution Status
for i in range(0, RETRY_COUNT):
# Get Query Execution
query_status = client.get_query_execution(
QueryExecutionId=query_execution_id
)
exec_status = query_status['QueryExecution']['Status']['State']
if exec_status == 'SUCCEEDED':
print(f'Status: {exec_status}')
break
elif exec_status == 'FAILED':
raise Exception(f'STATUS: {exec_status}')
else:
print(f'STATUS: {exec_status}')
time.sleep(i)
else:
client.stop_query_execution(QueryExecutionId=query_execution_id)
raise Exception('TIME OVER')
# Get Query Results
result = client.get_query_results(QueryExecutionId=query_execution_id)
print(result['ResultSet']['Rows'])
# Function can return results to your application or service
# return result['ResultSet']['Rows']
Scroll down to the Environment Variables section and add the following three Environment Variables.
SELECT track_name as "Track Name",
artist_name as "Artist Name",
count(1) as "Hits"
FROM analyticsworkshopdb.processed_data
GROUP BY 1,2
ORDER BY 3 DESC
LIMIT 5;
Compare the results of this query with the results of the lambda function; they should be the same.
GREAT!
You have now created a lambda function from scratch and tested it.
The Next module will cover the final topic of building Data Warehouse on Amazon Redshift.