タグ情報をもとにCloudWatchLogsをS3にエクスポートする

CloudWatchLogsは各リージョンに保存されるものなので、災害対策などで別のリージョンや他のサービスへデータを複製する要件が発生することがあります。
その時に、CloudWatchLogsはS3へデータをエクスポート機能がありますので、他のサイトではLambdaとStepFunctionsを使って自動でエクスポートするような構成をよく見ます。

ただ、よく見る構成だと全CloudWathLogsをS3エクスポートするようなものが多く、ちょっと過剰だなと思っていました。
なので、特定のタグを設定しているCloudWatchLogsのみを選定して、エクスポートするLambdaを作ってみました。
また、CloudWatchLogsのエクスポート処理はログの量によっては、15分を超えるためStepFunctionsと併用されることが多いですが、わざわざStepFunctionsを作るのも面倒だなと思い、Lambdaだけで15分超えても処理を続けられる機構を実装してみました。

15分を超えても処理を継続させるLambdaはなんとか作ってみたいと思っていましたので、実際に動くものが作れてよかったです。
ただ、ちょっと長いソースになってしまったので、今回はloggerでログ出力もさせるようにしておきました。

使用例

このページのLambdaを使用することで以下のようなことが実現できます。
  • 特定のタグを設定したCloudWatchLogsをS3にエクスポート
  • AWS EventBridgeと連携することで日時エクスポート
エクスポートするログは前日出力された分のログのみを抽出してS3にエクスポートするようになっています。
なので、毎日実行するようにAWS EventBridgeを設定することを推奨します。
※AWS EventBridgeの解説は、本ページでは省略いたします。
 作成方法は、こちらをご確認下さい。

構成図

本ページにて紹介するLambdaは、CloudWatchLogsを検索し、特定のタグが設定されているものに対して、エクスポート処理を実行させます。
boto3関数は、CloudWatchLogsの「describe_log_groups」、 「create_export_task」、 「describe_export_tasks」を使用します。

また、「list_tags_log_group」に使用していますが、現在こちらの関数は使用が非推奨となっています。
本来は使わないほうが良いのですが、代替として使うべきと推奨されている「list_tags_for_resource」がいまのLambdaが標準でimportできるboto3では使用できません。(2023/1/11時点)
なので、仕方なく非推奨のほうを使っています。

さらに15分以上の処理をLambdaに実行させるために、Lambdaの「invoke」を使用します。
処理が10分を超えた段階で、現在の処理状態を引き継ぐ形で同じLambaを実行し直します。
これをエクスポート処理が全てのロググループで完了するまでに繰り返して、Lambdaの15分制限を超えて処理を続けさせます。

Lambdaソースコード

エクスポート先のS3バケット名を入力いただくと、すぐに使えるエクスポート用のLambdソースコードが作成できます。
import boto3
import json
import os
import time
from datetime import datetime, timedelta, timezone
from logging import getLogger

lambda_client = boto3.client('lambda')
cwl_client = boto3.client('logs')
cwl_paginator = cwl_client.get_paginator('describe_log_groups')

account_id = boto3.client('sts').get_caller_identity()['Account']
region = os.environ['AWS_REGION']
destination_s3=''

logger = getLogger(__name__)
logger.setLevel('DEBUG')

