Tech Do | メディアドゥの技術ブログ

株式会社メディアドゥのエンジニアによるブログです。

重たい集計バッチをAthenaを利用して高速化した話

f:id:qazx7412:20200203171046j:plain

こんにちは、昨年末に新しくノートPCを注文したら年明けに新モデルが発表されてしまったショックを未だ引きずっているエンジニアの回路(@qazx7412)です。

今回は昨年末に取り組んだAthenaを利用した集計バッチの高速化についての話をしようと思います。

あらすじ

さて、私が普段関わっている配信システムには「売上集計」と呼ばれている夜間バッチがあります。
これがなにかといえば名前のとおり売上を集計する夜間バッチなのですが、配信システムにはユーザーが購入を行ったときに発行した購入キーとコンテンツIDおよびユーザーIDを紐付けて購入履歴として管理するテーブルがあり、ここから毎晩その日の購入の集計を行います。

実際のものとは異なりますがたとえばこんな感じです。

(purchase_records)

key content_id user_id price unixtime -
AAAAbbbb ccc000 user01 100 1554440845 (2019-04-05 14:07:25)
XXXXyyyy zzz999 user02 200 1554440846 (2019-04-05 14:07:26)
DDDDffff ccc000 user02 100 1554440847 (2019-04-05 14:07:27)
GGGGhhhh ccc000 user03 100 1554440848 (2019-04-05 14:07:28)

この購入履歴からこんな感じに日付とIDをキーとしてその日の売上を集計します。

(daily_sales)

date content_id count price
2019-04-05 ccc000 3 300
2019-04-05 zzz999 1 200

この集計は、もともと下記のようなSELECTの結果をそのままUPSERTするだけの単純なバッチでした。

-- mysql

SELECT
  FROM_UNIXTIME(unixtime,  "%Y-%m-%d") as date,
  content_id,
  COUNT(content_id) as count,
  SUM(price) as price
FROM
  purchase_records
WHERE
  unixtime BETWEEN 1554390000 AND 1554476399 -- 2019-04-05 00:00:00 〜 23:59:59
GROUP BY
  date, content_id
LIMIT 0
OFFSET 10000
-- mysql

INSERT INTO daily_sales (
  date,
  content_id,
  count,
  price
)
VALUES
  ('2019-04-05', 'ccc000', 3, 300),
  ('2019-04-05', 'zzz999', 1, 100)
ON DUPLICATE KEY UPDATE
  count = VALUES(count)
  price = VALUES(price)

f:id:qazx7412:20200203162505p:plain

ところがある日、配信システムで新たに大きな書店の受け入れを行ったところ、受け入れ以降、集計が終わるのにとてつもなく時間がかかるようになってしまいました。
なんと深夜0時から1時あたりに実行を開始して終わるのは7時から8時くらいになってしまっている…という有様です。

もちろん原因は明白で、集計元となる購入履歴テーブルのレコード数が増えた結果SELECTやGROUP BY、LIMIT OFFSETに膨大な時間がかかるようになってしまったためです。

f:id:qazx7412:20200206184851p:plain
処理しなきゃいけない件数がとにかくいっぱい…

Exadataってすごかったんだなぁ…と思いつつすでにMySQLでは処理しきれない規模となったテーブルを集計するために別の手段を考えなくてはならないわけですが、ここで都合のいいことにAWSにはAthenaという大容量なデータを集計するのにはもってこいのサービスがありました。

f:id:qazx7412:20200203160535p:plain
Amazon Athena

AthenaはS3に設置したcsvなどをデータソースとしてSQLを投げて実行ができるAWSのサービスです。
一般的にはログなどを集積して分析や調査を行うために使われるケースが多いと思います。

配信システムでもすでにCloudFrontの調査などで利用していました。
いつも膨大なログを一瞬で処理してくれるAthenaならば、Exadataが処理していた膨大な購入履歴でも瞬殺できるはずです。

Athenaで高速化しちゃうぞ

ということでAuroraにあるデータをAthenaで集計するために集計バッチをさらに下記のような動きへ変更します。

  1. 事前準備としてAthenaでテーブルを作成
  2. 定期的にAuroraから一定期間分のレコードをS3へエクスポート
  3. AthenaでGROUP BYを実行
  4. Athenaの実行結果をAuroraへインポート

順番に見ていきましょう。

1. 事前準備としてAthenaでテーブルを作成

まずAthenaを利用するにはまずデータベースとテーブルを作成する必要があります。
これらの作成には一般的なRDBMSと同様にクエリを実行する方法と、マネジメントコンソールの作成用の画面を利用する方法がありますが、今回は前者で作成します。
コンソールのAthenaのQuery Editorからクエリが実行できるのでここで実行します。 実行するクエリは下記2つです。

