Class: ActiveJob::QueueAdapters::AsyncAdapter::Scheduler
- Defined in:
- activejob/lib/active_job/queue_adapters/async_adapter.rb
Overview
:nodoc:
Constant Summary collapse
- DEFAULT_EXECUTOR_OPTIONS =
- { min_threads: 0, max_threads: ENV.fetch("RAILS_MAX_THREADS", 5).to_i, auto_terminate: true, idletime: 60, # 1 minute max_queue: 0, # unlimited fallback_policy: :caller_runs # shouldn't matter -- 0 max queue }.freeze 
Instance Attribute Summary collapse
- 
  
    
      #immediate  ⇒ Object 
    
    
  
  
  
  
    
    
  
  
  
  
  
  
    Returns the value of attribute immediate. 
Instance Method Summary collapse
- #enqueue(job, queue_name:) ⇒ Object
- #enqueue_at(job, timestamp, queue_name:) ⇒ Object
- #executor ⇒ Object
- 
  
    
      #initialize(**options)  ⇒ Scheduler 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    A new instance of Scheduler. 
- #shutdown(wait: true) ⇒ Object
Constructor Details
#initialize(**options) ⇒ Scheduler
Returns a new instance of Scheduler.
| 86 87 88 89 90 | # File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 86 def initialize(**) self.immediate = false @immediate_executor = Concurrent::ImmediateExecutor.new @async_executor = Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS.merge()) end | 
Instance Attribute Details
#immediate ⇒ Object
Returns the value of attribute immediate.
| 84 85 86 | # File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 84 def immediate @immediate end | 
Instance Method Details
#enqueue(job, queue_name:) ⇒ Object
| 92 93 94 | # File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 92 def enqueue(job, queue_name:) executor.post(job, &:perform) end | 
#enqueue_at(job, timestamp, queue_name:) ⇒ Object
| 96 97 98 99 100 101 102 103 | # File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 96 def enqueue_at(job, , queue_name:) delay = - Time.current.to_f if !immediate && delay > 0 Concurrent::ScheduledTask.execute(delay, args: [job], executor: executor, &:perform) else enqueue(job, queue_name: queue_name) end end | 
#executor ⇒ Object
| 110 111 112 | # File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 110 def executor immediate ? @immediate_executor : @async_executor end | 
#shutdown(wait: true) ⇒ Object
| 105 106 107 108 | # File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 105 def shutdown(wait: true) @async_executor.shutdown @async_executor.wait_for_termination if wait end |