/usr/lib/ruby/vendor_ruby/amqp/consumer.rb is in ruby-amqp 0.9.5-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 | # encoding: utf-8
require "amq/client/async/consumer"
module AMQP
# AMQP consumers are entities that handle messages delivered to them ("push API" as opposed to "pull API") by AMQP broker.
# Every consumer is associated with a queue. Consumers can be exclusive (no other consumers can be registered for the same queue)
# or not (consumers share the queue). In the case of multiple consumers per queue, messages are distributed in round robin
# manner with respect to channel-level prefetch setting).
#
# @see AMQP::Queue
# @see AMQP::Queue#subscribe
# @see AMQP::Queue#cancel
class Consumer < AMQ::Client::Async::Consumer
#
# API
#
# @return [AMQP::Channel] Channel this consumer uses
attr_reader :channel
# @return [AMQP::Queue] Queue messages are consumed from
attr_reader :queue
# @return [String] Consumer tag, unique consumer identifier
attr_reader :consumer_tag
# @return [Hash] Custom subscription metadata
attr_reader :arguments
# @return [AMQ::Client::ConsumerTagGenerator] Consumer tag generator
def self.tag_generator
@tag_generator ||= AMQ::Client::ConsumerTagGenerator.new
end # self.tag_generator
# @param [AMQ::Client::ConsumerTagGenerator] Assigns consumer tag generator that will be used by consumer instances
# @return [AMQ::Client::ConsumerTagGenerator] Provided argument
def self.tag_generator=(generator)
@tag_generator = generator
end
def initialize(channel, queue, consumer_tag = nil, exclusive = false, no_ack = false, arguments = {}, no_local = false)
super(channel, queue, (consumer_tag || self.class.tag_generator.generate_for(queue)), exclusive, no_ack, arguments, no_local)
end # initialize
# @return [Boolean] true if this consumer is exclusive (other consumers for the same queue are not allowed)
def exclusive?
super
end # exclusive?
# Begin consuming messages from the queue
# @return [AMQP::Consumer] self
def consume(nowait = false, &block)
@channel.once_open do
@queue.once_declared do
super(nowait, &block)
end
end
self
end # consume(nowait = false, &block)
# Used by automatic recovery code.
# @api plugin
# @return [AMQP::Consumer] self
def resubscribe(&block)
@channel.once_open do
@queue.once_declared do
self.unregister_with_channel
@consumer_tag = self.class.tag_generator.generate_for(@queue)
self.register_with_channel
super(&block)
end
end
self
end # resubscribe(&block)
# @return [AMQP::Consumer] self
def cancel(nowait = false, &block)
@channel.once_open do
@queue.once_declared do
super(nowait, &block)
end
end
self
end # cancel(nowait = false, &block)
# {AMQP::Queue} API compatibility.
#
# @return [Boolean] true if this consumer is active (subscribed for message delivery)
# @api public
def subscribed?
!@callbacks[:delivery].empty?
end # subscribed?
# Legacy {AMQP::Queue} API compatibility.
# @private
# @deprecated
def callback
if @callbacks[:delivery]
@callbacks[:delivery].first
end
end # callback
# Register a block that will be used to handle delivered messages.
#
# @return [AMQP::Consumer] self
# @see AMQP::Queue#subscribe
def on_delivery(&block)
# We have to maintain this multiple arities jazz
# because older versions this gem are used in examples in at least 3
# books published by O'Reilly :(. MK.
delivery_shim = Proc.new { |basic_deliver, headers, payload|
case block.arity
when 1 then
block.call(payload)
when 2 then
h = Header.new(@channel, basic_deliver, headers.decode_payload)
block.call(h, payload)
else
h = Header.new(@channel, basic_deliver, headers.decode_payload)
block.call(h, payload, basic_deliver.consumer_tag, basic_deliver.delivery_tag, basic_deliver.redelivered, basic_deliver.exchange, basic_deliver.routing_key)
end
}
super(&delivery_shim)
end # on_delivery(&block)
# @group Acknowledging & Rejecting Messages
# Acknowledge a delivery tag.
# @return [Consumer] self
#
# @api public
# @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.13.)
def acknowledge(delivery_tag)
super(delivery_tag)
end # acknowledge(delivery_tag)
#
# @return [Consumer] self
#
# @api public
# @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.14.)
def reject(delivery_tag, requeue = true)
super(delivery_tag, requeue)
end # reject(delivery_tag, requeue = true)
# @endgroup
# @group Error Handling & Recovery
# Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure).
# Only one callback can be defined (the one defined last replaces previously added ones).
#
# @api public
def on_connection_interruption(&block)
super(&block)
end # on_connection_interruption(&block)
alias after_connection_interruption on_connection_interruption
# Defines a callback that will be executed after TCP connection is recovered after a network failure
# but before AMQP connection is re-opened.
# Only one callback can be defined (the one defined last replaces previously added ones).
#
# @api public
def before_recovery(&block)
super(&block)
end # before_recovery(&block)
# Defines a callback that will be executed when AMQP connection is recovered after a network failure..
# Only one callback can be defined (the one defined last replaces previously added ones).
#
# @api public
def on_recovery(&block)
super(&block)
end # on_recovery(&block)
alias after_recovery on_recovery
# Called by associated connection object when AMQP connection has been re-established
# (for example, after a network failure).
#
# @api plugin
def auto_recover
super
end # auto_recover
# @endgroup
# @return [String] Readable representation of relevant object state.
def inspect
"#<AMQP::Consumer:#{@consumer_tag}> queue=#{@queue.name} channel=#{@channel.id} callbacks=#{@callbacks.inspect}"
end # inspect
end # Consumer
end # AMQP
|