読者です 読者をやめる 読者になる 読者になる

GoとMySQLを用いたジョブキューシステムを作るときに考えたこと

Go MySQL JobQueue PostgreSQL

この記事ははてなエンジニアアドベントカレンダー2014の4日目です。 前回は Mackerelで採用している技術一覧とその紹介 - Hatena Developer Blog でした。

今回は、社内の開発合宿でGo言語でジョブキューシステムを実装したときに考えたことのうち、主にサーバ運用視点でのアーキテクチャ設計について紹介します。 ジョブキュー(メッセージキュー)は、単なる非同期処理をしたいというだけでなく、今年流行したマイクロサービスアーキテクチャにおけるサービス間の連携などにもよく用いられているという点で、今後ホットになる話題だと思います。

社内ではジョブキューと呼んでいますが、ジョブのレスポンスを受け取らないため、厳密にはメッセージキューと呼ぶ気がします。ジョブキュー=メッセージキューとして読んでもらっても差し支えないと思います。

背景

社内では伝統的に、TheSchwartz + WorkerManager という MySQL をストレージとして用いたジョブキュー&ワーカーシステムを使用していました。 長らく安定稼働していたのですが、ジョブの投入と処理部分を基本的には Perl で書かなければならないため、Scala などの他言語から扱いにくいという問題があります。 その他にも例えば、CPU/メモリの使用量が多いまたは実行時間の長いジョブがワーカーノードのリソースを専有するため、比較的軽いジョブがキューに滞留してしまう問題があります。 さらに、キューに一定数以上のジョブが滞留したときに、ワーカーが処理するジョブを取得するクエリが遅くなるという問題があり、この場合ワーカーの並列数を増やしても結局ジョブ取得クエリがボトルネックになってワーカー数に対してジョブの処理スループットがスケールしません。

まとめると、以下の3つの問題があります。

  • 言語依存
  • 一部の重いジョブによるワーカーのリソース占有
  • キューにジョブが滞留したときのジョブ取得クエリの速度低下

これらの問題を解決できる既存の実装はないと考えて、自分たちでジョブキューシステムを実装することを考えました。

関連調査

実践ハイパフォーマンスMySQL 第3版 O'REILLY

p265-268の 6.8.1章 「MySQL でキューテーブルを作成する」にて、MySQL でのキュー作成についての記述があります。 MySQL をキューに使用する 5 つの微妙な方法とその落とし穴 (翻訳版) - Engine Yard Blog にもほぼ同じような内容が書かれています。

一般的なキューテーブルのパターンは、未処理の行、処理中の行、処理済みの行という3種類の行を含んだテーブルを作成します。 1つ以上のワーカープロセスが未処理の行を検索し、それらの行を更新して「claimed」マークを付け、処理が実行されたら「完了」マークを付けます。 この方法には、2つの問題点があります。

1つは、ワーカーの未処理の行取得にポーリングとロックが使用されるということです。ポーリングはサーバに負荷をかけ、ロックはワーカープロセス間に競合と直接をもたらします。 ポーリングの回避するには、ワーカープロセスに通知すればよいです。通知方法は、ワーカーが非常に長いSLEEP() 関数を使って待機しておいて、通知するときに、KILLするというやり方や、GET_LOCK()関数とRELEASE_LOCK関数を使ったり、別のメッセージングサービスを使うという方法があります。

もう1つの問題は、ワーカーが行のマーキングにSELECT FOR UPDATEを使う実装が多いということです。 そうすると、トランザクションが互いにブロックして待機するため、通常はスケーラビリティが大きく損なわれてしまうことになります。 ほとんどの場合はもっとよい方法があります。 単純なUPDATEを使って行をマークした後、SELECTすればよいだけです。 マークには、ジョブを処理しているのは誰かという点と、ジョブの状態の2種類あります。誰がジョブを処理しているかはownerカラムを用意して、MySQLのCONNECTION_ID()を格納します。ジョブの状態については、別途stateカラムを用意して、必要なジョブの状態を管理します。

