FrontPage  Index  Search  Changes  RSS  Login

[ruby] delayed_job ちょっとだけ詳しく

概要

delayed_job はRails用のジョブキューシステム。

扱うバージョンは 3.0.3。

使い方を簡単に説明

基本的なことは https://github.com/collectiveidea/delayed_job 見たら分かる。

Rails3のGemfileに以下を追記してbundle installする。

gem 'delayed_job_active_record'
gem 'daemons'

ジェネレータが用意されているので実行して、できあがったマイグレーションファイルを適用する。

$ rails generate delayed_job:active_record
$ rake db:migrate

後は適当に遅延処理したい場面で使用するだけ。

@user.delay.activate!(@device)

delay メソッド(プロキシ)を挟む感じ。

キューの構造: ActiveRecord の場合

create_table :delayed_jobs, :force => true do |table|
  table.integer  :priority, :default => 0      # Allows some jobs to jump to the front of the queue
  table.integer  :attempts, :default => 0      # Provides for retries, but still fail eventually.
  table.text     :handler                      # YAML-encoded string of the object that will do work
  table.text     :last_error                   # reason for last failure (See Note below)
  table.datetime :run_at                       # When to run. Could be Time.zone.now for immediately, or sometime in the future.
  table.datetime :locked_at                    # Set when a client is working on this object
  table.datetime :failed_at                    # Set when all retries have failed (actually, by default, the record is deleted instead)
  table.string   :locked_by                    # Who is working on this object (if locked)
  table.string   :queue                        # The name of the queue this job is in
  table.timestamps
end

add_index :delayed_jobs, [:priority, :run_at], :name => 'delayed_jobs_priority'

queue

バージョン3からジョブに queue という属性を持たせられる様になった。これを利用してジョブをグループ分けする事が出来る。

priority

ワーカーがジョブを処理する際の優先順位。ワーカーには取り扱うプライオリティの範囲(最大、最小)を指定する事ができる。 プライオリティの小さい方から順に実行される。

run_at

これに指定した日時以降にジョブが実行される。

locked_at

ジョブの実行予約(ロック)を行った日時。

locked_by

ロックを行ったワーカーの名前。

handler

ペイロードオブジェクトを YAML 形式でシリアライズしたもの。

ジョブをキューに登録する

delay

delayed_jobはObjectとModuleを拡張する。

Object.send(:include, Delayed::MessageSending)
Module.send(:include, Delayed::MessageSending::ClassMethods)

追加されるdelayメソッドを利用する事で、任意のメソッドをジョブとしてキューに登録する事が出来る。

delayメソッドはDelayed::DelayProxyのインスタンスを作成して返すメソッドで、Delayed::DelayProxyで実装されているmethod_missingを通じて遅延処理が実現される。

Delayed::DelayProxy越しにメソッドを実行すると、Delayed::Job.enqueueが呼ばれてジョブがキューに登録される。


enqueue

Delayed::Job.enqueueにペイロードオブジェクトを含むオプションを渡してジョブをキューに登録する。

キューに登録する際に、ワーカーのenqueueイベントのコールバックも実行される。更に、ペイロードオブジェクトのenqueueフックもジョブ登録前に実行される。

ペイロードオブジェクト

プロキシ越しに受け取ったメッセージを遅延処理として実行させるためのオブジェクト。

  • Delayed::PerformableMethod.new(object, method_name, args)

performメソッドでジョブを実行する。また、methodメソッドをobjectにdelegateしている。

※ データベーステーブルのカラム名は handler で、オブジェクトをto_yamlした結果を保存する。

オプション

ペイロードオブジェクトは:payload_objectという名前で渡すが、その他のオプションはキューのカラム名である:run_at, :priority, :queue等を渡せる。

実行

実行するジョブの予約

以下のスコープにあるものを、実行準備が出来ているジョブの候補と考える。

(
     run_at <= {現在日時}
 AND (
         locked_at IS NULL
      OR locked_at < {現在日時から最大実行時間を引いたもの}
     )
  OR locked_by = {ワーカー名}
)
AND failed_at IS NULL

このスコープを更にワーカーに指定されているプライオリティとキュー名で絞り込む。

  • priority >= {最小プライオリティ}
  • priority <= {最大プライオリティ}
  • queue IN ({キュー名})

