メドピア開発者ブログ

集合知により医療を再発明しようと邁進しているヘルステックカンパニーのエンジニアブログです。読者に有用な情報発信ができるよう心がけたいので応援のほどよろしくお願いします。

SolidQueue解体新書

こんにちは。サーバーサイドエンジニアの三村(@t_mimura39)です。

さて、Railsエンジニアの皆さんは非同期処理にどのようなライブラリを利用していますか?
ちなみに弊社では Sidekiq を利用するプロジェクトが多いです。

tech.medpeer.co.jp

今回はRailsでの非同期処理ライブラリの新たな選択肢として誕生した「SolidQueue」について解説します。

github.com

目次

🆕 更新履歴 🆕

2024/09/12

SolidQueue v0.9.0のリリースに伴い大きく変更が入ったため以下の点を加筆・修正しました。

  • デフォルトの設定ファイルリネーム config/solid_queue.yml -> config/queue.yml
  • RecurringTaskの設定が config/recurring.yml に切り出された
  • RecurringTaskの設定で command を指定できるようになり、Jobクラスを作成せずとも簡易的なスケジュール実行が実現できるようになった
  • RecurringTaskのスケジューリング管理がDispatcherから新設されたSchedulerに切り出された

🙋 はじめに 🙋

本記事では SolidQueueの内部実装の簡単な解説 と、それらを理解するために必要な SolidQueueの簡単な概要 について記述します。 SolidQueueの使い方や各種設定値についてはREADMEを参照してください。

本記事執筆時点でのSolidQueueの最新バージョンは v0.9.0 です。 現在SolidQueueはv1リリースに向けて活発に追加開発されているため、記述内容の一部が古くなっている可能性がありますがご留意ください。

📝 SolidQueueとは 📝

Railsの非同期処理を実装するためのフレームワークである ActiveJob はバックエンドのライブラリを選択できる仕組みとなっています。

例えば以下はRails 公式ドキュメントで紹介されているバックエンドライブラリです。

ActiveJobの詳細は公式ドキュメント参照してください。 https://guides.rubyonrails.org/active_job_basics.html

SolidQueue はここに新たに仲間入りするActiveJobバックエンドの一つです。 また、次期Railsのバージョン(Rails v8)ではこのSolidQueueがActiveJobのバックエンドライブラリのデフォルトになりました

🚀 SolidQueueの特徴 🚀

引用元 https://github.com/rails/solid_queue/blob/v0.9.0/README.md#solid-queue

Solid Queue is a DB-based queuing backend for Active Job, designed with simplicity and performance in mind.

Solid Queue は DB ベースの Active Job 用キューイングバックエンドで、シンプルさとパフォーマンスを念頭に設計されています。

DBベースのActiveJobバックエンドライブラリはこれまでにもDelayedJobGoodJob などがありました。 GoodJobは近年出てきたバックエンドライブラリとして注目されていますがPostgreSQL専用なので利用シーンが限定されています。

Solid QueueはMySQL, PostgreSQL or SQLiteの全てをサポート ※1 し、尚且つデータベースの比較的新しい機能である 「FOR UPDATE SKIP LOCKED」を活用することで高パフォーマンスを実現 しています。

SolidQueueの誕生経緯はSolidQueueの開発メンバーによるこちらの紹介記事をご参照ください。

dev.37signals.com

🔓 「FOR UPDATE SKIP LOCKED」 とは 🔓

簡単に説明すると「FOR UPDATEロック付きでSELECTをするが、すでにロックがかかっているレコードはスキップする(ロック待ちが発生しない)」という機能です。

SolidQueueではこれを 「複数のWorkerがJobテーブルから安全かつ高速に1件のレコードを取得する」という用途で利用しています。

この機能がSolidQueueの重要なポイントとなりますが、以下のような形でシンプルに実装されているためSolidQueueの内部実装を理解するための必須知識とはならなそうです。

https://github.com/rails/solid_queue/blob/v0.9.0/app/models/solid_queue/record.rb#L9-L15

# SolidQueue::Record (SolidQueueが提供するモデルの基底クラス)
def self.non_blocking_lock
  if SolidQueue.use_skip_locked
    lock(Arel.sql("FOR UPDATE SKIP LOCKED"))
  else
    lock
  end
end

https://github.com/rails/solid_queue/blob/v0.9.0/app/models/solid_queue/ready_execution.rb#L32-L34

# SolidQueue::ReadyExecution.rb 実行可能なJobの取得処理(抜粋)
def select_candidates(queue_relation, limit)
  queue_relation.ordered.limit(limit).non_blocking_lock.select(:id, :job_id)
end

📝「FOR UPDATE SKIP LOCKED」の詳細については TechRachoさんの記事がとても詳しいので興味のある方はこちらを参照ください。

techracho.bpsinc.jp

🍀 4種類のアクターについて 🍀

SolidQueueは以下の4種類の「アクター」により構成されています。

  • Dispatcher: Jobの実行タイミング管理や同時実行可能数制限などを担当
  • Worker: 実行待ちJobをポーリング的に取得し処理を実行する
  • Scheduler: 後述する RecurringTask のスケジュール管理を担当
  • Supervisor: 設定内容に基づき上記2つを起動させ稼働状況の監視をする

このようにそれぞれの役割が明確に分かれているため、「比較的処理が軽量なDispatcherは1つだけ立ち上げ、Workerは複数立ち上げる」のような調整が柔軟にできるようになっています。

起動方法

以下のコマンドを実行することで、設定ファイルの内容に基づいた種類・個数の「アクター」を起動することができます。 まず最初に「Supervisor」が1つ起動し、そのSupervisorにぶら下がる形 で「Dispatcher」と「Worker」がN個、必要に応じて「Scheduler」が一つ起動する作りとなっています。

# SolidQueueが提供しているコマンド
bin/jobs start           # Starts Solid Queue supervisor to dispatch and perform enqueued jobs. Default command.
# config/queue.yml
# (設定例) Dispatcherが1つ、Workerが2つでぞれぞれ対象とするキューなどの設定を分けることが可能
production:
  dispatchers:
    - polling_interval: 1
      batch_size: 500
      concurrency_maintenance_interval: 300
  workers:
    - queues: "*"
      threads: 3
      polling_interval: 2
    - queues: [ real_time, background ]
      threads: 5
      polling_interval: 0.1
      processes: 3

(おまけ)二つの起動モードについて

SolidQueueはSupervisorのプロセスフォークとしてDispatcherとWorkerを起動する作りとなっています。
しかし、プロセスフォークではなくスレッドで起動する起動モードが実装予定とされていました。 前者のプロセスフォークの実装は「forkモード」、後者のスレッドの実装は「async」と呼称されていました。

ただし、「asyncモード」は完全に削除され「forkモード」に一本化されることとなりました。 SolidQueueの内部構造を理解する上でこの歴史的経緯について把握する必要はありませんが、現在まさに開発過渡期のためコードリーディング時に本件を頭の片隅に置いておくと混乱を回避できます。

本件についての詳細を知りたい方は以下のPullRequestをご参照ください。

🚶 SolidQueue実装の歩き方 🚶

ここまではSolidQueueのコードを読む上で初めに理解しておいた方が良い前提知識でした。
それでは実際にSolidQueueのコードを読んでいきましょう。

SolidQueueは完全にActiveRecordとActiveJobに依存しています。 そのため素直なRailsアプリケーションを読む感覚で全体を読み解くことが出来ます。

まず大きく分けて以下の二つを理解していきましょう。

モデル

DBの取り扱いを含むビジネスロジックは全てモデル(app/models/solid_queue)として定義されています。 まずはここを読むことで各モデルにどのようなメソッドが定義されているか(≒ どのようなデータ操作があるか)が理解できます。

アクター

前述した4つのアクターの動きは lib/solid_queue に定義されています。 「dispatcher.rb」「worker.rb」「scheduler.rb」「supervisor.rb」のようにアクター毎にファイルが分かれているためその点を念頭に読むと理解が早いです。

