こんにちは、昨年末に新しくノートPCを注文したら年明けに新モデルが発表されてしまったショックを未だ引きずっているエンジニアの回路(@qazx7412)です。
今回は昨年末に取り組んだAthenaを利用した集計バッチの高速化についての話をしようと思います。
あらすじ
さて、私が普段関わっている配信システムには「売上集計」と呼ばれている夜間バッチがあります。
これがなにかといえば名前のとおり売上を集計する夜間バッチなのですが、配信システムにはユーザーが購入を行ったときに発行した購入キーとコンテンツIDおよびユーザーIDを紐付けて購入履歴として管理するテーブルがあり、ここから毎晩その日の購入の集計を行います。
実際のものとは異なりますがたとえばこんな感じです。
(purchase_records)
key | content_id | user_id | price | unixtime | - |
---|---|---|---|---|---|
AAAAbbbb | ccc000 | user01 | 100 | 1554440845 | (2019-04-05 14:07:25) |
XXXXyyyy | zzz999 | user02 | 200 | 1554440846 | (2019-04-05 14:07:26) |
DDDDffff | ccc000 | user02 | 100 | 1554440847 | (2019-04-05 14:07:27) |
GGGGhhhh | ccc000 | user03 | 100 | 1554440848 | (2019-04-05 14:07:28) |
この購入履歴からこんな感じに日付とIDをキーとしてその日の売上を集計します。
(daily_sales)
date | content_id | count | price |
---|---|---|---|
2019-04-05 | ccc000 | 3 | 300 |
2019-04-05 | zzz999 | 1 | 200 |
この集計は、もともと下記のようなSELECTの結果をそのままUPSERTするだけの単純なバッチでした。
-- mysql SELECT FROM_UNIXTIME(unixtime, "%Y-%m-%d") as date, content_id, COUNT(content_id) as count, SUM(price) as price FROM purchase_records WHERE unixtime BETWEEN 1554390000 AND 1554476399 -- 2019-04-05 00:00:00 〜 23:59:59 GROUP BY date, content_id LIMIT 0 OFFSET 10000
-- mysql INSERT INTO daily_sales ( date, content_id, count, price ) VALUES ('2019-04-05', 'ccc000', 3, 300), ('2019-04-05', 'zzz999', 1, 100) ON DUPLICATE KEY UPDATE count = VALUES(count) price = VALUES(price)
ところがある日、配信システムで新たに大きな書店の受け入れを行ったところ、受け入れ以降、集計が終わるのにとてつもなく時間がかかるようになってしまいました。
なんと深夜0時から1時あたりに実行を開始して終わるのは7時から8時くらいになってしまっている…という有様です。
もちろん原因は明白で、集計元となる購入履歴テーブルのレコード数が増えた結果SELECTやGROUP BY、LIMIT OFFSETに膨大な時間がかかるようになってしまったためです。
Exadataってすごかったんだなぁ…と思いつつすでにMySQLでは処理しきれない規模となったテーブルを集計するために別の手段を考えなくてはならないわけですが、ここで都合のいいことにAWSにはAthenaという大容量なデータを集計するのにはもってこいのサービスがありました。
AthenaはS3に設置したcsvなどをデータソースとしてSQLを投げて実行ができるAWSのサービスです。
一般的にはログなどを集積して分析や調査を行うために使われるケースが多いと思います。
配信システムでもすでにCloudFrontの調査などで利用していました。
いつも膨大なログを一瞬で処理してくれるAthenaならば、Exadataが処理していた膨大な購入履歴でも瞬殺できるはずです。
Athenaで高速化しちゃうぞ
ということでAuroraにあるデータをAthenaで集計するために集計バッチをさらに下記のような動きへ変更します。
- 事前準備としてAthenaでテーブルを作成
- 定期的にAuroraから一定期間分のレコードをS3へエクスポート
- AthenaでGROUP BYを実行
- Athenaの実行結果をAuroraへインポート
順番に見ていきましょう。
1. 事前準備としてAthenaでテーブルを作成
まずAthenaを利用するにはまずデータベースとテーブルを作成する必要があります。
これらの作成には一般的なRDBMSと同様にクエリを実行する方法と、マネジメントコンソールの作成用の画面を利用する方法がありますが、今回は前者で作成します。
コンソールのAthenaのQuery Editorからクエリが実行できるのでここで実行します。
実行するクエリは下記2つです。
-- Athena CREATE DATABASE athena_db CREATE EXTERNAL TABLE IF NOT EXISTS athena_db.daily_sales ( content_id string, price decimal(18,3) ) PARTITIONED BY ( date bigint ) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES ( 'serialization.format' = ',', 'field.delim' = ',' ) LOCATION 's3://tally/athena_db/daily_sales/' TBLPROPERTIES ( 'has_encrypted_data'='false' );
それぞれ上からデータベースを作成するクエリ、テーブルを作成するクエリです。
2つ目のクエリの PARTITIONED BY 〜
という見慣れない句ですがこれはAthenaのパーティションという機能を利用するためのものです。
この機能はSELECT時に読み込み範囲を制限してくれるので、実行時間、コスト(Athenaはスキャンデータ量課金)を削減できます。
具体的にはS3の指定のディレクトリにさらに ./date=<日付>
といった階層を設けてSELECT時のWHERE句で検索条件にパーティションの変数(今回の例ならdate)を指定すると指定のディレクトリのみをスキャン対象にしてくれます。
パーティションを利用するにはこの他に パーティションの作成
を行わなければならないのですが、これは今回のバッチでは集計を行う前に毎回実行するのでその時にまた説明します。
一応パーティション数にはテーブルごとに作成数制限があるのですが、上限は20000となっており日時バッチなら55年位はもつのでまあ大丈夫でしょう。
2. 定期的にAuroraから一定期間分のレコードをS3へエクスポート
次にAuroraからS3へレコードを出力できるようにします。
都合のいいことにAuroraにはS3へのエクスポート機能というものがあり、必要なIAMポリシーがアタッチされていればクエリの実行だけでS3へ出力ができます。
まずクエリはこんなかんじです。
-- MySQL (only Aurora) SELECT content_id, price FROM purchase_records WHERE unixtime BETWEEN 1554390000 AND 1554400799 -- 2019-04-05 00:00:00 〜 02:59:59 INTO OUTFILE S3 's3://tally/athena_db/daily_sales/date=20190405/sales_20190405_00.csv' FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' OVERWRITE ON
この INTO OUTFILE S3
という構文でさきほどテーブルを作成したときに指定したS3へ出力することができるので、このクエリを一定時間ごとに実行してS3へデータを送ります。
先述した通り、このクエリの実行には適切なIAMの設定が必要なのですが、これには少しクセがあるので注意が必要です。
まずこのクエリによるS3への送付なのですが、実態として送付をおこなっているのはクエリにより指示を受けたAuroraなので Aurora自体にIAMロールを付与する
必要があります。
ロールの付与はRDSのコンソールから行うことができます。
データベースの該当クラスタの管理画面の 接続とセキュリティ
の IAM ロールの管理
という項目があるのでここから付与できます。
ただAuroraの場合はもうひとつ パラメータグループで使用するロールを設定する
という手順が必要です。
管理画面の 設定
-> データベース
-> DB クラスターのパラメータグループ
からパラメータグループを設定できます。
下記のパラメータのうちどちらかに設定をすればいいので 使用したいロールのARN
を記述してください。
- aws_default_s3_role
- 通常利用するIAMロールを設定
- aurora_select_into_s3_role
- AuroraからS3へファイルを出力する時に使うロールを設定
3. AthenaでGROUP BYを実行
これでS3にファイルを用意できたので今度はAthenaで集計をします。
流したいのはこんなクエリです。
-- Athena ALTER TABLE athena_db.daily_sales ADD IF NOT EXISTS PARTITION (date='20190405') SELECT content_id, COUNT(content_id) as count, SUM(price) as price FROM athena_db.daily_sales WHERE date = 20190405 GROUP BY content_id
まず最初のクエリは前述のパーティションの作成を行うためのクエリです。
毎回いちいち値を指定して作成をするのが面倒なら以下のようなクエリでも作成ができます。
MSCK REPAIR TABLE athena_db.daily_sales
次にGROUP BYで集計をするクエリですが、WHERE句にパーティションである date
を指定しています。
前述した通りこうすることでパーティションを利用してスキャン範囲を制限できます。
ということで2つのクエリをこのバッチから実行できるようにします。
たとえばGoであれば下記のようにすればAthenaへクエリを流す事ができます。
func RunQuery(query string) (string, error) { config, err := external.LoadDefaultAWSConfig() if err != nil { panic(err) } client := athena.New(config) input := &athena.StartQueryExecutionInput{ QueryString: aws.String(query), ResultConfiguration: &athena.ResultConfiguration{ OutputLocation: aws.String("s3://tally/athena_result/"), }, } req := client.StartQueryExecutionRequest(input) output, err := req.Send() if err != nil { return "", err } // wait until Athena query completes for { stateCheckInput := &athena.GetQueryExecutionInput{ QueryExecutionId: output.QueryExecutionId, } stateCheckReq := client.GetQueryExecutionRequest(stateCheckInput) stateCheckOutput, err := stateCheckReq.Send() if err != nil { return "", err } if stateCheckOutput.QueryExecution.Status.State == athena.QueryExecutionStateSucceeded { break } switch stateCheckOutput.QueryExecution.Status.State { case athena.QueryExecutionStateQueued, athena.QueryExecutionStateRunning: time.Sleep(5 * time.Second) case athena.QueryExecutionStateSucceeded: break default: panic("error") } } return *output.QueryExecutionId, nil }
クエリを実行する StartQueryExecutionRequest
の引数になっている StartQueryExecutionInput
にクエリの他に OutputLocation
というものを指定していますがここからクエリの実行結果のS3の出力先を指定できます。
またAWS SDKではAthenaのクエリの実行が基本的に非同期になっているのでバッチ側で終了を待つ必要があります。
GetQueryExecutionRequest
に StartQueryExecutionRequest
が出力する QueryExecutionId
を指定することで実行状態が確認できるので終了まで待ちます。
4. Athenaの実行結果をAuroraへインポート
実行が終了したら最後に結果をまたAuroraへ取り込みます。
さきほどAthenaの実行時に指定した出力先へ <QueryExecutionId>.csv
という名前でcsvファイルが生成されています。
AuroraにはS3からcsvを取り込む機能もあるのですが、UPSERTには対応していないので今回はバッチ側でcsvを読み込んでクエリを生成することにしました。
cfg, err := external.LoadDefaultAWSConfig() if err != nil { panic(err) } svc := s3.New(r.cfg) input := &s3.GetObjectInput{ Bucket: aws.String("tally"), Key: aws.String(fmt.Sprintf("athena_result/%s.csv", queryExecutionId)), } req := svc.GetObjectRequest(input) output, err := req.Send() if err != nil { panic(err) } defer output.Body.Close() res, err := ioutil.ReadAll(output.Body) if err != nil { panic(err) } r := csv.NewReader(strings.NewReader(string(res))) dailySales := []*DailySales{} for index := 0; ; index++ { record, err := r.Read() if err == io.EOF { break } if index == 0 { continue } if err != nil { panic(err) } count, err := strconv.ParseFloat(record[1], 64) if err != nil { panic(err) } price, err := strconv.ParseFloat(record[2], 64) if err != nil { panic(err) } dailySales = append(dailySales, &DailySales{ Date; "2019-04-05", ContentId: record[0], Count: count, Price: price, }) }
生成するUPSERTクエリ自体は先述の INSERT 〜 ON DUPLICATE KEY
と同じです。
-- mysql (再掲) INSERT INTO daily_sales ( date, content_id, count, price ) VALUES ('2019-04-05', 'ccc000', 3, 300), ('2019-04-05', 'zzz999', 1, 100) ON DUPLICATE KEY UPDATE count = VALUES(count) price = VALUES(price)
ここまでを図にまとめるとこんな感じです。
これで今度こそ何事もなく売上を集計できるようになりました。
そしてこの変更の結果これまで何時間もかけて集計をしていたものがなんと1分未満で終わるようになりました!
Athena、改めてすごいですね…。
まとめ
ということで、Athenaを利用してバッチを高速化した話をさせていただきました。
今回のケースでは、システムが抱える問題に対してなかなかうまくAthenaの良さを引き出せたのではないかなと思います。
ただ一方、後日購入履歴の調査や抽出が必要になることがあり、テーブルには調査などにも使えるようにもう少し色々情報を入れておくべきだったなといった反省点もあったりしました。
また、こういう問題の解決に使えるような道具が揃っているのがAWSを始めとしたパブリッククラウドの利点の1つであるということを改めて認識することができたと思います。
弊社では新規開発や既存システムの移行などAWSを利用した開発を行っていますので、興味のあるエンジニアはぜひこちらからお願いいたします。