Node API

Node Base Class

class conducto.Node(**kwargs)

The node classes Exec, Serial and Parallel all derive from this class. The parameters here apply directly to Exec nodes and as defaults on Serial and Parallel for the sub-nodes.

Parameters
  • cpu (float) – Number of CPUs to allocate to the Node. Must be >0 if assigned. Default: 1

  • mem (float) – GB of memory to allocate to the Node. Must be >0 if assigned. Default: 2

  • requires_docker (bool) – If True, enable the Node to use. Default: False

  • env (dict) – Mapping containing environment variables as its keys and values. Default: {}

  • image (Image or str.) – Run Node in container using the given Image or image identified by name in Docker.

  • image_name (str) – Reference an Image by name instead of passing it explicitly. The Image must have been registered with Node.register_image().

  • container_reuse_context – See Running Exec Nodes for details. Note this has special inheritance rules when propagating to child nodes.

  • skip (bool) – If False the Node will be run normally. If True execution will pass over it and it will not be run. Default: False

  • suppress_errors (bool) – If True the Node will go to the Done state when finished, even if some children have failed. If False, any failed children will cause it to go to the Error state. Default: False

  • max_time (int or float or str) – An int or float value of seconds, or a duration string, representing the maximum time a Node may take to complete successfully. If a Node exceeds this time, it will be killed. The duration string must be a positive decimal with a suffix of ‘s’, ‘m’, ‘h’, or ‘d’, indicating seconds, minutes, hours, or days respectively. Default: '4h'

  • max_concurrent (int) – If set it limits the number of descendant Exec nodes that can run concurrently. Only applies to Serial and Parallel nodes. Default: None

  • docker_run_args (str or List[str]) –

    Additional arguments to pass to docker run when starting the container that runs the Exec node. Default: None

    Caution: the arguments specified here are passed directly to docker run. They may cause the command to fail, in which case affected Exec nodes will not run. If you create orphaned Docker containers, clean them up with docker container list and docker kill. Use with care.

    Only allowed for local mode

  • name (str) – If creating Node inside a context manager, you may pass name=... instead of using normal dict assignment. Default: None

All of these arguments, except for name, may be set in the Node constructor or later. For example, n = co.Parallel(cpu=2) and

n = co.Parallel()
n.cpu = 2

are equivalent.

Variables

name – Immutable. The name of this Node must be unique among sibling Nodes. It is most commonly set through dict assignment with parent['nodename'] = co.Parallel(). It may also be set in the constructor with co.Parallel(name='nodename') if you’re using another Node as a context manager. It may not contain a /, as / is reserved as the path separator.

__str__()

The full path of Node, computed by joining the names of this Node’s ancestry with /.

import conducto as co
x = co.Parallel()
x["foo"] = y = co.Parallel()
x["foo/bar"] = z = co.Exec("echo foobar")

print(f"x.name={x.name}  str(x)={x}")
# x.name=/  str(x) = /
print(f"y.name={y.name}  str(y)={y}")
# y.name=foo  str(y) = /foo
print(f"z.name={z.name}  str(z)={z}")
# z.name=bar  str(z) = /foo/bar
for node in x.stream():
    print(str(node))
# /
# /foo
# /foo/bar
stream(reverse=False)

Iterate through the nodes

Specific Node Types

conducto.Exec(command, *args, **kwargs)

A node that contains an executable command

Parameters

command (Union[str`|`callable]) – A shell command to execute or a python callable

If a Python callable is specified for the command the args and kwargs are serialized and a conducto command line is constructed to launch the function for that node in the pipeline.

conducto.Serial(**kwargs)

Node that has child Nodes and runs them one after another. Same interface as Node, plus the following:

Parameters

stop_on_error (bool) – If True the Serial will Error when one of its children Errors, leaving subsequent children Pending. If False and a child Errors the Serial will still run the rest of its children and then Error. Default: True

conducto.Parallel(**kwargs)

Node that has child Nodes and runs them at the same time. Same interface as Node.

conducto.Lazy(command_or_func, *args, **kwargs)conducto.pipeline.Serial

This node constructor returns a Serial containing a pair of nodes.

The first, Generate, runs func(*args, **kwargs) and prints out the resulting pipeline.

The second, Execute, imports that pipeline into the current one and runs it.

Parameters

command_or_func (str or Callable) – A shell command to execute or a python callable. If a Python callable is specified for the command the args and kwargs are serialized and a conducto command line is constructed to launch the function for that node in the pipeline.

Events

To complete additional actions when a node changes state, use the functions below to associate a callback with the state change that should trigger it.

conducto.Node.on_done(self, callback)

Register a callback for when this node enters the “done” state.

conducto.Node.on_error(self, callback)

Register a callback for when this node enters the “error” state.

conducto.Node.on_queued(self, callback)

Register a callback for when this node enters the “queued” state.

conducto.Node.on_running(self, callback)

Register a callback for when this node enters the “running” state.

conducto.Node.on_killed(self, callback)

Register a callback for when this node enters the “killed” state.

conducto.Node.on_state_change(self, callback)

Register a callback for when this node changes state.