-- Athena

CREATE DATABASE athena_db

CREATE EXTERNAL TABLE IF NOT EXISTS athena_db.daily_sales (
  content_id string,
  price decimal(18,3)
) PARTITIONED BY (
  date bigint
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = ',', 'field.delim' = ',' ) LOCATION 's3://tally/athena_db/daily_sales/' TBLPROPERTIES ( 'has_encrypted_data'='false' );

それぞれ上からデータベースを作成するクエリ、テーブルを作成するクエリです。
2つ目のクエリの PARTITIONED BY 〜 という見慣れない句ですがこれはAthenaのパーティションという機能を利用するためのものです。
この機能はSELECT時に読み込み範囲を制限してくれるので、実行時間、コスト(Athenaはスキャンデータ量課金)を削減できます。
具体的にはS3の指定のディレクトリにさらに ./date=<日付> といった階層を設けてSELECT時のWHERE句で検索条件にパーティションの変数(今回の例ならdate)を指定すると指定のディレクトリのみをスキャン対象にしてくれます。

パーティションを利用するにはこの他に パーティションの作成 を行わなければならないのですが、これは今回のバッチでは集計を行う前に毎回実行するのでその時にまた説明します。
一応パーティション数にはテーブルごとに作成数制限があるのですが、上限は20000となっており日時バッチなら55年位はもつのでまあ大丈夫でしょう。

2. 定期的にAuroraから一定期間分のレコードをS3へエクスポート

次にAuroraからS3へレコードを出力できるようにします。
都合のいいことにAuroraにはS3へのエクスポート機能というものがあり、必要なIAMポリシーがアタッチされていればクエリの実行だけでS3へ出力ができます。

まずクエリはこんなかんじです。

-- MySQL (only Aurora)

SELECT
  content_id,
  price
FROM
  purchase_records
WHERE
  unixtime BETWEEN 1554390000 AND 1554400799 -- 2019-04-05 00:00:00 〜 02:59:59
INTO OUTFILE S3 's3://tally/athena_db/daily_sales/date=20190405/sales_20190405_00.csv'
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
OVERWRITE ON

この INTO OUTFILE S3 という構文でさきほどテーブルを作成したときに指定したS3へ出力することができるので、このクエリを一定時間ごとに実行してS3へデータを送ります。
先述した通り、このクエリの実行には適切なIAMの設定が必要なのですが、これには少しクセがあるので注意が必要です。

まずこのクエリによるS3への送付なのですが、実態として送付をおこなっているのはクエリにより指示を受けたAuroraなので Aurora自体にIAMロールを付与する 必要があります。

ロールの付与はRDSのコンソールから行うことができます。
データベースの該当クラスタの管理画面の 接続とセキュリティIAM ロールの管理 という項目があるのでここから付与できます。

f:id:qazx7412:20200203164346p:plain

ただAuroraの場合はもうひとつ パラメータグループで使用するロールを設定する という手順が必要です。
管理画面の 設定 -> データベース -> DB クラスターのパラメータグループ からパラメータグループを設定できます。
下記のパラメータのうちどちらかに設定をすればいいので 使用したいロールのARN を記述してください。

  • aws_default_s3_role
    • 通常利用するIAMロールを設定
  • aurora_select_into_s3_role
    • AuroraからS3へファイルを出力する時に使うロールを設定

f:id:qazx7412:20200203164356p:plain

3. AthenaでGROUP BYを実行

これでS3にファイルを用意できたので今度はAthenaで集計をします。
流したいのはこんなクエリです。

-- Athena

ALTER TABLE athena_db.daily_sales
ADD IF NOT EXISTS PARTITION (date='20190405')

SELECT
  content_id,
  COUNT(content_id) as count,
  SUM(price) as price
FROM
  athena_db.daily_sales
WHERE
  date = 20190405
GROUP BY
  content_id

まず最初のクエリは前述のパーティションの作成を行うためのクエリです。
毎回いちいち値を指定して作成をするのが面倒なら以下のようなクエリでも作成ができます。

MSCK REPAIR TABLE athena_db.daily_sales

次にGROUP BYで集計をするクエリですが、WHERE句にパーティションである date を指定しています。
前述した通りこうすることでパーティションを利用してスキャン範囲を制限できます。

ということで2つのクエリをこのバッチから実行できるようにします。
たとえばGoであれば下記のようにすればAthenaへクエリを流す事ができます。

