/usr/share/doc/ruby-amqp/examples/legacy/ack.rb is in ruby-amqp 0.9.5-2.
This file is owned by root:root, with mode 0o755.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 | #!/usr/bin/env ruby
# encoding: utf-8
require "bundler"
Bundler.setup
$:.unshift(File.expand_path("../../../lib", __FILE__))
require 'amqp'
AMQP.start do |connection|
puts "Connected!"
channel = AMQP::Channel.new(connection)
e = channel.fanout("amqp-gem.examples.ack")
q = channel.queue('amqp-gem.examples.q1').bind(e) { puts "Bound #{e.name} to the queue" }
q.status do |message_count, consumer_count|
puts "Queue #{q.name} has #{message_count} messages and #{consumer_count} consumers"
end
i = 0
# Stopping after the second item was acked will keep the 3rd item in the queue
q.subscribe(:ack => true) do |h, m|
puts "Got a message"
if AMQP.closing?
puts "#{m} (ignored, redelivered later)"
else
puts m
h.ack
end
end # channel.queue
10.times do |i|
puts "Publishing message ##{i}"
e.publish("Totally rad #{i}")
end
show_stopper = Proc.new {
q.status do |message_count, consumer_count|
puts "Queue #{q.name} has #{message_count} messages and #{consumer_count} consumers"
end
q.unbind(e) do
puts "Unbound #{q.name} from #{e.name}"
e.delete do
puts "Just deleted #{e.name}"
end
q.delete do
puts "Just deleted #{q.name}"
AMQP.stop do
puts "About to stop EM reactor"
EM.stop
end
end
end
}
EM.add_timer(3, show_stopper)
# For ack to work appropriately you must shutdown AMQP gracefully,
# otherwise all items in your queue will be returned
Signal.trap('INT', show_stopper)
Signal.trap('TERM', show_stopper)
end # AMQP.start
|