This file is indexed.

/usr/lib/ruby/vendor_ruby/octocatalog-diff/util/parallel.rb is in octocatalog-diff 1.5.3-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# frozen_string_literal: true

# A class to parallelize process executation.
# This is a utility class to execute tasks in parallel, with our own forking implementation
# that passes through logs and reliably handles errors. If parallel processing has been disabled,
# this instead executes the tasks serially, but provides the same API as the parallel tasks.

require 'stringio'
require_relative 'util'

module OctocatalogDiff
  module Util
    class Parallel
      # This exception is called for a task that didn't complete.
      class IncompleteTask < RuntimeError; end

      # --------------------------------------
      # This class represents a parallel task. It requires a method reference, which will be executed with
      # any supplied arguments. It can optionally take a text description and a validator function.
      # --------------------------------------
      class Task
        attr_reader :description
        attr_accessor :args

        def initialize(opts = {})
          @method = opts.fetch(:method)
          @args = opts.fetch(:args, {})
          @description = opts[:description] || @method.name
          @validator = opts[:validator]
          @validator_args = opts[:validator_args] || {}
        end

        def execute(logger = Logger.new(StringIO.new))
          @method.call(@args, logger)
        end

        def validate(result, logger = Logger.new(StringIO.new))
          return true if @validator.nil?
          @validator.call(result, logger, @validator_args)
        end
      end

      # --------------------------------------
      # This class represents the result from a parallel task. The status is set to true (success), false (error),
      # or nil (task was killed before it could complete). The exception (for failure) and output object (for success)
      # are readable attributes. The validity of the results, determined by executing the 'validate' method of the Task,
      # is available to be set and fetched.
      # --------------------------------------
      class Result
        attr_reader :output, :args
        attr_accessor :status, :exception

        def initialize(opts = {})
          @status = opts[:status]
          @exception = opts[:exception]
          @output = opts[:output]
          @args = opts.fetch(:args, {})
        end
      end

      # --------------------------------------
      # Static methods in the class
      # --------------------------------------

      # Entry point for parallel processing. By default this will perform parallel processing,
      # but it will also accept an option to do serial processing instead.
      # @param task_array [Array<Parallel::Task>] Tasks to run
      # @param logger [Logger] Optional logger object
      # @param parallelized [Boolean] True for parallel processing, false for serial processing
      # @param raise_exception [Boolean] True to raise exception immediately if one occurs; false to return exception in results
      # @return [Array<Parallel::Result>] Parallel results (same order as tasks)
      def self.run_tasks(task_array, logger = nil, parallelized = true, raise_exception = false)
        # Create a throwaway logger object if one is not given
        logger ||= Logger.new(StringIO.new)

        # Validate input - we need an array of OctocatalogDiff::Util::Parallel::Task. If the array is empty then
        # return an empty array right away.
        raise ArgumentError, "run_tasks() argument must be array, not #{task_array.class}" unless task_array.is_a?(Array)
        return [] if task_array.empty?

        invalid_inputs = task_array.reject { |task| task.is_a?(OctocatalogDiff::Util::Parallel::Task) }
        if invalid_inputs.any?
          ele = invalid_inputs.first
          raise ArgumentError, "Element #{ele.inspect} must be a OctocatalogDiff::Util::Parallel::Task, not a #{ele.class}"
        end

        # Initialize the result array. For now all entries in the array indicate that the task was killed.
        # Actual statuses will replace this initial status. If the initial status wasn't replaced, then indeed,
        # the task was killed.
        result = task_array.map { |x| Result.new(exception: IncompleteTask.new('Killed'), args: x.args) }
        logger.debug "Initialized parallel task result array: size=#{result.size}"

        # Execute as per the requested method (serial or parallel) and handle results.
        exception = parallelized ? run_tasks_parallel(result, task_array, logger) : run_tasks_serial(result, task_array, logger)
        raise exception if exception && raise_exception
        result
      end

      # Utility method! Not intended to be called from outside this class.
      # ---
      # Use a forking strategy to run tasks in parallel. Each task in the array is forked in a child
      # process, and when that task completes it writes its result (OctocatalogDiff::Util::Parallel::Result)
      # into a serialized data file. Once children are forked this method waits for their return, deserializing
      # the output from each data file and updating the `result` array with actual results.
      # @param result [Array<OctocatalogDiff::Util::Parallel::Result>] Parallel task results
      # @param task_array [Array<OctocatalogDiff::Util::Parallel::Task>] Tasks to perform
      # @param logger [Logger] Logger
      # @return [Exception] First exception encountered by a child process; returns nil if no exceptions encountered.
      def self.run_tasks_parallel(result, task_array, logger)
        pidmap = {}
        ipc_tempdir = OctocatalogDiff::Util::Util.temp_dir('ocd-ipc-')

        # Child process forking
        task_array.each_with_index do |task, index|
          # simplecov doesn't see this because it's forked
          # :nocov:
          this_pid = fork do
            ENV['OCTOCATALOG_DIFF_TEMPDIR'] ||= ipc_tempdir
            task_result = execute_task(task, logger)
            File.open(File.join(ipc_tempdir, "#{Process.pid}.dat"), 'w') { |f| f.write Marshal.dump(task_result) }
            Kernel.exit! 0 # Kernel.exit! avoids at_exit from parents being triggered by children exiting
          end
          # :nocov:

          pidmap[this_pid] = { index: index, start_time: Time.now }
          logger.debug "Launched pid=#{this_pid} for index=#{index}"
          logger.reopen if logger.respond_to?(:reopen)
        end

        # Waiting for children and handling results
        while pidmap.any?
          this_pid, exit_obj = Process.wait2(0)
          next unless this_pid && pidmap.key?(this_pid)
          index = pidmap[this_pid][:index]
          exitstatus = exit_obj.exitstatus
          raise "PID=#{this_pid} exited abnormally: #{exit_obj.inspect}" if exitstatus.nil?
          raise "PID=#{this_pid} exited with status #{exitstatus}" unless exitstatus.zero?

          input = File.read(File.join(ipc_tempdir, "#{this_pid}.dat"))
          result[index] = Marshal.load(input) # rubocop:disable Security/MarshalLoad
          time_delta = Time.now - pidmap[this_pid][:start_time]
          pidmap.delete(this_pid)

          logger.debug "PID=#{this_pid} completed in #{time_delta} seconds, #{input.length} bytes"

          next if result[index].status
          return result[index].exception
        end

        logger.debug 'All child processes completed with no exceptions raised'

      # Cleanup: Kill any child processes that are still running, and clean the temporary directory
      # where data files were stored.
      ensure
        pidmap.each do |pid, _pid_data|
          begin
            Process.kill('TERM', pid)
          rescue Errno::ESRCH # rubocop:disable Lint/HandleExceptions
            # If the process doesn't exist, that's fine.
          end
        end
      end

      # Utility method! Not intended to be called from outside this class.
      # ---
      # Perform the tasks in serial.
      # @param result [Array<OctocatalogDiff::Util::Parallel::Result>] Parallel task results
      # @param task_array [Array<OctocatalogDiff::Util::Parallel::Task>] Tasks to perform
      # @param logger [Logger] Logger
      def self.run_tasks_serial(result, task_array, logger)
        # Perform the tasks 1 by 1 - each successful task will replace an element in the 'result' array,
        # whereas a failed task will replace the current element with an exception, and all later tasks
        # will not be replaced (thereby being populated with the cancellation error).
        task_array.each_with_index do |ele, task_counter|
          result[task_counter] = execute_task(ele, logger)
          next if result[task_counter].status
          return result[task_counter].exception
        end
        nil
      end

      # Utility method! Not intended to be called from outside this class.
      # ---
      # Process a single task. Called by run_tasks_parallel / run_tasks_serial.
      # This method will report all exceptions in the OctocatalogDiff::Util::Parallel::Result object
      # itself, and not raise them.
      # @param task [OctocatalogDiff::Util::Parallel::Task] Task object
      # @param logger [Logger] Logger
      # @return [OctocatalogDiff::Util::Parallel::Result] Parallel task result
      def self.execute_task(task, logger)
        begin
          logger.debug("Begin #{task.description}")
          output = task.execute(logger)
          result = Result.new(output: output, status: true, args: task.args)
        rescue => exc
          logger.debug("Failed #{task.description}: #{exc.class} #{exc.message}")
          # Immediately return without running the validation, since this already failed.
          return Result.new(exception: exc, status: false, args: task.args)
        end

        begin
          if task.validate(output, logger)
            logger.debug("Success #{task.description}")
          else
            # Preferably the validator method raised its own exception. However if it
            # simply returned false, raise our own exception here.
            raise "Failed #{task.description} validation (unspecified error)"
          end
        rescue => exc
          logger.warn("Failed #{task.description} validation: #{exc.class} #{exc.message}")
          result.status = false
          result.exception = exc
        end

        result
      end
    end
  end
end