/usr/lib/python2.7/dist-packages/kombu/transport/qpid.py is in python-kombu 3.0.33-1ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 | """
kombu.transport.qpid
=======================
`Qpid`_ transport using `qpid-python`_ as the client and `qpid-tools`_ for
broker management.
.. _`Qpid`: http://qpid.apache.org/
.. _`qpid-python`: http://pypi.python.org/pypi/qpid-python/
.. _`qpid-tools`: http://pypi.python.org/pypi/qpid-tools/
The use this transport you must install the necessary dependencies. These
dependencies are available via PyPI and can be installed using the pip
command:
`pip install qpid-tools qpid-python`
.. admonition:: Python 3 and PyPy Limitations
The Qpid transport does not support Python 3 or PyPy environments due
to underlying dependencies not being compatible. This version is
tested and works with with Python 2.7.
Authentication
==============
This transport supports SASL authentication with the Qpid broker. Normally,
SASL mechanisms are negotiated from a client list and a server list of
possible mechanisms, but in practice, different SASL client libraries give
different behaviors. These different behaviors cause the expected SASL
mechanism to not be selected in many cases. As such, this transport restricts
the mechanism types based on Kombu's configuration according to the following
table.
+------------------------------------+--------------------+
| **Broker String** | **SASL Mechanism** |
+------------------------------------+--------------------+
| qpid://hostname/ | ANONYMOUS |
+------------------------------------+--------------------+
| qpid://username:password@hostname/ | PLAIN |
+------------------------------------+--------------------+
| see instructions below | EXTERNAL |
+------------------------------------+--------------------+
The user can override the above SASL selection behaviors and specify the SASL
string using the :attr:`~kombu.Connection.login_method` argument to the
:class:`~kombu.Connection` object. The string can be a single SASL mechanism
or a space separated list of SASL mechanisms. If you are using Celery with
Kombu, this can be accomplished by setting the *BROKER_LOGIN_METHOD* Celery
option.
.. note::
While using SSL, Qpid users may want to override the SASL mechanism to
use *EXTERNAL*. In that case, Qpid requires a username to be presented
that matches the *CN* of the SSL client certificate. Ensure that the
broker string contains the corresponding username. For example, if the
client certificate has *CN=asdf* and the client connects to *example.com*
on port 5671, the broker string should be:
**qpid://asdf@example.com:5671/**
Transport Options
=================
The :attr:`~kombu.Connection.transport_options` argument to the
:class:`~kombu.Connection` object are passed directly to the
:class:`qpid.messaging.endpoints.Connection` as keyword arguments. These
options override and replace any other default or specified values. If using
Celery with Kombu, this can be accomplished by setting the
*BROKER_TRANSPORT_OPTIONS* Celery option.
"""
from __future__ import absolute_import
import os
import select
import socket
import ssl
import sys
import time
from itertools import count
from gettext import gettext as _
import amqp.protocol
try:
import fcntl
except ImportError:
fcntl = None # noqa
try:
import qpidtoollibs
except ImportError: # pragma: no cover
qpidtoollibs = None # noqa
try:
from qpid.messaging.exceptions import ConnectionError, NotFound
from qpid.messaging.exceptions import Empty as QpidEmpty
from qpid.messaging.exceptions import SessionClosed
except ImportError: # pragma: no cover
ConnectionError = None
NotFound = None
QpidEmpty = None
SessionClosed = None
try:
import qpid
except ImportError: # pragma: no cover
qpid = None
from kombu.five import Empty, items
from kombu.log import get_logger
from kombu.transport.virtual import Base64, Message
from kombu.transport import base
from kombu.utils.compat import OrderedDict
logger = get_logger(__name__)
OBJECT_ALREADY_EXISTS_STRING = 'object already exists'
VERSION = (1, 0, 0)
__version__ = '.'.join(map(str, VERSION))
PY3 = sys.version_info[0] == 3
def dependency_is_none(dependency):
"""Return True if the dependency is None, otherwise False. This is done
using a function so that tests can mock this behavior easily.
:param dependency: The module to check if it is None
:return: True if dependency is None otherwise False.
"""
return dependency is None
class AuthenticationFailure(Exception):
pass
class QoS(object):
"""A helper object for message prefetch and ACKing purposes.
NOTE: prefetch_count is currently hard set to 1, and needs to be improved
This object is instantiated 1-for-1 with a :class:`Channel`. QoS
allows prefetch_count to be set to the number of outstanding messages
the corresponding :class:`Channel` should be allowed to prefetch.
Setting prefetch_count to 0 disables prefetch limits, and the object
can hold an arbitrary number of messages.
Messages are added using :meth:`append`, which are held until they are
ACKed asynchronously through a call to :meth:`ack`. Messages that are
received, but not ACKed will not be delivered by the broker to another
consumer until an ACK is received, or the session is closed. Messages
are referred to using delivery_tag integers, which are unique per
:class:`Channel`. Delivery tags are managed outside of this object and
are passed in with a message to :meth:`append`. Un-ACKed messages can
be looked up from QoS using :meth:`get` and can be rejected and
forgotten using :meth:`reject`.
"""
def __init__(self, session, prefetch_count=1):
"""Instantiate a QoS object.
:keyword prefetch_count: Initial prefetch count, hard set to 1.
:type prefetch_count: int
"""
self.session = session
self.prefetch_count = 1
self._not_yet_acked = OrderedDict()
def can_consume(self):
"""Return True if the :class:`Channel` can consume more messages,
else False.
Used to ensure the client adheres to currently active prefetch
limits.
:returns: True, if this QoS object can accept more messages
without violating the prefetch_count. If prefetch_count is 0,
can_consume will always return True.
:rtype: bool
"""
return not self.prefetch_count or len(self._not_yet_acked) < self\
.prefetch_count
def can_consume_max_estimate(self):
"""Return the remaining message capacity for the associated
:class:`Channel`.
Returns an estimated number of outstanding messages that a
:class:`Channel` can accept without exceeding prefetch_count. If
prefetch_count is 0, then this method returns 1.
:returns: The number of estimated messages that can be fetched
without violating the prefetch_count.
:rtype: int
"""
if self.prefetch_count:
return self.prefetch_count - len(self._not_yet_acked)
else:
return 1
def append(self, message, delivery_tag):
"""Append message to the list of un-ACKed messages.
Add a message, referenced by the integer delivery_tag, for ACKing,
rejecting, or getting later. Messages are saved into an
:class:`~kombu.utils.compat.OrderedDict` by delivery_tag.
:param message: A received message that has not yet been ACKed.
:type message: qpid.messaging.Message
:param delivery_tag: An integer number to refer to this message by
upon receipt.
:type delivery_tag: int
"""
self._not_yet_acked[delivery_tag] = message
def get(self, delivery_tag):
"""
Get an un-ACKed message by delivery_tag. If called with an invalid
delivery_tag a KeyError is raised.
:param delivery_tag: The delivery tag associated with the message
to be returned.
:type delivery_tag: int
:return: An un-ACKed message that is looked up by delivery_tag.
:rtype: qpid.messaging.Message
"""
return self._not_yet_acked[delivery_tag]
def ack(self, delivery_tag):
"""Acknowledge a message by delivery_tag.
Called asynchronously once the message has been handled and can be
forgotten by the broker.
:param delivery_tag: the delivery tag associated with the message
to be acknowledged.
:type delivery_tag: int
"""
message = self._not_yet_acked.pop(delivery_tag)
self.session.acknowledge(message=message)
def reject(self, delivery_tag, requeue=False):
"""Reject a message by delivery_tag.
Explicitly notify the broker that the :class:`Channel` associated
with this QoS object is rejecting the message that was previously
delivered.
If requeue is False, then the message is not requeued for delivery
to another consumer. If requeue is True, then the message is
requeued for delivery to another consumer.
:param delivery_tag: The delivery tag associated with the message
to be rejected.
:type delivery_tag: int
:keyword requeue: If True, the broker will be notified to requeue
the message. If False, the broker will be told to drop the
message entirely. In both cases, the message will be removed
from this object.
:type requeue: bool
"""
message = self._not_yet_acked.pop(delivery_tag)
QpidDisposition = qpid.messaging.Disposition
if requeue:
disposition = QpidDisposition(qpid.messaging.RELEASED)
else:
disposition = QpidDisposition(qpid.messaging.REJECTED)
self.session.acknowledge(message=message, disposition=disposition)
class Channel(base.StdChannel):
"""Supports broker configuration and messaging send and receive.
A Channel object is designed to have method-parity with a Channel as
defined in AMQP 0-10 and earlier, which allows for the following broker
actions:
- exchange declare and delete
- queue declare and delete
- queue bind and unbind operations
- queue length and purge operations
- sending/receiving/rejecting messages
- structuring, encoding, and decoding messages
- supports synchronous and asynchronous reads
- reading state about the exchange, queues, and bindings
Channels are designed to all share a single TCP connection with a
broker, but provide a level of isolated communication with the broker
while benefiting from a shared TCP connection. The Channel is given
its :class:`Connection` object by the :class:`Transport` that
instantiates the Channel.
This Channel inherits from :class:`~kombu.transport.base.StdChannel`,
which makes this a 'native' Channel versus a 'virtual' Channel which
would inherit from :class:`kombu.transports.virtual`.
Messages sent using this Channel are assigned a delivery_tag. The
delivery_tag is generated for a message as they are prepared for
sending by :meth:`basic_publish`. The delivery_tag is unique per
Channel instance using :meth:`~itertools.count`. The delivery_tag has
no meaningful context in other objects, and is only maintained in the
memory of this object, and the underlying :class:`QoS` object that
provides support.
Each Channel object instantiates exactly one :class:`QoS` object for
prefetch limiting, and asynchronous ACKing. The :class:`QoS` object is
lazily instantiated through a property method :meth:`qos`. The
:class:`QoS` object is a supporting object that should not be accessed
directly except by the Channel itself.
Synchronous reads on a queue are done using a call to :meth:`basic_get`
which uses :meth:`_get` to perform the reading. These methods read
immediately and do not accept any form of timeout. :meth:`basic_get`
reads synchronously and ACKs messages before returning them. ACKing is
done in all cases, because an application that reads messages using
qpid.messaging, but does not ACK them will experience a memory leak.
The no_ack argument to :meth:`basic_get` does not affect ACKing
functionality.
Asynchronous reads on a queue are done by starting a consumer using
:meth:`basic_consume`. Each call to :meth:`basic_consume` will cause a
:class:`~qpid.messaging.endpoints.Receiver` to be created on the
:class:`~qpid.messaging.endpoints.Session` started by the :class:
`Transport`. The receiver will asynchronously read using
qpid.messaging, and prefetch messages before the call to
:meth:`Transport.basic_drain` occurs. The prefetch_count value of the
:class:`QoS` object is the capacity value of the new receiver. The new
receiver capacity must always be at least 1, otherwise none of the
receivers will appear to be ready for reading, and will never be read
from.
Each call to :meth:`basic_consume` creates a consumer, which is given a
consumer tag that is identified by the caller of :meth:`basic_consume`.
Already started consumers can be cancelled using by their consumer_tag
using :meth:`basic_cancel`. Cancellation of a consumer causes the
:class:`~qpid.messaging.endpoints.Receiver` object to be closed.
Asynchronous message ACKing is supported through :meth:`basic_ack`,
and is referenced by delivery_tag. The Channel object uses its
:class:`QoS` object to perform the message ACKing.
"""
#: A class reference that will be instantiated using the qos property.
QoS = QoS
#: A class reference that identifies
# :class:`~kombu.transport.virtual.Message` as the message class type
Message = Message
#: Default body encoding.
#: NOTE: ``transport_options['body_encoding']`` will override this value.
body_encoding = 'base64'
#: Binary <-> ASCII codecs.
codecs = {'base64': Base64()}
#: counter used to generate delivery tags for this channel.
_delivery_tags = count(1)
def __init__(self, connection, transport):
"""Instantiate a Channel object.
:param connection: A Connection object that this Channel can
reference. Currently only used to access callbacks.
:type connection: Connection
:param transport: The Transport this Channel is associated with.
:type transport: Transport
"""
self.connection = connection
self.transport = transport
qpid_connection = connection.get_qpid_connection()
self._broker = qpidtoollibs.BrokerAgent(qpid_connection)
self.closed = False
self._tag_to_queue = {}
self._receivers = {}
self._qos = None
def _get(self, queue):
"""Non-blocking, single-message read from a queue.
An internal method to perform a non-blocking, single-message read
from a queue by name. This method creates a
:class:`~qpid.messaging.endpoints.Receiver` to read from the queue
using the :class:`~qpid.messaging.endpoints.Session` saved on the
associated :class:`Transport`. The receiver is closed before the
method exits. If a message is available, a
:class:`qpid.messaging.Message` object is returned. If no message is
available, a :class:`qpid.messaging.exceptions.Empty` exception is
raised.
This is an internal method. External calls for get functionality
should be done using :meth:`basic_get`.
:param queue: The queue name to get the message from
:type queue: str
:return: The received message.
:rtype: :class:`qpid.messaging.Message`
:raises: :class:`qpid.messaging.exceptions.Empty` if no
message is available.
"""
rx = self.transport.session.receiver(queue)
try:
message = rx.fetch(timeout=0)
finally:
rx.close()
return message
def _put(self, routing_key, message, exchange=None, **kwargs):
"""Synchronous send of a single message onto a queue or exchange.
An internal method which synchronously sends a single message onto
a given queue or exchange. If exchange is not specified,
the message is sent directly to a queue specified by routing_key.
If no queue is found by the name of routing_key while exchange is
not specified an exception is raised. If an exchange is specified,
then the message is delivered onto the requested
exchange using routing_key. Message sending is synchronous using
sync=True because large messages in kombu funtests were not being
fully sent before the receiver closed.
This method creates a :class:`qpid.messaging.endpoints.Sender` to
send the message to the queue using the
:class:`qpid.messaging.endpoints.Session` created and referenced by
the associated :class:`Transport`. The sender is closed before the
method exits.
External calls for put functionality should be done using
:meth:`basic_publish`.
:param routing_key: If exchange is None, treated as the queue name
to send the message to. If exchange is not None, treated as the
routing_key to use as the message is submitted onto the exchange.
:type routing_key: str
:param message: The message to be sent as prepared by
:meth:`basic_publish`.
:type message: dict
:keyword exchange: keyword parameter of the exchange this message
should be sent on. If no exchange is specified, the message is
sent directly to a queue specified by routing_key.
:type exchange: str
"""
if not exchange:
address = '%s; {assert: always, node: {type: queue}}' % \
routing_key
msg_subject = None
else:
address = '%s/%s; {assert: always, node: {type: topic}}' % (
exchange, routing_key)
msg_subject = str(routing_key)
sender = self.transport.session.sender(address)
qpid_message = qpid.messaging.Message(content=message,
subject=msg_subject)
try:
sender.send(qpid_message, sync=True)
finally:
sender.close()
def _purge(self, queue):
"""Purge all undelivered messages from a queue specified by name.
An internal method to purge all undelivered messages from a queue
specified by name. If the queue does not exist a
:class:`qpid.messaging.exceptions.NotFound` exception is raised.
The queue message depth is first checked, and then the broker is
asked to purge that number of messages. The integer number of
messages requested to be purged is returned. The actual number of
messages purged may be different than the requested number of
messages to purge (see below).
Sometimes delivered messages are asked to be purged, but are not.
This case fails silently, which is the correct behavior when a
message that has been delivered to a different consumer, who has
not ACKed the message, and still has an active session with the
broker. Messages in that case are not safe for purging and will be
retained by the broker. The client is unable to change this
delivery behavior.
This is an internal method. External calls for purge functionality
should be done using :meth:`queue_purge`.
:param queue: the name of the queue to be purged
:type queue: str
:return: The number of messages requested to be purged.
:rtype: int
:raises: :class:`qpid.messaging.exceptions.NotFound` if the queue
being purged cannot be found.
"""
queue_to_purge = self._broker.getQueue(queue)
if queue_to_purge is None:
error_text = "NOT_FOUND - no queue '{0}'".format(queue)
raise NotFound(code=404, text=error_text)
message_count = queue_to_purge.values['msgDepth']
if message_count > 0:
queue_to_purge.purge(message_count)
return message_count
def _size(self, queue):
"""Get the number of messages in a queue specified by name.
An internal method to return the number of messages in a queue
specified by name. It returns an integer count of the number
of messages currently in the queue.
:param queue: The name of the queue to be inspected for the number
of messages
:type queue: str
:return the number of messages in the queue specified by name.
:rtype: int
"""
queue_to_check = self._broker.getQueue(queue)
message_depth = queue_to_check.values['msgDepth']
return message_depth
def _delete(self, queue, *args, **kwargs):
"""Delete a queue and all messages on that queue.
An internal method to delete a queue specified by name and all the
messages on it. First, all messages are purged from a queue using a
call to :meth:`_purge`. Second, the broker is asked to delete the
queue.
This is an internal method. External calls for queue delete
functionality should be done using :meth:`queue_delete`.
:param queue: The name of the queue to be deleted.
:type queue: str
"""
self._purge(queue)
self._broker.delQueue(queue)
def _has_queue(self, queue, **kwargs):
"""Determine if the broker has a queue specified by name.
:param queue: The queue name to check if the queue exists.
:type queue: str
:return: True if a queue exists on the broker, and false
otherwise.
:rtype: bool
"""
if self._broker.getQueue(queue):
return True
else:
return False
def queue_declare(self, queue, passive=False, durable=False,
exclusive=False, auto_delete=True, nowait=False,
arguments=None):
"""Create a new queue specified by name.
If the queue already exists, no change is made to the queue,
and the return value returns information about the existing queue.
The queue name is required and specified as the first argument.
If passive is True, the server will not create the queue. The
client can use this to check whether a queue exists without
modifying the server state. Default is False.
If durable is True, the queue will be durable. Durable queues
remain active when a server restarts. Non-durable queues (
transient queues) are purged if/when a server restarts. Note that
durable queues do not necessarily hold persistent messages,
although it does not make sense to send persistent messages to a
transient queue. Default is False.
If exclusive is True, the queue will be exclusive. Exclusive queues
may only be consumed by the current connection. Setting the
'exclusive' flag always implies 'auto-delete'. Default is False.
If auto_delete is True, the queue is deleted when all consumers
have finished using it. The last consumer can be cancelled either
explicitly or because its channel is closed. If there was no
consumer ever on the queue, it won't be deleted. Default is True.
The nowait parameter is unused. It was part of the 0-9-1 protocol,
but this AMQP client implements 0-10 which removed the nowait option.
The arguments parameter is a set of arguments for the declaration of
the queue. Arguments are passed as a dict or None. This field is
ignored if passive is True. Default is None.
This method returns a :class:`~collections.namedtuple` with the name
'queue_declare_ok_t' and the queue name as 'queue', message count
on the queue as 'message_count', and the number of active consumers
as 'consumer_count'. The named tuple values are ordered as queue,
message_count, and consumer_count respectively.
Due to Celery's non-ACKing of events, a ring policy is set on any
queue that starts with the string 'celeryev' or ends with the string
'pidbox'. These are celery event queues, and Celery does not ack
them, causing the messages to build-up. Eventually Qpid stops serving
messages unless the 'ring' policy is set, at which point the buffer
backing the queue becomes circular.
:param queue: The name of the queue to be created.
:type queue: str
:param passive: If True, the sever will not create the queue.
:type passive: bool
:param durable: If True, the queue will be durable.
:type durable: bool
:param exclusive: If True, the queue will be exclusive.
:type exclusive: bool
:param auto_delete: If True, the queue is deleted when all
consumers have finished using it.
:type auto_delete: bool
:param nowait: This parameter is unused since the 0-10
specification does not include it.
:type nowait: bool
:param arguments: A set of arguments for the declaration of the
queue.
:type arguments: dict or None
:return: A named tuple representing the declared queue as a named
tuple. The tuple values are ordered as queue, message count,
and the active consumer count.
:rtype: :class:`~collections.namedtuple`
"""
options = {'passive': passive,
'durable': durable,
'exclusive': exclusive,
'auto-delete': auto_delete,
'arguments': arguments}
if queue.startswith('celeryev') or queue.endswith('pidbox'):
options['qpid.policy_type'] = 'ring'
try:
self._broker.addQueue(queue, options=options)
except Exception as exc:
if OBJECT_ALREADY_EXISTS_STRING not in str(exc):
raise exc
queue_to_check = self._broker.getQueue(queue)
message_count = queue_to_check.values['msgDepth']
consumer_count = queue_to_check.values['consumerCount']
return amqp.protocol.queue_declare_ok_t(queue, message_count,
consumer_count)
def queue_delete(self, queue, if_unused=False, if_empty=False, **kwargs):
"""Delete a queue by name.
Delete a queue specified by name. Using the if_unused keyword
argument, the delete can only occur if there are 0 consumers bound
to it. Using the if_empty keyword argument, the delete can only
occur if there are 0 messages in the queue.
:param queue: The name of the queue to be deleted.
:type queue: str
:keyword if_unused: If True, delete only if the queue has 0
consumers. If False, delete a queue even with consumers bound
to it.
:type if_unused: bool
:keyword if_empty: If True, only delete the queue if it is empty. If
False, delete the queue if it is empty or not.
:type if_empty: bool
"""
if self._has_queue(queue):
if if_empty and self._size(queue):
return
queue_obj = self._broker.getQueue(queue)
consumer_count = queue_obj.getAttributes()['consumerCount']
if if_unused and consumer_count > 0:
return
self._delete(queue)
def exchange_declare(self, exchange='', type='direct', durable=False,
**kwargs):
"""Create a new exchange.
Create an exchange of a specific type, and optionally have the
exchange be durable. If an exchange of the requested name already
exists, no action is taken and no exceptions are raised. Durable
exchanges will survive a broker restart, non-durable exchanges will
not.
Exchanges provide behaviors based on their type. The expected
behaviors are those defined in the AMQP 0-10 and prior
specifications including 'direct', 'topic', and 'fanout'
functionality.
:keyword type: The exchange type. Valid values include 'direct',
'topic', and 'fanout'.
:type type: str
:keyword exchange: The name of the exchange to be created. If no
exchange is specified, then a blank string will be used as the
name.
:type exchange: str
:keyword durable: True if the exchange should be durable, or False
otherwise.
:type durable: bool
"""
options = {'durable': durable}
try:
self._broker.addExchange(type, exchange, options)
except Exception as exc:
if OBJECT_ALREADY_EXISTS_STRING not in str(exc):
raise exc
def exchange_delete(self, exchange_name, **kwargs):
"""Delete an exchange specified by name
:param exchange_name: The name of the exchange to be deleted.
:type exchange_name: str
"""
self._broker.delExchange(exchange_name)
def queue_bind(self, queue, exchange, routing_key, **kwargs):
"""Bind a queue to an exchange with a bind key.
Bind a queue specified by name, to an exchange specified by name,
with a specific bind key. The queue and exchange must already
exist on the broker for the bind to complete successfully. Queues
may be bound to exchanges multiple times with different keys.
:param queue: The name of the queue to be bound.
:type queue: str
:param exchange: The name of the exchange that the queue should be
bound to.
:type exchange: str
:param routing_key: The bind key that the specified queue should
bind to the specified exchange with.
:type routing_key: str
"""
self._broker.bind(exchange, queue, routing_key)
def queue_unbind(self, queue, exchange, routing_key, **kwargs):
"""Unbind a queue from an exchange with a given bind key.
Unbind a queue specified by name, from an exchange specified by
name, that is already bound with a bind key. The queue and
exchange must already exist on the broker, and bound with the bind
key for the operation to complete successfully. Queues may be
bound to exchanges multiple times with different keys, thus the
bind key is a required field to unbind in an explicit way.
:param queue: The name of the queue to be unbound.
:type queue: str
:param exchange: The name of the exchange that the queue should be
unbound from.
:type exchange: str
:param routing_key: The existing bind key between the specified
queue and a specified exchange that should be unbound.
:type routing_key: str
"""
self._broker.unbind(exchange, queue, routing_key)
def queue_purge(self, queue, **kwargs):
"""Remove all undelivered messages from queue.
Purge all undelivered messages from a queue specified by name. If the
queue does not exist an exception is raised. The queue message
depth is first checked, and then the broker is asked to purge that
number of messages. The integer number of messages requested to be
purged is returned. The actual number of messages purged may be
different than the requested number of messages to purge.
Sometimes delivered messages are asked to be purged, but are not.
This case fails silently, which is the correct behavior when a
message that has been delivered to a different consumer, who has
not ACKed the message, and still has an active session with the
broker. Messages in that case are not safe for purging and will be
retained by the broker. The client is unable to change this
delivery behavior.
Internally, this method relies on :meth:`_purge`.
:param queue: The name of the queue which should have all messages
removed.
:type queue: str
:return: The number of messages requested to be purged.
:rtype: int
:raises: :class:`qpid.messaging.exceptions.NotFound` if the queue
being purged cannot be found.
"""
return self._purge(queue)
def basic_get(self, queue, no_ack=False, **kwargs):
"""Non-blocking single message get and ACK from a queue by name.
Internally this method uses :meth:`_get` to fetch the message. If
an :class:`~qpid.messaging.exceptions.Empty` exception is raised by
:meth:`_get`, this method silences it and returns None. If
:meth:`_get` does return a message, that message is ACKed. The no_ack
parameter has no effect on ACKing behavior, and all messages are
ACKed in all cases. This method never adds fetched Messages to the
internal QoS object for asynchronous ACKing.
This method converts the object type of the method as it passes
through. Fetching from the broker, :meth:`_get` returns a
:class:`qpid.messaging.Message`, but this method takes the payload
of the :class:`qpid.messaging.Message` and instantiates a
:class:`~kombu.transport.virtual.Message` object with the payload
based on the class setting of self.Message.
:param queue: The queue name to fetch a message from.
:type queue: str
:keyword no_ack: The no_ack parameter has no effect on the ACK
behavior of this method. Un-ACKed messages create a memory leak in
qpid.messaging, and need to be ACKed in all cases.
:type noack: bool
:return: The received message.
:rtype: :class:`~kombu.transport.virtual.Message`
"""
try:
qpid_message = self._get(queue)
raw_message = qpid_message.content
message = self.Message(self, raw_message)
self.transport.session.acknowledge(message=qpid_message)
return message
except Empty:
pass
def basic_ack(self, delivery_tag):
"""Acknowledge a message by delivery_tag.
Acknowledges a message referenced by delivery_tag. Messages can
only be ACKed using :meth:`basic_ack` if they were acquired using
:meth:`basic_consume`. This is the ACKing portion of the
asynchronous read behavior.
Internally, this method uses the :class:`QoS` object, which stores
messages and is responsible for the ACKing.
:param delivery_tag: The delivery tag associated with the message
to be acknowledged.
:type delivery_tag: int
"""
self.qos.ack(delivery_tag)
def basic_reject(self, delivery_tag, requeue=False):
"""Reject a message by delivery_tag.
Rejects a message that has been received by the Channel, but not
yet acknowledged. Messages are referenced by their delivery_tag.
If requeue is False, the rejected message will be dropped by the
broker and not delivered to any other consumers. If requeue is
True, then the rejected message will be requeued for delivery to
another consumer, potentially to the same consumer who rejected the
message previously.
:param delivery_tag: The delivery tag associated with the message
to be rejected.
:type delivery_tag: int
:keyword requeue: If False, the rejected message will be dropped by
the broker and not delivered to any other consumers. If True,
then the rejected message will be requeued for delivery to
another consumer, potentially to the same consumer who rejected
the message previously.
:type requeue: bool
"""
self.qos.reject(delivery_tag, requeue=requeue)
def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
"""Start an asynchronous consumer that reads from a queue.
This method starts a consumer of type
:class:`~qpid.messaging.endpoints.Receiver` using the
:class:`~qpid.messaging.endpoints.Session` created and referenced by
the :class:`Transport` that reads messages from a queue
specified by name until stopped by a call to :meth:`basic_cancel`.
Messages are available later through a synchronous call to
:meth:`Transport.drain_events`, which will drain from the consumer
started by this method. :meth:`Transport.drain_events` is
synchronous, but the receiving of messages over the network occurs
asynchronously, so it should still perform well.
:meth:`Transport.drain_events` calls the callback provided here with
the Message of type self.Message.
Each consumer is referenced by a consumer_tag, which is provided by
the caller of this method.
This method sets up the callback onto the self.connection object in a
dict keyed by queue name. :meth:`~Transport.drain_events` is
responsible for calling that callback upon message receipt.
All messages that are received are added to the QoS object to be
saved for asynchronous ACKing later after the message has been
handled by the caller of :meth:`~Transport.drain_events`. Messages
can be ACKed after being received through a call to :meth:`basic_ack`.
If no_ack is True, the messages are immediately ACKed to avoid a
memory leak in qpid.messaging when messages go un-ACKed. The no_ack
flag indicates that the receiver of the message does not intent to
call :meth:`basic_ack`.
:meth:`basic_consume` transforms the message object type prior to
calling the callback. Initially the message comes in as a
:class:`qpid.messaging.Message`. This method unpacks the payload
of the :class:`qpid.messaging.Message` and creates a new object of
type self.Message.
This method wraps the user delivered callback in a runtime-built
function which provides the type transformation from
:class:`qpid.messaging.Message` to
:class:`~kombu.transport.virtual.Message`, and adds the message to
the associated :class:`QoS` object for asynchronous ACKing
if necessary.
:param queue: The name of the queue to consume messages from
:type queue: str
:param no_ack: If True, then messages will not be saved for ACKing
later, but will be ACKed immediately. If False, then messages
will be saved for ACKing later with a call to :meth:`basic_ack`.
:type no_ack: bool
:param callback: a callable that will be called when messages
arrive on the queue.
:type callback: a callable object
:param consumer_tag: a tag to reference the created consumer by.
This consumer_tag is needed to cancel the consumer.
:type consumer_tag: an immutable object
"""
self._tag_to_queue[consumer_tag] = queue
def _callback(qpid_message):
raw_message = qpid_message.content
message = self.Message(self, raw_message)
delivery_tag = message.delivery_tag
self.qos.append(qpid_message, delivery_tag)
if no_ack:
# Celery will not ack this message later, so we should to
# avoid a memory leak in qpid.messaging due to un-ACKed
# messages.
self.basic_ack(delivery_tag)
return callback(message)
self.connection._callbacks[queue] = _callback
new_receiver = self.transport.session.receiver(queue)
new_receiver.capacity = self.qos.prefetch_count
self._receivers[consumer_tag] = new_receiver
def basic_cancel(self, consumer_tag):
"""Cancel consumer by consumer tag.
Request the consumer stops reading messages from its queue. The
consumer is a :class:`~qpid.messaging.endpoints.Receiver`, and it is
closed using :meth:`~qpid.messaging.endpoints.Receiver.close`.
This method also cleans up all lingering references of the consumer.
:param consumer_tag: The tag which refers to the consumer to be
cancelled. Originally specified when the consumer was created
as a parameter to :meth:`basic_consume`.
:type consumer_tag: an immutable object
"""
if consumer_tag in self._receivers:
receiver = self._receivers.pop(consumer_tag)
receiver.close()
queue = self._tag_to_queue.pop(consumer_tag, None)
self.connection._callbacks.pop(queue, None)
def close(self):
"""Cancel all associated messages and close the Channel.
This cancels all consumers by calling :meth:`basic_cancel` for each
known consumer_tag. It also closes the self._broker sessions. Closing
the sessions implicitly causes all outstanding, un-ACKed messages to
be considered undelivered by the broker.
"""
if not self.closed:
self.closed = True
for consumer_tag in self._receivers.keys():
self.basic_cancel(consumer_tag)
if self.connection is not None:
self.connection.close_channel(self)
self._broker.close()
@property
def qos(self):
""":class:`QoS` manager for this channel.
Lazily instantiates an object of type :class:`QoS` upon access to
the self.qos attribute.
:return: An already existing, or newly created QoS object
:rtype: :class:`QoS`
"""
if self._qos is None:
self._qos = self.QoS(self.transport.session)
return self._qos
def basic_qos(self, prefetch_count, *args):
"""Change :class:`QoS` settings for this Channel.
Set the number of un-acknowledged messages this Channel can fetch and
hold. The prefetch_value is also used as the capacity for any new
:class:`~qpid.messaging.endpoints.Receiver` objects.
Currently, this value is hard coded to 1.
:param prefetch_count: Not used. This method is hard-coded to 1.
:type prefetch_count: int
"""
self.qos.prefetch_count = 1
def prepare_message(self, body, priority=None, content_type=None,
content_encoding=None, headers=None, properties=None):
"""Prepare message data for sending.
This message is typically called by
:meth:`kombu.messaging.Producer._publish` as a preparation step in
message publication.
:param body: The body of the message
:type body: str
:keyword priority: A number between 0 and 9 that sets the priority of
the message.
:type priority: int
:keyword content_type: The content_type the message body should be
treated as. If this is unset, the
:class:`qpid.messaging.endpoints.Sender` object tries to
autodetect the content_type from the body.
:type content_type: str
:keyword content_encoding: The content_encoding the message body is
encoded as.
:type content_encoding: str
:keyword headers: Additional Message headers that should be set.
Passed in as a key-value pair.
:type headers: dict
:keyword properties: Message properties to be set on the message.
:type properties: dict
:return: Returns a dict object that encapsulates message
attributes. See parameters for more details on attributes that
can be set.
:rtype: dict
"""
properties = properties or {}
info = properties.setdefault('delivery_info', {})
info['priority'] = priority or 0
return {'body': body,
'content-encoding': content_encoding,
'content-type': content_type,
'headers': headers or {},
'properties': properties or {}}
def basic_publish(self, message, exchange, routing_key, **kwargs):
"""Publish message onto an exchange using a routing key.
Publish a message onto an exchange specified by name using a
routing key specified by routing_key. Prepares the message in the
following ways before sending:
- encodes the body using :meth:`encode_body`
- wraps the body as a buffer object, so that
:class:`qpid.messaging.endpoints.Sender` uses a content type
that can support arbitrarily large messages.
- assigns a delivery_tag generated through self._delivery_tags
- sets the exchange and routing_key info as delivery_info
Internally uses :meth:`_put` to send the message synchronously. This
message is typically called by
:class:`kombu.messaging.Producer._publish` as the final step in
message publication.
:param message: A dict containing key value pairs with the message
data. A valid message dict can be generated using the
:meth:`prepare_message` method.
:type message: dict
:param exchange: The name of the exchange to submit this message
onto.
:type exchange: str
:param routing_key: The routing key to be used as the message is
submitted onto the exchange.
:type routing_key: str
"""
message['body'], body_encoding = self.encode_body(
message['body'], self.body_encoding,
)
message['body'] = buffer(message['body'])
props = message['properties']
props.update(
body_encoding=body_encoding,
delivery_tag=next(self._delivery_tags),
)
props['delivery_info'].update(
exchange=exchange,
routing_key=routing_key,
)
self._put(routing_key, message, exchange, **kwargs)
def encode_body(self, body, encoding=None):
"""Encode a body using an optionally specified encoding.
The encoding can be specified by name, and is looked up in
self.codecs. self.codecs uses strings as its keys which specify
the name of the encoding, and then the value is an instantiated
object that can provide encoding/decoding of that type through
encode and decode methods.
:param body: The body to be encoded.
:type body: str
:keyword encoding: The encoding type to be used. Must be a supported
codec listed in self.codecs.
:type encoding: str
:return: If encoding is specified, return a tuple with the first
position being the encoded body, and the second position the
encoding used. If encoding is not specified, the body is passed
through unchanged.
:rtype: tuple
"""
if encoding:
return self.codecs.get(encoding).encode(body), encoding
return body, encoding
def decode_body(self, body, encoding=None):
"""Decode a body using an optionally specified encoding.
The encoding can be specified by name, and is looked up in
self.codecs. self.codecs uses strings as its keys which specify
the name of the encoding, and then the value is an instantiated
object that can provide encoding/decoding of that type through
encode and decode methods.
:param body: The body to be encoded.
:type body: str
:keyword encoding: The encoding type to be used. Must be a supported
codec listed in self.codecs.
:type encoding: str
:return: If encoding is specified, the decoded body is returned.
If encoding is not specified, the body is returned unchanged.
:rtype: str
"""
if encoding:
return self.codecs.get(encoding).decode(body)
return body
def typeof(self, exchange, default='direct'):
"""Get the exchange type.
Lookup and return the exchange type for an exchange specified by
name. Exchange types are expected to be 'direct', 'topic',
and 'fanout', which correspond with exchange functionality as
specified in AMQP 0-10 and earlier. If the exchange cannot be
found, the default exchange type is returned.
:param exchange: The exchange to have its type lookup up.
:type exchange: str
:keyword default: The type of exchange to assume if the exchange does
not exist.
:type default: str
:return: The exchange type either 'direct', 'topic', or 'fanout'.
:rtype: str
"""
qpid_exchange = self._broker.getExchange(exchange)
if qpid_exchange:
qpid_exchange_attributes = qpid_exchange.getAttributes()
return qpid_exchange_attributes['type']
else:
return default
class Connection(object):
"""Encapsulate a connection object for the :class:`Transport`.
A Connection object is created by a :class:`Transport` during a call to
:meth:`Transport.establish_connection`. The :class:`Transport` passes in
connection options as keywords that should be used for any connections
created. Each :class:`Transport` creates exactly one Connection.
A Connection object maintains a reference to a
:class:`~qpid.messaging.endpoints.Connection` which can be accessed
through a bound getter method named :meth:`get_qpid_connection` method.
Each Channel uses a the Connection for each
:class:`~qpidtoollibs.BrokerAgent`, and the Transport maintains a session
for all senders and receivers.
The Connection object is also responsible for maintaining the
dictionary of references to callbacks that should be called when
messages are received. These callbacks are saved in _callbacks,
and keyed on the queue name associated with the received message. The
_callbacks are setup in :meth:`Channel.basic_consume`, removed in
:meth:`Channel.basic_cancel`, and called in
:meth:`Transport.drain_events`.
The following keys are expected to be passed in as keyword arguments
at a minimum:
All keyword arguments are collected into the connection_options dict
and passed directly through to
:meth:`qpid.messaging.endpoints.Connection.establish`.
"""
# A class reference to the :class:`Channel` object
Channel = Channel
def __init__(self, **connection_options):
"""Instantiate a Connection object.
The following parameters are expected:
* host: The host that connections should connect to.
* port: The port that connection should connect to.
* username: The username that connections should connect with.
Optional.
* password: The password that connections should connect with.
Optional but requires a username.
* transport: The transport type that connections should use.
Either 'tcp', or 'ssl' are expected as values.
* timeout: the timeout used when a Connection connects to the
broker.
* sasl_mechanisms: The sasl authentication mechanism type to use.
refer to SASL documentation for an explanation of valid
values.
Creates a :class:`qpid.messaging.endpoints.Connection` object with
the saved parameters, and stores it as _qpid_conn.
qpid.messaging has an AuthenticationFailure exception type, but
instead raises a ConnectionError with a message that indicates an
authentication failure occurred in those situations.
ConnectionError is listed as a recoverable error type, so kombu
will attempt to retry if a ConnectionError is raised. Retrying
the operation without adjusting the credentials is not correct,
so this method specifically checks for a ConnectionError that
indicates an Authentication Failure occurred. In those
situations, the error type is mutated while preserving the
original message and raised so kombu will allow the exception to
not be considered recoverable.
"""
self.connection_options = connection_options
self.channels = []
self._callbacks = {}
self._qpid_conn = None
establish = qpid.messaging.Connection.establish
# There are several inconsistent behaviors in the sasl libraries
# used on different systems. Although qpid.messaging allows
# multiple space separated sasl mechanisms, this implementation
# only advertises one type to the server. These are either
# ANONYMOUS, PLAIN, or an overridden value specified by the user.
sasl_mech = connection_options['sasl_mechanisms']
try:
msg = _('Attempting to connect to qpid with '
'SASL mechanism %s') % sasl_mech
logger.debug(msg)
self._qpid_conn = establish(**self.connection_options)
# connection was successful if we got this far
msg = _('Connected to qpid with SASL '
'mechanism %s') % sasl_mech
logger.info(msg)
except ConnectionError as conn_exc:
# if we get one of these errors, do not raise an exception.
# Raising will cause the connection to be retried. Instead,
# just continue on to the next mech.
coded_as_auth_failure = getattr(conn_exc, 'code', None) == 320
contains_auth_fail_text = \
'Authentication failed' in conn_exc.text
contains_mech_fail_text = \
'sasl negotiation failed: no mechanism agreed' \
in conn_exc.text
contains_mech_unavail_text = 'no mechanism available' \
in conn_exc.text
if coded_as_auth_failure or \
contains_auth_fail_text or contains_mech_fail_text or \
contains_mech_unavail_text:
msg = _('Unable to connect to qpid with SASL '
'mechanism %s') % sasl_mech
logger.error(msg)
raise AuthenticationFailure(sys.exc_info()[1])
raise
def get_qpid_connection(self):
"""Return the existing connection (singleton).
:return: The existing qpid.messaging.Connection
:rtype: :class:`qpid.messaging.endpoints.Connection`
"""
return self._qpid_conn
def close(self):
"""Close the connection
Closing the connection will close all associated session, senders, or
receivers used by the Connection.
"""
self._qpid_conn.close()
def close_channel(self, channel):
"""Close a Channel.
Close a channel specified by a reference to the :class:`Channel`
object.
:param channel: Channel that should be closed.
:type channel: Channel
"""
try:
self.channels.remove(channel)
except ValueError:
pass
finally:
channel.connection = None
class Transport(base.Transport):
"""Kombu native transport for a Qpid broker.
Provide a native transport for Kombu that allows consumers and
producers to read and write messages to/from a broker. This Transport
is capable of supporting both synchronous and asynchronous reading.
All writes are synchronous through the :class:`Channel` objects that
support this Transport.
Asynchronous reads are done using a call to :meth:`drain_events`,
which synchronously reads messages that were fetched asynchronously, and
then handles them through calls to the callback handlers maintained on
the :class:`Connection` object.
The Transport also provides methods to establish and close a connection
to the broker. This Transport establishes a factory-like pattern that
allows for singleton pattern to consolidate all Connections into a single
one.
The Transport can create :class:`Channel` objects to communicate with the
broker with using the :meth:`create_channel` method.
The Transport identifies recoverable connection errors and recoverable
channel errors according to the Kombu 3.0 interface. These exception are
listed as tuples and store in the Transport class attribute
`recoverable_connection_errors` and `recoverable_channel_errors`
respectively. Any exception raised that is not a member of one of these
tuples is considered non-recoverable. This allows Kombu support for
automatic retry of certain operations to function correctly.
For backwards compatibility to the pre Kombu 3.0 exception interface, the
recoverable errors are also listed as `connection_errors` and
`channel_errors`.
"""
# Reference to the class that should be used as the Connection object
Connection = Connection
# This Transport does not specify a polling interval.
polling_interval = None
# This Transport does support the Celery asynchronous event model.
supports_ev = True
# The driver type and name for identification purposes.
driver_type = 'qpid'
driver_name = 'qpid'
# Exceptions that can be recovered from, but where the connection must be
# closed and re-established first.
recoverable_connection_errors = (
ConnectionError,
select.error,
)
# Exceptions that can be automatically recovered from without
# re-establishing the connection.
recoverable_channel_errors = (
NotFound,
)
# Support the pre 3.0 Kombu exception labeling interface which treats
# connection_errors and channel_errors both as recoverable via a
# reconnect.
connection_errors = recoverable_connection_errors
channel_errors = recoverable_channel_errors
def __init__(self, *args, **kwargs):
"""Instantiate a Transport object.
This method creates a pipe, and saves the read and write file
descriptors as attributes. The behavior of the read file descriptor
is modified to be non-blocking using fcntl.fcntl.
"""
self.verify_runtime_environment()
super(Transport, self).__init__(*args, **kwargs)
self.r, self._w = os.pipe()
if fcntl is not None:
fcntl.fcntl(self.r, fcntl.F_SETFL, os.O_NONBLOCK)
def verify_runtime_environment(self):
"""Verify that the runtime environment is acceptable.
This method is called as part of __init__ and raises a RuntimeError
in Python3 or PyPi environments. This module is not compatible with
Python3 or PyPi. The RuntimeError identifies this to the user up
front along with suggesting Python 2.6+ be used instead.
This method also checks that the dependencies qpidtoollibs and
qpid.messaging are installed. If either one is not installed a
RuntimeError is raised.
:raises: RuntimeError if the runtime environment is not acceptable.
"""
if getattr(sys, 'pypy_version_info', None):
raise RuntimeError(
'The Qpid transport for Kombu does not '
'support PyPy. Try using Python 2.6+',
)
if PY3:
raise RuntimeError(
'The Qpid transport for Kombu does not '
'support Python 3. Try using Python 2.6+',
)
if dependency_is_none(qpidtoollibs):
raise RuntimeError(
'The Python package "qpidtoollibs" is missing. Install it '
'with your package manager. You can also try `pip install '
'qpid-tools`.')
if dependency_is_none(qpid):
raise RuntimeError(
'The Python package "qpid.messaging" is missing. Install it '
'with your package manager. You can also try `pip install '
'qpid-python`.')
def _qpid_session_ready(self):
os.write(self._w, '0')
def on_readable(self, connection, loop):
"""Handle any messages associated with this Transport.
This method clears a single message from the externally monitored
file descriptor by issuing a read call to the self.r file descriptor
which removes a single '0' character that was placed into the pipe
by the Qpid session message callback handler. Once a '0' is read,
all available events are drained through a call to
:meth:`drain_events`.
The behavior of self.r is adjusted in __init__ to be non-blocking,
ensuring that an accidental call to this method when no more messages
will arrive will not cause indefinite blocking.
Nothing is expected to be returned from :meth:`drain_events` because
:meth:`drain_events` handles messages by calling callbacks that are
maintained on the :class:`Connection` object. When
:meth:`drain_events` returns, all associated messages have been
handled.
This method calls drain_events() which reads as many messages as are
available for this Transport, and then returns. It blocks in the
sense that reading and handling a large number of messages may take
time, but it does not block waiting for a new message to arrive. When
:meth:`drain_events` is called a timeout is not specified, which
causes this behavior.
One interesting behavior of note is where multiple messages are
ready, and this method removes a single '0' character from
self.r, but :meth:`drain_events` may handle an arbitrary amount of
messages. In that case, extra '0' characters may be left on self.r
to be read, where messages corresponding with those '0' characters
have already been handled. The external epoll loop will incorrectly
think additional data is ready for reading, and will call
on_readable unnecessarily, once for each '0' to be read. Additional
calls to :meth:`on_readable` produce no negative side effects,
and will eventually clear out the symbols from the self.r file
descriptor. If new messages show up during this draining period,
they will also be properly handled.
:param connection: The connection associated with the readable
events, which contains the callbacks that need to be called for
the readable objects.
:type connection: Connection
:param loop: The asynchronous loop object that contains epoll like
functionality.
:type loop: kombu.async.Hub
"""
os.read(self.r, 1)
try:
self.drain_events(connection)
except socket.timeout:
pass
def register_with_event_loop(self, connection, loop):
"""Register a file descriptor and callback with the loop.
Register the callback self.on_readable to be called when an
external epoll loop sees that the file descriptor registered is
ready for reading. The file descriptor is created by this Transport,
and is written to when a message is available.
Because supports_ev == True, Celery expects to call this method to
give the Transport an opportunity to register a read file descriptor
for external monitoring by celery using an Event I/O notification
mechanism such as epoll. A callback is also registered that is to
be called once the external epoll loop is ready to handle the epoll
event associated with messages that are ready to be handled for
this Transport.
The registration call is made exactly once per Transport after the
Transport is instantiated.
:param connection: A reference to the connection associated with
this Transport.
:type connection: Connection
:param loop: A reference to the external loop.
:type loop: kombu.async.hub.Hub
"""
loop.add_reader(self.r, self.on_readable, connection, loop)
def establish_connection(self):
"""Establish a Connection object.
Determines the correct options to use when creating any
connections needed by this Transport, and create a
:class:`Connection` object which saves those values for
connections generated as they are needed. The options are a
mixture of what is passed in through the creator of the
Transport, and the defaults provided by
:meth:`default_connection_params`. Options cover broker network
settings, timeout behaviors, authentication, and identity
verification settings.
This method also creates and stores a
:class:`~qpid.messaging.endpoints.Session` using the
:class:`~qpid.messaging.endpoints.Connection` created by this
method. The Session is stored on self.
:return: The created :class:`Connection` object is returned.
:rtype: :class:`Connection`
"""
conninfo = self.client
for name, default_value in items(self.default_connection_params):
if not getattr(conninfo, name, None):
setattr(conninfo, name, default_value)
if conninfo.ssl:
conninfo.qpid_transport = 'ssl'
conninfo.transport_options['ssl_keyfile'] = conninfo.ssl[
'keyfile']
conninfo.transport_options['ssl_certfile'] = conninfo.ssl[
'certfile']
conninfo.transport_options['ssl_trustfile'] = conninfo.ssl[
'ca_certs']
if conninfo.ssl['cert_reqs'] == ssl.CERT_REQUIRED:
conninfo.transport_options['ssl_skip_hostname_check'] = False
else:
conninfo.transport_options['ssl_skip_hostname_check'] = True
else:
conninfo.qpid_transport = 'tcp'
credentials = {}
if conninfo.login_method is None:
if conninfo.userid is not None and conninfo.password is not None:
sasl_mech = 'PLAIN'
credentials['username'] = conninfo.userid
credentials['password'] = conninfo.password
elif conninfo.userid is None and conninfo.password is not None:
raise Exception(
'Password configured but no username. SASL PLAIN '
'requires a username when using a password.')
elif conninfo.userid is not None and conninfo.password is None:
raise Exception(
'Username configured but no password. SASL PLAIN '
'requires a password when using a username.')
else:
sasl_mech = 'ANONYMOUS'
else:
sasl_mech = conninfo.login_method
if conninfo.userid is not None:
credentials['username'] = conninfo.userid
opts = {
'host': conninfo.hostname,
'port': conninfo.port,
'sasl_mechanisms': sasl_mech,
'timeout': conninfo.connect_timeout,
'transport': conninfo.qpid_transport
}
opts.update(credentials)
opts.update(conninfo.transport_options)
conn = self.Connection(**opts)
conn.client = self.client
self.session = conn.get_qpid_connection().session()
self.session.set_message_received_handler(self._qpid_session_ready)
return conn
def close_connection(self, connection):
"""
Close the :class:`Connection` object.
:param connection: The Connection that should be closed.
:type connection: :class:`kombu.transport.qpid.Connection`
"""
connection.close()
def drain_events(self, connection, timeout=0, **kwargs):
"""Handle and call callbacks for all ready Transport messages.
Drains all events that are ready from all
:class:`~qpid.messaging.endpoints.Receiver` that are asynchronously
fetching messages.
For each drained message, the message is called to the appropriate
callback. Callbacks are organized by queue name.
:param connection: The :class:`Connection` that contains the
callbacks, indexed by queue name, which will be called by this
method.
:type connection: Connection
:keyword timeout: The timeout that limits how long this method will
run for. The timeout could interrupt a blocking read that is
waiting for a new message, or cause this method to return before
all messages are drained. Defaults to 0.
:type timeout: int
"""
start_time = time.time()
elapsed_time = -1
while elapsed_time < timeout:
try:
receiver = self.session.next_receiver(timeout=timeout)
message = receiver.fetch()
queue = receiver.source
except QpidEmpty:
raise socket.timeout()
else:
connection._callbacks[queue](message)
elapsed_time = time.time() - start_time
raise socket.timeout()
def create_channel(self, connection):
"""Create and return a :class:`Channel`.
Creates a new :class:`Channel`, and append the :class:`Channel` to the
list of channels known by the :class:`Connection`. Once the new
:class:`Channel` is created, it is returned.
:param connection: The connection that should support the new
:class:`Channel`.
:type connection: Connection
:return: The new Channel that is made.
:rtype: :class:`Channel`.
"""
channel = connection.Channel(connection, self)
connection.channels.append(channel)
return channel
@property
def default_connection_params(self):
"""Return a dict with default connection parameters.
These connection parameters will be used whenever the creator of
Transport does not specify a required parameter.
:return: A dict containing the default parameters.
:rtype: dict
"""
return {
'hostname': 'localhost',
'port': 5672,
}
def __del__(self):
"""
Ensure file descriptors opened in __init__() are closed.
"""
for fd in (self.r, self._w):
try:
os.close(fd)
except OSError:
# ignored
pass
|