Friday, February 19, 2021

 AWS Athena Table auto partition using 'ALTER TABLE'


Hello Geeks,

Below program is the solution to create automatic Athena table partition by using 'Alter table' command which is much faster than 'MSCK REPAIR' command specially when we dealing with large tables. Using AWS Lambda function(server-less solution) and DynamoDB (for storing latest partition information).

So, Lets' start..

first we will create Dynamo Table:

It's very straight forward, goto AWS dynamodb console, Create DynamoDB table.

> Table name* -> xxxxx  <this table name we have to provide as lamda env variable

> Primary key* 

    -> tab_name  -> String <put any string like test><this column I am using as Primary key> 

> day -> Number (put default 0)

> month -> Number (put default 0)

> Year -> Number (put default 0)

 --------------------------------------------

>AWS Lambda function 

>Function name: any

> runtime: python 3.7

> IAM role required: AmazonAthenaFullAccess, CloudWatchFullAccess,AmazonS3FullAccess or Athena Query Result Directory access

 

--------------

import boto3
import os
import _datetime
from botocore.exceptions import ClientError

#define client object
athena_client=boto3.client('athena')
dynamodb=boto3.resource('dynamodb')

#Environment Variables
athena_table_str=os.environ['ATHENA_TABLES']
athena_table_list=list(athena_table_str.split(','))
athena_db_name=os.environ['ATHENA_DB_NAME']
query_output=os.environ['QUERY_OUTPUT']
delta=int(os.environ['DELTA'])
dynamo_table_name=str(os.environ['DYNAMO_TABLE_NAME'])

#Initilization year, month and date (initial value)
init_y=int(os.environ['INIT_YEAR'])
init_m=int(os.environ['INIT_MONTH'])
init_d=int(os.environ['INIT_DAY'])

def get_partition_date(tablename):
    dynamoTable=dynamodb.Table(dynamo_table_name)
    try:
        response=dynamoTable.get_item(Key={'tab_name': tablename })
        if 'Item' not in response:
            dynamoTable.put_item(
                Item={
                    'tab_name':tablename,
                    'year' : init_y,
                    'month': init_m,
                    'day'  : init_d
                })
            response=dynamoTable.get_item(Key={'tab_name': tablename })  
            return response['Item']
        else:
            return response['Item']
    except ClientError as e:
        print(e.response['Error']['Message'])
    
def lambda_handler(event, context):
    for t in athena_table_list:
        d_obj=get_partition_date(t)

        y=d_obj['year']
        m=d_obj['month']
        d=d_obj['day']
        
        start_date = _datetime.date(y,m,d)
        day_delta=_datetime.timedelta(days=1)
        end_date=start_date + delta*day_delta
    
        queryContext={'Database' : athena_db_name}
        resultConfig = {'OutputLocation': query_output}
    
        try:
            for i in range ((end_date - start_date).days):
                end_date=start_date + i*day_delta
                sql = "alter table {} add partition (created_dt='{}')".format(t,end_date)
                result = athena_client.start_query_execution(QueryString=sql, QueryExecutionContext=queryContext, ResultConfiguration=resultConfig )
        
            dt_lst=(str(end_date).split('-'))
            yy=int(dt_lst[0])
            mm=int(dt_lst[1])
            dd=int(dt_lst[2])
        
            dynamoTable=dynamodb.Table(dynamo_table_name)
            dynamoTable.put_item(
                Item={
                    'tab_name':t,
                    'year' : yy,
                    'month': mm,
                    'day'  : dd
                })        
        except Exception as e:
            print(e)
            print('Error')
            raise e

--------------

Required Environment Variables:-

ATHENA_DB_NAME: <same db name which is using/creating in athena console>

ATHENA_TABLES : List of athena tables i.e table1,table2,table3,table4

DELTA: It's an integer value, showing no of days you want to create partition. Ex: 4

DYNAMO_TABLE_NAME: <Create dynamo table first and put table name here>

INIT_DAY:  <integer value:> from which day partitions start to create

INIT_MONTH: <integer value:> from which month partitions start to create

INIT_YEAR: <integer value:> from which year partitions start to create

QUERY_OUTPUT: Athena Query Result Location <you may find as , goto athena console, click settings then search "Query result Location"