nws.sleigh
index
/usr/lib/python2.4/site-packages/nws/sleigh.py

Python API for parallel programming using NetWorkSpaces.
 
The sleigh module, built on top of NetWorkSpaces (NWS), makes it very
easy to write simple parallel programs.  It contains the Sleigh class,
which provides two basic methods for executing tasks in parallel:
eachElem and eachWorker.
 
eachElem is used to execute a specified function multiple times in
parallel with a varying set of arguments.  eachWorker is used to execute
a function exactly once on every worker in the sleigh with a fixed set
of arguments.
 
Example:
 
First start up the NWS server, using the twistd command:
 
    % twistd -y /etc/nws.tac
 
Now you can create a sleigh to execute python code in parallel:
 
    % python
    >>> from nws.sleigh import Sleigh
    >>> s = Sleigh()
    >>> import math
    >>> result = s.eachElem(math.exp, range(10))
    >>> print "The answer is", result

 
Modules
       
cPickle
copy_reg
new
os
pickle
platform
random
socket
string
subprocess
sys
time
traceback

 
Classes
       
__builtin__.object
Sleigh
SleighPending
SleighResultIterator
exceptions.Exception
SleighException
SleighGatheredException
SleighIllegalValueException
SleighJoinException
SleighNwsException
SleighOccupiedException
SleighScriptException
SleighStoppedException
SleighTaskException

 
class Sleigh(__builtin__.object)
    Represents a collection of python processes used to execute tasks.
 
The sleigh allows python functions, methods, and expressions to be
executed in parallel using the eachElem and eachWorker methods.
 
The sleigh workers are started when the Sleigh object is
constructed.  When tasks are submitted to the sleigh, using the
eachWorker and eachElem methods, the workers will execute the tasks,
and return the results.  When the stop method is called, the workers
are stopped.
 
Note that a given python program can create multiple Sleigh objects,
which will each have it's own set of workers.  This could be useful
if tasks have different requirements.  For example, you could create
a Linux sleigh and a Windows sleigh, and submit Excel tasks only to
your Windows sleigh.
 
  Methods defined here:
__init__(self, *deprecated, **kw)
Start the remote python processes used to execute tasks.
 
Keyword arguments:
 
launch -- Specifies the method of starting workers.  If this
        argument is set to the string 'local', then the workers
        are executed on the local machine.  If it is set to the
        string 'web', then web launching is used.  Otherwise,
        the launch argument must specify a function (such as
        nws.sleigh.sshcmd) that returns a list of command
        arguments use to execute the workers on the machines
        specified by the nodeList argument.  The default value
        is 'local'.
 
workerCount -- Number of workers to start if the launch argument
        is set to 'local' (which is the default value of launch).
        This argument is ignored if launch is not set to
        'local'.  The default value is 3.
 
nodeList -- List of hosts on which to execute workers, if the
        launch argument is set to a function.  This argument is
        ignored if launch is set to 'local' or 'web'.  The
        default value is ['localhost', 'localhost',
        'localhost'].
 
nwsHost -- Host name of the machine where the NWS server is
        executing.
 
nwsPort -- Port to connect to the NWS server.
 
nwsHostRemote -- Host name of the machine that workers should
        use to connect to the NWS server.  This is useful in
        conjunction with the sshforwardcmd function (see the
        description of scriptExec).  The default is the value of
        the nwsHost argument.
 
nwsPortRemote -- Port that workers should use to connect to the
        NWS server.  This is useful in conjunction with the
        sshforwardcmd function (see the description of
        scriptExec).  The default is the value of the nwsPort
        argument.
 
scriptExec -- Python function returning a list of command
        arguments to execute the worker script.  This list of
        command arguments is appended to the list returned by
        the launch function.  The default value is the envcmd
        function (defined in this module), which uses the
        standard 'env' command to execute the script with the
        appropriate environment for the worker.
 
scriptDir -- Directory on the worker that contains the execution
        script.
 
