/usr/lib/python2.7/dist-packages/kombu/tests/transport/test_base.py is in python-kombu 3.0.33-1ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 | from __future__ import absolute_import
from kombu import Connection, Consumer, Exchange, Producer, Queue
from kombu.five import text_t
from kombu.message import Message
from kombu.transport.base import StdChannel, Transport, Management
from kombu.tests.case import Case, Mock
class test_StdChannel(Case):
def setUp(self):
self.conn = Connection('memory://')
self.channel = self.conn.channel()
self.channel.queues.clear()
self.conn.connection.state.clear()
def test_Consumer(self):
q = Queue('foo', Exchange('foo'))
print(self.channel.queues)
cons = self.channel.Consumer(q)
self.assertIsInstance(cons, Consumer)
self.assertIs(cons.channel, self.channel)
def test_Producer(self):
prod = self.channel.Producer()
self.assertIsInstance(prod, Producer)
self.assertIs(prod.channel, self.channel)
def test_interface_get_bindings(self):
with self.assertRaises(NotImplementedError):
StdChannel().get_bindings()
def test_interface_after_reply_message_received(self):
self.assertIsNone(
StdChannel().after_reply_message_received(Queue('foo')),
)
class test_Message(Case):
def setUp(self):
self.conn = Connection('memory://')
self.channel = self.conn.channel()
self.message = Message(self.channel, delivery_tag=313)
def test_postencode(self):
m = Message(self.channel, text_t('FOO'), postencode='ccyzz')
with self.assertRaises(LookupError):
m._reraise_error()
m.ack()
def test_ack_respects_no_ack_consumers(self):
self.channel.no_ack_consumers = set(['abc'])
self.message.delivery_info['consumer_tag'] = 'abc'
ack = self.channel.basic_ack = Mock()
self.message.ack()
self.assertNotEqual(self.message._state, 'ACK')
self.assertFalse(ack.called)
def test_ack_missing_consumer_tag(self):
self.channel.no_ack_consumers = set(['abc'])
self.message.delivery_info = {}
ack = self.channel.basic_ack = Mock()
self.message.ack()
ack.assert_called_with(self.message.delivery_tag)
def test_ack_not_no_ack(self):
self.channel.no_ack_consumers = set()
self.message.delivery_info['consumer_tag'] = 'abc'
ack = self.channel.basic_ack = Mock()
self.message.ack()
ack.assert_called_with(self.message.delivery_tag)
def test_ack_log_error_when_no_error(self):
ack = self.message.ack = Mock()
self.message.ack_log_error(Mock(), KeyError)
ack.assert_called_with()
def test_ack_log_error_when_error(self):
ack = self.message.ack = Mock()
ack.side_effect = KeyError('foo')
logger = Mock()
self.message.ack_log_error(logger, KeyError)
ack.assert_called_with()
self.assertTrue(logger.critical.called)
self.assertIn("Couldn't ack", logger.critical.call_args[0][0])
def test_reject_log_error_when_no_error(self):
reject = self.message.reject = Mock()
self.message.reject_log_error(Mock(), KeyError, requeue=True)
reject.assert_called_with(requeue=True)
def test_reject_log_error_when_error(self):
reject = self.message.reject = Mock()
reject.side_effect = KeyError('foo')
logger = Mock()
self.message.reject_log_error(logger, KeyError)
reject.assert_called_with(requeue=False)
self.assertTrue(logger.critical.called)
self.assertIn("Couldn't reject", logger.critical.call_args[0][0])
class test_interface(Case):
def test_establish_connection(self):
with self.assertRaises(NotImplementedError):
Transport(None).establish_connection()
def test_close_connection(self):
with self.assertRaises(NotImplementedError):
Transport(None).close_connection(None)
def test_create_channel(self):
with self.assertRaises(NotImplementedError):
Transport(None).create_channel(None)
def test_close_channel(self):
with self.assertRaises(NotImplementedError):
Transport(None).close_channel(None)
def test_drain_events(self):
with self.assertRaises(NotImplementedError):
Transport(None).drain_events(None)
def test_heartbeat_check(self):
Transport(None).heartbeat_check(Mock(name='connection'))
def test_driver_version(self):
self.assertTrue(Transport(None).driver_version())
def test_register_with_event_loop(self):
Transport(None).register_with_event_loop(Mock(name='loop'))
def test_manager(self):
self.assertTrue(Transport(None).manager)
class test_Management(Case):
def test_get_bindings(self):
m = Management(Mock(name='transport'))
with self.assertRaises(NotImplementedError):
m.get_bindings()
|