Tech Do | メディアドゥの技術ブログ 

株式会社メディアドゥのエンジニアによるブログです。

Goを使ってCloudWatch Logs Insightsでクエリを実行する

f:id:qazx7412:20210806163826j:plain

こんにちは。
最近ボイロ界隈の人たちが宇宙人狼をする動画を見るのにはまってしまっているエンジニアの回路です。
今回はCloudWatch Logs Insightsでaws-sdk-goを使ってクエリを実行する方法を紹介します。

まえがき

AWSを使っているとログの管理にはCloudWatch Logsを使用することが多いと思います。
LambdaやFargateを使用している場合は、特に複雑な設定の必要はなく、IAMさえ適切に設定されていればテキストをprintすることでログに記録してくれるので大変便利です。

そんなCloudWatch LogsにはInsightsという便利な機能があります。
これはシェルスクリプトライクなInsights独自のクエリ言語を使用して、ログに対して検索や集計などができる機能です。

fields @timestamp, @message
| filter @message like 'hoge'
| sort @timestamp desc
| limit 20

このようにパイプでつないでいく感じでログから検索ができます。
ロググループの画面から検索するよりも早くて楽に検索ができるので便利に使っている方も多いのではないでしょうか。

そんなInsights、使っていれば当然コンソールからだけではなく自動でログ監視とかをするのにLambdaあたりから実行できればいいのにと思いますよね?
ということでGoのプログラムからサクッと呼び出せるようにしていきましょう。

今回もサンプルは下記においてありますので参考にしてください。

github.com

本編

ではやっていきましょう。まずクライアントを用意します。

   cfg, err := config.LoadDefaultConfig(context.TODO())
    if err != nil {
        panic(err)
    }

    client := cloudwatchlogs.NewFromConfig(cfg)

aws-sdkを使っていると頻出しているいつものやつですね。
次にクエリを実行します。

   query := `
      fields @timestamp, @requestId, @timestamp, @type, @message
          | sort @timestamp desc
  `
    logGroup := "/aws/hoge/fuga"

    startQueryInput := &cloudwatchlogs.StartQueryInput{
        StartTime:     aws.Int64(from),
        EndTime:       aws.Int64(to),
        LogGroupNames: []string{logGroup},
        QueryString:   aws.String(query),
    }
    startQueryOutput, err := client.StartQuery(context.TODO(), startQueryInput)
    if err != nil {
        panic(err)
    }

基本的にこの「StartQuery」に範囲とロググループ、クエリを指定してやれば問題なくクエリを実行できます…が以前Athenaについて記事にしたときと同じくaws-sdk-goでのクエリの実行は非同期になっています。
なのでクエリの完了を待ってから結果を取り出さなくてはなりません。
クエリの結果を取り出すには下記のようにします。

   for {
        getQueryResultInput := &cloudwatchlogs.GetQueryResultsInput{
            QueryId: startQueryOutput.QueryId,
        }
        getQueryResultOutput, err := client.GetQueryResults(context.TODO(), getQueryResultInput)
        if err != nil {
            panic(err)
        }

        switch getQueryResultOutput.Status {
        case types.QueryStatusRunning, types.QueryStatusScheduled:
            time.Sleep(5 * time.Second)
        case types.QueryStatusComplete:
            return getQueryResultOutput, nil
        default:
            panic(err)
        }
    }

まずforの無限ループで実行が完了するのを待ちます。
forループの中では「GetQueryResults」を毎回実行して返ってきた内容から結果を確認し続けます。
もし実行中なら適当な時間待ってからもう一度ループ、完了していたらループを抜ける、もしそれ以外の想定外の結果が返ってきたらなんらかのエラーがあったと判断してエラー処理をします。

結果が返ってきたら最後にこれをGoで取り扱いやすい形へ変更しましょう。
まず適当なstructを用意します。

type Log struct {
    RequestId string
    Type      string
    Message   string
}

そして出力結果から値を取り出します。

   purchaseRequests := []*Log{}
    for _, results := range getQueryResultOutput.Results {
        purchaseRequest := &Log{}
        for _, result := range results {
            switch *result.Field {
            case "@requestId":
                purchaseRequest.RequestId = *result.Value
            case "@type":
                purchaseRequest.Type = *result.Value
            case "@message":
                purchaseRequest.Message = *result.Value
            default:
                continue
            }
        }
        purchaseRequests = append(purchaseRequests, purchaseRequest)
    }

