/usr/share/doc/ruby-amqp/examples/queues/using_explicit_acknowledgements.rb is in ruby-amqp 0.9.5-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 | #!/usr/bin/env ruby
# encoding: utf-8
require "bundler"
Bundler.setup
$:.unshift(File.expand_path("../../../lib", __FILE__))
require 'amqp'
puts "=> Subscribing for messages using explicit acknowledgements model"
puts
# this example uses Kernel#sleep and thus we must run EventMachine reactor in
# a separate thread, or nothing will be sent/received while we sleep() on the current thread.
t = Thread.new { EventMachine.run }
sleep(0.5)
# open two connections to imitate two apps
connection1 = AMQP.connect
connection2 = AMQP.connect
connection3 = AMQP.connect
channel_exception_handler = Proc.new { |ch, channel_close| EventMachine.stop; raise "channel error: #{channel_close.reply_text}" }
# open two channels
channel1 = AMQP::Channel.new(connection1)
channel1.on_error(&channel_exception_handler)
# first app will be given up to 3 messages at a time. If it doesn't
# ack any messages after it was delivered 3, messages will be routed to
# the app #2.
channel1.prefetch(3)
channel2 = AMQP::Channel.new(connection2)
channel2.on_error(&channel_exception_handler)
# app #2 processes messages one-by-one and has to send and ack every time
channel2.prefetch(1)
# app 3 will just publish messages
channel3 = AMQP::Channel.new(connection3)
channel3.on_error(&channel_exception_handler)
exchange = channel3.direct("amq.direct")
queue1 = channel1.queue("amqpgem.examples.acknowledgements.explicit", :auto_delete => false)
# purge the queue so that we don't get any redeliveries from previous runs
queue1.purge
queue1.bind(exchange).subscribe(:ack => true) do |metadata, payload|
# do some work
sleep(0.2)
# acknowledge some messages, they will be removed from the queue
if rand > 0.5
# FYI: there is a shortcut, metadata.ack
channel1.acknowledge(metadata.delivery_tag, false)
puts "[consumer1] Got message ##{metadata.headers['i']}, ack-ed"
else
# some messages are not ack-ed and will remain in the queue for redelivery
# when app #1 connection is closed (either properly or due to a crash)
puts "[consumer1] Got message ##{metadata.headers['i']}, SKIPPPED"
end
end
queue2 = channel2.queue!("amqpgem.examples.acknowledgements.explicit", :auto_delete => false)
queue2.subscribe(:ack => true) do |metadata, payload|
metadata.ack
# app 2 always acks messages
puts "[consumer2] Received #{payload}, redelivered = #{metadata.redelivered}, ack-ed"
end
# after some time one of the consumers quits/crashes
EventMachine.add_timer(4.0) {
connection1.close
puts "----- Connection 1 is now closed (we pretend that it has crashed) -----"
}
EventMachine.add_timer(10.0) do
# purge the queue so that we don't get any redeliveries on the next run
queue2.purge {
connection2.close {
connection3.close { EventMachine.stop }
}
}
end
i = 0
EventMachine.add_periodic_timer(0.8) {
3.times do
exchange.publish("Message ##{i}", :headers => { :i => i })
i += 1
end
}
t.join
|