Tech Do | メディアドゥの技術ブログ 

株式会社メディアドゥのエンジニアによるブログです。

goroutineでバッチ処理時間を大幅に改善した話

f:id:ogady:20200225142714p:plain

はじめに

こんにちは、Media Do Tech Do Blog初執筆のogadyです。

メディアドゥには2019年の8月に入社して、この度ついにブログ執筆させていただくことになりました!

本記事では、私のチームで運用しているバッチツールをgoroutineで高速化した話をさせていただきます。

背景

現在メディアドゥでは、二つの電子書籍取次システムを片寄せし、統合する案件を進めています。

私のチームは、システムのDBマイグレーションを行う移行・突合システムgoで開発・運用しています。 移行・突合システムでは、移行元の取次システムのデータを移行・突合システムにインポートして、ごちゃごちゃ加工してマイグレーションしています。

f:id:ogady:20200222170720p:plain
移行・突合システムイメージ図

この「取次システムのデータを移行システムにインポートして」という部分が曲者で、最新状態を保つため定期的・突発的に移行システムのDBを総入替するのですが、なんせ書籍のデータだけでも200万近くあるため普通にバッチで総入替を行うと4〜5時間かかっていました。データ入替中は他の作業ができなくなるため、これは改善せねばなるまいとなった訳です。

本記事の前提

  • 技術要素
    • 言語:Go1.11(そろそろアップデートしたい。。。)
    • ORマッパー:GORM

1. 改善前

データインポート用のバッチはざっくりこんな感じに処理を行っていました。

f:id:ogady:20200222162818p:plain
改善前フロー図

やはり、データを引き抜いてくる→データをインサートする、部分で一気に全量のデータをシリアルに扱っているのが遅い原因ですね。ここを、Go得意の並列処理など使って高速化していきます。

2. goroutineで高速化する

goroutineの待ち受けを考える

goroutineを使う場合、それら全て終了をどう待ち受けるかが問題になります。既に先人の知見が溢れてますが、改めてまとめてみます。

1. sync.WaitGroup

まずよく使われるのはsync.WaitGroupです。

golang.org

goroutineを発行するとき、sync.WaitGroupAdd()メソッドで処理待ちするgoroutine数(waitgroup)を足していきます。goroutine処理の終了時はDone()メソッドを呼び、waitgruopをデクリメントします。waitogroupが”0”になる(全てのgoroutineがwg.Done()を呼ぶ)と、wg.Wait()以降の処理が実行されます。

これを使ってシンプルなgoroutine待ちを実装するとこうなります。

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1) // wgをインクリメントする
        go func() {
            defer wg.Done() // wgをデクリメントする。全て終わると ”0” になる
        }()
    }
    wg.Wait() // wgが ”0” になるまで待つ
    log.Println("goroutine待ち合わせ完了")
}

注意点として、Add()メソッドはgroutine内で使ってはいけません。goroutineのスケジューリングはこちらでコントロールできないため、全てのgoroutineが生成されても動いていないこともあります。その状態で、Wait()行まで行ってしまうと、waitgroupが ”0” のままなのでその後の処理に移ってしまいます。

また、エラーハンドリングを行う場合は、channelを利用してgoroutine内から受け取ったり、error[]に保持して待ち合わせ後に取り出すなどの実装を行うことで実現できます。

2. sync.ErrGroup

godoc.org

sync.ErrGroupも基本的な用途は変わらず、Wait()メソッドでgoroutineの待ち合わせを行います。

異なるのは、sync.ErrGroupGo()メソッドでgoroutineを生成する点と、WithContext()メソッドで新たなContextを生成し、エラー渡せる仕組みがある点です。これにより、1つのgoroutineでエラーが発生したときに他の全てのgoroutineをキャンセルすることができます。

sync.ErrGroupを使ったgoroutine待ちの実装

func main() {
    eg, ctx := errgroup.WithContext(context.Background()) // Contextも新しく生成している
    for i := 0; i < 10; i++ {
        i := i
        eg.Go(func() error { // サブタスクとしてgoroutineを発行する
            select {
            case <-ctx.Done():
                //default内でエラーがreturnされると以降はこちらのcase文に入りgoroutine処理が停止する
                return nil
            default:
                err := worker() // エラーを返す関数
                if err != nil {
                    return err
                }
                return nil
            }
        })
    }
    if err := eg.Wait(); err != nil {
        log.Println(err)
    }
    log.Println("goroutine待ち合わせ完了")
}

今回の移行システムのバッチ処理で使用する並行処理ではエラーハンドリングが重要なので、sync.ErrGroupを使用していきます。 

実際の実装

今回goroutineで改善した処理のメインとなる部分はこんな感じに実装しました。

