Node API

Node Base Class

class conducto.Node(**kwargs)

The node classes Exec, Serial and Parallel all derive from this class. The parameters here apply directly to Exec nodes and as defaults on Serial and Parallel for the sub-nodes.

Parameters
  • cpufloat, default 1, Number of CPUs to allocate to the Node. Must be >0 if assigned.

  • memfloat, default 2, GB of memory to allocate to the Node. Must be >0 if assigned.

  • requires_dockerbool, default False, If True, enable the Node to use

  • envdict with keys environment variables and the values

  • imageconducto.Image or str, Run Node in container using the given conducto.Image or image identified by name in Docker.

  • image_namestr, Reference an conducto.Image by name instead of passing it explicitly. The Image must have been registered with conducto.Node.register_image().

  • container_reuse_context – See Running Exec Nodes for details. Note this has special inheritance rules when propagating to child nodes.

  • skip – bool, default False, If False the Node will be run normally. If True execution will pass over it and it will not be run.

  • suppress_errors – bool, default False, If True the Node will go to the Done state when finished, even if some children have failed. If False, any failed children will cause it to go to the Error state.

  • max_time – Union[int, float, str], default ‘4h’, An int or float value of seconds, or a duration string, representing the maximum time a Node may take to complete successfully. If a Node exceeds this time, it will be killed. The duration string must be a positive decimal with a suffix of ‘s, ‘m’, ‘h’, or ‘d’, indicating seconds, minutes, hours, or days respectively.

  • name – If creating Node inside a context manager, you may pass name=… instead of using normal dict assignment.

All of these arguments, except for name, may be set in the Node constructor or later. For example, n = co.Parallel(cpu=2) and

n = co.Parallel()
n.cpu = 2

are equivalent.

Variables

name – Immutable. The name of this Node must be unique among sibling Nodes. It is most commonly set through dict assignment with parent[‘nodename’] = co.Parallel(). It may also be set in the constructor with co.Parallel(name=’nodename’) if you’re using another Node as a context manager. It may not contain a /, as / is reserved as the path separator.

__str__()

The full path of Node, computed by joining the names of this Node’s ancestry with /.

import conducto as co
x = co.Parallel()
x["foo"] = y = co.Parallel()
x["foo/bar"] = z = co.Exec("echo foobar")

print(f"x.name={x.name}  str(x)={x}")
# x.name=/  str(x) = /
print(f"y.name={y.name}  str(y)={y}")
# y.name=foo  str(y) = /foo
print(f"z.name={z.name}  str(z)={z}")
# z.name=bar  str(z) = /foo/bar
for node in x.stream():
    print(str(node))
# /
# /foo
# /foo/bar
stream(reverse=False)

Iterate through the nodes

Specific Node Types

conducto.Exec(command, *args, **kwargs)

A node that contains an executable command

Parameters

command – A shell command to execute or a python callable

If a Python callable is specified for the command the args and kwargs are serialized and a conducto command line is constructed to launch the function for that node in the pipeline.

conducto.Serial(**kwargs)

Node that has child Nodes and runs them one after another. Same interface as conducto.Node(), plus the following:

Parameters

stop_on_error – bool, default True, If True the Serial will Error when one of its children Errors, leaving subsequent children Pending. If False and a child Errors the Serial will still run the rest of its children and then Error, defaults to True

conducto.Parallel(**kwargs)

Node that has child Nodes and runs them at the same time. Same interface as conducto.Node().

conducto.Lazy(command_or_func, *args, **kwargs) → conducto.pipeline.Node

This node constructor returns a co.Serial containing a pair of nodes. The first, `Generate`, runs func(*args, **kwargs) and prints out the resulting pipeline. The second, `Execute`, imports that pipeline into the current one and runs it. :param command_or_func: A shell command to execute or a python callable If a Python callable is specified for the command the args and kwargs are serialized and a conducto command line is constructed to launch the function for that node in the pipeline.

Chat with us for a live demo right now!
(If we're awake 😴)

avatar