そして、以下でソートする。

priority ASC, run_at ASC

上記スコープから read_ahead 分だけジョブを取り出して、最初にロックできたジョブの実行を予約する。

ロック

取り出したジョブの locked_by に記録されているワーカー名がジョブを取り出したワーカーの名前と異なる場合、以下に該当するジョブの locked_at(現在日時) と locked_by(ワーカー名) を更新する。

    id = {取り出したジョブのID}
AND (
        locked_at is null
     OR locked_at < {現在日時から最大実行時間を引いたもの}
    )
AND (
     run_at <= {現在日時}
    )

ワーカー名が同じ場合、以下に該当するジョブの locked_at(現在日時) を更新する。

id = {取り出したジョブのID}
AND locked_by = {ワーカー名}

更新が成功した場合に、ロック成功とみなす。

予約したジョブを実行する

ジョブを予約できた後にジョブを実行する。

Delayed::Worker.lifecycle のコールバック機構を利用してジョブを実行しているため、ワーカーのperformイベントにコールバックを登録する事でジョブを拡張する事ができる。

  • Delayed::Lifecycle
  • Delayed::Callback

タイムアウト

ジョブは invoke_job の実行時間が max_run_time を超えるとタイムアウトする。タイムアウトせずに正常終了したジョブは destroy で削除される。

DeserializationError

DeserializationError 例外が発生した場合、Delayed::Job#failed(job) を実行する。

ワーカーの failure イベントのコールバック付きでペイロードオブジェクトの failure メソッドを実行し、destroy_failed_jobs が真ならジョブを削除する。 偽ならジョブの fail! を実行する(failed_at を更新する)。

Exception

DeserializationError 以外の例外が発生した場合、error イベントのコールバック付きで handle_failed_job(job, error) を実行する。

ジョブの last_error にメッセージをセットし、reschedule でジョブをリスケする。

ジョブのリスケジュール

ジョブの試行回数 attempts を 1 増やし、試行回数の上限に達していたら failed を実行して終わる。

再実行可能なら、次回の実行予定時間をジョブの run_at にセットしてロックを解除する。

実行予定時間は、ペイロードオブジェクトの reschedule_at メソッドがあればその実行結果を使い、なければ現在日時に試行回数^4と5を足した結果を使う。

カスタムジョブ

インスタンスメソッドとしてperformを実装していれば、そのクラスのインスタンスをペイロードオブジェクト(ハンドラ)としてDelayed::Job.enqueueに渡すことが出来る。

class NewsletterJob < Struct.new(:text, :emails)

def perform
  emails.each { |e| NewsletterMailer.deliver_text_to_email(text, e) }
end

end

フック

以下のフックが利用できる。

  • enqueue
  • before(job)
  • after(job)
  • success(job)
  • error(job, exception)
  • failure

デーモン

ジェネレータで script/delayed_job というデーモン管理スクリプトが作られる。

Usage: delayed_job [options] start|stop|restart|run
    -h, --help                       Show this message
    -e, --environment=NAME           Specifies the environment to run this delayed jobs under (test/development/production).
        --min-priority N             Minimum priority of jobs to run.
        --max-priority N             Maximum priority of jobs to run.
    -n, --number_of_workers=workers  Number of unique workers to spawn
        --pid-dir=DIR                Specifies an alternate directory in which to store the process ids.
    -i, --identifier=n               A numeric identifier for the worker.
    -m, --monitor                    Start monitor process.
        --sleep-delay N              Amount of time to sleep when no jobs are found
        --read-ahead N               Number of jobs from the queue to consider
    -p, --prefix NAME                String to be prefixed to worker process names
        --queues=queues              Specify which queue DJ must look up for jobs
        --queue=queue                Specify which queue DJ must look up for jobs

--queue, --queues

対象とするキューを指定出来る。カンマ区切りで複数のキューを指定することも出来る。

--min-priority, --max-priority

取り扱うジョブのプライオリティの範囲を指定出来る。

--monitor

daemons の monitor 機能を利用してモニタ用プロセスを立ち上げる。 ワーカーがクラッシュした際に自動的に再起動させるなどしてくれる。

Last modified:2012/07/14 22:14:24
Keyword(s):[ruby]
References: