Class: ActiveJob::QueueAdapters::AsyncAdapter
- Inherits:
- 
      AbstractAdapter
      
        - Object
- AbstractAdapter
- ActiveJob::QueueAdapters::AsyncAdapter
 
- Defined in:
- activejob/lib/active_job/queue_adapters/async_adapter.rb
Overview
Active Job Async adapter
The Async adapter runs jobs with an in-process thread pool.
This is the default queue adapter. It’s well-suited for dev/test since it doesn’t need an external infrastructure, but it’s a poor fit for production since it drops pending jobs on restart.
To use this adapter, set queue adapter to :async:
config.active_job.queue_adapter = :async
To configure the adapter’s thread pool, instantiate the adapter and pass your own config:
config.active_job.queue_adapter = ActiveJob::QueueAdapters::AsyncAdapter.new \
  min_threads: 1,
  max_threads: 2 * Concurrent.processor_count,
  idletime: 600.seconds
The adapter uses a Concurrent Ruby thread pool to schedule and execute jobs. Since jobs share a single thread pool, long-running jobs will block short-lived jobs. Fine for dev/test; bad for production.
Defined Under Namespace
Classes: JobWrapper, Scheduler
Instance Method Summary collapse
- 
  
    
      #enqueue(job)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    :nodoc:. 
- 
  
    
      #enqueue_at(job, timestamp)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    :nodoc:. 
- 
  
    
      #immediate=(immediate)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    Used for our test suite. 
- 
  
    
      #initialize(**executor_options)  ⇒ AsyncAdapter 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    See Concurrent::ThreadPoolExecutor for executor options. 
- 
  
    
      #shutdown(wait: true)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    Gracefully stop processing jobs. 
Constructor Details
#initialize(**executor_options) ⇒ AsyncAdapter
See Concurrent::ThreadPoolExecutor for executor options.
| 35 36 37 | # File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 35 def initialize(**) @scheduler = Scheduler.new(**) end | 
Instance Method Details
#enqueue(job) ⇒ Object
:nodoc:
| 39 40 41 | # File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 39 def enqueue(job) # :nodoc: @scheduler.enqueue JobWrapper.new(job), queue_name: job.queue_name end | 
#enqueue_at(job, timestamp) ⇒ Object
:nodoc:
| 43 44 45 | # File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 43 def enqueue_at(job, ) # :nodoc: @scheduler.enqueue_at JobWrapper.new(job), , queue_name: job.queue_name end | 
#immediate=(immediate) ⇒ Object
Used for our test suite.
| 55 56 57 | # File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 55 def immediate=(immediate) # :nodoc: @scheduler.immediate = immediate end | 
#shutdown(wait: true) ⇒ Object
Gracefully stop processing jobs. Finishes in-progress work and handles any new jobs following the executor’s fallback policy (‘caller_runs`). Waits for termination by default. Pass `wait: false` to continue.
| 50 51 52 | # File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 50 def shutdown(wait: true) # :nodoc: @scheduler.shutdown wait: wait end |