はじめに
こんにちは、Media Do Tech Do Blog初執筆のogadyです。
メディアドゥには2019年の8月に入社して、この度ついにブログ執筆させていただくことになりました!
本記事では、私のチームで運用しているバッチツールをgoroutineで高速化した話をさせていただきます。
背景
現在メディアドゥでは、二つの電子書籍取次システムを片寄せし、統合する案件を進めています。
私のチームは、システムのDBマイグレーションを行う移行・突合システムをgoで開発・運用しています。 移行・突合システムでは、移行元の取次システムのデータを移行・突合システムにインポートして、ごちゃごちゃ加工してマイグレーションしています。
この「取次システムのデータを移行システムにインポートして」という部分が曲者で、最新状態を保つため定期的・突発的に移行システムのDBを総入替するのですが、なんせ書籍のデータだけでも200万近くあるため普通にバッチで総入替を行うと4〜5時間かかっていました。データ入替中は他の作業ができなくなるため、これは改善せねばなるまいとなった訳です。
本記事の前提
- 技術要素
- 言語:Go1.11(そろそろアップデートしたい。。。)
- ORマッパー:GORM
1. 改善前
データインポート用のバッチはざっくりこんな感じに処理を行っていました。
やはり、データを引き抜いてくる→データをインサートする、部分で一気に全量のデータをシリアルに扱っているのが遅い原因ですね。ここを、Go得意の並列処理など使って高速化していきます。
2. goroutineで高速化する
goroutineの待ち受けを考える
goroutineを使う場合、それら全て終了をどう待ち受けるかが問題になります。既に先人の知見が溢れてますが、改めてまとめてみます。
1. sync.WaitGroup
まずよく使われるのはsync.WaitGroup
です。
goroutineを発行するとき、sync.WaitGroup
のAdd()
メソッドで処理待ちするgoroutine数(waitgroup)を足していきます。goroutine処理の終了時はDone()
メソッドを呼び、waitgruopをデクリメントします。waitogroupが”0”になる(全てのgoroutineがwg.Done()を呼ぶ)と、wg.Wait()
以降の処理が実行されます。
これを使ってシンプルなgoroutine待ちを実装するとこうなります。
func main() { var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) // wgをインクリメントする go func() { defer wg.Done() // wgをデクリメントする。全て終わると ”0” になる }() } wg.Wait() // wgが ”0” になるまで待つ log.Println("goroutine待ち合わせ完了") }
注意点として、Add()
メソッドはgroutine内で使ってはいけません。goroutineのスケジューリングはこちらでコントロールできないため、全てのgoroutineが生成されても動いていないこともあります。その状態で、Wait()
行まで行ってしまうと、waitgroupが ”0” のままなのでその後の処理に移ってしまいます。
また、エラーハンドリングを行う場合は、channelを利用してgoroutine内から受け取ったり、error[]
に保持して待ち合わせ後に取り出すなどの実装を行うことで実現できます。
2. sync.ErrGroup
sync.ErrGroup
も基本的な用途は変わらず、Wait()
メソッドでgoroutineの待ち合わせを行います。
異なるのは、sync.ErrGroup
はGo()
メソッドでgoroutineを生成する点と、WithContext()
メソッドで新たなContextを生成し、エラー渡せる仕組みがある点です。これにより、1つのgoroutineでエラーが発生したときに他の全てのgoroutineをキャンセルすることができます。
sync.ErrGroup
を使ったgoroutine待ちの実装
func main() { eg, ctx := errgroup.WithContext(context.Background()) // Contextも新しく生成している for i := 0; i < 10; i++ { i := i eg.Go(func() error { // サブタスクとしてgoroutineを発行する select { case <-ctx.Done(): //default内でエラーがreturnされると以降はこちらのcase文に入りgoroutine処理が停止する return nil default: err := worker() // エラーを返す関数 if err != nil { return err } return nil } }) } if err := eg.Wait(); err != nil { log.Println(err) } log.Println("goroutine待ち合わせ完了") }
今回の移行システムのバッチ処理で使用する並行処理ではエラーハンドリングが重要なので、sync.ErrGroup
を使用していきます。
実際の実装
今回goroutineで改善した処理のメインとなる部分はこんな感じに実装しました。
func (r *ContentsImport) Execute() error { //取得対象のファイルIDを全取得 cIDs, err := r.getContentIDs() if err != nil { return err } r.contentsCount = len(cIDs) //goroutine実行数を算出 r.threadCount = calcThreadCount() // runtime.NumCPU() を元にgoroutineスレッド数を決定しています。(最低3、最大6) logger.Logger.Info(fmt.Sprintf("スレッド数:%d", r.threadCount)) //1スレッド当たりのインポート件数を決定(最後に実行するスレッドで余り分も含めてインポート対象とするので、ここでは余りを考慮していません) r.regCountPerThread = r.contentsCount / r.threadCount eg, ctx := errgroup.WithContext(context.Background()) // 1スレッド当たりの登録データ取得開始位置を算出 allocateStartPos := calcCountPerThreadStart(r, i) // 1スレッド当たりの登録データ取得終了位置を算出 allocateEndPos := calcCountPerThreadEnd(r, i) db, err := infra.GetDBConnection() if err != nil { return errors.Wrap(err, "取次システムDBのコネクション取得エラー") } defer db.Close() migDB, err := infra.GetMigrateDBConnection() if err != nil { return errors.Wrap(err, "移行DBのコネクション取得エラー") } defer migDBCon.Close() // goroutine発行 eg.Go(func() error { cIDsPerThread := cIDs[allocateStartPos:allocateEndPos] //取得処理のループ数を算出 getLoopCount := r.calcGetLoopCount(len(cIDsPerThread)) for l := 0; l < getLoopCount; l++ { select { case <-ctx.Done(): return nil default: //1ループあたりの取得件数を元に、取得sqlに渡すデータの対象範囲を算出 getStartPos := r.calcCountPerGetStartPos(l) getEndPos := r.calcCountPerGetEndPos(len(cIDsPerThread), getLoopCount, l) migArray, err := r.getProductMigrationData(db, cIDsPerThread[getStartPos:getEndPos]) if err != nil { return err } err = r.bulkInsert(migDB, migArray) // Bulk Insertを行います。 if err != nil { return err } } } return nil }) if err := eg.Wait(); err != nil { return err } return nil }
3. Bulk Insertを実装する
移行システムへインサートするときも、一気にBulk Insertして高速化したいところです。
今回のシステムで使っているORマッパー:GORMはBulk Insertの機能がないので自分で実装する必要があります。
//MigrateContent は、 migration mddc_fileの情報を格納します type MigrateContent struct { ContID string // コンテンツID ContName string // コンテンツ名 TitleID string // タイトルID InsertedDate time.Time // 登録日 UpdatedDate time.Time // 更新日 } type MigrateContents []*MigrateContent // values - Bulk Insertの valuesを文字列で生成する func (ms MigrateContents) values() string { v := make([]byte, 0) for _, m := range ms { if len(v) != 0 { v = append(v, ","...) } v = append(v, fmt.Sprintf( "(%s, %s, %s, %s, %s)", ms.string(m.ContID), ms.string(m.ContName), ms.string(m.TitleID), ms.string(m.InsertedDate.Format("2006/01/02 15:04:05")), ms.string(m.UpdatedDate.Format("2006/01/02 15:04:05")), )...) } return string(v) } func (ms *MigrateContents) escapeSingleQuotation(s string) string { if strings.Contains(s, "'") { return strings.Replace(s, "'", "''", -1) } return s } func (ms *MigrateContents) string(s string) string { return "'" + ms.escapeSingleQuotation(s) + "'" } // BulkInsert - Bulk Insertを行う func (r *MigrateContentsRepositoryImpl) BulkInsert(array MigrateContents) error { return r.db.Exec( fmt.Sprintf( "insert into mddc_file "+ "("+ "cont_id, "+ "cont_name, "+ "title_id, "+ "inserted_date, "+ "updated_date "+ ") "+ "values %s", array.values(), ), ).Error }
ここでメモリ問題が発生
このバッチツールを改修したとき、最初、Bulk Insert時のメモリが大量に使用されてしまいました。これは、values
部分の文字列を生成するときに、+=
演算子を用いて結合していたためでした。他の言語を触っている人や、Goに親しんだ人にとっては当たり前かもしれませんが、基本的に+=
を使用した文字列結合は値コピーを繰り返し、メモリを圧迫してしまいます。
Goでは、string
は単なるbyte
の配列なので、[]byte
をappend
で結合すればメモリ消費を抑えることができます。これは、slice
の実態が配列のポインタであり、アドレスの値を直接参照するので、値コピーが発生しないためです(stringにキャストするときには値コピーされるようです)。他にも文字列結合にはstring.Join()
や、bytes.Buffer()
などの方法があります。
5. 改善後
上記の改善を行った結果、4〜5時間かかっていた処理を、30分前後まで短縮することに成功しました。
改善後の処理フローはこんな感じになりました。(ちょっと複雑になっちゃいました。)
goroutine使った並行処理化、強い!
改訂2版 みんなのGo言語では過度な並列処理化は推奨されていませんが、こういったユースケースではgoroutine並列化を検討してみるとかなりのパフォーマンス改善が見込まれるので、おっそいバッチ処理などでは積極的に導入していきたいですね!
6. 後書き
今回は、データインポートのバッチ処理速度の改善を行いましたが、取次システムのオンライン処理でもかなり時間がかかっている機能があります。こうした処理のパフォーマンスを改善していくことで、ユーザー体験を向上させていきたいですね。その際は、短絡的にgoroutine等に頼るのではなく、「インフラで解決できないか?」、「そもそも遅いクエリ発行してるんじゃないのか?」と総合的に検討して適切な方法を選択するのが大切だと思っています。
ところで弊社ではエンジニア大募集中ですので、もし興味のあるエンジニアがいらっしゃいましたらぜひこちらからお願いいたします。