こんにちは。
最近ボイロ界隈の人たちが宇宙人狼をする動画を見るのにはまってしまっているエンジニアの回路です。
今回はCloudWatch Logs Insightsでaws-sdk-goを使ってクエリを実行する方法を紹介します。
まえがき
AWSを使っているとログの管理にはCloudWatch Logsを使用することが多いと思います。
LambdaやFargateを使用している場合は、特に複雑な設定の必要はなく、IAMさえ適切に設定されていればテキストをprintすることでログに記録してくれるので大変便利です。
そんなCloudWatch LogsにはInsightsという便利な機能があります。
これはシェルスクリプトライクなInsights独自のクエリ言語を使用して、ログに対して検索や集計などができる機能です。
fields @timestamp, @message | filter @message like 'hoge' | sort @timestamp desc | limit 20
このようにパイプでつないでいく感じでログから検索ができます。
ロググループの画面から検索するよりも早くて楽に検索ができるので便利に使っている方も多いのではないでしょうか。
そんなInsights、使っていれば当然コンソールからだけではなく自動でログ監視とかをするのにLambdaあたりから実行できればいいのにと思いますよね?
ということでGoのプログラムからサクッと呼び出せるようにしていきましょう。
今回もサンプルは下記においてありますので参考にしてください。
本編
ではやっていきましょう。まずクライアントを用意します。
cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { panic(err) } client := cloudwatchlogs.NewFromConfig(cfg)
aws-sdkを使っていると頻出しているいつものやつですね。
次にクエリを実行します。
query := ` fields @timestamp, @requestId, @timestamp, @type, @message | sort @timestamp desc ` logGroup := "/aws/hoge/fuga" startQueryInput := &cloudwatchlogs.StartQueryInput{ StartTime: aws.Int64(from), EndTime: aws.Int64(to), LogGroupNames: []string{logGroup}, QueryString: aws.String(query), } startQueryOutput, err := client.StartQuery(context.TODO(), startQueryInput) if err != nil { panic(err) }
基本的にこの「StartQuery」に範囲とロググループ、クエリを指定してやれば問題なくクエリを実行できます…が以前Athenaについて記事にしたときと同じくaws-sdk-goでのクエリの実行は非同期になっています。
なのでクエリの完了を待ってから結果を取り出さなくてはなりません。
クエリの結果を取り出すには下記のようにします。
for { getQueryResultInput := &cloudwatchlogs.GetQueryResultsInput{ QueryId: startQueryOutput.QueryId, } getQueryResultOutput, err := client.GetQueryResults(context.TODO(), getQueryResultInput) if err != nil { panic(err) } switch getQueryResultOutput.Status { case types.QueryStatusRunning, types.QueryStatusScheduled: time.Sleep(5 * time.Second) case types.QueryStatusComplete: return getQueryResultOutput, nil default: panic(err) } }
まずforの無限ループで実行が完了するのを待ちます。
forループの中では「GetQueryResults」を毎回実行して返ってきた内容から結果を確認し続けます。
もし実行中なら適当な時間待ってからもう一度ループ、完了していたらループを抜ける、もしそれ以外の想定外の結果が返ってきたらなんらかのエラーがあったと判断してエラー処理をします。
結果が返ってきたら最後にこれをGoで取り扱いやすい形へ変更しましょう。
まず適当なstructを用意します。
type Log struct { RequestId string Type string Message string }
そして出力結果から値を取り出します。
purchaseRequests := []*Log{} for _, results := range getQueryResultOutput.Results { purchaseRequest := &Log{} for _, result := range results { switch *result.Field { case "@requestId": purchaseRequest.RequestId = *result.Value case "@type": purchaseRequest.Type = *result.Value case "@message": purchaseRequest.Message = *result.Value default: continue } } purchaseRequests = append(purchaseRequests, purchaseRequest) }
「GetQueryResults」が返却する「cloudwatchlogs.GetQueryResultsOutput」の「cloudwatchlogs.GetQueryResultsOutput.Results」に二次元配列の形でクエリの実行結果が入っているので、各フィールドを見てstructに当てはめ、appendしていきます。
結果が得られたあとは煮るなり焼くなり好きに使いましょう。
for _, item := range purchaseRequests { fmt.Printf("ID: %s, Type: %s, Message: %s", item.RequestId, item.Type, item.Message) }
最後にここまでの内容をRepositoryパターンっぽくまとめて使いやすい感じにしてみました。
package cloudwatch import ( "context" "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types" ) // InsightRepository is repo for cloudwatch insight. type InsightRepository interface { FindLogByRange(logGroup string, from, to int64) ([]*Log, error) } type insightRepoImpl struct { cfg aws.Config } // NewInsightRepoImpl returns a repository implementation. func NewInsightRepoImpl() (InsightRepository, error) { // Get aws default config cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { panic(err) } return &insightRepoImpl{ cfg: cfg, }, nil } func (r *insightRepoImpl) sendQuery(logGroup, query string, from, to int64) (*cloudwatchlogs.GetQueryResultsOutput, error) { client := cloudwatchlogs.NewFromConfig(r.cfg) startQueryInput := &cloudwatchlogs.StartQueryInput{ StartTime: aws.Int64(from), EndTime: aws.Int64(to), LogGroupNames: []string{logGroup}, QueryString: aws.String(query), } startQueryOutput, err := client.StartQuery(context.TODO(), startQueryInput) if err != nil { panic(err) } // wait for the end of query on insight for { getQueryResultInput := &cloudwatchlogs.GetQueryResultsInput{ QueryId: startQueryOutput.QueryId, } getQueryResultOutput, err := client.GetQueryResults(context.TODO(), getQueryResultInput) if err != nil { panic(err) } switch getQueryResultOutput.Status { case types.QueryStatusRunning, types.QueryStatusScheduled: time.Sleep(5 * time.Second) case types.QueryStatusComplete: return getQueryResultOutput, nil default: panic(err) } } } type Log struct { RequestId string Type string Message string } func (r *insightRepoImpl) FindLogByRange(logGroup string, from, to int64) ([]*Log, error) { query := ` fields @timestamp, @requestId, @timestamp, @type, @message | sort @timestamp desc ` res, err := r.sendQuery(logGroup, query, from, to) if err != nil { panic(err) } purchaseRequests := []*Log{} for _, results := range res.Results { purchaseRequest := &Log{} for _, result := range results { switch *result.Field { case "@requestId": purchaseRequest.RequestId = *result.Value case "@type": purchaseRequest.Type = *result.Value case "@message": purchaseRequest.Message = *result.Value default: continue } } purchaseRequests = append(purchaseRequests, purchaseRequest) } return purchaseRequests, nil }
あとがき
ということで、CloudWatch Logs Insightsの実行方法をご紹介してきました。
私が担当している配信サービスでも使用しているGoからのInsightsの実行について、後で確認できるようにするためにもまとめておきたいなと思ったのでまとめてみました。
以前にAthenaをGoから実行できるようにしたときと基本的には同じ考え方で開発できたのであまり苦労はしなかったです。ただ、aws-sdk-goのバージョン間の使用方法の違いがあり、実際に実行しているコードからサンプルを作ろうとすると新しいバージョンでは実行できないことがあり、その点は少し辛かったです。
ただ、一度書いてさえしまえばinsightで自由自在にログを監視できるようになっておすすめなので、皆さんもぜひお試しいただけたらと思います。
弊社ではAWSのログ監視をうまくやっていきたいエンジニアを募集中です。
興味がございましたらエントリーはこちらからぜひよろしくお願いします。 recruit.mediado.jp