func (r *ContentsImport) Execute() error {
    //取得対象のファイルIDを全取得
    cIDs, err := r.getContentIDs()
    if err != nil {
        return err
    }

    r.contentsCount = len(cIDs)

    //goroutine実行数を算出
    r.threadCount = calcThreadCount() // runtime.NumCPU() を元にgoroutineスレッド数を決定しています。(最低3、最大6)
    logger.Logger.Info(fmt.Sprintf("スレッド数:%d", r.threadCount))

    //1スレッド当たりのインポート件数を決定(最後に実行するスレッドで余り分も含めてインポート対象とするので、ここでは余りを考慮していません)
    r.regCountPerThread = r.contentsCount / r.threadCount

    eg, ctx := errgroup.WithContext(context.Background())

    // 1スレッド当たりの登録データ取得開始位置を算出
    allocateStartPos := calcCountPerThreadStart(r, i)
    // 1スレッド当たりの登録データ取得終了位置を算出
    allocateEndPos := calcCountPerThreadEnd(r, i)

    db, err := infra.GetDBConnection()
    if err != nil {
        return errors.Wrap(err, "取次システムDBのコネクション取得エラー")
    }
    defer db.Close()

    migDB, err := infra.GetMigrateDBConnection()
    if err != nil {
        return errors.Wrap(err, "移行DBのコネクション取得エラー")
    }
    defer migDBCon.Close()

    // goroutine発行
    eg.Go(func() error {
        cIDsPerThread := cIDs[allocateStartPos:allocateEndPos]

        //取得処理のループ数を算出
        getLoopCount := r.calcGetLoopCount(len(cIDsPerThread))

        for l := 0; l < getLoopCount; l++ {
            select {
            case <-ctx.Done():
                return nil

            default:
                //1ループあたりの取得件数を元に、取得sqlに渡すデータの対象範囲を算出
                getStartPos := r.calcCountPerGetStartPos(l)
                getEndPos := r.calcCountPerGetEndPos(len(cIDsPerThread), getLoopCount, l)

                migArray, err := r.getProductMigrationData(db, cIDsPerThread[getStartPos:getEndPos])
                if err != nil {
                    return err
                }

                err = r.bulkInsert(migDB, migArray) // Bulk Insertを行います。
                if err != nil {
                    return err
                }
            }
        }
        return nil
    })

    if err := eg.Wait(); err != nil {
        return err
    }
    return nil
}

3. Bulk Insertを実装する

移行システムへインサートするときも、一気にBulk Insertして高速化したいところです。

今回のシステムで使っているORマッパー:GORMはBulk Insertの機能がないので自分で実装する必要があります。

//MigrateContent は、 migration mddc_fileの情報を格納します
type MigrateContent struct {
    ContID          string         // コンテンツID
    ContName        string         // コンテンツ名
    TitleID         string         // タイトルID
    InsertedDate    time.Time       // 登録日
    UpdatedDate     time.Time       // 更新日
}


type MigrateContents []*MigrateContent

// values - Bulk Insertの valuesを文字列で生成する
func (ms MigrateContents) values() string {
    v := make([]byte, 0)
    for _, m := range ms {
        if len(v) != 0 {
            v = append(v, ","...)
        }

        v = append(v, fmt.Sprintf(
            "(%s, %s, %s, %s, %s)",
            ms.string(m.ContID),
            ms.string(m.ContName),
            ms.string(m.TitleID),
            ms.string(m.InsertedDate.Format("2006/01/02 15:04:05")),
            ms.string(m.UpdatedDate.Format("2006/01/02 15:04:05")),
        )...)
    }
    return string(v)
}

func (ms *MigrateContents) escapeSingleQuotation(s string) string {
    if strings.Contains(s, "'") {
        return strings.Replace(s, "'", "''", -1)
    }
    return s
}

func (ms *MigrateContents) string(s string) string {
    return "'" + ms.escapeSingleQuotation(s) + "'"
}

// BulkInsert - Bulk Insertを行う
func (r *MigrateContentsRepositoryImpl) BulkInsert(array MigrateContents) error {
    return r.db.Exec(
        fmt.Sprintf(
            "insert into mddc_file "+
                "("+
                "cont_id, "+
                "cont_name, "+
                "title_id, "+
                "inserted_date, "+
                "updated_date "+
                ") "+
                "values %s",
            array.values(),
        ),
    ).Error
}

ここでメモリ問題が発生

このバッチツールを改修したとき、最初、Bulk Insert時のメモリが大量に使用されてしまいました。これは、values部分の文字列を生成するときに、+=演算子を用いて結合していたためでした。他の言語を触っている人や、Goに親しんだ人にとっては当たり前かもしれませんが、基本的に+=を使用した文字列結合は値コピーを繰り返し、メモリを圧迫してしまいます。

Goでは、stringは単なるbyteの配列なので、[]byteappendで結合すればメモリ消費を抑えることができます。これは、sliceの実態が配列のポインタであり、アドレスの値を直接参照するので、値コピーが発生しないためです(stringにキャストするときには値コピーされるようです)。他にも文字列結合にはstring.Join()や、bytes.Buffer()などの方法があります。

5. 改善後

上記の改善を行った結果、4〜5時間かかっていた処理を、30分前後まで短縮することに成功しました。

改善後の処理フローはこんな感じになりました。(ちょっと複雑になっちゃいました。)

f:id:ogady:20200222164607p:plain
改善後の処理フロー

goroutine使った並行処理化、強い!

改訂2版 みんなのGo言語では過度な並列処理化は推奨されていませんが、こういったユースケースではgoroutine並列化を検討してみるとかなりのパフォーマンス改善が見込まれるので、おっそいバッチ処理などでは積極的に導入していきたいですね!

6. 後書き

今回は、データインポートのバッチ処理速度の改善を行いましたが、取次システムのオンライン処理でもかなり時間がかかっている機能があります。こうした処理のパフォーマンスを改善していくことで、ユーザー体験を向上させていきたいですね。その際は、短絡的にgoroutine等に頼るのではなく、「インフラで解決できないか?」、「そもそも遅いクエリ発行してるんじゃないのか?」と総合的に検討して適切な方法を選択するのが大切だと思っています。

ところで弊社ではエンジニア大募集中ですので、もし興味のあるエンジニアがいらっしゃいましたらぜひこちらからお願いいたします。

www.wantedly.com

7. 参考資料など