/usr/lib/python2.7/dist-packages/kombu/async/debug.py is in python-kombu 3.0.33-1ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | from __future__ import absolute_import
from kombu.five import items
from kombu.utils import reprcall
from kombu.utils.eventio import READ, WRITE, ERR
def repr_flag(flag):
return '{0}{1}{2}'.format('R' if flag & READ else '',
'W' if flag & WRITE else '',
'!' if flag & ERR else '')
def _rcb(obj):
if obj is None:
return '<missing>'
if isinstance(obj, str):
return obj
if isinstance(obj, tuple):
cb, args = obj
return reprcall(cb.__name__, args=args)
return obj.__name__
def repr_active(h):
return ', '.join(repr_readers(h) + repr_writers(h))
def repr_events(h, events):
return ', '.join(
'{0}({1})->{2}'.format(
_rcb(callback_for(h, fd, fl, '(GONE)')), fd,
repr_flag(fl),
)
for fd, fl in events
)
def repr_readers(h):
return ['({0}){1}->{2}'.format(fd, _rcb(cb), repr_flag(READ | ERR))
for fd, cb in items(h.readers)]
def repr_writers(h):
return ['({0}){1}->{2}'.format(fd, _rcb(cb), repr_flag(WRITE))
for fd, cb in items(h.writers)]
def callback_for(h, fd, flag, *default):
try:
if flag & READ:
return h.readers[fd]
if flag & WRITE:
if fd in h.consolidate:
return h.consolidate_callback
return h.writers[fd]
except KeyError:
if default:
return default[0]
raise
|