73 lines
1.7 KiB
Ruby
73 lines
1.7 KiB
Ruby
require 'active_job/logging'
|
|
require 'logstash-event'
|
|
|
|
class ActiveJobLogSubscriber < ::ActiveJob::Logging::LogSubscriber
|
|
def enqueue(event)
|
|
process_event(event, 'enqueue')
|
|
end
|
|
|
|
def enqueue_at(event)
|
|
process_event(event, 'enqueue_at')
|
|
end
|
|
|
|
def perform(event)
|
|
process_event(event, 'perform')
|
|
end
|
|
|
|
def perform_start(event)
|
|
process_event(event, 'perform_start')
|
|
end
|
|
|
|
def log(data)
|
|
event = LogStash::Event.new(data)
|
|
event['message'] = "#{data[:job_class]}##{data[:job_id]} at #{data[:scheduled_at]}"
|
|
logger.send(Lograge.log_level, event.to_json)
|
|
end
|
|
|
|
def logger
|
|
Lograge.logger.presence || super
|
|
end
|
|
|
|
private
|
|
|
|
def process_event(event, type)
|
|
data = extract_metadata(event)
|
|
data.merge! extract_exception(event)
|
|
data.merge! extract_scheduled_at(event) if type == 'enqueue_at'
|
|
data.merge! extract_duration(event) if type == 'perform'
|
|
|
|
tags = ['job', type]
|
|
tags.push('exception') if data[:exception]
|
|
data[:tags] = tags
|
|
data[:type] = 'tps'
|
|
data[:source] = ENV['SOURCE']
|
|
|
|
log(data)
|
|
end
|
|
|
|
def extract_metadata(event)
|
|
{
|
|
job_id: event.payload[:job].job_id,
|
|
queue_name: queue_name(event),
|
|
job_class: event.payload[:job].class.to_s,
|
|
job_args: args_info(event.payload[:job]),
|
|
}
|
|
end
|
|
|
|
def extract_duration(event)
|
|
{ duration: event.duration.to_f.round(2) }
|
|
end
|
|
|
|
def extract_exception(event)
|
|
event.payload.slice(:exception)
|
|
end
|
|
|
|
def extract_scheduled_at(event)
|
|
{ scheduled_at: scheduled_at(event) }
|
|
end
|
|
|
|
# The default args_info makes a string. We need objects to turn into JSON.
|
|
def args_info(job)
|
|
job.arguments.map { |arg| arg.try(:to_global_id).try(:to_s) || arg }
|
|
end
|
|
end
|