コードを読む上での些細な注意事項ですが、37signals ※2が開発するシステムによくある特徴である「ConcernによるModule分割」が各所に多用されています。 慣れない人にとってはメソッドの定義箇所を追うのが大変かもしれませんが、処理を抽象化し綺麗に共通化・分離されている様は参考になると思います。

また、concurrent-ruby が時折利用されているため一定ここの知識がある方がリーディングは捗ります。

github.com

🥞 SolidQueueのモデル(テーブル)🥞

SolidQueueはデータベースをバックエンドとしているため、素直なActiveRecordを用いたモデルが提供されています。

本記事執筆時点では以下のテーブル群で構成されています。

  • solid_queue_jobs
  • solid_queue_scheduled_executions
  • solid_queue_ready_executions
  • solid_queue_claimed_executions
  • solid_queue_blocked_executions
  • solid_queue_failed_executions
  • solid_queue_pauses
  • solid_queue_processes
  • solid_queue_semaphores
  • solid_queue_recurring_executions
  • solid_queue_recurring_tasks

最新の情報は db/migrate を参照してください。

Jobの状態遷移

まずはJobにはどのような状態があるのかを確認しましょう。
以下が状態遷移図となります。

SolidQueue Jobの状態遷移(全て)

一見複雑な状態遷移に見えますが、基本的には 「Scheduled -> Ready -> Claimed -> Finished」 の一方通行です。
上記の遷移図から「同時実行数制限」「Job処理失敗・再実行」などを除いたシンプルな遷移図がこちらになります。

SolidQueue Job状態遷移(抜粋)

この基本の状態遷移を頭に入れた上でコードリーディングしていきましょう。

状態の表現方法

Jobモデルに対してhas_one関連のexecutionが紐づく形でJobの状態が管理されています。

※ 「Finished(処理完了)」に関しては「jobs.finished_at が設定されている」又は「Jobレコードが削除されている」という形で表現されています。(この二つの表現方法は preserve_finished_jobs 設定によって切り替わります)

https://github.com/rails/solid_queue/blob/v0.9.0/app/models/solid_queue/job/executable.rb#L86-L109

# SolidQueue::Job::Executable (JobにincludeされるConcernモジュール)
def finished?
  finished_at.present?
end

def status
  if finished?
    :finished
  elsif execution.present?
    execution.type
  end
end

def execution
  %w[ ready claimed failed ].reduce(nil) { |acc, status| acc || public_send("#{status}_execution") }
end

モデルの実装例

全てのモデルの実装について解説してしまうと日が暮れてしまうためここでは二つの例のみを挙げます。

まず「実行待ちのJobの中から実行対象のJobをロックしつつ取得し実行中状態に遷移させる SolidQueue::ReadyExecution.claim」 の実装を見てみましょう。 https://github.com/rails/solid_queue/blob/v0.9.0/app/models/solid_queue/ready_execution.rb

module SolidQueue
  class ReadyExecution < Execution # 「Ready(実行待ち状態)」を表現するJobのhas_oneモデル
    class << self
      def claim(queue_list, limit, process_id)
        # QueueSelector.new(queue_list, self).scoped_relations
        # => 指定されたキューに対応するReadyExecutionのRelationを返却
        QueueSelector.new(queue_list, self).scoped_relations.flat_map do |queue_relation|
          select_and_lock(queue_relation, process_id, limit).tap do |locked|
            limit -= locked.size
          end
        end
      end

      private
        def select_and_lock(queue_relation, process_id, limit)
          return [] if limit <= 0

          transaction do
            candidates = select_candidates(queue_relation, limit)
            lock_candidates(candidates, process_id)
          end
        end

        # 指定件数分の実行待ち状態のJobを「FOR UPDATE SKIP LOCKED」を指定しつつ取得する
        def select_candidates(queue_relation, limit)
          queue_relation.ordered.limit(limit).non_blocking_lock.select(:id, :job_id)
        end

        def lock_candidates(executions, process_id)
          return [] if executions.none?

          # SolidQueue::ClaimedExecution = 実行中状態
          # 実行中状態として登録しJobが重複実行されないようにした上で実行待ち状態を解除する。
          SolidQueue::ClaimedExecution.claiming(executions.map(&:job_id), process_id) do |claimed|
            ids_to_delete = executions.index_by(&:job_id).values_at(*claimed.map(&:job_id)).map(&:id)
            where(id: ids_to_delete).delete_all
          end
        end

次に「Job作成時の初期ステータス決定処理」を見てみましょう。

https://github.com/rails/solid_queue/blob/v0.9.0/app/models/solid_queue/job/executable.rb

# SolidQueue::Job::Executable (JobにincludeされるConcernモジュール)
after_create :prepare_for_execution

def prepare_for_execution
  # 実行予定日時を超過(due?)の場合はディスパッチ処理(後述)を実行
  if due? then dispatch
  else
    # 実行予定日時が未来の場合は「Scheduled(実行予約)」状態に遷移
    schedule
  end
end

def dispatch
  # 同時実行可能数制限ロックを取得できた場合は「Ready(実行待ち)」状態に遷移
  if acquire_concurrency_lock then ready
  else
    # ロックを取得できなかった場合は「Blocked(実行制限中)」状態に遷移
    block
  end
end

いかがでしょうか?
多少コメントで補足をいれていますが、ActiveRecordが使われた素直なRailsアプリケーションとして自然に読めますね。

他のモデル(app/models/solid_queue)にもこのような形で処理が定義されているため自身で読んでみてください。

詳細は割愛しますが Queue のようにデータベースで管理されていない概念についても綺麗にモデリングされている点も参考になりますね。

次は「Supervisor」「Dispatcher」「Worker」の3つのアクターがそれぞれどのような処理をしているかについて見ていきましょう。 ※ Schedulerに関しては、「Recurring tasks」の項で解説します。

😎 Supervisor 😎

Dispatcher・Workerアクターの起動

https://github.com/rails/solid_queue/blob/v0.9.0/lib/solid_queue/supervisor.rb

Supervisorが起動されると、設定ファイルに基づいてDispatcherとWorkerアクターを起動します。

プロセスの死活監視

SolidQueue::Processモデルを活用し稼働しているプロセス(DispatcherとWorker、Scheduler)の死活監視をします。

https://github.com/rails/solid_queue/blob/v0.9.0/app/models/solid_queue/process.rb

# SolidQueue::Process
create_table "solid_queue_processes" do |t|
  t.string "kind", null: false
  t.datetime "last_heartbeat_at", null: false
  t.bigint "supervisor_id"
  t.integer "pid", null: false
  t.string "hostname"
  t.text "metadata"
  t.datetime "created_at", null: false
  t.string "name", null: false
  t.index ["last_heartbeat_at"], name: "index_solid_queue_processes_on_last_heartbeat_at"
  t.index ["name", "supervisor_id"], name: "index_solid_queue_processes_on_name_and_supervisor_id", unique: true
  t.index ["supervisor_id"], name: "index_solid_queue_processes_on_supervisor_id"
end

各プロセスは一定間隔でハートビートイベントを発火( process.last_heartbeat_at のタイムスタンプ更新)しています。

https://github.com/rails/solid_queue/blob/v0.9.0/lib/solid_queue/processes/registrable.rb#L39-L46

# SolidQueue::Processes::Registrable (3アクターのプロセスクラスにincludeされるConcernモジュール)
def launch_heartbeat
  # 本処理の本質とは関係ありませんが `Concurrent::TimerTask` がとてもうまく活用されていて綺麗ですね。
  @heartbeat_task = Concurrent::TimerTask.new(execution_interval: SolidQueue.process_heartbeat_interval) do
    wrap_in_app_executor { heartbeat }
  end
  # 省略...
end

def heartbeat
  process.heartbeat
end

https://github.com/rails/solid_queue/blob/v0.9.0/app/models/solid_queue/process.rb#L22-L24

class SolidQueue::Process
  def heartbeat
    touch(:last_heartbeat_at)
  end
end

Supervisorはハートビートが一定間隔停止しているプロセスは停止したとみなし事後処理を実行します。

https://github.com/rails/solid_queue/blob/v0.9.0/lib/solid_queue/supervisor/maintenance.rb

# SolidQueue::Supervisor::Maintenance (SupervisorにincludeされるConcernモジュール)
def launch_maintenance_task
  @maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: SolidQueue.process_alive_threshold) do
    prune_dead_processes
  end

  @maintenance_task.execute
end

def prune_dead_processes
  # プロセスの停止処理を実行
  wrap_in_app_executor { SolidQueue::Process.prune }
end

🏹 Dispatcher 🏹

ポーリング

Dispatcherと後述するWorkerはポーリング処理を実行し続けます。
ポーリング処理は以下のように抽象化されたConcernとして実装されています。

https://github.com/rails/solid_queue/blob/v0.9.0/lib/solid_queue/processes/poller.rb#L24-L42

# SolidQueue::Processes::Poller (間接的に後述のDispatcherとWorkerにincludeされるConcernモジュール)
def start_loop
  loop do
    break if shutting_down?

    wrap_in_app_executor do
      unless poll > 0
        interruptible_sleep(polling_interval)
      end
    end
  end
ensure
  SolidQueue.instrument(:shutdown_process, process: self) do
    run_callbacks(:shutdown) { shutdown }
  end
end

def poll
  raise NotImplementedError
end

実行予約中Jobを監視&ディスパッチ(実行待ちに移動)

Dispatcherのメイン作業です。
deliver_laterなどで実行予約されたJobが実行タイミングを迎えたタイミングでディスパッチします。 あくまでディスパッチまでで、実際のJobを処理するのは後述のWorkerです。

https://github.com/rails/solid_queue/blob/v0.9.0/lib/solid_queue/dispatcher.rb#L26-L35

# SolidQueue::Dispatcher
def poll
  batch = dispatch_next_batch
  batch.size
end

def dispatch_next_batch
  with_polling_volume do
    # 実行予定日時(scheduled_at)が過去のJobを抽出しディスパッチする
    ScheduledExecution.dispatch_next_batch(batch_size)
  end
end

他にも「同時実行可能数制限のメンテナンス」もDispatcherの責務ですが、一旦解説を後回しとします。

🕺 Worker 🕺

実行待ち状態のJobをポーリング的に取得しJobを実行(perform)

Workerの責務はこれだけです。シンプルですね。

https://github.com/rails/solid_queue/blob/v0.9.0/lib/solid_queue/worker.rb#L21-L35

# SolidQueue::Worker
def poll
  claim_executions.then do |executions|
    executions.each do |execution|
      # Pool#postの中でJobがperformされる
      pool.post(execution)
    end

    executions.size
  end
end

def claim_executions
  with_polling_volume do
    # 実行待ち状態のJobを取得
    SolidQueue::ReadyExecution.claim(queues, pool.idle_threads, process_id)
  end
end

以上が3つのアクターの主な処理内容です。
アクターとして責務が分割されていることにより、それぞれの処理が単純化されていて理解しやすいですね。

これからはここまで言及しなかったSolidQueueの機能について簡単に紹介します。

🚧 Jobの同時実行可能数制限について 🚧

機能概要

Sidekiq Enterpriseの「Rate Limiting」のようなものです。

引用元 https://github.com/rails/solid_queue/blob/v0.9.0/README.md#concurrency-controls

Solid Queue extends Active Job with concurrency controls, that allows you to limit how many jobs of a certain type or with certain arguments can run at the same time. When limited in this way, jobs will be blocked from running, and they'll stay blocked until another job finishes and unblocks them, or after the set expiry time (concurrency limit's duration) elapses. Jobs are never discarded or lost, only blocked.

Solid Queue は Active Job に同時実行制御機能を拡張し、特定のタイプや引数を持つジョブを同時に実行できる数を制限することができます。 このように制限された場合、ジョブは実行がブロックされ、他のジョブが終了してブロックが解除されるまで、または設定された有効期限 (同時実行の制限時間) が経過するまでブロックされたままになります。 ジョブが破棄されたり失われたりすることはなく、ブロックされるだけです。

class MyJob < ApplicationJob
  limits_concurrency to: max_concurrent_executions, key: ->(arg1, arg2, **) { ... }, duration: max_interval_to_guarantee_concurrency_limit, group: concurrency_group

  def perform(arg1, arg2, arg3)
end

# 例: 同一アカウントに対してのお知らせ配信処理を同タイミングで最大2つまでに制限。しかし、Jobの実行が5分を超過するとその制限が解放される。
class DeliverAnnouncementToContactJob < ApplicationJob
  limits_concurrency to: 2, key: ->(contact) { contact.account }, duration: 5.minutes

  def perform(contact)

内部実装

その名の通り「セマフォ」を表現するモデルである SolidQueue::Semaphore(solid_queue_semaphore) というモデル(テーブル)が活用されていいます。

これは同時実行数制限対象(key)に対して、現時点で残り何件(value)のJobが実行可能かを表現しています。

SolidQueue::Semaphore
create_table "solid_queue_semaphores" do |t|
  t.string "key", null: false
  t.integer "value", default: 1, null: false
  t.datetime "expires_at", null: false
  t.datetime "created_at", null: false
  t.datetime "updated_at", null: false
  t.index ["expires_at"], name: "index_solid_queue_semaphores_on_expires_at"
  t.index ["key", "value"], name: "index_solid_queue_semaphores_on_key_and_value"
  t.index ["key"], name: "index_solid_queue_semaphores_on_key", unique: true
end

細かな処理内容については Semaphoreモデル に綺麗に定義されているためここを読み解くと良いです。

主に以下のような使われ方をしています。

# 同時実行数制限ロックの取得(booleanを返却)
# DispatcherがJobをReadyに移動するタイミングで呼び出される。
# ロックが取得できない場合はBlocked状態となる。
def acquire_concurrency_lock
  return true unless concurrency_limited?

  Semaphore.wait(job) # 指定したjobが実行可能か確認し、実行可能な場合は同時実行可能数(semaphores.value)を-1します。
end

# 同時実行数制限ロックの解放
# Jobの実行終了時などに呼び出される。
def release_concurrency_lock
  return false unless concurrency_limited?

  Semaphore.signal(job)
end

また、有効期限切れのSemaphoreを解放する処理がDispatcherの責務の一つとして定義されています。 詳細はここでは割愛しますが、興味がある方は SolidQueue::Dispatcher::ConcurrencyMaintenance をご参照ください。

⏰ Recurring tasks (cronでのスケジュール定義) ⏰

機能概要

引用元 https://github.com/rails/solid_queue/blob/v0.9.0/README.md#recurring-tasks

Solid Queue supports defining recurring tasks that run at specific times in the future, on a regular basis like cron jobs. These are managed by the scheduler process and are defined in their own configuration file.

Solid Queue は、cron ジョブのように将来の特定の時間に定期的に実行されるタスクの定義をサポートします。 これらはスケジューラー プロセスによって管理され、独自の設定ファイルで定義されます。

# config/recurring.yml
a_periodic_job:
  class: MyJob
  args: [ 42, { status: "custom_status" } ]
  schedule: every second
a_cleanup_task:
  command: "DeletedStuff.clear_all"
  schedule: every day at 9am

上記の通り、専用のJobクラスを定義せずに command を指定するだけで簡易的なスケジュール実行が実現できるのはとても便利そうですね。

内部実装

以下2つのモデルで管理されています。

  • SolidQueue::RecurringExecution
    • RecurringTaskが重複実行されないように排他制御するためのモデル(テーブル)です。
  • SolidQueue::RecurringTask
    • cronで定義されているスケジュールの設定値が格納されています。
    • 本機能を実装する上では本来不要ですが、後述のMissionControlの実装都合で追加されたモデル(テーブル)です。
# SolidQueue::RecurringExecution
create_table "solid_queue_recurring_executions" do |t|
  t.bigint "job_id", null: false
  t.string "task_key", null: false
  t.datetime "run_at", null: false # 実際に実行された日時ではなくcronで算出された日時が設定される
  t.datetime "created_at", null: false
  t.index ["job_id"], name: "index_solid_queue_recurring_executions_on_job_id", unique: true
  t.index ["task_key", "run_at"], name: "index_solid_queue_recurring_executions_on_task_key_and_run_at", unique: true
end

# SolidQueue::RecurringTask
create_table "solid_queue_recurring_tasks" do |t|
  t.string "key", null: false
  t.string "schedule", null: false
  t.string "command", limit: 2048
  t.string "class_name"
  t.text "arguments"
  t.string "queue_name"
  t.integer "priority", default: 0
  t.boolean "static", default: true, null: false
  t.text "description"
  t.datetime "created_at", null: false
  t.datetime "updated_at", null: false
  t.index ["key"], name: "index_solid_queue_recurring_tasks_on_key", unique: true
  t.index ["static"], name: "index_solid_queue_recurring_tasks_on_static"
end

Scheduler起動時に全てのRecurringTask定義に対して以下の処理が実行されます。 https://github.com/rails/solid_queue/blob/v0.9.0/lib/solid_queue/scheduler/recurring_schedule.rb

# SolidQueue::Scheduler::RecurringSchedule (DuspatcherにincludeされるConcernモジュール)
def schedule(task)
  # Concurrent::ScheduledTaskを利用し、cronに基づき「次回実行日時」に処理をスケジューリング
  scheduled_task = Concurrent::ScheduledTask.new(task.delay_from_now, args: [ self, task, task.next_time ]) do |thread_schedule, thread_task, thread_task_run_at|
    # 再帰的に本メソッドを呼び出し次回分のスケジュールを登録
    thread_schedule.schedule_task(thread_task)

    wrap_in_app_executor do
      # Jobを実行待ち状態で登録
      thread_task.enqueue(at: thread_task_run_at)
    end
  end
  # 省略...
end

注意点

以上の通りSolidQueueのRecurringTaskは「cron定義に基づいて次回実行を予約する」といったアプローチです。 そのため、cronによるJob実行予定日時が「SolidQueueが停止状態」となっていた場合にSolidQueue停止中はもちろんの事、次回SolidQueueが起動した際にも遡ってJobを実行することはありません。

(具体例)
cronが「毎朝7時(0 7 * * *) にDummyJob実行」と定義されている。 何かしらの事情により朝の6:55〜7:05の間SolidQueueが停止されていた場合、本日分のDummyJobは実行されない(エラーにもならない)。

Sidekiqのcron実装である SidekiqCron は、Sidekiq起動時に一定期間内であれば過去を遡ってJobを実行してくれる仕組みがあります。(詳細は こちら を参照ください)

どちらも一長一短ですが、Sidekiqなどの他のライブラリからSolidQueueに移行する際にはこういった細かな挙動の違いにも気をつける必要がありますね。

⛔ キューの一時停止 ⛔

他のライブラリと同様にSolidQueueにも特定のキューを停止する機能が備わっています。

