class Resque::Job
A Resque::Job represents a unit of work. Each job lives on a single queue and has an associated payload object. The payload is a hash with two attributes: `class` and `args`. The `class` is the name of the Ruby class which should be used to run the job. The `args` are an array of arguments which should be passed to the Ruby class's `perform` class-level method.
You can manually run a job using this code:
job = Resque::Job.reserve(:high) klass = Resque::Job.constantize(job.payload['class']) klass.perform(*job.payload['args'])
Constants
- DontPerform
Raise Resque::Job::DontPerform from a before_perform hook to abort the job.
Attributes
This job's associated payload object.
The name of the queue from which this job was pulled (or is to be placed)
The worker object which is currently processing this job.
Public Class Methods
Creates a job by placing it on a queue. Expects a string queue name, a string class name, and an optional array of arguments to pass to the class' `perform` method.
Raises an exception if no queue or class is given.
# File lib/resque/job.rb, line 145 def self.create(queue, klass, *args) Resque.validate(klass, queue) if Resque.inline? # Instantiating a Resque::Job and calling perform on it so callbacks run # decode(encode(args)) to ensure that args are normalized in the same manner as a non-inline job new(:inline, {'class' => klass, 'args' => decode(encode(args))}).perform else Resque.push(queue, :class => klass.to_s, :args => args) end end
Given a string, returns a Ruby object.
# File lib/resque/job.rb, line 59 def self.decode(object) return unless object begin if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load) MultiJson.load object else MultiJson.decode object end rescue ::MultiJson::DecodeError => e raise DecodeException, e.message, e.backtrace end end
Removes a job from a queue. Expects a string queue name, a string class name, and, optionally, args.
Returns the number of jobs destroyed.
If no args are provided, it will remove all jobs of the class provided.
That is, for these two jobs:
{ 'class' => 'UpdateGraph', 'args' => ['defunkt'] } { 'class' => 'UpdateGraph', 'args' => ['mojombo'] }
The following call will remove both:
Resque::Job.destroy(queue, 'UpdateGraph')
Whereas specifying args will only remove the 2nd job:
Resque::Job.destroy(queue, 'UpdateGraph', 'mojombo')
This method can be potentially very slow and memory intensive, depending on the size of your queue, as it loads all jobs into a Ruby array before processing.
# File lib/resque/job.rb, line 181 def self.destroy(queue, klass, *args) klass = klass.to_s queue = "queue:#{queue}" destroyed = 0 if args.empty? redis.lrange(queue, 0, -1).each do |string| if decode(string)['class'] == klass destroyed += redis.lrem(queue, 0, string).to_i end end else destroyed += redis.lrem(queue, 0, encode(:class => klass, :args => args)) end destroyed end
Given a Ruby object, returns a string suitable for storage in a queue.
# File lib/resque/job.rb, line 50 def self.encode(object) if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load) MultiJson.dump object else MultiJson.encode object end end
# File lib/resque/job.rb, line 134 def initialize(queue, payload) @queue = queue @payload = payload @failure_hooks_ran = false end
# File lib/resque/job.rb, line 19 def self.redis Resque.redis end
Given a string queue name, returns an instance of Resque::Job if any jobs are available. If not, returns nil.
# File lib/resque/job.rb, line 201 def self.reserve(queue) return unless payload = Resque.pop(queue) new(queue, payload) end
Public Instance Methods
Equality
# File lib/resque/job.rb, line 313 def ==(other) queue == other.queue && payload_class == other.payload_class && args == other.args end
# File lib/resque/job.rb, line 327 def after_hooks @after_hooks ||= Plugin.after_hooks(payload_class) end
Returns an array of args represented in this job's payload.
# File lib/resque/job.rb, line 285 def args @payload['args'] end
# File lib/resque/job.rb, line 323 def around_hooks @around_hooks ||= Plugin.around_hooks(payload_class) end
# File lib/resque/job.rb, line 319 def before_hooks @before_hooks ||= Plugin.before_hooks(payload_class) end
Given a word with dashes, returns a camel cased version of it.
classify('job-name') # => 'JobName'
# File lib/resque/job.rb, line 76 def classify(dashed_word) dashed_word.split('-').each { |part| part[0] = part[0].chr.upcase }.join end
Tries to find a constant with the name specified in the argument string:
constantize(“Module”) # => Module constantize(“Test::Unit”) # => Test::Unit
The name is assumed to be the one of a top-level constant, no matter whether it starts with “::” or not. No lexical context is taken into account:
C = 'outside' module M
C = 'inside' C # => 'inside' constantize("C") # => 'outside', same as ::C
end
NameError is raised when the constant is unknown.
# File lib/resque/job.rb, line 97 def constantize(camel_cased_word) camel_cased_word = camel_cased_word.to_s if camel_cased_word.include?('-') camel_cased_word = classify(camel_cased_word) end names = camel_cased_word.split('::') names.shift if names.empty? || names.first.empty? constant = Object names.each do |name| args = Module.method(:const_get).arity != 1 ? [false] : [] if constant.const_defined?(name, *args) constant = constant.const_get(name) else constant = constant.const_missing(name) end end constant end
Given a string, returns a Ruby object.
# File lib/resque/job.rb, line 34 def decode(object) return unless object begin if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load) MultiJson.load object else MultiJson.decode object end rescue ::MultiJson::DecodeError => e raise DecodeException, e.message, e.backtrace end end
Given a Ruby object, returns a string suitable for storage in a queue.
# File lib/resque/job.rb, line 25 def encode(object) if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load) MultiJson.dump object else MultiJson.encode object end end
Given an exception object, hands off the needed parameters to the Failure module.
# File lib/resque/job.rb, line 291 def fail(exception) run_failure_hooks(exception) Failure.create \ :payload => payload, :exception => exception, :worker => worker, :queue => queue end
# File lib/resque/job.rb, line 331 def failure_hooks @failure_hooks ||= Plugin.failure_hooks(payload_class) end
# File lib/resque/job.rb, line 278 def has_payload_class? payload_class != Object rescue NameError false end
String representation
# File lib/resque/job.rb, line 307 def inspect obj = @payload "(Job{%s} | %s | %s)" % [ @queue, obj['class'], obj['args'].inspect ] end
Returns the actual class constant represented in this job's payload.
# File lib/resque/job.rb, line 267 def payload_class @payload_class ||= constantize(@payload['class']) end
Returns the payload class as a string without raising NameError
# File lib/resque/job.rb, line 272 def payload_class_name payload_class.to_s rescue NameError 'No Name' end
Attempts to perform the work represented by this job instance. Calls perform on the class given in the payload with the arguments given in the payload.
# File lib/resque/job.rb, line 209 def perform job = payload_class job_args = args || [] job_was_performed = false begin # Execute before_perform hook. Abort the job gracefully if # Resque::DontPerform is raised. begin before_hooks.each do |hook| job.send(hook, *job_args) end rescue DontPerform return false end # Execute the job. Do it in an around_perform hook if available. if around_hooks.empty? job.perform(*job_args) job_was_performed = true else # We want to nest all around_perform plugins, with the last one # finally calling perform stack = around_hooks.reverse.inject(nil) do |last_hook, hook| if last_hook lambda do job.send(hook, *job_args) { last_hook.call } end else lambda do job.send(hook, *job_args) do result = job.perform(*job_args) job_was_performed = true result end end end end stack.call end # Execute after_perform hook after_hooks.each do |hook| job.send(hook, *job_args) end # Return true if the job was performed return job_was_performed # If an exception occurs during the job execution, look for an # on_failure hook then re-raise. rescue Object => e run_failure_hooks(e) raise e end end
Creates an identical job, essentially placing this job back on the queue.
# File lib/resque/job.rb, line 302 def recreate self.class.create(queue, payload_class, *args) end
# File lib/resque/job.rb, line 15 def redis Resque.redis end
# File lib/resque/job.rb, line 335 def run_failure_hooks(exception) begin job_args = args || [] if has_payload_class? failure_hooks.each { |hook| payload_class.send(hook, exception, *job_args) } unless @failure_hooks_ran end ensure @failure_hooks_ran = true end end