scriptName -- Name of the script that executes the worker on the
        remote machines.  This defaults to
        PythonNWSSleighWorker.sh on Unix, and
        PythonNWSSleighWorker.py on Windows.
 
modulePath -- Directory path to add to sys.path on workers.
        This is often useful for giving the workers access to
        python modules that define the python function to be
        executed by eachWorker or eachElem.  The default value
        is None.
 
workingDir -- Directory path to use as the current working
        directory for the workers.  The default value is the
        current working directory of the sleigh master.
 
logDir -- Directory in which to create the worker log files.
        The default value is None, which leaves the decision
        to the sleigh worker scripts, which generally uses the
        remote system's temporary directory.
 
user -- User name to use for remote execution of worker.  This
        argument may be ignored, depending the specified
        launch function.  The default is the value of the USER
        environment variable, or the USERNAME environment
        variable if USER is not set.
 
wsNameTemplate -- Template for the sleigh workspace name.  This
        must be a legal 'format' string, containing only an
        integer format specifier.  The default is
        'sleigh_ride_%04d'.
 
userWsNameTemplate -- Template for the user's workspace name.
        This must be a legal 'format' string, containing only
        an integer format specifier.  The default is
        'sleigh_user_%04d'.
 
verbose -- Boolean flag used for displaying debug messages.
        Debug messages will be sent to stderr.  This will also
        cause the worker processes to write debug messages to
        files prefixed with 'sleigh_ride' in their current
        working directory (as controled by the workerDir
        argument).  The default value is False.
__str__(self)
eachElem(self, fun, elementArgs=[[]], fixedArgs=[], **kw)
Execute a function, method, or expression for each element in
the specified list(s).
 
s.eachElem(fun, elementArgs[, fixedArgs]) -> list or SleighPending
 
The results are normally returned as a list, unless the blocking
arguments is set to False, in which case, a SleighPending object
is returned.
 
Arguments:
 
fun -- Function, method, or python expression to execute.  To
        execute a method, you must specify a bound method
        object.  If the function or defining class is defined in
        a module, the workers will attempt to import that
        module.  If that module isn't available to the worker
        (because it's a non-standard module, not in the
        PYTHONPATH), then the worker is not be able to execute
        those tasks.
 
        To execute a python expression, you must specify it as a
        string.  Leading whitespace is automatically stripped to
        avoid a common source of python syntax errors.
 
elementArgs -- List of arguments to pass to the function or method
        that need to be different for different tasks.  In
        general, this is a list of iterable objects, such as lists,
        each containing the values to use for a given argument of
        the different tasks.
 
        If your function needs only one varying argument of a
        simple type, you can specify it without the outer list.
 
        Note that for a python expression, the list of arguments
        is passed to the expression as a global variable named
        'SleighArguments'.
 
fixedArgs -- List of additional arguments that are
        fixed/constant for each task.  Normally, they are
        appended to the arguments specified by elementArgs, but
        the order can be altered using the argPermute argument
        described below.
 
        The default value is an empty list, which means that no
        extra arguments are passed to the function.
 
        Note that for a python expression, the list of arguments
        is passed to the expression as a global variable named
        'SleighArguments'.
 
Keyword arguments:
 
type -- Indicates the type of function invocation to perform.
        This can be either 'invoke', 'define', or 'eval'.
        If the fun argument is a function or bound method, then
        the default value is 'invoke'.  If the fun argument is
        a string, then the default value is 'eval' (a value of
        'invoke' or 'define' is illegal for python expressions).
 
blocking -- A boolean value that indicates whether to wait for
        the results, or to return as soon as the tasks have been
        submitted.  If set to False, eachElem will return a
        SleighPending object that is used to monitor the status
        of the tasks, and to eventually retrieve the results.
        You must wait for the results to be complete before
        executing any further tasks on the sleigh, or a
        SleighOccupiedException will be raised.
 
        If blocking is set to False, then the loadFactor
        argument is disabled and ignored.  Note that it's
        unlikely that you'll need to turn off blocking in
        eachElem.  Non-blocking mode is more useful in
        eachWorker.
 
        The default value is True.
 
argPermute -- List that maps the specified arguments to the
        actual arguments of the execution function.  By
        "specified arguments", I mean the items extracted from
        elementArgs, followed by fixedArgs.  (Note that unless
        you are specifying both elementArgs and fixedArgs, you
        probably don't need to use argPermute.)  The items in
        the argPermute list are used as indexes into the
        "specified arguments" list.  The length of argPermute
        determines the number of arguments passed to the
        execution function, which would normally be the length
        of the specified arguments list, but this is not
        required.  For example, setting argPermute to an empty
        list would cause the execution function to be called
        without any arguments (although elementArgs would still
        be required, and would be used to determine the number
        of tasks to execute).
 
        The default behaviour is to pass the execution function
        the arguments specified by elementArgs, followed by the
        arguments from fixedArgs, which is equivalent to setting
        argPermute to:
 
            n = len(elementArgs) + len(fixedArgs)
            argPermute = range(n)
 
        If you wished to reverse the order of the arguments, you
        could then modify argPermute:
 
            argPermute.reverse()
 
        But, more realistically, you need to interleave the
        fixed arguments with the varying arguments.  For
        example, your execution function takes on fixed
        argument, followed by two that vary, you would set
        argPermute to:
 
            argPermute=[1,2,0]
 
loadFactor -- Maximum number of tasks per worker to put into the
        workspace at one time.  This can become important if you
        are executing a very large number of tasks.  Setting
        loadFactor to 3 will probably keep enough tasks
        available in the workspace to keep the workers busy,
        without flooding the workspace and bogging down the NWS
        server.
 
        The default behaviour is to submit all of the tasks
        to the sleigh workspace immediately.
 
accumulator -- A function or callable object that will be called
        for each result as they arrive.  The first argument to
        the function is a list of result values, and the second
        argument is a list of indexes, which identifies which task.
 
        The arguments to the accumulator function are lists since
        in the future, we plan to allow tasks to be "chunked" to
        improve performance of small tasks.
 
        Note that bound methods can be very useful accumulators.
eachWorker(self, fun, *workerArgs, **kw)
Execute a function, method, or expression on each worker of sleigh.
 
s.eachWorker(fun[, ...]) -> list or SleighPending
 
The results are normally returned as a list, unless
the blocking arguments is set to False, in which case, a
SleighPending object is returned.
 
Arguments:
 
fun -- Function, method, or python expression to execute.  To
        execute a method, you must specify a bound method
        object.  If the function or defining class is defined in
        a module, the workers will attempt to import that
        module.  If that module isn't available to the worker
        (because it's a non-standard module, not in the
        PYTHONPATH), then the worker is not be able to execute
        those tasks.
 
        To execute a python expression, you must specify it as a
        string.  Leading whitespace is automatically stripped to
        avoid a common source of python syntax errors.
 
Optional arguments:
 
*workerArgs -- Arguments to pass to the function or method.
        Specify whatever arguments the function requires,
        including no arguments.  The exact same set of arguments
        will be used for each worker (unlike eachElem).  For a
        python expression, these arguments are passed to the
        expression as a global variable named 'SleighArguments'.
 
Keyword arguments:
 
type -- Indicates the type of function invocation to perform.
        This can be either 'invoke', 'define', or 'eval'.
        If the fun argument is a function or bound method, then
        the default value is 'invoke'.  If the fun argument is
        a string, then the default value is 'eval' (a value of
        'invoke' or 'define' is illegal for python expressions).
 
blocking -- A boolean value that indicates whether to wait for
        the results, or to return as soon as the tasks have been
        submitted.  If set to False, eachWorker will return a
        SleighPending object that is used to monitor the status
        of the tasks, and to eventually retrieve the results.
        You must wait for the results to be complete before
        executing any further tasks on the sleigh, or a
        SleighOccupiedException will be raised.
 
        This argument is important if you want the master to be
        able to interact/communicate with the workers, via NWS
        operations, for example.  This allows you to implement
        more complex parallel or distributed programs.
 
        The default value is True.
 
accumulator -- A function or callable object that will be called
        for each result as they arrive.  The first argument to
        the function is a list of result values, and the second
        argument is a list of indexes, which identifies which task.
 
        The arguments to the accumulator function are lists since
        in the future, we plan to allow tasks to be "chunked" to
        improve performance of small tasks.
 
        Note that bound methods can be very useful accumulators.
imap(self, fun, *iterables, **kw)
Execute a function, method, or expression for each element in
the specified list(s).
 
s.imap(fun, *iterables) -> iterator
starmap(self, fun, iterable, **kw)
Execute a function, method, or expression for each element in
the specified list(s).
 
s.starmap(fun, iterable) -> iterator
status(self, closeGroup=False, timeout=0.0)
Return the status of the worker group.
 
s.status(closeGroup, timeout) -> numworkers, closed
 
The status includes the number of workers that have joined the
group so far, and a flag that indicates whether the group has
been closed (meaning that no more workers can join).  Normally,
the group is automatically closed when all the workers that were
listed in the constructor have joined.  However, this method
allows you to force the group to close after the timeout
expires.  This can be particularly useful if you are running
on a large number of nodes, and some of the nodes are slow or
unreliable.  If some of the workers are never started, the group
will never close, and no tasks will ever execute.
 
Arguments:
 
closeGroup -- Boolean flag indicating whether to close the
        group.  If True, the group will be closed, after the
        specified timeout.  The default value is False.
 
timeout -- Number of seconds to wait for the group to close
        before returning.  The default value is 0.0.
stop(self)
Stop the remote processes and delete the sleigh workspace.
 
s.stop()

Data and other attributes defined here:
__dict__ = <dictproxy object>
dictionary for instance variables (if defined)
__weakref__ = <attribute '__weakref__' of 'Sleigh' objects>
list of weak references to the object (if defined)

 
class SleighException(exceptions.Exception)
    Base class for all exceptions raised by this module.
 
  Methods defined here:
__repr__(self)
__str__(self)

Methods inherited from exceptions.Exception:
__getitem__(...)
__init__(...)

 
class SleighGatheredException(SleighException)
    Results already gathered.
 
 
Method resolution order:
SleighGatheredException
SleighException
exceptions.Exception

Methods inherited from SleighException:
__repr__(self)
__str__(self)

Methods inherited from exceptions.Exception:
__getitem__(...)
__init__(...)

 
class SleighIllegalValueException(SleighException)
    Illegal value specified.
 
 
Method resolution order:
SleighIllegalValueException
SleighException
exceptions.Exception

Methods inherited from SleighException:
__repr__(self)
__str__(self)

Methods inherited from exceptions.Exception:
__getitem__(...)
__init__(...)

 
class SleighJoinException(SleighException)
    Too late to join worker group.
 
 
Method resolution order:
SleighJoinException
SleighException
exceptions.Exception

Methods inherited from SleighException:
__repr__(self)
__str__(self)

Methods inherited from exceptions.Exception:
__getitem__(...)
__init__(...)

 
class SleighNwsException(SleighException)
    Error performing NWS operation.
 
 
Method resolution order:
SleighNwsException
SleighException
exceptions.Exception

Methods inherited from SleighException:
__repr__(self)
__str__(self)

Methods inherited from exceptions.Exception:
__getitem__(...)
__init__(...)

 
class SleighOccupiedException(SleighException)
    Sleigh is occupied.
 
 
Method resolution order:
SleighOccupiedException
SleighException
exceptions.Exception

Methods inherited from SleighException:
__repr__(self)
__str__(self)

Methods inherited from exceptions.Exception:
__getitem__(...)
__init__(...)

 
class SleighPending(__builtin__.object)
    Represents a sleigh eachWorker/eachElem invocation in progress.
 
This is returned from the eachElem and eachWorker operations
when executed asynchronously.  It allows you to check for
the number of tasks left to be executed, and to wait for
the results of the operation to be returned.
 
  Methods defined here:
__init__(self, nws, numTasks, barrierName, sleighState, accumulator)
Create an object that represents the pending sleigh operation.
 
This constructor is intended for internal use only.
 
Arguments:
 
nws -- Sleigh NetWorkSpace object.
 
numTasks -- Number of tasks that were submitted.
 
barrierName -- Name of the barrier to wait at when complete.
 
sleighState -- Object representing the current state of the
        sleigh.
 
accumulator -- Function to call with results as they arrive.
__str__(self)
check(self)
Return the number of tasks still outstanding.
 
p.check() -> integer
wait(self)
Wait for and return the list of results.
 
p.wait() -> list

Data and other attributes defined here:
__dict__ = <dictproxy object>
dictionary for instance variables (if defined)
__weakref__ = <attribute '__weakref__' of 'SleighPending' objects>
list of weak references to the object (if defined)

 
class SleighResultIterator(__builtin__.object)
    Returns results from tasks submitted to the sleigh.
 
Instances of this class are returned from the Sleigh imap, and
starmap methods.
 
  Methods defined here:
__init__(self, task, taskIter, nws, numSubmitted, sleighState)
Create an iterator over the task results.
 
This constructor is intended for internal use only.
 
Arguments:
 
task -- Partially initialized Task object.
 
taskIter -- Iterator over the task arguments.
 
nws -- Sleigh NetWorkSpace object.
 
numSubmitted -- Number of tasks already submitted.
 
sleighState -- Part of the Sleigh objects internal state.
__iter__(self)
next(self)
shutdown(self)
Stop submitting tasks from the iterator.
 
This method is a less drastic version of "stop".  It is expected
that you will keep retrieving results that have already been
submitted, but no new tasks will be submitted, regardless of
what tasks were originally specified to imap or starmap.  The
sleigh object will continue to be "occupied" until all results of
the pending tasks have been retreived.
stop(self)
Stop the iterator, flushing any pending results.
 
This method is useful if you're done with the iterator, and
don't want to retrieve anymore results.  After calling stop,
you can submit more tasks to the sleigh (that is, it will
no longer be "occupied".

Properties defined here:
buffered
Number of buffered task results.
get = buffered(self)
returned
Number of task results returned.
get = returned(self)
stopped
Is the iterator stopped?
get = stopped(self)
submitted
Number of submitted tasks.
get = submitted(self)

Data and other attributes defined here:
__dict__ = <dictproxy object>
dictionary for instance variables (if defined)
__weakref__ = <attribute '__weakref__' of 'SleighResultIterator' objects>
list of weak references to the object (if defined)

 
class SleighScriptException(SleighException)
    Unable to find sleigh worker script.
 
 
Method resolution order:
SleighScriptException
SleighException
exceptions.Exception

Methods inherited from SleighException:
__repr__(self)
__str__(self)

Methods inherited from exceptions.Exception:
__getitem__(...)
__init__(...)

 
class SleighStoppedException(SleighException)
    Sleigh is stopped.
 
 
Method resolution order:
SleighStoppedException
SleighException
exceptions.Exception

Methods inherited from SleighException:
__repr__(self)
__str__(self)

Methods inherited from exceptions.Exception:
__getitem__(...)
__init__(...)

 
class SleighTaskException(SleighException)
    Error executing a task.
 
 
Method resolution order:
SleighTaskException
SleighException
exceptions.Exception

Methods inherited from SleighException:
__repr__(self)
__str__(self)

Methods inherited from exceptions.Exception:
__getitem__(...)
__init__(...)

 
Data
        __all__ = ['Sleigh', 'SleighPending', 'SleighResultIterator', 'SleighException', 'SleighNwsException', 'SleighGatheredException', 'SleighStoppedException', 'SleighOccupiedException', 'SleighIllegalValueException', 'SleighScriptException', 'SleighTaskException', 'SleighJoinException']