/usr/lib/ruby/vendor_ruby/amqp/connection.rb is in ruby-amqp 0.9.5-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 | # encoding: utf-8
require "amqp/client"
# Top-level namespace of amqp gem. Please refer to "See also" section below.
#
# @see AMQP.connect
# @see AMQP.start
# @see AMQP::Channel
# @see AMQP::Exchange
# @see AMQP::Queue
module AMQP
# Starts EventMachine event loop unless it is already running and connects
# to AMQP broker using {AMQP.connect}. It is generally a good idea to
# start EventMachine event loop in a separate thread and use {AMQP.connect}
# (for Web applications that do not use Thin or Goliath, it is the only option).
#
# See {AMQP.connect} for information about arguments this method takes and
# information about relevant topics such as authentication failure handling.
#
# @example Using AMQP.start to connect to AMQP broker, EventMachine loop isn't yet running
# AMQP.start do |connection|
# # default is to connect to localhost:5672, to root ("/") vhost as guest/guest
#
# # this block never exits unless either AMQP.stop or EM.stop
# # is called.
#
# AMQP::Channel(connection) do |channel|
# channel.queue("", :auto_delete => true).bind(channel.fanout("amq.fanout")).subscribe do |headers, payload|
# # handle deliveries here
# end
# end
# end
#
# @api public
def self.start(connection_options_or_string = {}, other_options = {}, &block)
EM.run do
if !@connection || @connection.closed? || @connection.closing?
@connection = connect(connection_options_or_string, other_options, &block)
end
@channel = Channel.new(@connection)
@connection
end
end
# Alias for {AMQP.start}
# @api public
def self.run(*args, &block)
self.start(*args, &block)
end
# Properly closes default AMQP connection and then underlying TCP connection.
# Pass it a block if you want a piece of code to be run once default connection
# is successfully closed.
#
# @note If default connection was never estabilished or is in the closing state already,
# this method has no effect.
# @api public
def self.stop(reply_code = 200, reply_text = "Goodbye", &block)
return if @connection.nil? || self.closing?
EM.next_tick do
if AMQP.channel and AMQP.channel.open? and AMQP.channel.connection.open?
AMQP.channel.close
end
AMQP.channel = nil
shim = Proc.new {
block.call
AMQP.connection = nil
}
@connection.disconnect(reply_code, reply_text, &shim)
end
end
# Indicates that default connection is closing.
#
# @return [Boolean]
# @api public
def self.closing?
@connection.closing?
end
# @return [Boolean] Current global logging value
# @api public
def self.logging
self.settings[:logging]
end
# @return [Boolean] Sets current global logging value
# @api public
def self.logging=(value)
self.settings[:logging] = !!value
end
# Default connection. When you do not pass connection instance to methods like
# {Channel#initialize}, AMQP gem will use this default connection.
#
# @api public
def self.connection
@connection
end
# "Default channel". A placeholder for apps that only want to use one channel. This channel is not global, *not* used
# under the hood by methods like {AMQP::Exchange#initialize} and only shared by exchanges/queues you decide on.
# To reiterate: this is only a conventience accessor, since many apps (especially Web apps) can get by with just one
# connection and one channel.
#
# @api public
def self.channel
@channel
end
# A placeholder for applications that only need one channel. If you use {AMQP.start} to set up default connection,
# {AMQP.channel} is open on that connection, but can be replaced by your application.
#
#
# @see AMQP.channel
# @api public
def self.channel=(value)
@channel = value
end
# Sets global connection object.
# @api public
def self.connection=(value)
@connection = value
end
# Alias for {AMQP.connection}
# @deprecated
# @api public
def self.conn
warn "AMQP.conn will be removed in 1.0. Please use AMQP.connection."
@connection
end
# Alias for {AMQP.connection=}
# @deprecated
# @api public
def self.conn=(value)
warn "AMQP.conn= will be removed in 1.0. Please use AMQP.connection=(connection)."
self.connection = value
end
# Connects to AMQP broker and yields connection object to the block as soon
# as connection is considered open.
#
#
# @example Using AMQP.connect with default connection settings
#
# AMQP.connect do |connection|
# AMQP::Channel.new(connection) do |channel|
# # channel is ready: set up your messaging flow by creating exchanges,
# # queues, binding them together and so on.
# end
# end
#
# @example Using AMQP.connect to connect to a public RabbitMQ instance with connection settings given as a hash
#
# AMQP.connect(:host => "dev.rabbitmq.com", :username => "guest", :password => "guest") do |connection|
# AMQP::Channel.new(connection) do |channel|
# # ...
# end
# end
#
#
# @example Using AMQP.connect to connect to a public RabbitMQ instance with connection settings given as a URI
#
# AMQP.connect "amqp://guest:guest@dev.rabbitmq.com:5672", :on_possible_authentication_failure => Proc.new { puts("Looks like authentication has failed") } do |connection|
# AMQP::Channel.new(connection) do |channel|
# # ...
# end
# end
#
#
# @overload connect(connection_string, options = {})
# Used to pass connection parameters as a connection string
# @param [String] :connection_string AMQP connection URI, à la JDBC connection string. For example: amqp://bus.megacorp.internal:5877/qa
#
#
# @overload connect(connection_options)
# Used to pass connection options as a Hash.
# @param [Hash] :connection_options AMQP connection options (:host, :port, :username, :vhost, :password)
#
# @option connection_options_or_string [String] :host ("localhost") Host to connect to.
# @option connection_options_or_string [Integer] :port (5672) Port to connect to.
# @option connection_options_or_string [String] :vhost ("/") Virtual host to connect to.
# @option connection_options_or_string [String] :username ("guest") Username to use. Also can be specified as :user.
# @option connection_options_or_string [String] :password ("guest") Password to use. Also can be specified as :pass.
# @option connection_options_or_string [Hash] :ssl TLS (SSL) parameters to use.
# @option connection_options_or_string [#call] :on_tcp_connection_failure A callable object that will be run if connection to server fails
# @option connection_options_or_string [#call] :on_possible_authentication_failure A callable object that will be run if authentication fails (see Authentication failure section)
#
#
# h2. Handling authentication failures
#
# AMQP 0.9.1 specification dictates that broker closes TCP connection when it detects that authentication
# has failed. However, broker does exactly the same thing when other connection-level exception occurs
# so there is no way to guarantee that connection was closed because of authentication failure.
#
# Because of that, AMQP gem follows Java client example and hints at _possibility_ of authentication failure.
# To handle it, pass a callable object (a proc, a lambda, an instance of a class that responds to #call)
# with :on_possible_authentication_failure option.
#
# @note This method assumes that EventMachine even loop is already running. If it is not the case or you are not sure, we recommend you use {AMQP.start} instead.
# It takes exactly the same parameters.
# @return [AMQP::Session]
# @api public
def self.connect(connection_options_or_string = {}, other_options = {}, &block)
Client.connect(connection_options_or_string, other_options, &block)
end
# @return [Hash] Default AMQP connection settings. This hash may be modified.
# @api public
def self.settings
@settings ||= AMQ::Client::Settings.default
end
end # AMQP
|