Source code for nutils.parallel

# -*- coding: utf8 -*-
#
# Module PARALLEL
#
# Part of Nutils: open source numerical utilities for Python. Jointly developed
# by HvZ Computational Engineering, TU/e Multiscale Engineering Fluid Dynamics,
# and others. More info at http://nutils.org <info@nutils.org>. (c) 2014

"""
The parallel module provides tools aimed at parallel computing. At this point
all parallel solutions use the ``fork`` system call and are supported on limited
platforms, notably excluding Windows. On unsupported platforms parallel features
will disable and a warning is printed.
"""

from . import prop, log, numpy, debug
import os, sys, multiprocessing, thread

Lock = multiprocessing.Lock
cpu_count = multiprocessing.cpu_count

[docs]class Fork( object ): 'nested fork context, unwinds at exit' def __init__( self, nprocs ): 'constructor' self.nprocs = nprocs def __enter__( self ): 'fork and return iproc' for self.iproc in range( self.nprocs-1 ): self.child_pid = os.fork() if self.child_pid: break else: self.child_pid = None self.iproc = self.nprocs-1 self.logger = log.context( 'proc %d' % ( self.iproc+1 ), depth=2 ) return self.iproc def __exit__( self, exctype, excvalue, tb ): 'kill all processes but first one' status = 0 try: if exctype == KeyboardInterrupt: status = 1 elif exctype == GeneratorExit: if self.iproc: log.stack( 'generator failed with unknown exception', debug.callstack( depth=2 ) ) status = 1 elif exctype: if self.iproc: log.stack( repr(excvalue), debug.exception() ) status = 1 if self.child_pid: child_pid, child_status = os.waitpid( self.child_pid, 0 ) if child_pid != self.child_pid: log.error( 'pid failure! got %s, was waiting for %s' % (child_pid,self.child_pid) ) status = 1 elif child_status: status = 1 self.logger.disable() except: # should not happen.. but just to be sure status = 1 if self.iproc: os._exit( status ) if not exctype: assert status == 0, 'one or more subprocesses failed'
[docs]class AlternativeFork( object ): 'single master, multiple slave fork context, unwinds at exit' def __init__( self, nprocs ): 'constructor' self.nprocs = nprocs self.children = None def __enter__( self ): 'fork and return iproc' children = [] for self.iproc in range( 1, self.nprocs ): child_pid = os.fork() if not child_pid: break children.append( child_pid ) else: self.children = children self.iproc = 0 self.logger = log.context( 'proc %d' % ( self.iproc+1 ), depth=2 ) return self.iproc def __exit__( self, exctype, excvalue, tb ): 'kill all processes but first one' status = 0 try: if exctype: log.stack( repr(excvalue), debug.exception() ) status = 1 while self.children: child_pid, child_status = os.wait() self.children.remove( child_pid ) if child_status: status = 1 self.logger.disable() except: # should not happen.. but just to be sure status = 1 if self.iproc: os._exit( status ) if not exctype: assert status == 0, 'one or more subprocesses failed'
def waitpid_noerr( pid ): try: os.waitpid( pid, 0 ) except: pass
[docs]def fork( func, nice=19 ): 'fork and run (return value is lost)' if not hasattr( os, 'fork' ): log.warning( 'fork does not exist on this platform; running %s in serial' % func.__name__ ) return func def wrapped( *args, **kwargs ): pid = os.fork() if pid: thread.start_new_thread( waitpid_noerr, (pid,) ) # kill the zombies # see: http://stackoverflow.com/a/13331632/445031 # this didn't work: http://stackoverflow.com/a/6718735/445031 return pid try: os.nice( nice ) func( *args, **kwargs ) except KeyboardInterrupt: pass except: log.stack( repr(sys.exc_value), debug.exception() ) finally: os._exit( 0 ) return wrapped
[docs]def shzeros( shape, dtype=float ): 'create zero-initialized array in shared memory' # return numpy.zeros( shape, dtype=dtype ) # TODO: toggle to numpy for debugging if isinstance( shape, int ): shape = shape, else: assert all( isinstance(sh,int) for sh in shape ) size = numpy.product( shape ) if shape else 1 if dtype == float: typecode = 'd' elif dtype == int: typecode = 'i' else: raise Exception, 'invalid dtype: %r' % dtype buf = multiprocessing.RawArray( typecode, size ) return numpy.frombuffer( buf, dtype ).reshape( shape )
[docs]def pariter( iterable ): 'iterate parallel' nprocs = getattr( prop, 'nprocs', 1 ) return iterable if nprocs <= 1 else _pariter( iterable, nprocs )
def _pariter( iterable, nprocs ): 'iterate parallel, helper generator' shared_iter = multiprocessing.RawValue( 'i', nprocs ) lock = Lock() with Fork( nprocs ) as iproc: iiter = iproc for n, it in enumerate( iterable ): if n < iiter: continue assert n == iiter yield it with lock: iiter = shared_iter.value shared_iter.value = iiter + 1 def parmap( func, iterable, shape=(), dtype=float ): n = len(iterable) out = shzeros( (n,)+shape, dtype=dtype ) for i, item in pariter( enumerate(iterable) ): out[i] = func( item ) return out # vim:shiftwidth=2:foldmethod=indent:foldnestmax=1