This file is indexed.

/usr/lib/logdata-anomaly-miner/aminer/input/ByteStreamLineAtomizer.py is in logdata-anomaly-miner 0.0.7-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from aminer.input import LogAtom
from aminer.input import StreamAtomizer
from aminer.parsing import MatchContext
from aminer.parsing import ParserMatch

class ByteStreamLineAtomizer(StreamAtomizer):
  """This atomizer consumes binary data from a stream to break
  it into lines, removing the line separator at the end. With
  a parsing model, it will also perform line parsing. Failures
  in atomizing or parsing will cause events to be generated and
  sent to event handler.
  Data will be consumed only when there was no downstream handler
  registered (the data will be discarded in that case) or when
  at least one downstream consumed the data."""

  def __init__(self, parsingModel, atomHandlerList, eventHandlerList,
       maxLineLength, defaultTimestampPath):
    """Create the atomizer.
    @param eventHandlerList when not None, send events to those
    handlers. The list might be empty at invocation and populated
    later on.
    @param maxLineLength the maximal line length including the
    final line separator."""
    self.parsingModel=parsingModel
    self.atomHandlerList=atomHandlerList
    self.eventHandlerList=eventHandlerList
    self.maxLineLength=maxLineLength
    self.defaultTimestampPath=defaultTimestampPath

    self.inOverlongLineFlag=False
# If consuming of data was already attempted but the downstream
# handlers refused to handle it, keep the data and the parsed
# object to avoid expensive duplicate parsing operation. The data
# does not include the line separators any more.
    self.lastUnconsumedLogAtom=None

  def consumeData(self, streamData, endOfStreamFlag=False):
    """Consume data from the underlying stream for atomizing.
    @return the number of consumed bytes, 0 if the atomizer would
    need more data for a complete atom or -1 when no data was
    consumed at the moment but data might be consumed later on."""
# Loop until as much streamData as possible was processed and
# then return a result. The correct processing of endOfStreamFlag
# is tricky: by default, even when all data was processed, do
# one more iteration to handle also the flag.
    consumedLength=0
    while True:
      if self.lastUnconsumedLogAtom!=None:
# Keep length before dispatching: dispatch will reset the field.
        dataLength=len(self.lastUnconsumedLogAtom.rawData)
        if self.dispatchAtom(self.lastUnconsumedLogAtom):
          consumedLength+=dataLength+1
          continue
# Nothing consumed, tell upstream to wait if appropriate.
        if consumedLength==0: consumedLength=-1
        break

      lineEnd=streamData.find('\n', consumedLength)
      if self.inOverlongLineFlag:
        if(lineEnd<0):
          consumedLength=length(streamData)
          if endOfStreamFlag:
            self.dispatchEvent('Overlong line terminated by end of stream', streamData)
            self.inOverlongLineFlag=False
          break
        consumedLength=lineEnd+1
        self.inOverlongLineFlag=False
        continue

# This is the valid start of a normal/incomplete/overlong line.
      if lineEnd<0:
        tailLength=len(streamData)-consumedLength
        if tailLength>self.maxLineLength:
          self.dispatchEvent('Start of overlong line detected' ,streamData[consumedLength:])
          self.inOverlongLineFlag=True
          consumedLength=len(streamData)
# Stay in loop to handle also endOfStreamFlag!
          continue
        if endOfStreamFlag and (tailLength!=0):
          self.dispatchEvent('Incomplete last line', streamData[consumedLength:])
          consumedLength=len(streamData)
        break

# This is at least a complete/overlong line.
      lineLength=lineEnd+1-consumedLength
      if lineLength>self.maxLineLength:
        self.dispatchEvent('Overlong line detected', streamData[consumedLength:lineEnd])
        consumedLength=lineEnd+1
        continue

# This is a normal line.
      lineData=streamData[consumedLength:lineEnd]
      logAtom=LogAtom.LogAtom(lineData, None, None, self)
      if self.parsingModel!=None:
        matchContext=MatchContext(lineData)
        matchElement=self.parsingModel.getMatchElement('', matchContext)
        if matchElement!=None:
          logAtom.parserMatch=ParserMatch(matchElement)
          if self.defaultTimestampPath!=None:
            tsMatch=logAtom.parserMatch.getMatchDictionary().get(self.defaultTimestampPath, None)
            if tsMatch!=None:
              logAtom.setTimestamp(tsMatch.matchObject[1])
      if self.dispatchAtom(logAtom):
        consumedLength=lineEnd+1
        continue
      if consumedLength==0:
# Downstream did not want the data, so tell upstream to block
# for a while.
        consumedLength=-1
      break
    return(consumedLength)

  def dispatchAtom(self, logAtom):
    """Dispatch the data using the appropriate handlers. Also clean
    or set lastUnconsumed fields depending on outcome of dispatching."""
    wasConsumedFlag=False
    if len(self.atomHandlerList)==0: wasConsumedFlag=True
    else:
      for handler in self.atomHandlerList:
        if handler.receiveAtom(logAtom): wasConsumedFlag=True

    if wasConsumedFlag:
      self.lastUnconsumedLogAtom=None
    else:
      self.lastUnconsumedLogAtom=logAtom
    return(wasConsumedFlag)

  def dispatchEvent(self, message, lineData):
    if self.eventHandlerList==None: return
    for handler in self.eventHandlerList:
      handler.receiveEvent('Input.%s' % self.__class__.__name__,
          message, [lineData], None, self)