/usr/share/pyshared/Scientific/BSP/IO.py is in python-scientific 2.8-4.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 | # Parallel IO
#
# Written by Konrad Hinsen <hinsen@cnrs-orleans.fr>
# last revision: 2007-5-25
#
"""
Parallel acces to netCDF files
One netCDF dimension is defined for splitting the data among
processors such that each processor is responsible for one slice of
the file along that dimension.
Since netCDF files can be very big, the distribution algorithm gives
priority to memory efficiency over CPU time efficiency. The processor
that handles the file treats only one slice per superstep, which means
that at no time more than one slice must be stored in any processor.
"""
from Scientific.IO.NetCDF import NetCDFFile
from Scientific.BSP.core import ParClass, ParBase, ParInvalid, is_invalid
from Scientific import N
class _ParNetCDFFile(ParBase):
"""
Distributed netCDF file
A ParNetCDFFile object acts as much as possible like a NetCDFFile object.
Variables become ParNetCDFVariable objects, which behave like
distributed sequences. Variables that use the dimension named by
|split_dimension| are automatically distributed among the processors
such that each treats only one slice of the whole file.
"""
def __parinit__(self, pid, nprocs, filename, split_dimension,
mode = 'r', local_access = False):
"""
@param filename: the name of the netCDF file
@type filename: C{str}
@param split_dimension: the name of the dimension along which the data
is distributed over the processors
@type split_dimension: C{str}
@param mode: read ('r'), write ('w'), or append ('a')
@type mode: C{str}
@param local_access: if C{False}, processor 0 is the only one to
access the file, all others communicate with
processor 0. If C{True} (only for reading), each
processor accesses the file directly. In the
latter case, the file must be accessible on all
processors under the same name. A third mode is
'auto', which uses some heuristics to decide
if the file is accessible everywhere: it checks
for existence of the file, then compares
the size on all processors, and finally verifies
that the same variables exist everywhere, with
identical names, types, and sizes.
@type local_access: C{bool} or C{str}
"""
if mode != 'r':
local_access = 0
self.pid = pid
self.nprocs = nprocs
self.filename = filename
self.split = split_dimension
self.local_access = local_access
self.read_only = mode == 'r'
if local_access or pid == 0:
self.file = NetCDFFile(filename, mode)
try:
length = self.file.dimensions[split_dimension]
if length is None:
length = -1
except KeyError:
length = None
variables = {}
for name, var in self.file.variables.items():
variables[name] = (name, var.dimensions)
if length < 0 and split_dimension in var.dimensions:
index = list(var.dimensions).index(split_dimension)
length = var.shape[index]
else:
self.file = None
self.split = split_dimension
length = None
variables = None
if not local_access:
length = self.broadcast(length)
variables = self.broadcast(variables)
if length is not None:
self._divideData(length)
self.variables = {}
for name, var in variables.items():
self.variables[name] = _ParNetCDFVariable(self, var[0], var[1],
split_dimension)
def __repr__(self):
return repr(self.filename)
def close(self):
if self.local_access or self.pid == 0:
self.file.close()
def createDimension(self, name, length):
if name == self.split:
if length is None:
raise ValueError("Split dimension cannot be unlimited")
self._divideData(length)
if self.pid == 0:
self.file.createDimension(name, length)
def createVariable(self, name, typecode, dimensions):
if self.pid == 0:
var = self.file.createVariable(name, typecode, dimensions)
dim = var.dimensions
else:
dim = 0
name, dim = self.broadcast((name, dim))
self.variables[name] = _ParNetCDFVariable(self, name, dim, self.split)
return self.variables[name]
def _divideData(self, length):
chunk = (length+self.nprocs-1)/self.nprocs
self.first = min(self.pid*chunk, length)
self.last = min(self.first+chunk, length)
if (not self.local_access) and self.pid == 0:
self.parts = []
for pid in range(self.nprocs):
first = pid*chunk
last = min(first+chunk, length)
self.parts.append((first, last))
def sync(self):
if self.pid == 0:
self.file.sync()
flush = sync
class _ParNetCDFVariable(ParBase):
def __init__(self, file, name, dimensions, split_dimension):
self.file = file
self.pid = file.pid
self.nprocs = file.nprocs
self.name = name
self.dimensions = dimensions
self.value = self
self.attributes = {}
try:
self.index = list(dimensions).index(split_dimension)
except ValueError:
self.index = None
def __repr__(self):
return repr(self.name)
def __getitem__(self, item):
item = self._prepareIndices(item)
if self.file.local_access :
data = self._readData(item, self.file.first, self.file.last)
elif self.pid == 0:
for pid in range(1, self.nprocs):
first, last = self.file.parts[pid]
data = self._readData(item, first, last)
self.put(data, [pid])
data = self._readData(item, self.file.first, self.file.last)
else:
for pid in range(1, self.nprocs):
messages = self.put(None, [])
if messages:
data = messages[0]
if data is None:
return ParInvalid
else:
return data
def __getslice__(self, first, last):
return self.__getitem__(slice(first, last))
def __setitem__(self, item, value):
item = self._prepareIndices(item)
if is_invalid(value):
value = None
if self.pid == 0:
if value is not None:
self._writeData(item, value, self.file.first, self.file.last)
if self.index is not None:
for pid in range(1, self.nprocs):
first, last = self.file.parts[pid]
data = self.put(None, [])
if data and data[0] is not None:
self._writeData(item, data[0], first, last)
else:
if self.index is not None:
for pid in range(1, self.nprocs):
if pid == self.pid:
self.put(value, [0])
else:
self.put(None, [])
def __setslice__(self, first, last, value):
self.__setitem__(slice(first, last), value)
def _prepareIndices(self, item):
if not hasattr(item, 'is_parindex'):
if type(item) != type(()):
item = (item,)
item = item + (len(self.dimensions)-len(item))*(slice(None),)
return item
def _readData(self, item, part_first, part_last):
item = self._indices(item, part_first, part_last)
if item is None:
return None
else:
return self.file.file.variables[self.name][item]
def _writeData(self, item, data, part_first, part_last):
try:
if len(data) == 0:
return
except TypeError:
pass
item = self._indices(item, part_first, part_last)
if item is not None:
try:
self.file.file.variables[self.name][item] = N.array(data)
except:
print self.file.file.variables[self.name].shape
print item
print N.array(data).shape
raise
def _indices(self, item, part_first, part_last):
if hasattr(item, 'is_parindex'):
if not item.valid:
return None
if item.skip == 0:
return item.start+part_first
else:
return slice(item.start+part_first, item.stop+part_first,
item.skip)
if self.index is not None:
split = item[self.index]
if isinstance(split, int):
raise ValueError("Must use slice along split dimension")
first, last, skip = split.start, split.stop, split.step
if first is None: first = 0
if skip is None: skip = 1
n1 = max(0, (part_first-first+skip-1)/skip)
first = first + n1*skip
if last is None:
last = part_last
last = min(last, part_last)
item = item[:self.index] + (slice(first, last, skip),) + \
item[self.index+1:]
return item
def __getattr__(self, attr):
if self.file.local_access:
return getattr(self.file.file.variables[self.name], attr)
try:
return self.attributes[attr]
except KeyError:
pass
if self.pid == 0:
value = getattr(self.file.file.variables[self.name], attr)
else:
value = None
value = self.broadcast(value)
if self.file.read_only:
self.attributes[attr] = value
return value
def __len__(self):
return self.file.last - self.file.first
ParNetCDFVariable = ParClass(_ParNetCDFVariable)
ParNetCDFFile = ParClass(_ParNetCDFFile)
|