/usr/include/vspline/multithread.h is in vspline-dev 0.3.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 | /************************************************************************/
/* */
/* vspline - a set of generic tools for creation and evaluation */
/* of uniform b-splines */
/* */
/* Copyright 2015 - 2017 by Kay F. Jahnke */
/* */
/* The git repository for this software is at */
/* */
/* https://bitbucket.org/kfj/vspline */
/* */
/* Please direct questions, bug reports, and contributions to */
/* */
/* kfjahnke+vspline@gmail.com */
/* */
/* Permission is hereby granted, free of charge, to any person */
/* obtaining a copy of this software and associated documentation */
/* files (the "Software"), to deal in the Software without */
/* restriction, including without limitation the rights to use, */
/* copy, modify, merge, publish, distribute, sublicense, and/or */
/* sell copies of the Software, and to permit persons to whom the */
/* Software is furnished to do so, subject to the following */
/* conditions: */
/* */
/* The above copyright notice and this permission notice shall be */
/* included in all copies or substantial portions of the */
/* Software. */
/* */
/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND */
/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES */
/* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND */
/* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT */
/* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, */
/* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING */
/* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR */
/* OTHER DEALINGS IN THE SOFTWARE. */
/* */
/************************************************************************/
/// \file multithread.h
///
/// \brief code to distribute the processing of bulk data to several threads
///
/// The code in this header provides a resonably general method to perform
/// processing of manifolds of data with several threads in parallel. In vspline,
/// there are several areas where potentially large numbers of individual values
/// have to be processed independently of each other or in a dependence which
/// can be preserved in partitioning. To process such 'bulk' data effectively,
/// vspline employs two strategies: multithreading and vectorization.
/// This file handles the multithreading.
///
/// To produce generic code for the purpose, we first introduce a model of what
/// we intend to do. This model looks at the data as occupying a 'range' having
/// a defined starting point and end point. We keep with the convention of defining
/// ranges so that the start point is inside and the end point outside the data
/// set described by the range, just like iterators obtained by begin() and end().
/// This range is made explicit, even if it is implicit in the data which we want to
/// submit to multithreading, and there is a type for the purpose: struct range_type.
/// range_type merely captures the concept of a range, taking 'limit_type' as it's
/// template parameter, so that any type of range can be accomodated. A range is
/// defined by it's lower and upper limit.
///
/// Next we define an object holding a set of ranges, modeling a partitioning of
/// an original/whole range into subranges, which, within the context of this code,
/// are disparate and in sequence. This object is modeled as struct partition_type,
/// taking a range_type as it's template argument.
///
/// With these types, we model concrete ranges and partitionings. The most important
/// one is dealing with multidimensional shapes, where a range extends from a 'lower'
/// coordinate to just below a 'higer' coordinate. These two coordinates can be
/// used directly to call vigra's 'subarray' function.
///
/// Next we provide code to partition ranges into sets of subranges.
///
/// Finally we can express a generalized multithreading routine. This routine takes
/// a functor capable of processing a range specification and a parameter pack of
/// arbitrary further parameters, of which some will usually be refering to manifolds
/// of data for which the given range makes sense. We call this routine with a
/// partitioning of the original range and the same parameter pack that is to be passed
/// to the functor. The multithreading routine proceeds to set up 'tasks' as needed,
/// providing each with the functor as it's functional, a subrange from
/// the partitioning, and the parameter pack as arguments. The routine to be used
/// to partition the 'whole' range is passed in.
///
/// The tasks, once prepared, are handed over to a 'joint_task' object which handles
/// the interaction with the thread pool (in thread_pool.h). While my initial code
/// used one thread per task, this turned out inefficient, because it was not granular
/// enough: the slowest thread became the limiting factor. Now the job at hand is split
/// into more individual tasks (something like 8 times the number of cores), resulting
/// in a fair compromise concerning granularity. multithread() waits for all tasks to
/// terminate and returns when it's certain that the job is complete.
///
/// With this method, we assure that on return of multithread() we can safely access
/// whatever results we anticipate. While it might be useful to launch the tasks and
/// return to continue the main thread, picking up the result later when it becomes
/// ready, I chose to suspend the calling thread until the result arrives. This makes
/// the logic simpler, and should be what most use cases need: there is often little else
/// to do but to wait for the result anyway. If asynchronous operation is needed, a thread
/// can be launched to initiate and collect from the multithreading. It's safe to have
/// several threads using this multithreading code, since each task is linked to a
/// 'coordinator', see struct joint_task below.
#ifndef VSPLINE_MULTITHREAD_H
#define VSPLINE_MULTITHREAD_H
#include <assert.h>
#include <vigra/tinyvector.hxx>
#include <vigra/multi_array.hxx>
#include <thread>
#include <mutex>
#include <queue>
#include <condition_variable>
#include <vspline/common.h>
#include <vspline/thread_pool.h>
namespace vspline
{
/// number of CPU cores in the system.
const int ncores = std::thread::hardware_concurrency() ;
/// when multithreading, use this number of jobs per default. This is
/// an attempt at a compromise: too many jobs will produce too much overhead,
/// too few will not distribute the load well and make the system vulnerable
/// to 'straggling' threads
const int default_njobs = 8 * ncores ;
/// given limit_type, we define range_type as a TinyVector of two limit_types,
/// the first denoting the beginning of the range and the second it's end, with
/// end being outside of the range.
template < class limit_type >
using range_type = vigra::TinyVector < limit_type , 2 > ;
/// given range_type, we define partition_type as a std::vector of range_type.
/// This data type is used to hold the partitioning of a range into subranges.
template < class range_type >
using partition_type = std::vector < range_type > ;
/// given a dimension, we define a shape_type as a TinyVector of
/// vigra::MultiArrayIndex of this dimension.
/// This is equivalent to vigra's shape type.
// TODO: might instead define as: vigra::MultiArrayShape<dimension>
template < int dimension >
using shape_type = vigra::TinyVector < vigra::MultiArrayIndex , dimension > ;
/// given a dimension, we define shape_range_type as a range defined by
/// two shapes of the given dimension. This definition allows us to directly
/// pass the two shapes as arguments to a call of subarray() on a MultiArrayView
/// of the given dimension. Note the subarray semantics: if the range is
/// [2,2] to [4,4], it refers to elements [2,2], [3,2], [2,3], [3,3].
template < int dimension >
using shape_range_type = range_type < shape_type < dimension > > ;
template < int dimension >
using shape_partition_type = partition_type < shape_range_type < dimension > > ;
// currently unused
// // iterator_splitter will try to set up n ranges from a range. the partial
// // ranges are stored in a std::vector. The split may succeed producing n
// // or less ranges, and if iter_range can't be split at all, a single range
// // encompassing the whole of iter_range will be returned in the result vector.
//
// template < class _iterator_type >
// struct iterator_splitter
// {
// typedef _iterator_type iterator_type ;
// typedef vigra::TinyVector < iterator_type , 2 > range_type ;
// typedef std::vector < range_type > partition_type ;
//
// static partition_type part ( const range_type & iter_range ,
// int n )
// {
// std::vector < range_type > res ;
// assert ( n > 0 ) ;
//
// iterator_type start = iter_range [ 0 ] ;
// iterator_type end = iter_range [ 1 ] ;
// int size = end - start ;
// if ( n > size )
// n = size ;
//
// int chunk_size = size / n ; // will be at least 1
//
// for ( int i = 0 ; i < n - 1 ; i++ )
// {
// res.push_back ( range_type ( start , start + chunk_size ) ) ;
// start += chunk_size ;
// }
// res.push_back ( range_type ( start , end ) ) ;
// return res ;
// }
// } ;
/// shape_splitter will try to split a shape into n ranges by 'chopping' it
/// along the outermost axis that can be split n-ways. The additional parameter
/// 'forbid' prevents a particular axis from being split. The split may succeed
/// producing n or less ranges, and if 'shape' can't be split at all, a single range
/// encompassing the whole of 'shape' will be returned in the result vector. This
/// object is used for partitioning when one axis has to be preserved intact, like
/// for b-spline prefiltering, but it's not used per default for all shape splitting,
/// since the resulting partitioning performs not so well in certain situations
/// (see the partitioning into tiles below for a better general-purpose splitter)
// TODO: with some shapes, splitting will result in subranges which aren't optimal
// for b-spline prefiltering (these are fastest with extents which are a multiple of
// the simdized data type), so we might add code to preferably use cut locations
// coinciding with those extents. And with small extents being split, the result
// becomes very inefficient for filtering.
template < int dim >
struct shape_splitter
{
typedef shape_type < dim > shape_t ;
typedef range_type < shape_t > range_t ;
typedef partition_type < range_t > partition_t ;
static partition_t part ( const shape_t & shape , ///< shape to be split n-ways
int n = default_njobs , ///< intended number of chunks
int forbid = -1 ) ///< axis which shouldn't be split
{
partition_t res ;
// find the outermost dimension that can be split n ways, and it's extent
int split_dim = -1 ;
int max_extent = -1 ;
for ( int md = dim - 1 ; md >= 0 ; md-- )
{
if ( md != forbid
&& shape[md] > max_extent
&& shape[md] >= n )
{
max_extent = shape[md] ;
split_dim = md ;
break ;
}
}
// if the search did not yet succeed:
if ( max_extent == -1 )
{
// repeat process with relaxed conditions: now the search will also succeed
// if there is an axis which can be split less than n ways
for ( int md = dim - 1 ; md >= 0 ; md-- )
{
if ( md != forbid
&& shape[md] > 1 )
{
max_extent = shape[md] ;
split_dim = md ;
break ;
}
}
}
if ( split_dim == -1 )
{
// we have not found a dimension for splitting. We pass back res with
// a range over the whole initial shape as it's sole member
res.push_back ( range_t ( shape_t() , shape ) ) ;
}
else
{
// we can split the shape along split_dim
int w = shape [ split_dim ] ; // extent of the dimension we can split
n = std::min ( n , w ) ; // just in case, if that is smaller than n
int * cut = new int [ n ] ; // where to chop up this dimension
for ( int i = 0 ; i < n ; i++ )
cut[i] = ( (i+1) * w ) / n ; // roughly equal chunks, but certainly last cut == a.end()
shape_t start , end = shape ;
for ( int i = 0 ; i < n ; i++ )
{
end [ split_dim ] = cut [ i ]; // apply the cut locations
res.push_back ( range_t ( start , end ) ) ;
start [ split_dim ] = end [ split_dim ] ;
}
delete[] cut ; // clean up
}
return res ;
}
} ;
/// partition a shape range into 'stripes'. This uses shape_splitter with
/// 'forbid' left at the default of -1, resulting in a split along the
/// outermost dimension that can be split n ways or the next best thing
/// shape_splitter can come up with. If the intended split is merely to
/// distribute the work load without locality considerations, this should
/// be the split to use. When locality is an issue, consider the next variant.
template < int d >
partition_type < shape_range_type<d> >
partition_to_stripes ( shape_range_type<d> range , int nparts )
{
if ( range[0].any() )
{
// the lower limit of the range is not at the origin, so get the shape
// of the region between range[0] and range[1], call shape_splitter with
// this shape, and add the offset to the lower limit of the original range
// to the partial ranges in the result
auto shape = range[1] - range[0] ;
auto res = shape_splitter < d > :: part ( shape , nparts ) ;
for ( auto & r : res )
{
r[0] += range[0] ;
r[1] += range[0] ;
}
return res ;
}
// if range[0] is at the origin, we don't have to use an offset
return shape_splitter < d > :: part ( range[1] , nparts ) ;
}
/// alternative partitioning into tiles. For the optimal situation, where
/// the view isn't rotated or pitched much, the partitioning into bunches
/// of lines (above) seems to perform slightly better, but with more difficult
/// transformations (like 90 degree rotation), performance suffers (like, -20%),
/// whereas with this tiled partitioning it is roughly the same, supposedly due
/// to identical locality in both cases. So currently I am using this partitioning.
/// note that the current implementation ignores the argument 'nparts' and
/// produces tiles 160X160.
// TODO code is a bit clumsy...
// TODO it may be a good idea to have smaller portions towards the end
// of the partitioning, since they will be processed last, and if the
// last few single-threaded operations are short, they may result in less
// situations where a long single-threaded operation has just started when
// all other tasks are already done, causing the system to idle on the other
// cores. or at least the problem would not persist for so long.
template < int d >
partition_type < shape_range_type<d> >
partition_to_tiles ( shape_range_type<d> range ,
int nparts = default_njobs )
{
// To help with the dilemma that this function is really quite specific
// for images, for the time being I delegate to return partition_to_stripes()
// for dimensions != 2
if ( d != 2 )
return partition_to_stripes ( range , nparts ) ;
auto shape = range[1] - range[0] ;
// currently disregarding incoming nparts parameter:
// int nelements = prod ( shape ) ;
// int ntile = nelements / nparts ;
// int nedge = pow ( ntile , ( 1.0 / d ) ) ;
// TODO fixing this size is system-specific!
int nedge = 160 ; // heuristic, fixed size tiles
auto tiled_shape = shape / nedge ;
typedef std::vector < int > stopv ;
stopv stops [ d ] ;
for ( int a = 0 ; a < d ; a++ )
{
stops[a].push_back ( 0 ) ;
for ( int k = 1 ; k < tiled_shape[a] ; k++ )
stops[a].push_back ( k * nedge ) ;
stops[a].push_back ( shape[a] ) ;
}
for ( int a = 0 ; a < d ; a++ )
tiled_shape[a] = stops[a].size() - 1 ;
int k = prod ( tiled_shape ) ;
// If this partitioning scheme fails to produce a partitioning with
// at least nparts components, fall back to using partition_to_stripes()
if ( k < nparts )
return partition_to_stripes ( range , nparts ) ;
nparts = k ;
partition_type < shape_range_type<d> > res ( nparts ) ;
for ( int a = 0 ; a < d ; a++ )
{
int j0 = 1 ;
for ( int h = 0 ; h < a ; h++ )
j0 *= tiled_shape[h] ;
int i = 0 ;
int j = 0 ;
for ( int k = 0 ; k < nparts ; k++ )
{
res[k][0][a] = stops[a][i] ;
res[k][1][a] = stops[a][i+1] ;
++j ;
if ( j == j0 )
{
j = 0 ;
++i ;
if ( i >= tiled_shape[a] )
i = 0 ;
}
}
}
for ( auto & e : res )
{
e[0] += range[0] ;
e[1] += range[0] ;
// std::cout << "tile: " << e[0] << e[1] << std::endl ;
}
return res ;
}
// /// specialization for 1D shape range. Obviously we can't make tiles
// /// from 1D data...
//
// template<>
// partition_type < shape_range_type<1> >
// partition_to_tiles ( shape_range_type<1> range ,
// int nparts )
// {
// auto size = range[1][0] - range[0][0] ;
// auto part_size = size / nparts ;
// if ( part_size < 1 )
// part_size = size ;
//
// nparts = int ( size / part_size ) ;
// if ( nparts * part_size < size )
// nparts++ ;
//
// partition_type < shape_range_type<1> > res ( nparts ) ;
//
// auto start = range[0] ;
// auto stop = start + part_size ;
// for ( auto & e : res )
// {
// e[0] = start ;
// e[1] = stop ;
// start = stop ;
// stop = start + part_size ;
// }
// res[nparts-1][1] = size ;
// return res ;
// }
/// action_wrapper wraps a functional into an outer function which
/// first calls the functional and then checks if this was the last
/// of a bunch of actions to complete, by incrementing the counter
/// p_done points to and comparing the result to 'nparts'. If the
/// test succeeds, the caller is notified via the condition variable
/// p_pool_cv points to, under the mutex p_pool_mutex points to.
static void action_wrapper ( std::function < void() > payload ,
int nparts ,
std::mutex * p_pool_mutex ,
std::condition_variable * p_pool_cv ,
int * p_done )
{
// execute the 'payload'
payload() ;
// under the coordinator's pool mutex, increase the caller's
// 'done' counter and test if it's now equal to 'nparts', the total
// number of actions in this bunch
// TODO initially I had the notify_all call after closing the scope of
// the lock guard, but I had random crashes. Changing the code to call
// notify_all with the lock guard still in effect seemed to remove the
// problem, but made me unsure of my logic.
// 2017-06-23 after removing a misplaced semicolon after the conditional
// below I recoded to perform the notification after closing the lock_guard's
// scope, and now there doesn't seem to be any problem any more. I leave
// these comments in for reference in case things go wrong
// TODO remove this and previous comment if all is well
// 2017-10-12 when stress-testing with restore_test, I had random crashes
// and failure to join again, so I've taken the notify call into the lock
// guard's scope again to see if that fixes it which seems to be the case.
{
std::lock_guard<std::mutex> lk ( * p_pool_mutex ) ;
if ( ++ ( * p_done ) == nparts )
{
// this was the last action originating from the coordinator
// notify the coordinator that the joint task is now complete
p_pool_cv->notify_one() ;
}
}
}
// with this collateral code at hand, we can now implement multithread().
/// multithread uses a thread pool of worker threads to perform
/// a multithreaded operation. It receives a functor (a single-threaded
/// function used for all individual tasks), a partitioning, which contains
/// information on which part of the data each task should work, and
/// a set of additional parameters to pass on to the functor.
/// The individual 'payload' tasks are created by binding the functor with
///
/// - a range from the partitioning, describing it's share of the data
///
/// - the remaining parameters
///
/// These tasks are bound to a wrapper routine which takes care of
/// signalling when the last task has completed.
static thread_pool common_thread_pool ; // keep a thread pool only for multithread()
template < class range_type , class ...Types >
int multithread ( void (*pfunc) ( range_type , Types... ) ,
partition_type < range_type > partitioning ,
Types ...args )
{
// get the number of ranges in the partitioning
int nparts = partitioning.size() ;
// guard against empty or wrong partitioning
if ( nparts <= 0 )
{
return 0 ;
}
if ( nparts == 1 )
{
// if only one part is in the partitioning, we take a shortcut
// and execute the function right here:
(*pfunc) ( partitioning[0] , args... ) ;
return 1 ;
}
// alternatively, 'done' can be coded as std::atomic<int>. I tried
// but couldn't detect any performance benefit, even though allegedly
// atomics are faster than using mutexes... so I'm leaving the code
// as it was, using an int and a mutex.
int done = 0 ; // number of completed tasks
std::mutex pool_mutex ; // mutex to guard access to done and pool_cv
std::condition_variable pool_cv ; // for signalling completion
{
// under the thread pool's task_mutex, fill tasks into task queue
std::lock_guard<std::mutex> lk ( common_thread_pool.task_mutex ) ;
for ( int i = 0 ; i < nparts ; i++ )
{
// first create the 'payload' function
std::function < void() > payload
= std::bind ( pfunc , partitioning[i] , args... ) ;
// now bind it to the action wrapper and enqueue it
std::function < void() > action
= std::bind ( action_wrapper ,
payload ,
nparts ,
&pool_mutex ,
&pool_cv ,
&done
) ;
common_thread_pool.task_queue.push ( action ) ;
}
}
// alert all worker threads
common_thread_pool.task_cv.notify_all() ;
{
// now wait for the last task to complete. This is signalled by
// action_wrapper by notfying on pool_cv and doublechecked
// by testing for done == nparts
std::unique_lock<std::mutex> lk ( pool_mutex ) ;
// the predicate done == nparts rejects spurious wakes
pool_cv.wait ( lk , [&] { return done == nparts ; } ) ;
}
// all jobs are done
return nparts ;
}
/// This variant of multithread() takes a pointer to a function performing
/// the partitioning of the incoming range. The partitioning function is
/// invoked on the incoming range (provided nparts is greater than 1) and
/// the resulting partitioning is used as an argument to the first variant
/// of multithread().
// TODO It might be better to code this using std::function objects.
// TODO may use move semantics for forwarding instead of relying on the
// optimizer to figure this out
template < class range_type , class ...Types >
int multithread ( void (*pfunc) ( range_type , Types... ) ,
partition_type < range_type > (*partition) ( range_type , int ) ,
int nparts ,
range_type range ,
Types ...args )
{
if ( nparts <= 1 )
{
// if only one part is requested, we take a shortcut and execute
// the function right here:
(*pfunc) ( range , args... ) ;
return 1 ;
}
// partition the range using the function pointed to by 'partition'
auto partitioning = (*partition) ( range , nparts ) ;
// then pass pfunc, the partitioning and the remaining arguments
// to the variant of multithread() accepting a partitioning
return multithread ( pfunc , partitioning , args... ) ;
}
} ; // end if namespace vspline
#endif // #ifndef VSPLINE_MULTITHREAD_H
|