/usr/lib/python2.7/dist-packages/sardana/pool/poolioregister.py is in python-sardana 1.2.0-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 | #!/usr/bin/env python
##############################################################################
##
## This file is part of Sardana
##
## http://www.tango-controls.org/static/sardana/latest/doc/html/index.html
##
## Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
## Sardana is free software: you can redistribute it and/or modify
## it under the terms of the GNU Lesser General Public License as published by
## the Free Software Foundation, either version 3 of the License, or
## (at your option) any later version.
##
## Sardana is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU Lesser General Public License for more details.
##
## You should have received a copy of the GNU Lesser General Public License
## along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
"""This module is part of the Python Pool libray. It defines the base classes
for """
__all__ = [ "PoolIORegister" ]
__docformat__ = 'restructuredtext'
import time
from sardana import ElementType
from sardana.sardanaevent import EventType
from sardana.sardanaattribute import SardanaAttribute
from .poolelement import PoolElement
from .poolacquisition import PoolIORAcquisition
class Value(SardanaAttribute):
def __init__(self, *args, **kwargs):
super(Value, self).__init__(*args, **kwargs)
def update(self, cache=True, propagate=1):
if not cache or not self.has_value():
value = self.obj.read_value()
self.set_value(value, propagate=propagate)
class PoolIORegister(PoolElement):
def __init__(self, **kwargs):
kwargs['elem_type'] = ElementType.IORegister
PoolElement.__init__(self, **kwargs)
self._value = Value(self, listeners=self.on_change)
self._config = None
acq_name = "%s.Acquisition" % self._name
self.set_action_cache(PoolIORAcquisition(self.pool, name=acq_name))
def get_value_attribute(self):
"""Returns the value attribute object for this IO register
:return: the value attribute
:rtype: :class:`~sardana.sardanaattribute.SardanaAttribute`"""
return self._value
# --------------------------------------------------------------------------
# Event forwarding
# --------------------------------------------------------------------------
def on_change(self, evt_src, evt_type, evt_value):
# forward all events coming from attributes to the listeners
self.fire_event(evt_type, evt_value)
# --------------------------------------------------------------------------
# default acquisition channel
# --------------------------------------------------------------------------
def get_default_attribute(self):
return self.get_value_attribute()
# --------------------------------------------------------------------------
# value
# --------------------------------------------------------------------------
def read_value(self):
"""Reads the IO register value from hardware.
:return:
a :class:`~sardana.sardanavalue.SardanaValue` containing the IO
register value
:rtype:
:class:`~sardana.sardanavalue.SardanaValue`"""
return self.get_action_cache().read_value()[self]
def put_value(self, value, propagate=1):
"""Sets a value.
:param value:
the new value
:type value:
:class:`~sardana.sardanavalue.SardanaValue`
:param propagate:
0 for not propagating, 1 to propagate, 2 propagate with priority
:type propagate:
int"""
val_attr = self._value
val_attr.set_value(value, propagate=propagate)
return val_attr
def get_value(self, cache=True, propagate=1):
value = self.get_value_attribute()
value.update(cache=cache, propagate=propagate)
return value
def set_value(self, value, timestamp=None):
self.write_register(value, timestamp=timestamp)
def set_write_value(self, w_value, timestamp=None, propagate=1):
"""Sets a new write value for the IO registere
:param w_value:
the new write value for IO register
:type w_value:
:class:`~numbers.Number`
:param propagate:
0 for not propagating, 1 to propagate, 2 propagate with priority
:type propagate:
int"""
self._value.set_write_value(w_value, timestamp=timestamp,
propagate=propagate)
value = property(get_value, set_value, doc="ioregister value")
def write_register(self, value, timestamp=None):
self._aborted = False
self._stopped = False
if not self._simulation_mode:
if timestamp is None:
timestamp = time.time()
self.set_write_value(value, timestamp=timestamp, propagate=0)
self.controller.write_one(self.axis, value)
|