Webサービスの処理に時間のかかる部分がある場合、それをQueueに入れて別プロセスのWorkerに処理をさせたいという部分があります。
例えば、メールを送信する処理であったり、画像の変換だったりですね。
Go言語においては最初はGo-Routineを利用すれば、非同期に処理ができたり、responseを返した後に処理を継続するということができるかなと思っていたのですが、responseが終了した時点でContextが閉じられてしまうせいなのかうまく動作せず、このあたりの方法を探っていたところでした。
Google App Engineには標準でQueueとWorkerを実行するための仕組みが備わっています。今日はその辺りをまとめていきます。
TaskQueue, PushQueue, PullQueueの違い
私は最初この三つは異なるものだと思っていましたが、TaskQueueの種類としてPushQueueとPullQueueがあるという説明が載っていました。ということでPushQueueとPullQueueの違いを見ていきましょう。
PushQueue
PushQueueは以下のような仕組みを備えています。
- ProcessingRateという処理速度を用いてTaskを処理する。
- App Engineが処理容量に応じて自動的にスケールする。
- 処理が終わったら自動的にqueueを削除する。
特徴的なところは、
色々な値をデフォルトで設定し(queue.yamlで設定可能)App Engineが自動的に実行し掃除までしてくれる
というところです。
ちなみにApp Engine外のサービスから触ることはできません。
PullQueue
PullQueueは以下のような仕組みを備えています。
- アプリケーション外のコードや他のアプリケーションがTaskを処理することが可能。
- 処理時間やタイムフレームを明確に決めることができる。
- 処理のボリュームにより自分でスケールさせたり、Queueの削除をする必要がある。
一般的なキューイングのシステムはこちらのイメージが近いです。単純なStackのイメージになります。
PushQueueと違い自動で実行はされません。
Go言語でPushQueueを使ってみる
現在開発しているサービスにてPushQueueを使う部分があったので実装例と詰まったところとかを記載していきます。
今回はApp Engineのdelayパッケージを利用しました。
サンプルコード
package main
import (
"golang.org/x/net/context"
"google.golang.org/appengine"
"google.golang.org/appengine/datastore"
"google.golang.org/appengine/delay"
"net/http"
)
func init() {
http.HandleFunc("/", main)
}
var delayPut = delay.Func("delayPut", func(c context.Context, data *Data) {
keys := Put(c, validData)
})
var Put = func(c context.Context, data *data) []datastore.Key {
// PutEntity
keys, err := data.put(c)
return keys
}
func main(w http.ResponseWriter, r *http.Request) {
c := appengine.NewContext(r)
delayPut.Call(c, data)
}
こんな感じになりました。
実装上の注意
さて上のサンプルでも利用しているdelayパッケージに関してですが、少し注意点があります。
Func
はTop-Levelのコンテキストで呼ぶこと。
もし仮に、上記main
の関数内でdelayPut
を定義したりすると、Task実行時にこのファイルがロードされても、main
の中身までロードする訳ではないため、delayPutが見つからず失敗するためだと思われます。Func
の第二引数の返り値に意味があること。
Func
の第二引数funcの返り値は複数返すことができますが、最後の値がerror typeであり、かつnilではない場合はもう一度そのメソッドを呼ぶという意味になります。実は、PushQueueは失敗したら設定した回数だけリトライされます。
テストが少し実装しにくいこと。
delayFunc
を呼んでいるmain()
のテストが予想通りになりませんでした。time.Sleep
しても、StronglyConsistentDatastore: true
にしても、datastoreへの更新がうまく動かなかったためです。そのため、Put()
のように別なメソッドへ切り出して、その部分に対するテストを実装しています。default queueにしかqueueを入れられないこと。
調べたのですが出てきませんでした。メソッド内に特別入れられるような箇所もなく、delayパッケージではdefault queueにしか積むことが出来ないのかもしれません。おそらくTask Queue APIを利用すれば出来るかと思いますが、今回はそこまで触れませんでした。
queue.yaml
PushQueueの設定をするときは、queue.yamlを作成します。(以下はdefault queueに対する設定を行っています。)
細かい設定内容や詳細はGo Task Queue Configurationに載っています。
queue:
- name: default
rate: 500/s
bucket_size: 100
max_concurrent_requests: 1000
今回はリトライに関するパラメータを一切設定していません。
設定を反映するときは
appcfg.py update_queues myapp/
を実行します。
rate, bucket_size, max_concurrent_requests
設定内容でとても分かりにくかった項目があります。少し私なりの補足を書いていきます。
- bucket_size
- これから理解するのが一番早いです。bucketとはバケツの意味であり、一度に運べる量みたいなものだと思ってください。(リボルバーの銃弾の装填数の最大数もイメージとして近いです。)
- そのため、
bucket_size=5
のところにqueueが8個きた場合には、5個はすぐに処理用のマシン(プロセッサ)へ渡されますが、残りの3個に関してはbucketに空きが出るまで処理されません。(弾を5発詰めて発射します。残りは3個ですね。) - この仕組みは大量データ(大量queue)に対して、処理量を一様にするためにこのようになっています。もし仮にこの仕組みを採用しないとすると、処理をするマシン(プロセッサ)の数を都度増減させるか、データが最も来るときに合わせてそれを処理できるくらいの数に設定しておかなくてはいけません。
- rate
bucket_size
の空きをどれくらいの頻度・個数空けるかという設定値になります。書き方としては、5/s、10/m、3/hなど個数/[s,m,h]を指定できます。(上のリボルバーでいうと装填速度みたいなものです。)どういった設定値なのかというと、例えばbucket_size=5
のところにqueueが8個到達するという場合を考えてみましょう。-
rate=1/sの場合 1. bucket = 5, bucket空き = 0, 残りqueue = 3 2. 1s後、bucket = 0, bucket空き = 1, 残りqueue = 3 3. 直後に、bucket = 1, bucket空き = 0, 残りqueue = 2 4. 1s後、bucket = 0, bucket空き = 1, 残りqueue = 2 5. 直後に、bucket = 1, bucket空き = 0, 残りqueue = 1 6. 1s後、bucket = 0, bucket空き = 1, 残りqueue = 1 7. 直後に、bucket = 1, bucket空き = 0, 残りqueue = 0
-
rate=2/sの場合 1. bucket = 5, bucket空き = 0, 残りqueue = 3 2. 0.5s後、bucket = 0, bucket空き = 1, 残りqueue = 3 3. 直後に、bucket = 1, bucket空き = 0, 残りqueue = 2 4. 0.5s後、bucket = 0, bucket空き = 1, 残りqueue = 2 5. 直後に、bucket = 1, bucket空き = 0, 残りqueue = 1 6. 0.5s後、bucket = 0, bucket空き = 1, 残りqueue = 1 7. 直後に、bucket = 1, bucket空き = 0, 残りqueue = 0
-
このような流れとなります。bucketの回復速度みたいな感じですね。
- 公式の説明のキャプチャが分かりやすかったので貼っておきます。
- max_concurrent_requests
- 単純に並列処理をする最大数です。datastoreがたくさんのrequestを受けて競合するのを避けるために使われます。
queue:
- name: optimize-queue
rate: 20/s
bucket_size: 40
max_concurrent_requests: 10
この場合は並列で10ずつしか処理しないので、0.5sに10処理が完了すればqueueが詰まることはありませんね。かつ、並列処理を10に抑えているのでdatastoreの競合が起きにくいです。(多分それくらいなら発生しないかと)
さて、PushQueue, PullQueueに関しては以上です。
PullQueueを利用する機会があれば、そちらの方に突っ込んだ内容を書いてみようかと思います。
では良いインプットと良いプログラミングを。
0 件のコメント:
コメントを投稿