Rails Consoleによる手作業( SolidQueue::Queue#pause )または、後述のUIダッシュボードでの操作によりキューを停止することが可能です。
停止されたキューは以下のテーブルに格納されます。

# SolidQueue::Pause
create_table "solid_queue_pauses" do |t|
  t.string "queue_name", null: false
  t.datetime "created_at", null: false
  t.index ["queue_name"], name: "index_solid_queue_pauses_on_queue_name", unique: true
end

このPauseはWorkerが処理対象Jobを抽出する際に参照され、当該キューに属するJobは処理対象から除外されます。

本記事では紹介していませんが、処理対象のキューを選択する処理もモデルとして表現されており「staging* のようなワイルドカード指定によるキュー名の前方一致」や「キューの優先順位」がシンプルに実装されているため読んでいて面白いです。
こちらも興味があれば覗いてみてください。 SolidQueue::QueueSelector

🔁 処理失敗したJobの再実行 🔁

Sidekiqのような「Job実行時に例外が発生し捕捉されなかった場合は自動再実行」のような仕組みはSolidQueue本体にはありません。

Jobを再実行するには以下のいずれかを選択する必要があります。

  • ActiveJobのretry_onを利用
    • ApplicationJobのような基底クラスでStandardErrorをretry_on対象とすることで実質的にSidekiqのような自動リトライを再現します。が、リトライ対象の例外は明示的に尚且つ限定的に指定することをオススメします。
  • 自前で特定の例外を捕捉し再度キューイング
  • 後述のUIダッシュボードで手動再実行

🖼️ UIダッシュボード 🖼️

SolidQueueと同様にRails公式ツールとして Mission Control — Jobs が開発されています。

github.com

※ 以下は本記事執筆時点最新バージョン v0.3.1 時点の内容です。

キューやジョブの一覧管理、失敗したジョブの再実行など基本的な機能は揃っています。 SolidQueue専用ではなく Resque にも対応しているようです。

SolidQueueと同様にこちらも絶賛開発が進行しているので今後の追加機能に期待です。

missin-control

導入時の注意点

Mission Controlは以下のGemが依存として定義されています。
rails/propshaft のようにかなり新しいアセット関連のGemが依存として定義されているため、プロジェクトによっては導入に一定のハードルがあるかもしれません。

https://github.com/rails/mission_control-jobs/blob/v0.3.1/mission_control-jobs.gemspec#L20-L23

# mission_control-jobs.gemspec
spec.add_dependency "rails", ">= 7.1"
spec.add_dependency "propshaft"
spec.add_dependency "importmap-rails"
spec.add_dependency "turbo-rails"
spec.add_dependency "stimulus-rails"
spec.add_dependency "irb", "~> 1.13"

🔥 パフォーマンスやDB負荷 🔥

Sidekiqを多用しているプロジェクトは「パフォーマンス(≒ Jobを捌く速度)」と「DB負荷」が気になるのではないでしょうか。
スタートアップな小さなサービスならまだしも、大量なJobを処理するようなサービスでSolidQueueを安全に利用できるのかは一定の不安があります。

一方、HEY ※3 では非同期処理の全てでSolidQueueが利用されているようです。

Solid Queueは現在、HEYのためだけに毎日2000万近くのジョブを実行しています。 すべてのアプリケーションの移行が完了したら、Solid Queue で 1 日あたり約 1 億件のジョブを処理することになります。 とてもSOLIDな1.0になるでしょう。 rosapolisによる素晴らしい仕事

ソリッド・キューがついに全HEYを制覇。 Resqueにさようなら! Resqueは長年にわたって私たちによく貢献してくれましたが、蓄積された複雑さをすべて圧縮できるクリーンシートの実装が必要な時期が来ていました。 何千万もの毎日のHEYジョブは、今ではSQだけで実行されている!

データベースのスペックやWorker数などは不明ですが、かなりの規模のプロジェクトでも十分に使えるポテンシャルは持っていそうです。

👋 まとめ 👋

簡単ではありますがSolidQueueの内部構造の解説でした。 普段から馴染みのあるActiveRecordで実装されているためとても理解がしやすかったです。

SolidQueueを採用する予定がない方にとっても、「Railsで開発されたジョブキューシステム」として捉えると良い学習対象となるのではないでしょうか。

本記事がきっかけで少しでもSolidQueueに興味を持っていただけたら幸いです。

👍 参考資料 👍

SolidQueueはまだまだ若いライブラリのため参考にできる資料が少ない印象です。
そんな中でもTechRachoさんの翻訳ブログはとても参考にさせていただきました。ありがとうございます。


是非読者になってください!


メドピアでは一緒に働く仲間を募集しています。
ご応募をお待ちしております!

■募集ポジションはこちら medpeer.co.jp

■エンジニア紹介ページはこちら engineer.medpeer.co.jp

■メドピア公式YouTube  www.youtube.com

■メドピア公式note
style.medpeer.co.jp


  1. 「FOR UPDATE SKIP LOCKED」 をサポートしているデータベース(MySQL 8以上、またはPostgreSQL 9.5以上)で利用することが推奨されています。 これは「FOR UPDATE SKIP LOCKED」を利用できないと複数のWorkerが実行された際にロック待ちが発生しパフォーマンスが落ちてしまうためです。
  2. Ruby on Railsの作者であるDHHがCTOを務めるシステム開発会社。SolidQueueに関しても37signalsのメンバーが中心に開発をしている。
  3. 37signalsが提供しているメール・カレンダーサービス