最後に、マークされたものの、ワーカープロセスが異常終了したなどの理由で処理されなかった行をクリーンアップする必要があります。 これは簡単で、UPDATEを定期的に実行してそれらをリセットすればよいです。 SHOW PROCESSLISTで現在サーバに接続しているすべてのスレッドID以外のスレッドIDでマークされてる行は、もはや処理されることがないため、クリーンアップします。

より詳しい内容については書籍の方を参照してください。 クリーンアップについては、MySQLのイベントスケジューラを使えばMySQLだけで完結してできそうだなと思いました。

TheSchwartz について

TheSchwartz は奇しくも Go の生みの親である bradfitz のプロダクトです。 TheSchwartz のキュー実装も基本的には上述の手法に則っていますが、「claimed」マークを付けるといったことはせずに、各ワーカープロセスが50件ずつ取得してランダムに1つのジョブを選択することにより、他のワーカープロセスが同じジョブをある程度取得しないようにしています。雑なやり方ですが、一応ロックなしでジョブを取得できていることになります。

TheSchwartz のジョブテーブルは以下のようになっています。(https://github.com/saymedia/TheSchwartz/blob/master/doc/schema.sql#L7-20)

CREATE TABLE job (
        jobid           BIGINT UNSIGNED PRIMARY KEY NOT NULL AUTO_INCREMENT,
        funcid          INT UNSIGNED NOT NULL,
        arg             MEDIUMBLOB,
        uniqkey         VARCHAR(255) NULL,
        insert_time     INTEGER UNSIGNED,
        run_after       INTEGER UNSIGNED NOT NULL,
        grabbed_until   INTEGER UNSIGNED NOT NULL,
        priority        SMALLINT UNSIGNED,
        coalesce        VARCHAR(255),
        INDEX (funcid, run_after),
        UNIQUE(funcid, uniqkey),
        INDEX (funcid, coalesce)
);

さらに、ジョブ取得クエリは以下のようになっており、funcid により現存するジョブの種類で絞り込んで、run_after, grabbed_until により実行時間が未来のものを除外して、 priority の高い順にソートしています。このあとランダムに1件取得し、ワーカーに処理させます。run_after が特に設定されていないようなジョブばかり投入された場合、インデックスをみる限り、funcid による絞り込みしか効かなさそうということがわかります。

SELECT * FROM job WHERE (job.funcid IN ('')) AND (job.run_after <= UNIX_TIMESTAMP()) AND (job.grabbed_until
 <= UNIX_TIMESTAMP()) ORDER BY priority DESC LIMIT 50

世の中のジョブキュー

TheSchwartz 以外のジョブキューとして、Rescue(JVM系言語 の場合は Jescue) 、QueRabbitMQQ4M などを検討しました。

まず、キューのストレージとして Redis を選択すると、オンメモリデータベースであるため、当然スループットの向上を期待できますが、フェイルオーバ時のデータの一貫性に問題があるためジョブをロストしてしまう可能性があります。 (厳密には、逐次書き込みモードにしておけば一貫性は向上するが、パフォーマンスは低下するというトレードオフがあるというのが正しい。また、ロストしてしまっても問題ないようなジョブの場合には問題ない。)

次に、RabbitMQ についてはメッセージ・キューとして、様々な機能を揃えており、開発も活発で、かなり大規模なサービスで運用されているという実績もあります。ただし、機能が多すぎて運用が大変そう、Erlangで書かれているのでいざというときコードを読めるか不安、Perl のクライアントがまともに使えるのか、そもそも AMQP が結構複雑ということもあって、採用を見送っています。

卜部昌平のあまりreblogしないtumblr - RabbitMQ と再送について

さらに、PostgreSQL を用いた Que は実践ハイパフォーマンスMySQLで書かれていたポイントを抑えられており、かなり筋がよさそうと思ったのですが、Ruby からでないと使えないので、実装を参考にする程度にしています。(PostgreSQLとジョブキューについては、後ほどでてきます)

最後に、Q4Mは、MySQLのストレージエンジンとしてキューを実装しており、キューとしてのパフォーマンスも最適化されているという点と信頼のMySQLという点でかなり魅力的な選択肢だったのですが、いざ問題が起きたときにMySQLのストレージエンジンのレイヤまで潜らなければならない点がネックでした。

その他、汎用的な実装ではないですが MogileFS 内部のジョブキュー実装も参考にしました(https://github.com/mogilefs/MogileFS-Server/blob/master/lib/MogileFS/Store.pm#L751-767) これもまた、bradfitz 作のプロダクトですね。

提案実装

上記を踏まえて、さいきょうのじょぶきゅーしすてむのアーキテクチャを考えました。

言語非依存なアーキテクチャ

ジョブキューを言語に依存させないために、以下の2つの工夫を考えました。

  • ジョブをHTTPにより投入できるようにする。
  • ジョブを処理するワーカー部分をWebアプリケーションのエンドポイント(POST /jobs/send-too-many-mails)として実装する。

ジョブ投入の際に、パラメータとしてワーカーのエンドポイントを含めておき、ジョブの dispatcher がそのエンドポイントに対してリクエストするという流れになります。 パラメータはJSONにしておき、キューにもJSONで格納されて、エンドポイントにもそのままJSONで渡されます。 TheSchwartz のようにキューテーブルでジョブの種類(funcid)を管理せずに、素朴にジョブに含まれるエンドポイントを叩きます。 ジョブの投入もWebアプリケーションサーバから行い、ジョブの処理もWebアプリケーションサーバが行うことになるので、一周しているように見えますが、ユーザのリクエスト処理にワーカーの処理が影響しないように、実際にはワーカー用のアプリケーションサーバを別途用意します。

信頼性のあるデータストレージ

ジョブをロストするリスクをできるかぎり小さくしたいという要求があります。 したがって、データ構造としてのキューを実装するには向いているとはいえないものの、実績のある RDBMS(MySQLやPostgreSQL)を使うのが自然だと思いました。 自分たちでジョブをディスクにストアする仕組みを実装する方針もありえなくはないですが、実装コストやメンテナンスコスト、さらに高可用性のための仕組みを考えると、既存のストレージに乗っかるのがどう考えても安心感があります。(高可用性については、DRBDを使えばなんとでもなる気がしますが、あまり低レイヤな仕組みに頼るといざ問題が起こった時の調査に困るという問題があります。)

Go 言語の採用

サーバ運用の観点だけに限定すると、以下の2つの理由で Go を採用しました。

まず、多数のワーカープロセスに、HTTPでコネクションを張り続けることを考えると、非同期的にリクエストを投げやすいものがよいという点です。(Go は非同期的にI/Oを実行するために、AIO や epoll(7)&ノンブロッキングI/Oではなく、スレッドを1つ作成してブロッキングI/Oさせるらしい(未確認)ので、コネクション数がどこまでスケールするかはわからない)

次に、どのサービスでもジョブキューシステムを使うことが多く、ローカルで環境構築をすることも多いので、依存がないかつクロスコンパイルができる Go がよいと思いました。

全体構成

以下に、全体構成を図示します。図の矢印はデータの流れを示しています。

f:id:y_uuki:20141204232859p:plain

アプリケーションサーバから HTTP の POST でジョブが投入されて、webでジョブを受けます。webは API サーバで、ジョブの投入を受け付けると、DB のキューテーブルにジョブを格納します。 dispatcherがキューテーブルに対して、定期的にポーリングをかけて、新規ジョブを取得します。取得したジョブの中のエンドポイントに対して、ジョブに含まれたパラメータを付加して、ワーカーアプリケーションサーバにHTTP POSTします。

単一の dispatcher による複数ジョブの同時取得

各ワーカープロセスが1件だけジョブを取得しているから遅いのであって、複数のジョブをまとめて取得して実行すれば効率がよいはずです。 さらに、TheSchwartz のようにキューテーブルでジョブの種類(funcid)を管理しない方針なので、funcid による絞り込みが不要になり、クエリの効率も多少上がると思います。 さらに性能面では、1つのキューに対して1つの dispatcher で十分となるため、基本的にはそもそもマークする必要がないと考えます。 キューのスループットを上げたい場合は、同時に取得するジョブ数を増やします。 実際には、dispatcher の異常終了のために、未処理の行をクリーンアップする必要などがあるため、ジョブ取得時に、owner カラムと status カラムの更新は必要です。 とはいえ、複数の dispatcher が互いにブロックすることがないので、安心感があります。

1つの dispatcher で十分とはいえ、異常終了する場合などを考えて dispatcher を冗長化する必要があります。 下の図に冗長化した場合の構成を示します。

f:id:y_uuki:20141204232909p:plain

マーキングをしているとはいえ、性能面で1つの dispatcher で問題ないなら、なるべく dispatcher 2 には仕事をさせたくありません。 そこで、MySQL の汎用ロックを使って、同時に1つの dispatcher しか動作しないようにします。 具体的には、dispatcher 1 が常にGET_LOCK でロックをとっておき、dispatcher 2 はIS_FREE_LOCK()でロックが解放されてないかポーリングでチェックします。 dispatcher 1 が何かの原因で終了すると、MySQLとのセッションが切れて、ロックが解放されるので、dispatcher 2 がGET_LOCKして仕事をし始めることになります。

イベント通知によるジョブ取得

基本的にポーリングで問題ないと考えていますが、イベント通知したい場合は、さきほどの図のwebdispatcherの部分を1プロセスにまとめて、それぞれ goroutine で実装するというやり方があります。 webがアプリケーションサーバからジョブの投入を受け付けて、MySQL に一旦格納したあと、channel を用いてdispatcherにイベント通知します。 このあたりのプロセス間通信的なやりとりが UNIX ドメインソケットを使う場合などと比べて、Go を使うと比較的簡単になるのがよいですね。

キューの分散によるジョブルーティング

一部の重いジョブによるワーカーのリソース占有という問題に対処するために、複数のキューを用意することを考えました。 複数のキューを用意する場合、いくつかの戦略があり、例えば優先度ごとにキューを用意したり、consistent-hashing でジョブ名に対してハッシュ値を計算して分散させたりなどがあります。

今回は、一部の重いジョブを専用のキューに押し込んで、そのキューを担当する dispatcher の最大ジョブ取得数を絞って、同時に重いジョブを処理するワーカー数を少なくすることができればよいはずです。 あらかじめ、アプリケーションを実装する場合に、どのジョブが問題になるかわかっていればよいですが、実際には動いてみないとわからない事が多いです。 アプリケーション側でジョブの処理優先度を付けるようにするなどの方法だと、問題が起きたときにわざわざアプリケーションをデプロイしなくてはならず面倒です。

そこで、なるべくインフラチームがその場で対応しやすいように、キューの増減およびジョブとキューのルーティング割り当てをAPIで管理出来るようにしたいと考えました。 具体的には、ジョブ名とキューのルーティング割り当てを専用のテーブルで管理しておき、APIで更新できるようにしておきます。 これにより、ジョブごとの処理時間のログを吐かせて、Fluentd & Kibana などで可視化しておけば、重いジョブが何かわかるため、そのときにAPIを叩いて、キューを増やして、重いジョブをそのキューに回すことができます。(一応、キューのスループットを上げるために、ルーティング登録の前段でconsitent-hashingで分散する案も考えてはいます。)

PostgreSQLの場合

実装はMySQLを使っていますが、PostgreSQLをバックエンドとした場合、どのようなメリットがあるかを紹介します。 PostgreSQL は社内では Mackerelで採用している技術一覧とその紹介 - Hatena Developer Blog に書かれているように、Mackerel で本番投入しています。

まず、行のマーキングをするために、Advisory Locks が使えます。 Advisory Locks は、MySQLにおけるGET_LOCK()などと同じ汎用ロックですが、1セッションあたり1つしかロックを作れないGET_LOCK()と違い、1セッションあたり複数の名前でロックをとれるので、MVCC的なロックとは別に各行に対して汎用ロックをとれます。(この辺、理解に少し自信がない) https://github.com/chanks/que/blob/master/lib/que/sql.rb#L6 行の state カラムの更新などがいらず、実際に書き込みを行わないため、マーキングが高速であるというメリットがあります。

Advisory Locks だとマーキングの状態がロックされている or されていないの2種類しかないので、2状態以上の状態がほしい、つまり state カラムや owner カラムが場合もあります。 そのときは、UPDATE文のRETURINGを使うと、更新された行を取得できるので、MySQLのようにマーキングにUPDATESELECT する必要がなく1クエリで済みます。

その他、ジョブのパラメータをJSONで表現しているため、PostgreSQL のJSON型を使うと、JSONの中身に対して条件を付加してクエリを実行できるので、問題調査に役立つかもしれません。

ポーリングとイベント通知の動的なモード変更

思いつき段階ですが、一応書きます。 ただし、仕組みが複雑になりすぎるので、実装が難しいかつメンテナンスが難しいまたは実際には必要ない可能性などを考慮して、実際には実装しないほうがよさそうだと思っています。

ポーリングは効率が悪く、イベント通知が効率がよく思えますが、それはワークロードが小さいときの話です。 今回のシステムではdispatcherが同時に複数のジョブを取得するという前提がありますので、1つ1つのジョブについてイベント通知が発生してしまい、却って性能が低下するということが考えられます。

ポーリングとイベント通知については、ネットワークパケットを受信するためのOSの仕組みに似たような課題があります。 通常、NICで受け取ったパケットは、NICからカーネルに割り込みをかけることにより、カーネルに渡されます。つまり、カーネルはイベントを待ち受けていて、NICがイベントを発生させます。 ところが、高パケットレート環境では、パケットが到着する度に、CPUが割り込みを受けることになるので、CPU利用率がボトルネックになります。 そこで、高パケットレート時にはNAPIという仕組みで、NICからイベントを通知するのではなく、カーネルからNICに対してポーリングをかけて複数のパケットを同時に受信するということをしています。(NICドライバ用対応)

ジョブキューにおいても、ジョブの投入レートをメモリに記憶させておいて、投入レートが閾値以上ならモードを切り替えることはできそうです。

NAPIについては、以下のドキュメントが詳しいです。 napi | The Linux Foundation

また、モード切り替えでなく、ジョブの投入レートから、ポーリング間隔を動的に決定することもできそうです。 今回のジョブキューの場合、ポーリング間隔とdispatcherの最大ジョブ取得数から最大スループットは決まりそうなので、投入レート=最大スループットとすると、ポーリング間隔が決定できそうな気はします。

NICドライバのポーリング頻度調整周りの実装については EC2でSR-IOVを使うときのNICドライバパラメータ検証 - ゆううきブログ に書いています。

参考資料

まとめ

Go と MySQL でジョブキューを実装するときに考えたことを主にサーバ運用視点で紹介しました。 ある程度動くものはできあがっているので、本番に投入してある程度実績を積んだらオープンソースにしたいと思っています。(僕は基本的にアーキテクチャ設計とかいろいろな調査とかベンチマーク環境の準備をしているだけですが) 個人的には、新しいと思っていた課題が、OSカーネルの伝統的な課題に置き換えたりできることがすごくおもしろいと思いました。 今回はパケット受信だけ紹介しましたが、他にスループットとレイテンシの最適化周りで、TCPのスライディングウィンドウとか、OSのプロセススケジューリングや、失敗ジョブの世代間ガベージコレクションなどなんとなく古典的な問題に置き換えられそうな問題がありました。 ジョブキュー/メッセージキューの実装について、よりよいアーキテクチャや最適化方法をご存知の方は教えていただけるとうれしいです。

次の担当は id:nanto_vi さんです。よろしくお願いします!