Conducto uses a tree structure to control how commands behave, and to report on their status. This page will show you how to leverage that structure to control of your pipeline's behavior.
Mostly this is about how to organize your pipeline definition, which is the code that creates pipelines. If you already have a launched pipeline and you want to know how to manipulate it, check out the next section.
Conducto invites you to define your pipeline in an imperative context, which sets it apart from other pipeline tools. Rather than using a single structure that defines the whole thing at once (often via YAML), Conducto provides a library that you can call from general purpose programming languages like Python.
import conducto as co a = co.Serial() b = co.Exec('echo hello world') a['child'] = b # creates a pipeline with two nodes '/' and 'child' # / # └─child
Call functions, write loops, or whatever else you like to do with objects--you've got a whole language at your fingertips.
def make_n_nodes(n): parent = co.Parallel() for i in range(n): parent["node " + str(i)] = Exec("I'm node " + str(i)) root = co.Serial() root["concurrent"] = make_n_nodes(5)
Many of the properties that you can use to control a Conducto pipeline are common to all node types. For a list of these, see the Node base class. Conducto supports four types of nodes:
Exec nodes create containers that run commands. If you initialize an Exec node object with a string, it will use that string like a shell command. You can also provide them with a function to call, see native functions for more about this.
Exec nodes cannot have children.
Serial Nodes run their children one after the other.
Unlike other node types, they accept the
stop_on_error node parameter, which defaults to
It controls whether the Serial node continues running children after one has failed.
Here's a pipeline definition which uses it:
with co.Parallel() as root: with co.Serial(name="run until error"): # will fail because grep returns nonzero co.Exec('echo foo | grep bar', name="fail") # will remain pending because the previous node failed co.Exec('echo baz', name="succeed") with co.Serial(stop_on_error=False, name="run all children"): # will fail because grep returns nonzero co.Exec('echo wakka | grep bang', name="fail") # will run and succeed despite the earlier failure co.Exec('echo splat', name="succeed")
Running a pipeline based on the definition above will only run one of the nodes named "succeeded". The othe stays in the pending state because its predecessors failed and
stop_on_error is True.
stop_on_error in a live sandbox.
Parallel Nodes are similar to Serial nodes, except that they run all of their children at the same time. They don't use any special parameters.
Lazy Nodes let you create pipelines that add nodes to themselves based on computation done at runtime.
In the previous section we pointed out that
stop_on_error is a node parameter supported by Serial nodes.
Most node parameters are applicable to more than one node type.
See the Node base class for a list of these.
For now, the only one we need is
image, which lets you control the execution context of a node's command.
In the following section,
image is used to demonstrate how node parameters can be inherited.
image is typical of node parameters, so unless otherwise noted you can expect the other node parameters to work the same way.
The Conducto python API provides three ways to express parent/child relationships between nodes:
- context manager
The subsections below show the same pipeline expressed in each syntax. If you work with complex pipelines, you might want to use more than one of these to maximize your pipeline's readability.
To use the dict syntax, make a node with a type that accepts children and assign them like you would values in a dictionary. The keys used here will name the assigned node.
root = co.Serial(image="foo") root['all together'] = co.Parallel() root['all together']['a'] = co.Exec("echo step 1, image bar", image="bar") root['all together']['b'] = co.Exec("echo step 1, image foo") root['one at a time'] = co.Serial(image="bar") root['one at a time']['c'] = co.Exec("echo step 2, image bar") root['one at a time']['d'] = co.Exec("echo step 3, image bar")
If you'd rather encode this sort of thing into a string, you can use '/' to indicate a parent-child relationship.
root = co.Serial(image="foo") root['all together'] = co.Parallel() root['all together/a'] = co.Exec("echo step 1, image bar", image="bar") root['all together/b'] = co.Exec("echo step 1, image foo") root['one at a time'] = co.Serial(image="bar") root['one at a time/c'] = co.Exec("echo step 3, image bar") root['one at a time/d'] = co.Exec("echo step 4, image bar")
Or you can use context managers, which let you use whitespace to express node depth.
with co.Serial(image="foo") as root: with co.Parallel(name="all together"): co.Exec("echo step 1, image bar", name="a", image="bar") co.Exec("echo step 1, image foo", name="b") with co.Serial(name="one at a time", image="bar"): co.Exec("echo step 2, image bar", name="c") co.Exec("echo step 3, image bar", name="d")
A word of caution:
We recommend that you avoid using a function call to separate usage of the
name kwarg from its enclosing
This prevents the whitespace from being a reliable indication of what is going on, and can get messy.
Here's one of the examples above, plus the boilerplate necessary to launch it from the command line:
import conducto as co def context() -> co.Serial: with co.Serial(image="foo") as root: with co.Parallel(name="all together"): co.Exec("echo step 1, image bar", name="a", image="bar") co.Exec("echo step 1, image foo", name="b") with co.Serial(name="one at a time", image="bar") as two: co.Exec("echo step 2, image bar", name="c") co.Exec("echo step 3, image bar", name="d") return root if __name__ == '__main__': co.main(default=context)
You can ensure that Conducto understood what you meant by having it print the pipeline:
$ python tree_example.py / ├─0 all together │ ├─ a echo step 1, image bar │ └─ b echo step 1, image foo └─1 one at a time ├─0 c echo step 2, image bar └─1 d echo step 3, image bar
Notice the numbers that appear to the left of the pipeline's name. These indicate that the node has a Serial as its parent. If there is no number to the left of the node name, then you're looking a batch of nodes to be run in parallel.
Pipeline nodes carry state.
They can be pending, succeeded, or failed like the ones you saw in the
stop_on_error example above.
The code we've been looking at so far is pipeline definition code.
You might have noticed that it has no state.
It's not pending or failed, at least not yet.
The relationship between pipeline definitions and pipeline instances is analogous to the relationship between classes and objects: Given one definition, you can launch multiple pipelines, and those pipelines might change state through their lifetime.
In the previous section, we printed a tree that represents a certain pipeline definition. If you launch a pipeline from that definition, it will look like this:
This article was about using Conducto's tree structure to arrange your commands into a pipeline definition. The next article, controlling a pipeline will show you what you can do with a pipeline instance.
- Features: Stop on error:
Serial(stop_on_error=True/False)gives you additional control for handling errors.
- Features: Tree syntax: Shows the three different ways to assemble your tree, with different levels of readability vs reusability.