/usr/share/doc/ruby-amqp/examples/guides/queues/13_objects_that_consume_messages_take_two.rb is in ruby-amqp 0.9.5-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 | #!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "amqp"
class Consumer
#
# API
#
def handle_message(metadata, payload)
puts "Received a message: #{payload}, content_type = #{metadata.content_type}"
end # handle_message(metadata, payload)
end
class Worker
#
# API
#
def initialize(channel, queue_name = AMQ::Protocol::EMPTY_STRING, consumer = Consumer.new)
@queue_name = queue_name
@channel = channel
@channel.on_error(&method(:handle_channel_exception))
@consumer = consumer
end # initialize
def start
@queue = @channel.queue(@queue_name, :exclusive => true)
@queue.subscribe(&@consumer.method(:handle_message))
end # start
#
# Implementation
#
def handle_channel_exception(channel, channel_close)
puts "Oops... a channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}"
end # handle_channel_exception(channel, channel_close)
end
class Producer
#
# API
#
def initialize(channel, exchange)
@channel = channel
@exchange = exchange
end # initialize(channel, exchange)
def publish(message, options = {})
@exchange.publish(message, options)
end # publish(message, options = {})
#
# Implementation
#
def handle_channel_exception(channel, channel_close)
puts "Oops... a channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}"
end # handle_channel_exception(channel, channel_close)
end
AMQP.start("amqp://guest:guest@dev.rabbitmq.com") do |connection, open_ok|
channel = AMQP::Channel.new(connection)
worker = Worker.new(channel, "amqpgem.objects.integration")
worker.start
producer = Producer.new(channel, channel.default_exchange)
puts "Publishing..."
producer.publish("Hello, world", :routing_key => "amqpgem.objects.integration")
# stop in 2 seconds
EventMachine.add_timer(2.0) { connection.close { EventMachine.stop } }
end
|