| |
- __builtin__.object
-
- Sleigh
- SleighPending
- SleighResultIterator
- exceptions.Exception
-
- SleighException
-
- SleighGatheredException
- SleighIllegalValueException
- SleighJoinException
- SleighNwsException
- SleighOccupiedException
- SleighScriptException
- SleighStoppedException
- SleighTaskException
class Sleigh(__builtin__.object) |
|
Represents a collection of python processes used to execute tasks.
The sleigh allows python functions, methods, and expressions to be
executed in parallel using the eachElem and eachWorker methods.
The sleigh workers are started when the Sleigh object is
constructed. When tasks are submitted to the sleigh, using the
eachWorker and eachElem methods, the workers will execute the tasks,
and return the results. When the stop method is called, the workers
are stopped.
Note that a given python program can create multiple Sleigh objects,
which will each have it's own set of workers. This could be useful
if tasks have different requirements. For example, you could create
a Linux sleigh and a Windows sleigh, and submit Excel tasks only to
your Windows sleigh. |
|
Methods defined here:
- __init__(self, *deprecated, **kw)
- Start the remote python processes used to execute tasks.
Keyword arguments:
launch -- Specifies the method of starting workers. If this
argument is set to the string 'local', then the workers
are executed on the local machine. If it is set to the
string 'web', then web launching is used. Otherwise,
the launch argument must specify a function (such as
nws.sleigh.sshcmd) that returns a list of command
arguments use to execute the workers on the machines
specified by the nodeList argument. The default value
is 'local'.
workerCount -- Number of workers to start if the launch argument
is set to 'local' (which is the default value of launch).
This argument is ignored if launch is not set to
'local'. The default value is 3.
nodeList -- List of hosts on which to execute workers, if the
launch argument is set to a function. This argument is
ignored if launch is set to 'local' or 'web'. The
default value is ['localhost', 'localhost',
'localhost'].
nwsHost -- Host name of the machine where the NWS server is
executing.
nwsPort -- Port to connect to the NWS server.
nwsHostRemote -- Host name of the machine that workers should
use to connect to the NWS server. This is useful in
conjunction with the sshforwardcmd function (see the
description of scriptExec). The default is the value of
the nwsHost argument.
nwsPortRemote -- Port that workers should use to connect to the
NWS server. This is useful in conjunction with the
sshforwardcmd function (see the description of
scriptExec). The default is the value of the nwsPort
argument.
scriptExec -- Python function returning a list of command
arguments to execute the worker script. This list of
command arguments is appended to the list returned by
the launch function. The default value is the envcmd
function (defined in this module), which uses the
standard 'env' command to execute the script with the
appropriate environment for the worker.
scriptDir -- Directory on the worker that contains the execution
script.
scriptName -- Name of the script that executes the worker on the
remote machines. This defaults to
PythonNWSSleighWorker.sh on Unix, and
PythonNWSSleighWorker.py on Windows.
modulePath -- Directory path to add to sys.path on workers.
This is often useful for giving the workers access to
python modules that define the python function to be
executed by eachWorker or eachElem. The default value
is None.
workingDir -- Directory path to use as the current working
directory for the workers. The default value is the
current working directory of the sleigh master.
logDir -- Directory in which to create the worker log files.
The default value is None, which leaves the decision
to the sleigh worker scripts, which generally uses the
remote system's temporary directory.
user -- User name to use for remote execution of worker. This
argument may be ignored, depending the specified
launch function. The default is the value of the USER
environment variable, or the USERNAME environment
variable if USER is not set.
wsNameTemplate -- Template for the sleigh workspace name. This
must be a legal 'format' string, containing only an
integer format specifier. The default is
'sleigh_ride_%04d'.
userWsNameTemplate -- Template for the user's workspace name.
This must be a legal 'format' string, containing only
an integer format specifier. The default is
'sleigh_user_%04d'.
verbose -- Boolean flag used for displaying debug messages.
Debug messages will be sent to stderr. This will also
cause the worker processes to write debug messages to
files prefixed with 'sleigh_ride' in their current
working directory (as controled by the workerDir
argument). The default value is False.
- __str__(self)
- eachElem(self, fun, elementArgs=[[]], fixedArgs=[], **kw)
- Execute a function, method, or expression for each element in
the specified list(s).
s.eachElem(fun, elementArgs[, fixedArgs]) -> list or SleighPending
The results are normally returned as a list, unless the blocking
arguments is set to False, in which case, a SleighPending object
is returned.
Arguments:
fun -- Function, method, or python expression to execute. To
execute a method, you must specify a bound method
object. If the function or defining class is defined in
a module, the workers will attempt to import that
module. If that module isn't available to the worker
(because it's a non-standard module, not in the
PYTHONPATH), then the worker is not be able to execute
those tasks.
To execute a python expression, you must specify it as a
string. Leading whitespace is automatically stripped to
avoid a common source of python syntax errors.
elementArgs -- List of arguments to pass to the function or method
that need to be different for different tasks. In
general, this is a list of iterable objects, such as lists,
each containing the values to use for a given argument of
the different tasks.
If your function needs only one varying argument of a
simple type, you can specify it without the outer list.
Note that for a python expression, the list of arguments
is passed to the expression as a global variable named
'SleighArguments'.
fixedArgs -- List of additional arguments that are
fixed/constant for each task. Normally, they are
appended to the arguments specified by elementArgs, but
the order can be altered using the argPermute argument
described below.
The default value is an empty list, which means that no
extra arguments are passed to the function.
Note that for a python expression, the list of arguments
is passed to the expression as a global variable named
'SleighArguments'.
Keyword arguments:
type -- Indicates the type of function invocation to perform.
This can be either 'invoke', 'define', or 'eval'.
If the fun argument is a function or bound method, then
the default value is 'invoke'. If the fun argument is
a string, then the default value is 'eval' (a value of
'invoke' or 'define' is illegal for python expressions).
blocking -- A boolean value that indicates whether to wait for
the results, or to return as soon as the tasks have been
submitted. If set to False, eachElem will return a
SleighPending object that is used to monitor the status
of the tasks, and to eventually retrieve the results.
You must wait for the results to be complete before
executing any further tasks on the sleigh, or a
SleighOccupiedException will be raised.
If blocking is set to False, then the loadFactor
argument is disabled and ignored. Note that it's
unlikely that you'll need to turn off blocking in
eachElem. Non-blocking mode is more useful in
eachWorker.
The default value is True.
argPermute -- List that maps the specified arguments to the
actual arguments of the execution function. By
"specified arguments", I mean the items extracted from
elementArgs, followed by fixedArgs. (Note that unless
you are specifying both elementArgs and fixedArgs, you
probably don't need to use argPermute.) The items in
the argPermute list are used as indexes into the
"specified arguments" list. The length of argPermute
determines the number of arguments passed to the
execution function, which would normally be the length
of the specified arguments list, but this is not
required. For example, setting argPermute to an empty
list would cause the execution function to be called
without any arguments (although elementArgs would still
be required, and would be used to determine the number
of tasks to execute).
The default behaviour is to pass the execution function
the arguments specified by elementArgs, followed by the
arguments from fixedArgs, which is equivalent to setting
argPermute to:
n = len(elementArgs) + len(fixedArgs)
argPermute = range(n)
If you wished to reverse the order of the arguments, you
could then modify argPermute:
argPermute.reverse()
But, more realistically, you need to interleave the
fixed arguments with the varying arguments. For
example, your execution function takes on fixed
argument, followed by two that vary, you would set
argPermute to:
argPermute=[1,2,0]
loadFactor -- Maximum number of tasks per worker to put into the
workspace at one time. This can become important if you
are executing a very large number of tasks. Setting
loadFactor to 3 will probably keep enough tasks
available in the workspace to keep the workers busy,
without flooding the workspace and bogging down the NWS
server.
The default behaviour is to submit all of the tasks
to the sleigh workspace immediately.
accumulator -- A function or callable object that will be called
for each result as they arrive. The first argument to
the function is a list of result values, and the second
argument is a list of indexes, which identifies which task.
The arguments to the accumulator function are lists since
in the future, we plan to allow tasks to be "chunked" to
improve performance of small tasks.
Note that bound methods can be very useful accumulators.
- eachWorker(self, fun, *workerArgs, **kw)
- Execute a function, method, or expression on each worker of sleigh.
s.eachWorker(fun[, ...]) -> list or SleighPending
The results are normally returned as a list, unless
the blocking arguments is set to False, in which case, a
SleighPending object is returned.
Arguments:
fun -- Function, method, or python expression to execute. To
execute a method, you must specify a bound method
object. If the function or defining class is defined in
a module, the workers will attempt to import that
module. If that module isn't available to the worker
(because it's a non-standard module, not in the
PYTHONPATH), then the worker is not be able to execute
those tasks.
To execute a python expression, you must specify it as a
string. Leading whitespace is automatically stripped to
avoid a common source of python syntax errors.
Optional arguments:
*workerArgs -- Arguments to pass to the function or method.
Specify whatever arguments the function requires,
including no arguments. The exact same set of arguments
will be used for each worker (unlike eachElem). For a
python expression, these arguments are passed to the
expression as a global variable named 'SleighArguments'.
Keyword arguments:
type -- Indicates the type of function invocation to perform.
This can be either 'invoke', 'define', or 'eval'.
If the fun argument is a function or bound method, then
the default value is 'invoke'. If the fun argument is
a string, then the default value is 'eval' (a value of
'invoke' or 'define' is illegal for python expressions).
blocking -- A boolean value that indicates whether to wait for
the results, or to return as soon as the tasks have been
submitted. If set to False, eachWorker will return a
SleighPending object that is used to monitor the status
of the tasks, and to eventually retrieve the results.
You must wait for the results to be complete before
executing any further tasks on the sleigh, or a
SleighOccupiedException will be raised.
This argument is important if you want the master to be
able to interact/communicate with the workers, via NWS
operations, for example. This allows you to implement
more complex parallel or distributed programs.
The default value is True.
accumulator -- A function or callable object that will be called
for each result as they arrive. The first argument to
the function is a list of result values, and the second
argument is a list of indexes, which identifies which task.
The arguments to the accumulator function are lists since
in the future, we plan to allow tasks to be "chunked" to
improve performance of small tasks.
Note that bound methods can be very useful accumulators.
- imap(self, fun, *iterables, **kw)
- Execute a function, method, or expression for each element in
the specified list(s).
s.imap(fun, *iterables) -> iterator
- starmap(self, fun, iterable, **kw)
- Execute a function, method, or expression for each element in
the specified list(s).
s.starmap(fun, iterable) -> iterator
- status(self, closeGroup=False, timeout=0.0)
- Return the status of the worker group.
s.status(closeGroup, timeout) -> numworkers, closed
The status includes the number of workers that have joined the
group so far, and a flag that indicates whether the group has
been closed (meaning that no more workers can join). Normally,
the group is automatically closed when all the workers that were
listed in the constructor have joined. However, this method
allows you to force the group to close after the timeout
expires. This can be particularly useful if you are running
on a large number of nodes, and some of the nodes are slow or
unreliable. If some of the workers are never started, the group
will never close, and no tasks will ever execute.
Arguments:
closeGroup -- Boolean flag indicating whether to close the
group. If True, the group will be closed, after the
specified timeout. The default value is False.
timeout -- Number of seconds to wait for the group to close
before returning. The default value is 0.0.
- stop(self)
- Stop the remote processes and delete the sleigh workspace.
s.stop()
Data and other attributes defined here:
- __dict__ = <dictproxy object>
- dictionary for instance variables (if defined)
- __weakref__ = <attribute '__weakref__' of 'Sleigh' objects>
- list of weak references to the object (if defined)
|
class SleighPending(__builtin__.object) |
|
Represents a sleigh eachWorker/eachElem invocation in progress.
This is returned from the eachElem and eachWorker operations
when executed asynchronously. It allows you to check for
the number of tasks left to be executed, and to wait for
the results of the operation to be returned. |
|
Methods defined here:
- __init__(self, nws, numTasks, barrierName, sleighState, accumulator)
- Create an object that represents the pending sleigh operation.
This constructor is intended for internal use only.
Arguments:
nws -- Sleigh NetWorkSpace object.
numTasks -- Number of tasks that were submitted.
barrierName -- Name of the barrier to wait at when complete.
sleighState -- Object representing the current state of the
sleigh.
accumulator -- Function to call with results as they arrive.
- __str__(self)
- check(self)
- Return the number of tasks still outstanding.
p.check() -> integer
- wait(self)
- Wait for and return the list of results.
p.wait() -> list
Data and other attributes defined here:
- __dict__ = <dictproxy object>
- dictionary for instance variables (if defined)
- __weakref__ = <attribute '__weakref__' of 'SleighPending' objects>
- list of weak references to the object (if defined)
|
class SleighResultIterator(__builtin__.object) |
|
Returns results from tasks submitted to the sleigh.
Instances of this class are returned from the Sleigh imap, and
starmap methods. |
|
Methods defined here:
- __init__(self, task, taskIter, nws, numSubmitted, sleighState)
- Create an iterator over the task results.
This constructor is intended for internal use only.
Arguments:
task -- Partially initialized Task object.
taskIter -- Iterator over the task arguments.
nws -- Sleigh NetWorkSpace object.
numSubmitted -- Number of tasks already submitted.
sleighState -- Part of the Sleigh objects internal state.
- __iter__(self)
- next(self)
- shutdown(self)
- Stop submitting tasks from the iterator.
This method is a less drastic version of "stop". It is expected
that you will keep retrieving results that have already been
submitted, but no new tasks will be submitted, regardless of
what tasks were originally specified to imap or starmap. The
sleigh object will continue to be "occupied" until all results of
the pending tasks have been retreived.
- stop(self)
- Stop the iterator, flushing any pending results.
This method is useful if you're done with the iterator, and
don't want to retrieve anymore results. After calling stop,
you can submit more tasks to the sleigh (that is, it will
no longer be "occupied".
Properties defined here:
- buffered
- Number of buffered task results.
- get = buffered(self)
- returned
- Number of task results returned.
- get = returned(self)
- stopped
- Is the iterator stopped?
- get = stopped(self)
- submitted
- Number of submitted tasks.
- get = submitted(self)
Data and other attributes defined here:
- __dict__ = <dictproxy object>
- dictionary for instance variables (if defined)
- __weakref__ = <attribute '__weakref__' of 'SleighResultIterator' objects>
- list of weak references to the object (if defined)
|
|