/usr/lib/python3/dist-packages/pyutilib/misc/pyyaml_util.py is in python3-pyutilib 5.3.5-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 | #
# Utilities for working with YAML and JSON data representations.
#
# NOTE: The core utilities treat the YAML and JSON representations as equivalent. This may
# not be true in general, but it is true for their application within PyUtilib.
#
__all__ = [ 'yaml_fix', 'json_fix', 'load_yaml', 'load_json', 'extract_subtext', 'compare_repn', 'compare_strings', 'compare_yaml_files', 'compare_json_files', 'simple_yaml_parser']
import pprint
import math
import re
import sys
try:
unicode
except:
basestring = str
try:
from StringIO import StringIO
except:
from io import StringIO
try:
import json
json_available=True
except:
json_available=False
try:
import yaml
yaml_available=True #pragma:nocover
except: #pragma:nocover
yaml_available=False #pragma:nocover
from pyutilib.misc.comparison import open_possibly_compressed_file
def yaml_eval(str):
try:
val = int(str)
return val
except:
pass
try:
val = float(str)
return val
except:
pass
try:
val = eval(str)
return val
except:
pass
return str
def recursive_yaml_parser(stream, _depth=-1):
depth = -1
data = None
for Line in stream:
line = Line
flag = True
while flag:
flag = False
if line.strip() == '':
continue
if line.strip() == '---':
continue
if line.strip() == '...':
break
d = 0
while d < len(line) and line[d] == ' ':
d += 1
if line[d] == '#':
continue
if depth == -1:
if d <= _depth:
return None, line
depth = d
#if d >= depth:
#depth = d
if d < depth:
return data, line
if d >= depth:
if line[d] == '-':
if data is None:
data = []
value = line[d+1:].strip()
if len(value) > 0:
data.append( yaml_eval( value ) )
else:
value, line = recursive_yaml_parser(stream, d)
flag = True
data.append( value )
else:
if data is None:
data = {}
tokens = line.split(':')
key = tokens[0].strip()
value = tokens[1].strip()
if len(value) > 0:
data[key] = yaml_eval(value)
else:
value, line = recursive_yaml_parser(stream, d)
flag = True
data[key] = value
return data, line
def simple_yaml_parser(stream):
if isinstance(stream, basestring):
_stream = open_possibly_compressed_file(stream)
repn = recursive_yaml_parser(_stream)[0]
_stream.close()
return repn
return recursive_yaml_parser(stream)[0]
def yaml_fix(val):
if not isinstance(val,basestring):
return val
return val.replace(':','\\x3a')
def json_fix(val):
return yaml_fix(val)
def extract_subtext(stream, begin_str='', end_str=None, comment='#'):
if isinstance(stream,basestring):
_stream = open_possibly_compressed_file(stream)
else:
_stream = stream
if end_str is None:
end_str == begin_str
ans = []
status = len(begin_str) == 0
for line in _stream:
tokens = re.split('[\t ]+',line.strip())
if not status and line.startswith(begin_str):
status = True
elif not end_str is None and end_str != '' and line.startswith(end_str):
break
elif status:
if tokens[0] != comment:
ans.append(line)
if isinstance(stream,basestring):
_stream.close()
return "".join(ans)
def load_yaml(str):
istream = StringIO(str)
return yaml.load(istream, Loader=yaml.SafeLoader)
def load_json(str):
def _to_list(data):
ans = []
if sys.version_info < (3,0):
for val in data:
val_type = type(val)
if val_type is unicode:
val = val.encode('utf-8')
elif val_type is dict:
val = _to_dict(val)
elif val_type is list:
val = _to_list(val)
ans.append(val)
else:
for val in data:
val_type = type(val)
if val_type is bytes:
val = val.encode('utf-8')
elif val_type is dict:
val = _to_dict(val)
elif val_type is list:
val = _to_list(val)
ans.append(val)
return ans
def _to_dict(data):
ans = {}
if sys.version_info < (3,0):
for key, val in data.iteritems():
if type(key) is unicode:
key = key.encode('utf-8')
val_type = type(val)
if val_type is unicode:
val = val.encode('utf-8')
elif val_type is dict:
val = _to_dict(val)
elif val_type is list:
val = _to_list(val)
ans[key] = val
else:
for key, val in data.items():
if type(key) is bytes:
key = key.encode('utf-8')
val_type = type(val)
if val_type is bytes:
val = val.encode('utf-8')
elif val_type is dict:
val = _to_dict(val)
elif val_type is list:
val = _to_list(val)
ans[key] = val
return ans
# Use specialized decoders because JSON returns UNICODE strings,
# regardless of what string was originally encoded. We convert
# all unicode back to plain str.
return json.loads(str, object_hook=_to_dict)
def compare_repn(baseline, output, tolerance=0.0, prefix="<root>", exact=True, using_yaml=True):
if type(baseline) != type(output) and not (type(baseline) in [int,float] and type(output) in [int,float]):
raise IOError("(%s) Structural difference:\nbaseline:\n%s\noutput:\n%s" % (prefix, pprint.pformat(baseline), pprint.pformat(output)))
#
if type(baseline) is list:
if not exact and len(baseline) > len(output):
raise IOError("(%s) Baseline has longer list than output:\nbaseline:\n%s\noutput:\n%s" % (prefix, pprint.pformat(baseline), pprint.pformat(output)))
if exact and len(baseline) != len(output):
raise IOError("(%s) Baseline list length does not equal output list:\nbaseline:\n%s\noutput:\n%s" % (prefix, pprint.pformat(baseline), pprint.pformat(output)))
j=0
i=0
msg = ''
while j < len(baseline) and i < len(output):
try:
compare_repn(baseline[j], output[i], tolerance=tolerance, prefix=prefix+"["+str(i)+"]", exact=exact, using_yaml=using_yaml)
j += 1
except Exception:
msg = sys.exc_info()[1]
print(msg)
pass
i += 1
if j < len(baseline):
raise IOError("(%s) Could not find item %d in output list:\nbaseline:\n%s\noutput:\n%s\nERROR: %s" % (prefix, j, pprint.pformat(baseline), pprint.pformat(output), msg))
#
elif type(baseline) is dict:
if exact and len(baseline.keys()) != len(output.keys()):
raise IOError("(%s) Baseline and output have different keys:\nbaseline:\n%s\noutput:\n%s" % (prefix, pprint.pformat(baseline.keys()), pprint.pformat(output.keys())))
for key in baseline:
if not key in output:
raise IOError("(%s) Baseline key %s that does not exist in output:baseline:\n%s\noutput:\n%s" % (prefix, key, pprint.pformat(baseline.keys()), pprint.pformat(output.keys())))
compare_repn(baseline[key], output[key], tolerance=tolerance, prefix=prefix+"."+str(key), exact=exact, using_yaml=using_yaml)
#
elif (type(baseline) is float or type(output) is float) and type(baseline) in [int,float] and type(output) in [int,float]:
if not tolerance is None and math.fabs(baseline-output) > tolerance:
raise ValueError("(%s) Floating point values differ: baseline=%.17g and output=%.17g (tolerance=%.17g)" % (prefix, baseline, output, tolerance))
elif baseline != output: #pragma:nocover
raise ValueError("(%s) Values differ:\nbaseline:\n%s\noutput:\n%s" % (prefix, pprint.pformat(baseline), pprint.pformat(output)))
def compare_strings(baseline, output, tolerance=0.0, exact=True, using_yaml=True):
if using_yaml and not yaml_available: #pragma:nocover
raise IOError("Cannot compare YAML strings because YAML is not available")
if not using_yaml and not json_available:
raise IOError("Cannot compare JSON strings because JSON is not available")
if using_yaml:
baseline_repn = load_yaml(baseline)
output_repn = load_yaml(output)
else:
try:
baseline_repn = load_json(baseline)
except Exception:
print("Problem parsing JSON baseline")
print(baseline)
raise
try:
output_repn = load_json(output)
except Exception:
print("Problem parsing JSON output")
print(output)
raise
compare_repn(baseline_repn, output_repn, tolerance=tolerance, exact=exact, using_yaml=using_yaml)
def compare_files(baseline_fname, output_fname, tolerance=0.0, baseline_begin='', baseline_end='', output_begin='', output_end=None, exact=True, using_yaml=True):
INPUT=open_possibly_compressed_file(baseline_fname)
baseline = extract_subtext(INPUT, begin_str=baseline_begin, end_str=baseline_end)
INPUT.close()
INPUT=open_possibly_compressed_file(output_fname)
output = extract_subtext(INPUT, begin_str=output_begin, end_str=output_end)
INPUT.close()
compare_strings(baseline, output, tolerance=tolerance, exact=exact, using_yaml=using_yaml)
def compare_json_files(baseline_fname, output_fname, tolerance=0.0, baseline_begin='', baseline_end='', output_begin='', output_end=None, exact=True):
return compare_files(baseline_fname, output_fname, tolerance=tolerance, baseline_begin=baseline_begin, baseline_end=baseline_end, output_begin=output_begin, output_end=output_end, exact=exact, using_yaml=False)
def compare_yaml_files(baseline_fname, output_fname, tolerance=0.0, baseline_begin='', baseline_end='', output_begin='', output_end=None, exact=True):
return compare_files(baseline_fname, output_fname, tolerance=tolerance, baseline_begin=baseline_begin, baseline_end=baseline_end, output_begin=output_begin, output_end=output_end, exact=exact, using_yaml=True)
|