This file is indexed.

/usr/lib/python2.7/dist-packages/sardana/macroserver/msdoor.py is in python-sardana 1.2.0-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
#!/usr/bin/env python

##############################################################################
##
## This file is part of Sardana
##
## http://www.tango-controls.org/static/sardana/latest/doc/html/index.html
##
## Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
## Sardana is free software: you can redistribute it and/or modify
## it under the terms of the GNU Lesser General Public License as published by
## the Free Software Foundation, either version 3 of the License, or
## (at your option) any later version.
##
## Sardana is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
## GNU Lesser General Public License for more details.
##
## You should have received a copy of the GNU Lesser General Public License
## along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################

"""This module contains the class definition for the macro server door"""

__all__ = ["MacroProxy", "BaseInputHandler", "MSDoor"]

__docformat__ = 'restructuredtext'

import collections
import weakref

from taurus.core.util import Logger

from sardana import ElementType
from sardana.sardanaevent import EventType

from msbase import MSObject
from msparameter import Type


class MacroProxy(object):

    def __init__(self, door, macro_meta):
        self._door = weakref.ref(door)
        self._macro_meta = weakref.ref(macro_meta)

    @property
    def door(self):
        return self._door()

    @property
    def macro_info(self):
        return self._macro_meta()

    def __call__(self, *args, **kwargs):
        door = self.door
        parent_macro = door.get_running_macro()
        parent_macro.syncLog()
        executor = parent_macro.executor
        opts=dict(parent_macro=parent_macro, executor=executor)
        kwargs.update(opts)
        eargs = [self.macro_info.name]
        eargs.extend(args)
        return parent_macro.execMacro(*eargs, **kwargs)


class MacroProxyCache(dict):

    def __init__(self, door):
        self._door = weakref.ref(door)
        self.rebuild()

    @property
    def door(self):
        return self._door()

    def rebuild(self):
        self.clear()
        door = self.door
        macros = self.door.get_macros()
        for macro_name, macro_meta in macros.items():
            self[macro_name] = MacroProxy(door, macro_meta)


class BaseInputHandler(object):

    def __init__(self):
        try:
            self._input = raw_input
        except NameError:
            self._input = input

    def input(self, input_data=None):
        if input_data is None:
            input_data = {}
        prompt = input_data.get('prompt')
        if prompt is None:
            return self._input()
        else:
            return self._input(prompt)