「GetQueryResults」が返却する「cloudwatchlogs.GetQueryResultsOutput」の「cloudwatchlogs.GetQueryResultsOutput.Results」に二次元配列の形でクエリの実行結果が入っているので、各フィールドを見てstructに当てはめ、appendしていきます。
結果が得られたあとは煮るなり焼くなり好きに使いましょう。

   for _, item := range purchaseRequests {
        fmt.Printf("ID: %s, Type: %s, Message: %s", item.RequestId, item.Type, item.Message)
    }

f:id:qazx7412:20210806160317p:plain
個人環境の適当なlambdaのログを出力してみましたの図

最後にここまでの内容をRepositoryパターンっぽくまとめて使いやすい感じにしてみました。

package cloudwatch

import (
    "context"
    "time"

    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
    "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
)

// InsightRepository is repo for cloudwatch insight.
type InsightRepository interface {
    FindLogByRange(logGroup string, from, to int64) ([]*Log, error)
}

type insightRepoImpl struct {
    cfg aws.Config
}

// NewInsightRepoImpl returns a repository implementation.
func NewInsightRepoImpl() (InsightRepository, error) {
    // Get aws default config
    cfg, err := config.LoadDefaultConfig(context.TODO())
    if err != nil {
        panic(err)
    }

    return &insightRepoImpl{
        cfg: cfg,
    }, nil
}

func (r *insightRepoImpl) sendQuery(logGroup, query string, from, to int64) (*cloudwatchlogs.GetQueryResultsOutput, error) {
    client := cloudwatchlogs.NewFromConfig(r.cfg)
    startQueryInput := &cloudwatchlogs.StartQueryInput{
        StartTime:     aws.Int64(from),
        EndTime:       aws.Int64(to),
        LogGroupNames: []string{logGroup},
        QueryString:   aws.String(query),
    }
    startQueryOutput, err := client.StartQuery(context.TODO(), startQueryInput)
    if err != nil {
        panic(err)
    }

    // wait for the end of query on insight
    for {
        getQueryResultInput := &cloudwatchlogs.GetQueryResultsInput{
            QueryId: startQueryOutput.QueryId,
        }
        getQueryResultOutput, err := client.GetQueryResults(context.TODO(), getQueryResultInput)
        if err != nil {
            panic(err)
        }

        switch getQueryResultOutput.Status {
        case types.QueryStatusRunning, types.QueryStatusScheduled:
            time.Sleep(5 * time.Second)
        case types.QueryStatusComplete:
            return getQueryResultOutput, nil
        default:
            panic(err)
        }
    }
}

type Log struct {
    RequestId string
    Type      string
    Message   string
}

func (r *insightRepoImpl) FindLogByRange(logGroup string, from, to int64) ([]*Log, error) {
    query := `
      fields @timestamp, @requestId, @timestamp, @type, @message
          | sort @timestamp desc
  `

    res, err := r.sendQuery(logGroup, query, from, to)
    if err != nil {
        panic(err)
    }

    purchaseRequests := []*Log{}
    for _, results := range res.Results {
        purchaseRequest := &Log{}
        for _, result := range results {
            switch *result.Field {
            case "@requestId":
                purchaseRequest.RequestId = *result.Value
            case "@type":
                purchaseRequest.Type = *result.Value
            case "@message":
                purchaseRequest.Message = *result.Value
            default:
                continue
            }
        }
        purchaseRequests = append(purchaseRequests, purchaseRequest)
    }

    return purchaseRequests, nil
}

あとがき

ということで、CloudWatch Logs Insightsの実行方法をご紹介してきました。
私が担当している配信サービスでも使用しているGoからのInsightsの実行について、後で確認できるようにするためにもまとめておきたいなと思ったのでまとめてみました。
以前にAthenaをGoから実行できるようにしたときと基本的には同じ考え方で開発できたのであまり苦労はしなかったです。ただ、aws-sdk-goのバージョン間の使用方法の違いがあり、実際に実行しているコードからサンプルを作ろうとすると新しいバージョンでは実行できないことがあり、その点は少し辛かったです。
ただ、一度書いてさえしまえばinsightで自由自在にログを監視できるようになっておすすめなので、皆さんもぜひお試しいただけたらと思います。

弊社ではAWSのログ監視をうまくやっていきたいエンジニアを募集中です。
興味がございましたらエントリーはこちらからぜひよろしくお願いします。 recruit.mediado.jp