/usr/lib/ruby/vendor_ruby/amqp/deprecated/rpc.rb is in ruby-amqp 0.9.5-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 | # encoding: utf-8
module AMQP
if defined?(BasicObject)
# @private
class BlankSlate < BasicObject; end
else
# @private
class BlankSlate
instance_methods.each { |m| undef_method m unless m =~ /^__/ }
end
end
# Basic RPC (remote procedure call) facility.
#
# Needs more detail and explanation.
#
# EM.run do
# server = AMQP::Channel.new.rpc('hash table node', Hash)
#
# client = AMQP::Channel.new.rpc('hash table node')
# client[:now] = Time.now
# client[:one] = 1
#
# client.values do |res|
# p 'client', :values => res
# end
#
# client.keys do |res|
# p 'client', :keys => res
# EM.stop_event_loop
# end
# end
#
#
# @note This class will be removed before 1.0 release.
# @deprecated
# @private
class RPC < ::AMQP::BlankSlate
#
# API
#
attr_reader :name
# Takes a channel, queue and optional object.
#
# The optional object may be a class name, module name or object
# instance. When given a class or module name, the object is instantiated
# during this setup. The passed queue is automatically subscribed to so
# it passes all messages (and their arguments) to the object.
#
# Marshalling and unmarshalling the objects is handled internally. This
# marshalling is subject to the same restrictions as defined in the
# {http://ruby-doc.org/core/classes/Marshal.html Marshal} standard
# library. See that documentation for further reference.
#
# When the optional object is not passed, the returned rpc reference is
# used to send messages and arguments to the queue. See #method_missing
# which does all of the heavy lifting with the proxy. Some client
# elsewhere must call this method *with* the optional block so that
# there is a valid destination. Failure to do so will just enqueue
# marshalled messages that are never consumed.
#
def initialize(channel, queue, obj = nil)
@name = queue
@channel = channel
@channel.register_rpc(self)
if @obj = normalize(obj)
@delegate = Server.new(channel, queue, @obj)
else
@delegate = Client.new(channel, queue)
end
end
def client?
@obj.nil?
end
def server?
!client?
end
def method_missing(selector, *args, &block)
@delegate.__send__(selector, *args, &block)
end
# @private
class Client
attr_accessor :identifier
def initialize(channel, server_queue_name)
@channel = channel
@exchange = AMQP::Exchange.default(@channel)
@server_queue_name = server_queue_name
@handlers = Hash.new
@queue = channel.queue("__amqp_gem_rpc_client_#{rand(1_000_000)}", :auto_delete => true)
@queue.subscribe do |header, payload|
*response_args = Marshal.load(payload)
handler = @handlers[header.message_id]
handler.call(*response_args)
end
end
def method_missing(selector, *args, &block)
@channel.once_open do
message_id = "message_identifier_#{rand(1_000_000)}"
if block
@handlers[message_id] = block
@exchange.publish(Marshal.dump([selector, *args]), :routing_key => @server_queue_name, :reply_to => @queue.name, :message_id => message_id)
else
@exchange.publish(Marshal.dump([selector, *args]), :routing_key => @server_queue_name, :message_id => message_id)
end
end
end
end # Client
# @private
class Server
def initialize(channel, queue_name, impl)
@channel = channel
@exchange = AMQP::Exchange.default(@channel)
@queue = @channel.queue(queue_name)
@impl = impl
@handlers = Hash.new
@id = "client_identifier_#{rand(1_000_000)}"
@queue.subscribe(:ack => true) do |header, payload|
selector, *args = Marshal.load(payload)
result = @impl.__send__(selector, *args)
respond_to(header, result) if header.to_hash[:reply_to]
header.ack
end
end
def respond_to(header, result)
@exchange.publish(Marshal.dump(result), :message_id => header.message_id, :routing_key => header.reply_to)
end
end # Server
protected
def normalize(input)
case input
when ::Class
input.new
when ::Module
(::Class.new do include(obj) end).new
else
input
end
end
end # RPC
end # AMQP
|