This file is indexed.

/usr/lib/ruby/vendor_ruby/amqp/channel.rb is in ruby-amqp 0.9.5-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

   1
   2
   3
   4
   5
   6
   7
   8
   9
  10
  11
  12
  13
  14
  15
  16
  17
  18
  19
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
# encoding: utf-8

require "amqp/int_allocator"
require "amqp/exchange"
require "amqp/queue"

module AMQP
  # h2. What are AMQP channels
  #
  # To quote {http://bit.ly/hw2ELX AMQP 0.9.1 specification}:
  #
  # AMQP is a multi-channelled protocol. Channels provide a way to multiplex
  # a heavyweight TCP/IP connection into several light weight connections.
  # This makes the protocol more “firewall friendly” since port usage is predictable.
  # It also means that traffic shaping and other network QoS features can be easily employed.
  # Channels are independent of each other and can perform different functions simultaneously
  # with other channels, the available bandwidth being shared between the concurrent activities.
  #
  # h2. Opening a channel
  #
  # *Channels are opened asynchronously*. There are two ways to do it: using a callback or pseudo-synchronous mode.
  #
  # @example Opening a channel with a callback
  #   # this assumes EventMachine reactor is running
  #   AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client|
  #     AMQP::Channel.new(client) do |channel, open_ok|
  #       # when this block is executed, channel is open and ready for use
  #     end
  #   end
  #
  # <script src="https://gist.github.com/939480.js?file=gistfile1.rb"></script>
  #
  # Unless your application needs multiple channels, this approach is recommended. Alternatively,
  # AMQP::Channel can be instantiated without a block. Then returned channel is not immediately open,
  # however, it can be used as if it was a synchronous, blocking method:
  #
  # @example Instantiating a channel that will be open eventually
  #   # this assumes EventMachine reactor is running
  #   AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client|
  #     channel  = AMQP::Channel.new(client)
  #     exchange = channel.default_exchange
  #
  #     # ...
  #   end
  #
  # <script src="https://gist.github.com/939482.js?file=gistfile1.rb"></script>
  #
  # Even though in the example above channel isn't immediately open, it is safe to declare exchanges using
  # it. Exchange declaration will be delayed until after channel is open. Same applies to queue declaration
  # and other operations on exchanges and queues. Library methods that rely on channel being open will be
  # enqueued and executed in a FIFO manner when broker confirms channel opening.
  # Note, however, that *this "pseudo-synchronous mode" is easy to abuse and introduce race conditions AMQP gem
  # cannot resolve for you*. AMQP is an inherently asynchronous protocol and AMQP gem embraces this fact.
  #
  #
  # h2. Key methods
  #
  # Key methods of Channel class are
  #
  # * {Channel#queue}
  # * {Channel#default_exchange}
  # * {Channel#direct}
  # * {Channel#fanout}
  # * {Channel#topic}
  # * {Channel#close}
  #
  # refer to documentation for those methods for usage examples.
  #
  # Channel provides a number of convenience methods that instantiate queues and exchanges
  # of various types associated with this channel:
  #
  # * {Channel#queue}
  # * {Channel#default_exchange}
  # * {Channel#direct}
  # * {Channel#fanout}
  # * {Channel#topic}
  #
  #
  # h2. Error handling
  #
  # It is possible (and, indeed, recommended) to handle channel-level exceptions by defining an errback using #on_error:
  #
  # @example Queue declaration with incompatible attributes results in a channel-level exception
  #   AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672") do |connection, open_ok|
  #     AMQP::Channel.new do |channel, open_ok|
  #       puts "Channel ##{channel.id} is now open!"
  #
  #       channel.on_error do |ch, close|
  #         puts "Handling channel-level exception"
  #
  #         connection.close {
  #           EM.stop { exit }
  #         }
  #       end
  #
  #       EventMachine.add_timer(0.4) do
  #         # these two definitions result in a race condition. For sake of this example,
  #         # however, it does not matter. Whatever definition succeeds first, 2nd one will
  #         # cause a channel-level exception (because attributes are not identical)
  #         AMQP::Queue.new(channel, "amqpgem.examples.channel_exception", :auto_delete => true, :durable => false) do |queue|
  #           puts "#{queue.name} is ready to go"
  #         end
  #
  #         AMQP::Queue.new(channel, "amqpgem.examples.channel_exception", :auto_delete => true, :durable => true) do |queue|
  #           puts "#{queue.name} is ready to go"
  #         end
  #       end
  #     end
  #   end
  #
  # <script src="https://gist.github.com/939490.js?file=gistfile1.rb"></script>
  #
  # When channel-level exception is indicated by the broker and errback defined using #on_error is run, channel is already
  # closed and all queue and exchange objects associated with this channel are reset. The recommended way to recover from
  # channel-level exceptions is to open a new channel and re-instantiate queues, exchanges and bindings your application
  # needs.
  #
  #
  #
  # h2. Closing a channel
  #
  # Channels are opened when objects is instantiated and closed using {#close} method when application no longer
  # needs it.
  #
  # @example Closing a channel your application no longer needs
  #   # this assumes EventMachine reactor is running
  #   AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client|
  #     AMQP::Channel.new(client) do |channel, open_ok|
  #       channel.close do |close_ok|
  #         # when this block is executed, channel is successfully closed
  #       end
  #     end
  #   end
  #
  # <script src="https://gist.github.com/939483.js?file=gistfile1.rb"></script>
  #
  #
  # h2. RabbitMQ extensions.
  #
  # AMQP gem supports several RabbitMQ extensions taht extend Channel functionality.
  # Learn more in {file:docs/VendorSpecificExtensions.textile}
  #
  # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.2.5)
  class Channel < AMQ::Client::Channel

    #
    # API
    #

    # AMQP connection this channel is part of
    # @return [Connection]
    attr_reader :connection
    alias :conn :connection

    # Status of this channel (one of: :opening, :closing, :open, :closed)
    # @return [Symbol]
    attr_reader :status


    # @param [AMQP::Session] connection Connection to open this channel on. If not given, default AMQP
    #                                   connection (accessible via {AMQP.connection}) will be used.
    # @param [Integer]       id         Channel id. Must not be greater than max channel id client and broker
    #                                   negotiated on during connection setup. Almost always the right thing to do
    #                                   is to let AMQP gem pick channel identifier for you. If you want to get next
    #                                   channel id, use {AMQP::Channel.next_channel_id} (it is thread-safe).
    # @param [Hash]          options    A hash of options
    #
    # @example Instantiating a channel for default connection (accessible as AMQP.connection)
    #
    #   AMQP.connect do |connection|
    #     AMQP::Channel.new(connection) do |channel, open_ok|
    #       # channel is ready: set up your messaging flow by creating exchanges,
    #       # queues, binding them together and so on.
    #     end
    #   end
    #
    # @example Instantiating a channel for explicitly given connection
    #
    #   AMQP.connect do |connection|
    #     AMQP::Channel.new(connection) do |channel, open_ok|
    #       # ...
    #     end
    #   end
    #
    # @example Instantiating a channel with a :prefetch option
    #
    #   AMQP.connect do |connection|
    #     AMQP::Channel.new(connection, AMQP::Channel.next_channel_id, :prefetch => 5) do |channel, open_ok|
    #       # ...
    #     end
    #   end
    #
    #
    # @option options [Boolean] :prefetch (nil)  Specifies number of messages to prefetch. Channel-specific. See {AMQP::Channel#prefetch}.
    # @option options [Boolean] :auto_recovery (nil)  Turns on automatic network failure recovery mode for this channel.
    #
    # @yield [channel, open_ok] Yields open channel instance and AMQP method (channel.open-ok) instance. The latter is optional.
    # @yieldparam [Channel] channel Channel that is successfully open
    # @yieldparam [AMQP::Protocol::Channel::OpenOk] open_ok AMQP channel.open-ok) instance
    #
    #
    # @see AMQP::Channel#prefetch
    # @api public
    def initialize(connection = nil, id = self.class.next_channel_id, options = {}, &block)
      raise 'AMQP can only be used from within EM.run {}' unless EM.reactor_running?

      @connection = connection || AMQP.connection || AMQP.start
      # this means 2nd argument is options
      if id.kind_of?(Hash)
        options = options.merge(id)
        id      = self.class.next_channel_id
      end

      super(@connection, id, options)

      @rpcs                       = Hash.new
      # we need this deferrable to mimic what AMQP gem 0.7 does to enable
      # the following (pseudo-synchronous) style of programming some people use in their
      # existing codebases:
      #
      # connection = AMQP.connect
      # channel    = AMQP::Channel.new(connection)
      # queue      = AMQP::Queue.new(channel)
      #
      # ...
      #
      # Read more about EM::Deferrable#callback behavior in EventMachine documentation. MK.
      @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new

      # only send channel.open when connection is actually open. Makes it possible to
      # do c = AMQP.connect; AMQP::Channel.new(c) that is what some people do. MK.
      @connection.on_connection do
        self.open do |ch, open_ok|
          @channel_is_open_deferrable.succeed

          if block
            case block.arity
            when 1 then block.call(ch)
            else block.call(ch, open_ok)
            end # case
          end # if

          self.prefetch(options[:prefetch], false) if options[:prefetch]
        end # self.open
      end # @connection.on_open
    end

    # @return [Boolean] true if this channel is in automatic recovery mode
    # @see #auto_recovering?
    attr_accessor :auto_recovery

    # @return [Boolean] true if this channel uses automatic recovery mode
    def auto_recovering?
      @auto_recovery
    end # auto_recovering?

    # Called by associated connection object when AMQP connection has been re-established
    # (for example, after a network failure).
    #
    # @api plugin
    def auto_recover
      return unless auto_recovering?

      self.open do
        @channel_is_open_deferrable.succeed

        # re-establish prefetch
        self.prefetch(@options[:prefetch], false) if @options[:prefetch]

        # exchanges must be recovered first because queue recovery includes recovery of bindings. MK.
        @exchanges.each { |name, e| e.auto_recover }
        @queues.each    { |name, q| q.auto_recover }
      end
    end # auto_recover

    # Can be used to recover channels from channel-level exceptions. Allocates a new channel id and reopens
    # itself with this new id, releasing the old id after the new one is allocated.
    #
    # @api public
    def reuse
      old_id = @id
      # must release after we allocate a new id, otherwise we will end up
      # with the same value. MK.
      @id    = self.class.next_channel_id
      self.class.release_channel_id(old_id)

      self.open do
        @channel_is_open_deferrable.succeed

        # re-establish prefetch
        self.prefetch(@options[:prefetch], false) if @options[:prefetch]

        # exchanges must be recovered first because queue recovery includes recovery of bindings. MK.
        @exchanges.each { |name, e| e.auto_recover }
        @queues.each    { |name, q| q.auto_recover }
      end
    end # reuse



    # @group Declaring exchanges

    # Defines, intializes and returns a direct Exchange instance.
    #
    # Learn more about direct exchanges in {Exchange Exchange class documentation}.
    #
    #
    # @param [String] name (amq.direct) Exchange name.
    #
    # @option opts [Boolean] :passive (false)  If set, the server will not create the exchange if it does not
    #                                          already exist. The client can use this to check whether an exchange
    #                                          exists without modifying the server state.
    #
    # @option opts [Boolean] :durable (false)  If set when creating a new exchange, the exchange will be marked as
    #                                          durable. Durable exchanges and their bindings are recreated upon a server
    #                                          restart (information about them is persisted). Non-durable (transient) exchanges
    #                                          do not survive if/when a server restarts (information about them is stored exclusively
    #                                          in RAM).
    #
    #
    # @option opts [Boolean] :auto_delete  (false)  If set, the exchange is deleted when all queues have finished
    #                                               using it. The server waits for a short period of time before
    #                                               determining the exchange is unused to give time to the client code
    #                                               to bind a queue to it.
    #
    # @option opts [Boolean] :internal (default false)   If set, the exchange may not be used directly by publishers, but
    #                                                    only when bound to other exchanges. Internal exchanges are used to
    #                                                    construct wiring that is not visible to applications. This is a RabbitMQ-specific
    #                                                    extension.
    #
    # @option opts [Boolean] :nowait (true)              If set, the server will not respond to the method. The client should
    #                                                    not wait for a reply method.  If the server could not complete the
    #                                                    method it will raise a channel or connection exception.
    #
    #
    # @raise [AMQP::Error] Raised when exchange is redeclared with parameters different from original declaration.
    # @raise [AMQP::Error] Raised when exchange is declared with  :passive => true and the exchange does not exist.
    #
    #
    # @example Using default pre-declared direct exchange and no callbacks (pseudo-synchronous style)
    #
    #    # an exchange application A will be using to publish updates
    #    # to some search index
    #    exchange = channel.direct("index.updates")
    #
    #    # In the same (or different) process declare a queue that broker will
    #    # generate name for, bind it to aforementioned exchange using method chaining
    #    queue    = channel.queue("").
    #                       # queue will be receiving messages that were published with
    #                       # :routing_key attribute value of "search.index.updates"
    #                       bind(exchange, :routing_key => "search.index.updates").
    #                       # register a callback that will be run when messages arrive
    #                       subscribe { |header, message| puts("Received #{message}") }
    #
    #    # now publish a new document contents for indexing,
    #    # message will be delivered to the queue we declared and bound on the line above
    #    exchange.publish(document.content, :routing_key => "search.index.updates")
    #
    #
    # @example Instantiating a direct exchange using {Channel#direct} with a callback
    #
    #   AMQP.connect do |connection|
    #     AMQP::Channel.new(connection) do |channel|
    #       channel.direct("email.replies_listener") do |exchange, declare_ok|
    #         # by now exchange is ready and waiting
    #       end
    #     end
    #   end
    #
    #
    # @see Channel#default_exchange
    # @see Exchange
    # @see Exchange#initialize
    # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 3.1.3.1)
    #
    # @return [Exchange]
    # @api public
    def direct(name = 'amq.direct', opts = {}, &block)
      if exchange = find_exchange(name)
        extended_opts = Exchange.add_default_options(:direct, name, opts, block)

        validate_parameters_match!(exchange, extended_opts)

        block.call(exchange) if block
        exchange
      else
        register_exchange(Exchange.new(self, :direct, name, opts, &block))
      end
    end

    # Returns exchange object with the same name as default (aka unnamed) exchange.
    # Default exchange is a direct exchange and automatically routes messages to
    # queues when routing key matches queue name exactly. This feature is known as
    # "automatic binding" (of queues to default exchange).
    #
    # *Use default exchange when you want to route messages directly to specific queues*
    # (queue names are known, you don't mind this kind of coupling between applications).
    #
    #
    # @example Using default exchange to publish messages to queues with known names
    #   AMQP.start(:host => 'localhost') do |connection|
    #     ch        = AMQP::Channel.new(connection)
    #
    #     queue1    = ch.queue("queue1").subscribe do |payload|
    #       puts "[#{queue1.name}] => #{payload}"
    #     end
    #     queue2    = ch.queue("queue2").subscribe do |payload|
    #       puts "[#{queue2.name}] => #{payload}"
    #     end
    #     queue3    = ch.queue("queue3").subscribe do |payload|
    #       puts "[#{queue3.name}] => #{payload}"
    #     end
    #     queues    = [queue1, queue2, queue3]
    #
    #     # Rely on default direct exchange binding, see section 2.1.2.4 Automatic Mode in AMQP 0.9.1 spec.
    #     exchange = AMQP::Exchange.default
    #     EM.add_periodic_timer(1) do
    #       q = queues.sample
    #
    #       exchange.publish "Some payload from #{Time.now.to_i}", :routing_key => q.name
    #     end
    #   end
    #
    #
    #
    # @see Exchange
    # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.1.2.4)
    #
    # @return [Exchange]
    # @api public
    def default_exchange
      @default_exchange ||= Exchange.default(self)
    end

    # Defines, intializes and returns a fanout Exchange instance.
    #
    # Learn more about fanout exchanges in {Exchange Exchange class documentation}.
    #
    #
    # @param [String] name (amq.fanout) Exchange name.
    #
    # @option opts [Boolean] :passive (false)  If set, the server will not create the exchange if it does not
    #                                          already exist. The client can use this to check whether an exchange
    #                                          exists without modifying the server state.
    #
    # @option opts [Boolean] :durable (false)  If set when creating a new exchange, the exchange will be marked as
    #                                          durable. Durable exchanges and their bindings are recreated upon a server
    #                                          restart (information about them is persisted). Non-durable (transient) exchanges
    #                                          do not survive if/when a server restarts (information about them is stored exclusively
    #                                          in RAM).
    #
    #
    # @option opts [Boolean] :auto_delete  (false)  If set, the exchange is deleted when all queues have finished
    #                                               using it. The server waits for a short period of time before
    #                                               determining the exchange is unused to give time to the client code
    #                                               to bind a queue to it.
    #
    # @option opts [Boolean] :internal (default false)   If set, the exchange may not be used directly by publishers, but
    #                                                    only when bound to other exchanges. Internal exchanges are used to
    #                                                    construct wiring that is not visible to applications. This is a RabbitMQ-specific
    #                                                    extension.
    #
    # @option opts [Boolean] :nowait (true)              If set, the server will not respond to the method. The client should
    #                                                    not wait for a reply method.  If the server could not complete the
    #                                                    method it will raise a channel or connection exception.
    #
    #
    # @raise [AMQP::Error] Raised when exchange is redeclared with parameters different from original declaration.
    # @raise [AMQP::Error] Raised when exchange is declared with  :passive => true and the exchange does not exist.
    #
    #
    # @example Using fanout exchange to deliver messages to multiple consumers
    #
    #   # open up a channel
    #   # declare a fanout exchange
    #   # declare 3 queues, binds them
    #   # publish a message
    #
    # @see Exchange
    # @see Exchange#initialize
    # @see Channel#default_exchange
    # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 3.1.3.2)
    #
    # @return [Exchange]
    # @api public
    def fanout(name = 'amq.fanout', opts = {}, &block)
      if exchange = find_exchange(name)
        extended_opts = Exchange.add_default_options(:fanout, name, opts, block)

        validate_parameters_match!(exchange, extended_opts)

        block.call(exchange) if block
        exchange
      else
        register_exchange(Exchange.new(self, :fanout, name, opts, &block))
      end
    end


    # Defines, intializes and returns a topic Exchange instance.
    #
    # Learn more about topic exchanges in {Exchange Exchange class documentation}.
    #
    # @param [String] name (amq.topic) Exchange name.
    #
    #
    # @option opts [Boolean] :passive (false)  If set, the server will not create the exchange if it does not
    #                                          already exist. The client can use this to check whether an exchange
    #                                          exists without modifying the server state.
    #
    # @option opts [Boolean] :durable (false)  If set when creating a new exchange, the exchange will be marked as
    #                                          durable. Durable exchanges and their bindings are recreated upon a server
    #                                          restart (information about them is persisted). Non-durable (transient) exchanges
    #                                          do not survive if/when a server restarts (information about them is stored exclusively
    #                                          in RAM).
    #
    #
    # @option opts [Boolean] :auto_delete  (false)  If set, the exchange is deleted when all queues have finished
    #                                               using it. The server waits for a short period of time before
    #                                               determining the exchange is unused to give time to the client code
    #                                               to bind a queue to it.
    #
    # @option opts [Boolean] :internal (default false)   If set, the exchange may not be used directly by publishers, but
    #                                                    only when bound to other exchanges. Internal exchanges are used to
    #                                                    construct wiring that is not visible to applications. This is a RabbitMQ-specific
    #                                                    extension.
    #
    # @option opts [Boolean] :nowait (true)              If set, the server will not respond to the method. The client should
    #                                                    not wait for a reply method.  If the server could not complete the
    #                                                    method it will raise a channel or connection exception.
    #
    #
    # @raise [AMQP::Error] Raised when exchange is redeclared with parameters different from original declaration.
    # @raise [AMQP::Error] Raised when exchange is declared with  :passive => true and the exchange does not exist.
    #
    #
    # @example Using topic exchange to deliver relevant news updates
    #   AMQP.connect do |connection|
    #     channel  = AMQP::Channel.new(connection)
    #     exchange = channel.topic("pub/sub")
    #
    #     # Subscribers.
    #     channel.queue("development").bind(exchange, :key => "technology.dev.#").subscribe do |payload|
    #       puts "A new dev post: '#{payload}'"
    #     end
    #     channel.queue("ruby").bind(exchange, :key => "technology.#.ruby").subscribe do |payload|
    #       puts "A new post about Ruby: '#{payload}'"
    #     end
    #
    #     # Let's publish some data.
    #     exchange.publish "Ruby post",     :routing_key => "technology.dev.ruby"
    #     exchange.publish "Erlang post",   :routing_key => "technology.dev.erlang"
    #     exchange.publish "Sinatra post",  :routing_key => "technology.web.ruby"
    #     exchange.publish "Jewelery post", :routing_key => "jewelery.ruby"
    #   end
    #
    #
    # @example Using topic exchange to deliver geographically-relevant data
    #   AMQP.connect do |connection|
    #     channel  = AMQP::Channel.new(connection)
    #     exchange = channel.topic("pub/sub")
    #
    #     # Subscribers.
    #     channel.queue("americas.north").bind(exchange, :routing_key => "americas.north.#").subscribe do |headers, payload|
    #       puts "An update for North America: #{payload}, routing key is #{headers.routing_key}"
    #     end
    #     channel.queue("americas.south").bind(exchange, :routing_key => "americas.south.#").subscribe do |headers, payload|
    #       puts "An update for South America: #{payload}, routing key is #{headers.routing_key}"
    #     end
    #     channel.queue("us.california").bind(exchange, :routing_key => "americas.north.us.ca.*").subscribe do |headers, payload|
    #       puts "An update for US/California: #{payload}, routing key is #{headers.routing_key}"
    #     end
    #     channel.queue("us.tx.austin").bind(exchange, :routing_key => "#.tx.austin").subscribe do |headers, payload|
    #       puts "An update for Austin, TX: #{payload}, routing key is #{headers.routing_key}"
    #     end
    #     channel.queue("it.rome").bind(exchange, :routing_key => "europe.italy.rome").subscribe do |headers, payload|
    #       puts "An update for Rome, Italy: #{payload}, routing key is #{headers.routing_key}"
    #     end
    #     channel.queue("asia.hk").bind(exchange, :routing_key => "asia.southeast.hk.#").subscribe do |headers, payload|
    #       puts "An update for Hong Kong: #{payload}, routing key is #{headers.routing_key}"
    #     end
    #
    #     exchange.publish("San Diego update", :routing_key => "americas.north.us.ca.sandiego").
    #       publish("Berkeley update",         :routing_key => "americas.north.us.ca.berkeley").
    #       publish("San Francisco update",    :routing_key => "americas.north.us.ca.sanfrancisco").
    #       publish("New York update",         :routing_key => "americas.north.us.ny.newyork").
    #       publish("São Paolo update",        :routing_key => "americas.south.brazil.saopaolo").
    #       publish("Hong Kong update",        :routing_key => "asia.southeast.hk.hongkong").
    #       publish("Kyoto update",            :routing_key => "asia.southeast.japan.kyoto").
    #       publish("Shanghai update",         :routing_key => "asia.southeast.prc.shanghai").
    #       publish("Rome update",             :routing_key => "europe.italy.roma").
    #       publish("Paris update",            :routing_key => "europe.france.paris")
    #   end
    #
    # @see Exchange
    # @see Exchange#initialize
    # @see http://www.rabbitmq.com/faq.html#Binding-and-Routing RabbitMQ FAQ on routing & wildcards
    # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 3.1.3.3)
    #
    # @return [Exchange]
    # @api public
    def topic(name = 'amq.topic', opts = {}, &block)
      if exchange = find_exchange(name)
        extended_opts = Exchange.add_default_options(:topic, name, opts, block)

        validate_parameters_match!(exchange, extended_opts)

        block.call(exchange) if block
        exchange
      else
        register_exchange(Exchange.new(self, :topic, name, opts, &block))
      end
    end


    # Defines, intializes and returns a headers Exchange instance.
    #
    # Learn more about headers exchanges in {Exchange Exchange class documentation}.
    #
    # @param [String] name (amq.match) Exchange name.
    #
    # @option opts [Boolean] :passive (false)  If set, the server will not create the exchange if it does not
    #                                          already exist. The client can use this to check whether an exchange
    #                                          exists without modifying the server state.
    #
    # @option opts [Boolean] :durable (false)  If set when creating a new exchange, the exchange will be marked as
    #                                          durable. Durable exchanges and their bindings are recreated upon a server
    #                                          restart (information about them is persisted). Non-durable (transient) exchanges
    #                                          do not survive if/when a server restarts (information about them is stored exclusively
    #                                          in RAM).
    #
    #
    # @option opts [Boolean] :auto_delete  (false)  If set, the exchange is deleted when all queues have finished
    #                                               using it. The server waits for a short period of time before
    #                                               determining the exchange is unused to give time to the client code
    #                                               to bind a queue to it.
    #
    # @option opts [Boolean] :internal (default false)   If set, the exchange may not be used directly by publishers, but
    #                                                    only when bound to other exchanges. Internal exchanges are used to
    #                                                    construct wiring that is not visible to applications. This is a RabbitMQ-specific
    #                                                    extension.
    #
    # @option opts [Boolean] :nowait (true)              If set, the server will not respond to the method. The client should
    #                                                    not wait for a reply method.  If the server could not complete the
    #                                                    method it will raise a channel or connection exception.
    #
    #
    # @raise [AMQP::Error] Raised when exchange is redeclared with parameters different from original declaration.
    # @raise [AMQP::Error] Raised when exchange is declared with  :passive => true and the exchange does not exist.
    #
    #
    # @example Using headers exchange to route messages based on multiple attributes (OS, architecture, # of cores)
    #
    #   puts "=> Headers routing example"
    #   puts
    #   AMQP.start do |connection|
    #     channel   = AMQP::Channel.new(connection)
    #     channel.on_error do |ch, channel_close|
    #       puts "A channel-level exception: #{channel_close.inspect}"
    #     end
    #
    #     exchange = channel.headers("amq.match", :durable => true)
    #
    #     channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'all', :arch => "x64", :os => 'linux' }).subscribe do |metadata, payload|
    #       puts "[linux/x64] Got a message: #{payload}"
    #     end
    #     channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'all', :arch => "x32", :os => 'linux' }).subscribe do |metadata, payload|
    #       puts "[linux/x32] Got a message: #{payload}"
    #     end
    #     channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'any', :os => 'linux', :arch => "__any__" }).subscribe do |metadata, payload|
    #       puts "[linux] Got a message: #{payload}"
    #     end
    #     channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'any', :os => 'macosx', :cores => 8 }).subscribe do |metadata, payload|
    #       puts "[macosx|octocore] Got a message: #{payload}"
    #     end
    #
    #
    #     EventMachine.add_timer(0.5) do
    #       exchange.publish "For linux/x64",   :headers => { :arch => "x64", :os => 'linux' }
    #       exchange.publish "For linux/x32",   :headers => { :arch => "x32", :os => 'linux' }
    #       exchange.publish "For linux",       :headers => { :os => 'linux'  }
    #       exchange.publish "For OS X",        :headers => { :os => 'macosx' }
    #       exchange.publish "For solaris/x64", :headers => { :os => 'solaris', :arch => 'x64' }
    #       exchange.publish "For ocotocore",   :headers => { :cores => 8  }
    #     end
    #
    #
    #     show_stopper = Proc.new do
    #       $stdout.puts "Stopping..."
    #       connection.close {
    #         EventMachine.stop { exit }
    #       }
    #     end
    #
    #     Signal.trap "INT", show_stopper
    #     EventMachine.add_timer(2, show_stopper)
    #   end
    #
    #
    #
    # @see Exchange
    # @see Exchange#initialize
    # @see Channel#default_exchange
    # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 3.1.3.3)
    #
    # @return [Exchange]
    # @api public
    def headers(name = 'amq.match', opts = {}, &block)
      if exchange = find_exchange(name)
        extended_opts = Exchange.add_default_options(:headers, name, opts, block)

        validate_parameters_match!(exchange, extended_opts)

        block.call(exchange) if block
        exchange
      else
        register_exchange(Exchange.new(self, :headers, name, opts, &block))
      end
    end

    # @endgroup


    # @group Declaring queues


    # Declares and returns a Queue instance associated with this channel. See {Queue Queue class documentation} for
    # more information about queues.
    #
    # To make broker generate queue name for you (a classic example is exclusive
    # queues that are only used for a short period of time), pass empty string
    # as name value. Then queue will get it's name as soon as broker's response
    # (queue.declare-ok) arrives. Note that in this case, block is required.
    #
    #
    # Like for exchanges, queue names starting with 'amq.' cannot be modified and
    # should not be used by applications.
    #
    # @example Declaring a queue in a mail delivery app using Channel#queue without a block
    #   AMQP.connect do |connection|
    #     AMQP::Channel.new(connection) do |ch|
    #       # message producers will be able to send messages to this queue
    #       # using direct exchange and routing key = "mail.delivery"
    #       queue = ch.queue("mail.delivery", :durable => true)
    #       queue.subscribe do |headers, payload|
    #         # ...
    #       end
    #     end
    #   end
    #
    # @example Declaring a server-named exclusive queue that receives all messages related to events, using a block.
    #   AMQP.connect do |connection|
    #     AMQP::Channel.new(connection) do |ch|
    #       # message producers will be able to send messages to this queue
    #       # using amq.topic exchange with routing keys that begin with "events"
    #       ch.queue("", :exclusive => true) do |queue|
    #         queue.bind(ch.exchange("amq.topic"), :routing_key => "events.#").subscribe do |headers, payload|
    #           # ...
    #         end
    #       end
    #     end
    #   end
    #
    # @param [String] name Queue name. If you want a server-named queue, you can omit the name (note that in this case, using block is mandatory).
    #                                  See {Queue Queue class documentation} for discussion of queue lifecycles and when use of server-named queues
    #                                  is optimal.
    #
    # @option opts [Boolean] :passive (false)  If set, the server will not create the exchange if it does not
    #                                          already exist. The client can use this to check whether an exchange
    #                                          exists without modifying the server state.
    #
    # @option opts [Boolean] :durable (false)  If set when creating a new exchange, the exchange will be marked as
    #                                          durable. Durable exchanges and their bindings are recreated upon a server
    #                                          restart (information about them is persisted). Non-durable (transient) exchanges
    #                                          do not survive if/when a server restarts (information about them is stored exclusively
    #                                          in RAM). Any remaining messages in the queue will be purged when the queue
    #                                          is deleted regardless of the message's persistence setting.
    #
    #
    # @option opts [Boolean] :auto_delete  (false)  If set, the exchange is deleted when all queues have finished
    #                                               using it. The server waits for a short period of time before
    #                                               determining the exchange is unused to give time to the client code
    #                                               to bind a queue to it.
    #
    # @option opts [Boolean] :exclusive (false)  Exclusive queues may only be used by a single connection.
    #                                                    Exclusivity also implies that queue is automatically deleted when connection
    #                                                    is closed. Only one consumer is allowed to remove messages from exclusive queue.
    #
    # @option opts [Boolean] :nowait (true)              If set, the server will not respond to the method. The client should
    #                                                    not wait for a reply method.  If the server could not complete the
    #                                                    method it will raise a channel or connection exception.
    #
    #
    # @raise [AMQP::Error] Raised when queue is redeclared with parameters different from original declaration.
    # @raise [AMQP::Error] Raised when queue is declared with :passive => true and the queue does not exist.
    # @raise [AMQP::Error] Raised when queue is declared with :exclusive => true and queue with that name already exist.
    #
    #
    # @yield [queue, declare_ok] Yields successfully declared queue instance and AMQP method (queue.declare-ok) instance. The latter is optional.
    # @yieldparam [Queue] queue Queue that is successfully declared and is ready to be used.
    # @yieldparam [AMQP::Protocol::Queue::DeclareOk] declare_ok AMQP queue.declare-ok) instance.
    #
    # @see Queue
    # @see Queue#initialize
    # @see http://bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.1.4)
    #
    # @return [Queue]
    # @api public
    def queue(name = AMQ::Protocol::EMPTY_STRING, opts = {}, &block)
      raise ArgumentError.new("queue name must not be nil; if you want broker to generate queue name for you, pass an empty string") if name.nil?

      if name && !name.empty? && (queue = find_queue(name))
        extended_opts = Queue.add_default_options(name, opts, block)

        validate_parameters_match!(queue, extended_opts)

        block.call(queue) if block
        queue
      else
        self.queue!(name, opts, &block)
      end
    end

    # Same as {Channel#queue} but when queue with the same name already exists in this channel
    # object's cache, this method will replace existing queue with a newly defined one. Consider
    # using {Channel#queue} instead.
    #
    # @see Channel#queue
    #
    # @return [Queue]
    # @api public
    def queue!(name, opts = {}, &block)
      queue = if block.nil?
                Queue.new(self, name, opts)
              else
                shim = Proc.new { |q, method|
          if block.arity == 1
            block.call(q)
          else
            queue = find_queue(method.queue)
            block.call(queue, method.consumer_count, method.message_count)
          end
        }
                Queue.new(self, name, opts, &shim)
              end

      register_queue(queue)
    end

    # @return [Array<AMQP::Queue>] Queues cache for this channel
    # @api plugin
    # @private
    def queues
      @queues
    end # queues

    # @endgroup



    # Instantiates and returns an RPC instance associated with this channel.
    #
    # The optional object may be a class name, module name or object
    # instance. When given a class or module name, the object is instantiated
    # during this setup. The passed queue is automatically subscribed to so
    # it passes all messages (and their arguments) to the object.
    #
    # Marshalling and unmarshalling the objects is handled internally. This
    # marshalling is subject to the same restrictions as defined in the
    # [http://ruby-doc.org/core/classes/Marshal.html Marshal module} in the Ruby standard
    # library.
    #
    # When the optional object is not passed, the returned rpc reference is
    # used to send messages and arguments to the queue. See {AMQP::RPC#method_missing}
    # which does all of the heavy lifting with the proxy. Some client
    # elsewhere must call this method *with* the optional block so that
    # there is a valid destination. Failure to do so will just enqueue
    # marshalled messages that are never consumed.
    #
    # @example Use of RPC
    #
    #   # TODO
    #
    #
    # @param [String, Queue] Queue to be used by RPC server.
    # @return [RPC]
    # @api public
    def rpc(name, obj = nil)
      RPC.new(self, name, obj)
    end



    # Returns a hash of all rpc proxy objects.
    #
    # Most of the time, this method is not
    # called by application code.
    # @api plugin
    def rpcs
      @rpcs.values
    end



    # @group Channel lifecycle

    # Opens AMQP channel.
    #
    # @note Instantiated channels are opened by default. This method should only be used for error recovery after network connection loss.
    # @api public
    def open(&block)
      super(&block)
    end

    # @return [Boolean] true if channel is not closed.
    # @api public
    def open?
      self.status == :opened || self.status == :opening
    end # open?

    # Takes a block that will be deferred till the moment when channel is considered open
    # (channel.open-ok is received from the broker). If you need to delay an operation
    # till the moment channel is open, this method is what you are looking for.
    #
    # Multiple callbacks are supported. If when this moment is called, channel is already
    # open, block is executed immediately.
    #
    # @api public
    def once_open(&block)
      @channel_is_open_deferrable.callback(&block)
    end # once_open(&block)
    alias once_opened once_open

    # Closes AMQP channel.
    #
    # @api public
    def close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block)
      r = super(reply_code, reply_text, class_id, method_id, &block)
    
      r
    end

    # @endgroup




    # @group QoS and flow handling

    # Asks the peer to pause or restart the flow of content data sent to a consumer.
    # This is a simple flow­control mechanism that a peer can use to avoid overflowing its
    # queues or otherwise finding itself receiving more messages than it can process. Note that
    # this method is not intended for window control. It does not affect contents returned to
    # Queue#get callers.
    #
    # @param [Boolean] Desired flow state.
    #
    # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.5.2.3.)
    # @api public
    def flow(active = false, &block)
      super(active, &block)
    end

    # @return [Boolean]  True if flow in this channel is active (messages will be delivered to consumers that use this channel).
    #
    # @api public
    def flow_is_active?
      @flow_is_active
    end # flow_is_active?



    # @param [Fixnum] Message count
    # @param [Boolean] global (false)
    #
    # @return [Channel] self
    #
    # @api public
    def prefetch(count, global = false, &block)
      self.once_open do
        # RabbitMQ as of 2.3.1 does not support prefetch_size.
        self.qos(0, count, global, &block)
      end

      self
    end

    # @endgroup



    # @group Message acknowledgements

    # Acknowledge one or all messages on the channel.
    #
    # @api public
    # @see #reject
    # @see #recover
    # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.13.)
    def acknowledge(delivery_tag, multiple = false)
      super(delivery_tag, multiple)
    end # acknowledge(delivery_tag, multiple = false)

    # Reject a message with given delivery tag.
    #
    # @api public
    # @see #acknowledge
    # @see #recover
    # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.14.)
    def reject(delivery_tag, requeue = true)
      super(delivery_tag, requeue)
    end # reject(delivery_tag, requeue = true)

    # Notifies AMQ broker that consumer has recovered and unacknowledged messages need
    # to be redelivered.
    #
    # @return [Channel]  self
    #
    # @note RabbitMQ as of 2.3.1 does not support basic.recover with requeue = false.
    # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.16.)
    # @see #acknowledge
    # @api public
    def recover(requeue = true, &block)
      super(requeue, &block)
    end # recover(requeue = false, &block)

    # @endgroup




    # @group Transactions

    # Sets the channel to use standard transactions. One must use this method at least
    # once on a channel before using #tx_tommit or tx_rollback methods.
    #
    # @api public
    def tx_select(&block)
      super(&block)
    end # tx_select(&block)

    # Commits AMQP transaction.
    #
    # @api public
    def tx_commit(&block)
      super(&block)
    end # tx_commit(&block)

    # Rolls AMQP transaction back.
    #
    # @api public
    def tx_rollback(&block)
      super(&block)
    end # tx_rollback(&block)


    # @endgroup





    # @group Error handling

    # Defines a callback that will be executed when channel is closed after
    # channel-level exception.
    #
    # @api public
    def on_error(&block)
      super(&block)
    end


    # Defines a global callback to be run on channel-level exception across
    # all channels. Consider using Channel#on_error instead. This method is here for sake
    # of backwards compatibility with 0.6.x and 0.7.x releases.
    # @see AMQP::Channel#on_error
    # @deprecated
    # @api public
    def self.on_error(&block)
      self.error(&block)
    end # self.on_error(&block)

    # @endgroup




    #
    # Implementation
    #


    # Defines a global callback to be run on channel-level exception across
    # all channels. Consider using Channel#on_error instead. This method is here for sake
    # of backwards compatibility with 0.6.x and 0.7.x releases.
    #
    # @param [String] msg Error message that passed to previously defined handler
    #
    # @deprecated
    # @api public
    # @private
    def self.error(msg = nil, &block)
      if block
        @global_error_handler = block
      else
        @global_error_handler.call(msg) if @global_error_handler && msg
      end
    end


    # Overrides AMQ::Client::Channel version to also call global callback
    # (if defined) for backwards compatibility.
    #
    # @private
    # @api private
    def handle_close(method)
      super(method)

      self.class.error(method.reply_text)
    end
    
    # Overrides AMQ::Client::Channel version to also release the channel id
    #
    # @private
    # @api private
    def handle_close_ok(method)
      super(method)
      self.class.release_channel_id(@id)
    end
    # Resets channel state (for example, list of registered queue objects and so on).
    #
    # Most of the time, this method is not
    # called by application code.
    #
    # @private
    # @api plugin
    def reset(&block)
      # See AMQ::Client::Channel
      self.reset_state!

      # there is no way to reset a deferrable; we have to use a new instance. MK.
      @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new
      @channel_is_open_deferrable.callback(&block)

      @connection.on_connection do
        @channel_is_open_deferrable.succeed

        self.prefetch(@options[:prefetch], false) if @options[:prefetch]
      end
    end

    # @private
    # @api plugin
    def reset_state!
      super
      @rpcs = Hash.new
    end # reset_state!


    # Overrides superclass method to also re-create @channel_is_open_deferrable
    #
    # @api plugin
    # @private
    def handle_connection_interruption(reason = nil)
      super(reason)

      self.class.release_channel_id(@id) unless auto_recovering?
      @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new
    end


    # @private
    # @api private
    def self.channel_id_mutex
      @channel_id_mutex ||= Mutex.new
    end

    # Returns next available channel id. This method is thread safe.
    #
    # @return [Fixnum]
    # @api public
    # @see Channel.release_channel_id
    # @see Channel.reset_channel_id_allocator
    def self.next_channel_id
      channel_id_mutex.synchronize do
        self.initialize_channel_id_allocator

        @int_allocator.allocate
      end
    end

    # Releases previously allocated channel id. This method is thread safe.
    #
    # @param [Fixnum] Channel id to release
    # @api public
    # @see Channel.next_channel_id
    # @see Channel.reset_channel_id_allocator
    def self.release_channel_id(i)
      channel_id_mutex.synchronize do
        self.initialize_channel_id_allocator

        @int_allocator.release(i)
      end
    end # self.release_channel_id(i)

    # Resets channel allocator. This method is thread safe.
    # @api public
    # @see Channel.next_channel_id
    # @see Channel.release_channel_id
    def self.reset_channel_id_allocator
      channel_id_mutex.synchronize do
        initialize_channel_id_allocator

        @int_allocator.reset
      end
    end # self.reset_channel_id_allocator


    # @private
    def self.initialize_channel_id_allocator
      # TODO: ideally, this should be in agreement with agreed max number of channels of the connection,
      #       but it is possible that value either not yet available. MK.
      max_channel     =  (1 << 16) - 1
      @int_allocator ||= IntAllocator.new(1, max_channel)
    end # self.initialize_channel_id_allocator

    # @private
    # @api plugin
    def register_rpc(rpc)
      raise ArgumentError, "argument is nil!" unless rpc

      @rpcs[rpc.name] = rpc
    end # register_rpc(rpc)

    # @private
    # @api plugin
    def find_rpc(name)
      @rpcs[name]
    end


    #
    # Backwards compatibility with 0.6.x
    #

    # unique identifier of the default thread-local channel
    # @deprecated
    # @private
    def self.id
      Thread.current[:mq_id] ||= "#{`hostname`.strip}-#{Process.pid}-#{Thread.current.object_id}"
    end

    # @private
    # @deprecated
    def self.default
      # TODO: clear this when connection is closed
      Thread.current[:mq] ||= AMQP::Channel.new
    end

    # Allows for calls to all MQ instance methods. This implicitly calls
    # AMQP::Channel.new so that a new channel is allocated for subsequent operations.
    # @deprecated
    def self.method_missing(meth, *args, &blk)
      self.default.__send__(meth, *args, &blk)
    end



    protected

    # @private
    def validate_parameters_match!(entity, parameters)
      parameters.delete(:no_declare)
      unless entity.opts == parameters || parameters[:passive]
        raise AMQP::IncompatibleOptionsError.new(entity.name, entity.opts, parameters)
      end
    end # validate_parameters_match!(entity, parameters)
  end # Channel
end # AMQP