logger.info('処理を開始します。')
def lambda_handler(event, context):
    start_time = time.time()
    list_source = []
    task_id = ''
    year = datetime.now().strftime('%Y')
    month = datetime.now().strftime('%m')
    day = datetime.now().strftime('%d')
    
    #リストアップ(初回実行) or 前回の処理の引継ぎ
    if not 'list_source' in event:
        from_time = (datetime.now()-timedelta(days=2)).replace(hour=15, minute=0, second=0, microsecond=0)
        to_time = (datetime.now()-timedelta(days=1)).replace(hour=14, minute=59, second=59, microsecond=9999)
        from_time_unix = int(time.mktime(from_time.timetuple()))*1000
        to_time_unix = int(time.mktime(to_time.timetuple()))*1000
        
        for page in cwl_paginator.paginate():
            for logGroup in page['logGroups']:
                retry = 0
                # リトライ付きロググループリストアップ処理
                while retry < 3:
                    try:
                        response = cwl_client.list_tags_log_group(logGroupName=logGroup['logGroupName'])
                        if 'AutoExport' in response['tags']:
                            if response['tags']['AutoExport']=='True':
                                list_source.append(logGroup['logGroupName'])
                        break
                    except Exception as e:
                        logger.warn(e)
                        logger.warn('エラーが発生しました。リストアップ処理をリトライします。')
                        retry += 1 
                if retry >= 3:
                    logger.error('リストアップ処理に失敗しました。エラーを確認してください。')
                    return 9
        logger.info('リストアップ処理が完了しました。')
        if len(list_source) <= 0:
            logger.info('エクスポート対象がありません。処理を終了します。')
            return 0
    else:
        logger.debug('前回のLambdaからパラメータの引継ぎを行います。')
        list_source = event['list_source']
        task_id = event['task_id']
        from_time_unix = event['from_time_unix']
        to_time_unix = event['to_time_unix']
    
    logger.debug('対象ロググループ数:' + str(len(list_source)))
    logger.debug('対象ロググループ:' + str(list_source))

    #エクスポートタスク作成 & ステータス監視 (10分間無限ループ)
    while (time.time() - start_time) < 600:
        if task_id == '':
            try:
                folder_name = list_source[0][1:] if list_source[0].startswith('/') else list_source[0]
                task_id = cwl_client.create_export_task(
                    logGroupName = list_source[0],
                    fromTime = from_time_unix,
                    to = to_time_unix,
                    destination = destination_s3,
                    destinationPrefix = year + '/' + month + '/'+day + '/' + folder_name.replace('/','-')
                )['taskId']
            except Exception as e:
                logger.warn(e)
                logger.warn(list_source[0] + 'のエクスポート処理をスキップします。')
        else:
            # リトライ付きエクスポートタスク検索処理
            retry=0
            while retry < 3:
                try:
                    response = cwl_client.describe_export_tasks(taskId=task_id)
                    break
                except Exception as e:
                    retry += 1
            if response['exportTasks'][0]['status']['code']=='COMPLETED':
                logger.debug(list_source[0] + 'のエクスポート処理が完了しました。')
                list_source.pop(0)
                logger.debug('残りの対象ロググループ数:' + str(len(list_source)))
                logger.debug('残りの対象ロググループ:' + str(list_source))
                
                if len(list_source) == 0:
                    logger.info('エクスポート処理が完了しました。')
                    return 0
                try:
                    folder_name = list_source[0][1:] if list_source[0].startswith('/') else list_source[0]
                    task_id = cwl_client.create_export_task(
                        logGroupName = list_source[0],
                        fromTime = from_time_unix,
                        to = to_time_unix,
                        destination = destination_s3,
                        destinationPrefix = year + '/' + month+ '/' +day + '/' + folder_name.replace('/','-')
                    )['taskId']
                except Exception as e:
                    logger.warn(e)
                    logger.warn(list_source[0] + 'のエクスポート処理をスキップします。')
    
    #パラメータととも次のLambdaに処理の引継ぎ
    if len(list_source) > 0:
        logger.info('引継ぎ処理を開始します。')
        lambda_client.invoke(
            FunctionName=context.function_name,
            InvocationType = 'Event',
            Payload = json.dumps({
                'list_source': list_source,
                'task_id':task_id,
                'from_time_unix':from_time_unix,
                'to_time_unix':to_time_unix
                }
            )
        )
        
    return 0

▼基本設定

S3バケット名*

権限(IAMロール設定)

Lambdaに付与するIAMロールに「logs:DescribeLogGroups」、「logs:ListTagsLogGroup」、「logs:DescribeExportTasks」、「logs:CreateExportTask」、「lambda:InvokeFunction」の権限を付与してください。

解説

紹介したLambdaのフローは以下の通りです。
  1. 初回実行 or 引継ぎ実行判定
  2. 初回実行の場合は、「AutoExport」タグがTrueのロググループをリスト化
  3. 引継ぎ実行の場合は、前回の処理状態を引継ぎ
  4. リストアップしたロググループに繰り返し処理
    1. エクスポートタスク作成
    2. エクスポートタスクのステータス確認
    3. エクスポート完了の場合は、次のロググループの処理
  5. 処理開始から10分開始した場合、次のLambdaに引継ぎ実行

CloudWatchLogsのリストアップには、「paginator」という機能を使っています。
「paginator」は、大量の応答結果が発生しうる関数に設定されていることが多いです。
通常の「describe_log_groups」を使用すると、ロググループの数によっては1回の検索で全てをリストアップすることができません。
検索しきれなかった物は、ネクストトークンと呼ばれるものを指定して再度検索する必要があります。

そのネクストトークンを使って何度も検索を行う手間を省略してくれるのが「paginator」です。
「paginator」の処理をfor文で回すことで、ネクストトークンを使用せずとも全検索結果を取得することができます。

処理時間の計測には、timeライブラリを使っています。
Lambdaであれば、標準ライブラリとして入っているのですぐに使うことができます。
「start_time = time.time()」で実行開始時間を記録して、「time.time() - start_time」の部分で現時点での処理時間を毎回計算しています。
ここで計算して出る値は、実行開始からの経過時間が秒で出ますので、600秒超えていたら繰り返し処理を抜けるようになっています。

最後にリストアップしたロググループの処理が終わっていない場合は、次のLambdaを実行開始します。
処理の状態を引継ぐ必要があるので、「Payload」に残りのロググループリスト、現在ステータス確認途中のタスクID、エクスポート期間(開始と終了)を入れて、引数として渡します。

使い方

今回紹介したソースコードのLambdaを作成し、AWS EventBridgeで日時実行するように設定してください。
S3にエクスポートしたいロググループには、タグ名「AutoExport」で値「True」のタグを設定してください。
日時で実行されるLambdaがタグ設定されたロググループの前日出力された分をS3に出力してくれます。