class MSDoor(MSObject):
    """Sardana door object"""

    def __init__(self, **kwargs):
        self._state = None
        self._status = None
        self._result = None
        self._macro_status = None
        self._record_data = None
        self._macro_proxy_cache = None
        self._input_handler = BaseInputHandler()
        self._pylab_handler = None
        kwargs['elem_type'] = ElementType.Door
        MSObject.__init__(self, **kwargs)

    def get_macro_executor(self):
        return self.macro_server.macro_manager.getMacroExecutor(self)

    macro_executor = property(get_macro_executor)

    def get_running_macro(self):
        return self.macro_executor.getRunningMacro()

    running_macro = property(get_running_macro)

    def get_macro_data(self):
        macro = self.running_macro
        if macro is None:
            raise Exception("No macro has run so far!")
        data = macro.data
        return data

    def set_pylab_handler(self, ph):
        self._pylab_handler = ph
    
    def get_pylab_handler(self):
        return self._pylab_handler

    pylab_handler = property(get_pylab_handler, set_pylab_handler)
    
    def get_pylab(self):
        ph = self.pylab_handler
        if ph is None:
            import matplotlib.pylab
            ph = matplotlib.pylab
        return ph

    pylab = property(get_pylab)

    def set_pyplot_handler(self, ph):
        self._pyplot_handler = ph
    
    def get_pyplot_handler(self):
        return self._pyplot_handler

    pyplot_handler = property(get_pyplot_handler, set_pyplot_handler)
    
    def get_pyplot(self):
        ph = self.pyplot_handler
        if ph is None:
            import matplotlib.pyplot
            ph = matplotlib.pyplot
        return ph

    pyplot = property(get_pyplot)

    def set_input_handler(self, ih):
        self._input_handler = ih

    def get_input_handler(self):
        return self._input_handler

    input_handler = property(get_input_handler, set_input_handler)

    def append_prompt(self, prompt, msg):
        if '?' in prompt:
            prefix, suffix = prompt.rsplit('?', 1)
            if not prefix.endswith(' '):
                prefix += ' '
            prompt = prefix + msg + '?' + suffix
        else:
            prompt += msg + ' '
        return prompt

    def input(self, msg, *args, **kwargs):
        kwargs['data_type'] = kwargs.get('data_type', Type.String)
        kwargs['allow_multiple'] = kwargs.get('allow_multiple', False)
        
        if args:
            msg = msg % args
        if not msg.endswith(' '):
            msg += ' '
        dv = kwargs.get('default_value')
        if dv is not None:
            dv = '[' + str(dv) + ']'
            msg = self.append_prompt(msg, dv)

        macro = kwargs.pop('macro', self.macro_executor.getRunningMacro())
        if macro is None:
            macro = self

        input_data = dict(prompt=msg, type='input')
        input_data.update(kwargs)
        data_type = kwargs['data_type']
        is_seq = not isinstance(data_type, (str, unicode)) and \
                 isinstance(data_type, collections.Sequence)
        if is_seq:
            handle = self._handle_seq_input
        else:
            handle = self._handle_type_input
        
        return handle(macro, input_data, data_type)

    def _handle_seq_input(self, obj, input_data, data_type):
        valid = False
        allow_multiple = input_data['allow_multiple']
        while not valid:
            result = self.input_handler.input(input_data)
            if allow_multiple:
                r, dt = set(result), set(data_type)
                if r.issubset(dt):
                    break
            else:
                if result in data_type:
                    break
            obj.warning("Please give a valid option")
        return result
    
    def _handle_type_input(self, obj, input_data, data_type):
        type_obj = self.type_manager.getTypeObj(data_type)

        valid = False
        while not valid:
            result = self.input_handler.input(input_data)
            try:
                result_type = type_obj.getObj(result)
                if result_type is None:
                    raise Exception("Must give a value")
                valid = True
                return result_type
            except:
                dtype = str(data_type).lower()
                obj.warning("Please give a valid %s.", dtype)

    def get_report_logger(self):
        return self.macro_server.report_logger

    report_logger = property(get_report_logger)

    def report(self, msg, *args, **kwargs):
        """
        Record a log message in the sardana report (if enabled) with default
        level **INFO**. The msg is the message format string, and the args are
        the arguments which are merged into msg using the string formatting
        operator. (Note that this means that you can use keywords in the
        format string, together with a single dictionary argument.)

        *kwargs* are the same as :meth:`logging.Logger.debug` plus an optional
        level kwargs which has default value **INFO**

        Example::

            self.report("this is an official report!")

        :param msg: the message to be recorded
        :type msg: :obj:`str`
        :param args: list of arguments
        :param kwargs: list of keyword arguments"""
        return self.macro_server.report(msg, *args, **kwargs)

    def get_state(self):
        return self._state

    def set_state(self, state, propagate=1):
        self._state = state
        self.fire_event(EventType("state", priority=propagate), state)

    state = property(get_state, set_state)

    def get_status(self):
        return self._status

    def set_status(self, status, propagate=1):
        self._status = status
        self.fire_event(EventType("status", priority=propagate), status)

    status = property(get_status, set_status)

    def get_result(self):
        return self._result

    def set_result(self, result, propagate=1):
        self._result = result
        self.fire_event(EventType("result", priority=propagate), result)

    result = property(get_result, set_result)

    def get_macro_status(self):
        return self._macro_status

    def set_macro_status(self, macro_status, propagate=1):
        self._macro_status = macro_status
        self.fire_event(EventType("macrostatus", priority=propagate),
                        macro_status)

    result = property(get_result, set_result)

    def get_record_data(self):
        return self._record_data

    def set_record_data(self, record_data, codec=None, propagate=1):
        self._record_data = record_data
        self.fire_event(EventType("recorddata", priority=propagate),
                        (codec, record_data))

    record_data = property(get_record_data, set_record_data)

    def get_env(self, key=None, macro_name=None):
        """Gets the environment with the context for this door matching the
        given parameters:

        - macro_name defines the context where to look for the environment. If
          None, the global environment is used. If macro name is given the
          environment in the context of that macro is given
        - If key is None it returns the complete environment, otherwise
          key must be a string containing the environment variable name.

        :param key:
            environment variable name [default: None, meaning all environment]
        :type key: str
        :param macro_name:
            local context for a given macro [default: None, meaning no macro
            context is used]
        :type macro_name: str

        :raises: UnknownEnv"""
        return self.macro_server.environment_manager.getAllDoorEnv(self.name)

    def set_env(self, key, value):
        return self.macro_server.set_env(key, value)

    def _build_macro_proxy_cache(self):
        self._macro_proxy_cache = MacroProxyCache(self)

    def get_macro_proxies(self):
        if self._macro_proxy_cache is None:
            self._macro_proxy_cache = MacroProxyCache(self)
        return self._macro_proxy_cache

    def run_macro(self, par_str_list, asynch=False):
        if isinstance(par_str_list, (str, unicode)):
            par_str_list = par_str_list,

        if not hasattr(self, "Output"):
            import sys
            import logging
            Logger.addLevelName(15, "OUTPUT")

            def output(loggable, msg, *args, **kw):
                loggable.getLogObj().log(Logger.Output, msg, *args, **kw)
            Logger.output = output

            Logger.disableLogOutput()
            Logger.setLogLevel(Logger.Output)
            #filter = taurus.core.util.LogFilter(level=Logger.Output)
            formatter = logging.Formatter(fmt="%(message)s")
            Logger.setLogFormat("%(message)s")
            handler = logging.StreamHandler(stream=sys.stdout)
            #handler.addFilter(filter)
            Logger.addRootLogHandler(handler)
            #handler.setFormatter(formatter)
            #logger.addHandler(handler)
            #logger.addFilter(filter)
            self.__logging_info = handler, filter, formatter

            # result of a macro
            #Logger.addLevelName(18, "RESULT")

        return self.macro_executor.run(par_str_list, asynch=asynch)

    def __getattr__(self, name):
        """Get methods from macro server"""
        return getattr(self.macro_server, name)