func RunQuery(query string) (string, error) {
  config, err := external.LoadDefaultAWSConfig()
  if err != nil {
    panic(err)
  }
  client := athena.New(config)

  input := &athena.StartQueryExecutionInput{
    QueryString: aws.String(query),
    ResultConfiguration: &athena.ResultConfiguration{
      OutputLocation: aws.String("s3://tally/athena_result/"),
    },
  }
  req := client.StartQueryExecutionRequest(input)
  output, err := req.Send()
  if err != nil {
    return "", err
  }

  // wait until Athena query completes
  for {
    stateCheckInput := &athena.GetQueryExecutionInput{
      QueryExecutionId: output.QueryExecutionId,
    }
    stateCheckReq := client.GetQueryExecutionRequest(stateCheckInput)
    stateCheckOutput, err := stateCheckReq.Send()
    if err != nil {
      return "", err
    }

    if stateCheckOutput.QueryExecution.Status.State == athena.QueryExecutionStateSucceeded {
      break
    }

    switch stateCheckOutput.QueryExecution.Status.State {
    case athena.QueryExecutionStateQueued, athena.QueryExecutionStateRunning:
      time.Sleep(5 * time.Second)
    case athena.QueryExecutionStateSucceeded:
      break
    default:
      panic("error")
    }
  }

  return *output.QueryExecutionId, nil
}

クエリを実行する StartQueryExecutionRequest の引数になっている StartQueryExecutionInput にクエリの他に OutputLocation というものを指定していますがここからクエリの実行結果のS3の出力先を指定できます。
またAWS SDKではAthenaのクエリの実行が基本的に非同期になっているのでバッチ側で終了を待つ必要があります。
GetQueryExecutionRequestStartQueryExecutionRequest が出力する QueryExecutionId を指定することで実行状態が確認できるので終了まで待ちます。

4. Athenaの実行結果をAuroraへインポート

実行が終了したら最後に結果をまたAuroraへ取り込みます。
さきほどAthenaの実行時に指定した出力先へ <QueryExecutionId>.csv という名前でcsvファイルが生成されています。
AuroraにはS3からcsvを取り込む機能もあるのですが、UPSERTには対応していないので今回はバッチ側でcsvを読み込んでクエリを生成することにしました。

cfg, err := external.LoadDefaultAWSConfig()
if err != nil {
  panic(err)
}

svc := s3.New(r.cfg)
input := &s3.GetObjectInput{
  Bucket: aws.String("tally"),
  Key:    aws.String(fmt.Sprintf("athena_result/%s.csv", queryExecutionId)),
}
req := svc.GetObjectRequest(input)
output, err := req.Send()
if err != nil {
  panic(err)
}
defer output.Body.Close()

res, err := ioutil.ReadAll(output.Body)
if err != nil {
  panic(err)
}

r := csv.NewReader(strings.NewReader(string(res)))

dailySales := []*DailySales{}
for index := 0; ; index++ {
  record, err := r.Read()
  if err == io.EOF {
    break
  }
  if index == 0 {
    continue
  }
  if err != nil {
    panic(err)
  }

  count, err := strconv.ParseFloat(record[1], 64)
  if err != nil {
    panic(err)
  }
  price, err := strconv.ParseFloat(record[2], 64)
  if err != nil {
    panic(err)
  }

  dailySales = append(dailySales, &DailySales{
    Date;      "2019-04-05",
    ContentId: record[0],
    Count:     count,
    Price:     price,
  })
}

生成するUPSERTクエリ自体は先述の INSERT 〜 ON DUPLICATE KEY と同じです。

-- mysql (再掲)

INSERT INTO daily_sales (
  date,
  content_id,
  count,
  price
)
VALUES
  ('2019-04-05', 'ccc000', 3, 300),
  ('2019-04-05', 'zzz999', 1, 100)
ON DUPLICATE KEY UPDATE
  count = VALUES(count)
  price = VALUES(price)

ここまでを図にまとめるとこんな感じです。 f:id:qazx7412:20200203163553p:plain

これで今度こそ何事もなく売上を集計できるようになりました。
そしてこの変更の結果これまで何時間もかけて集計をしていたものがなんと1分未満で終わるようになりました!
Athena、改めてすごいですね…。

f:id:qazx7412:20200207151249p:plain
Athena化した集計バッチのログ

まとめ

ということで、Athenaを利用してバッチを高速化した話をさせていただきました。
今回のケースでは、システムが抱える問題に対してなかなかうまくAthenaの良さを引き出せたのではないかなと思います。
ただ一方、後日購入履歴の調査や抽出が必要になることがあり、テーブルには調査などにも使えるようにもう少し色々情報を入れておくべきだったなといった反省点もあったりしました。

また、こういう問題の解決に使えるような道具が揃っているのがAWSを始めとしたパブリッククラウドの利点の1つであるということを改めて認識することができたと思います。

弊社では新規開発や既存システムの移行などAWSを利用した開発を行っていますので、興味のあるエンジニアはぜひこちらからお願いいたします。

www.wantedly.com

参考資料