Pipeline Structure

Overview

Conducto uses a tree structure to control how commands behave, and to report on their status. This page will show you how to leverage that structure to control of your pipeline's behavior.

Mostly this is about how to organize your pipeline definition, which is the code that creates pipelines. If you already have a launched pipeline and you want to know how to manipulate it, check out the next section.

Nodes as Objects

Conducto invites you to define your pipeline in an imperative context, which sets it apart from other pipeline tools. Rather than using a single structure that defines the whole thing at once (often via YAML), Conducto provides a library that you can call from general purpose programming languages like Python.

import conducto as co

a = co.Serial()
b = co.Exec('echo hello world')

a['child'] = b
# creates a pipeline with two nodes '/' and 'child'
# /
# └─child

Call functions, write loops, or whatever else you like to do with objects--you've got a whole language at your fingertips.

def make_n_nodes(n):
    parent = co.Parallel()
    for i in range(n):
        parent["node " + str(i)] = Exec("I'm node " + str(i))

root = co.Serial()
root = make_n_nodes(5)

Node Types

Many of the properties that you can use to control a Conducto pipeline are common to all node types. For a list of these, see the Node base class. Conducto supports four types of nodes:

Exec

Exec nodes create containers that run commands. If you initialize an Exec node object with a string, it will use that string like a shell command. You can also provide them with a function to call, see native functions for more about this.

Exec nodes cannot have children.

Serial

Serial Nodes run their children one after the other. Unlike other node types, they accept the stop_on_error node parameter, which defaults to True. It controls whether the Serial node continues running children after one has failed. Here's a pipeline definition which uses it:

with co.Parallel() as root:
    with co.Serial(name="run until error"):

        # will fail because grep returns nonzero
        co.Exec('echo foo | grep bar', name="fail")

        # will remain pending because the previous node failed
        co.Exec('echo baz', name="succeed")

    with co.Serial(stop_on_error=False, name="run all children"):

        # will fail because grep returns nonzero
        co.Exec('echo wakka | grep bang', name="fail")

        # will run and succeed despite the earlier failure
        co.Exec('echo splat', name="succeed")

Running a pipeline based on the definition above will only run one of the nodes named "succeeded". The othe stays in the pending state because its predecessors failed and stop_on_error is True.

The first "succeed" node stays pending, the last one runs despite the error

Parallel

Parallel Nodes are similar to Serial nodes, except that they run all of their children at the same time. They don't use any special parameters.

Lazy

Lazy Nodes let you create pipelines that add nodes to themselves based on computation done at runtime. Their use is covered in a later section.

Node Parameters

In the previous section we pointed out that stop_on_error is a node parameter supported by Serial nodes. Most node parameters are applicable to more than one node type. See the Node base class for a list of these.

For now, the only one we need is image, which lets you control the execution context of a node's command. In the following section, image is used to demonstrate how node parameters can be inherited. image is typical of node parameters, so unless otherwise noted you can expect the other node parameters to work the same way.

Tree Syntax

The Conducto python API provides three ways to express parent/child relationships between nodes:

  • dict
  • path
  • context manager

The subsections below show the same pipeline expressed in each syntax. If you work with complex pipelines, you might want to use more than one of these to maximize your pipeline's readability.

Dict

To use the dict syntax, make a node with a type that accepts children and assign them like you would values in a dictionary. The keys used here will name the assigned node.

root = co.Serial(image="foo")
root['all together'] = co.Parallel()
root['all together']['a'] = co.Exec("echo step 1, image bar", image="bar")
root['all together']['b'] = co.Exec("echo step 1, image foo")
root['one at a time'] = co.Serial(image="bar")
root['one at a time']['c'] = co.Exec("echo step 2, image bar")
root['one at a time']['d'] = co.Exec("echo step 3, image bar")

Path

If you'd rather encode this sort of thing into a string, you can use '/' to indicate a parent-child relationship.

root = co.Serial(image="foo")
root['all together'] = co.Parallel()
root['all together/a'] = co.Exec("echo step 1, image bar", image="bar")
root['all together/b'] = co.Exec("echo step 1, image foo")
root['one at a time'] = co.Serial(image="bar")
root['one at a time/c'] = co.Exec("echo step 3, image bar")
root['one at a time/d'] = co.Exec("echo step 4, image bar")

Context

Or you can use context managers, which let you use whitespace to express node depth.

with co.Serial(image="foo") as root:
    with co.Parallel(name="all together"):
        co.Exec("echo step 1, image bar", name="a", image="bar")
        co.Exec("echo step 1, image foo", name="b")
    with co.Serial(name="one at a time", image="bar"):
        co.Exec("echo step 2, image bar", name="c")
        co.Exec("echo step 3, image bar", name="d")

A word of caution: We recommend that you avoid using a function call to separate usage of the name kwarg from its enclosing with block. This prevents the whitespace from being a reliable indication of what is going on, and can get messy.

Printing a Pipeline Definition

Here's one of the examples above, plus the boilerplate necessary to launch it from the command line:

import conducto as co

def context() -> co.Serial:
    with co.Serial(image="foo") as root:
        with co.Parallel(name="all together"):
            co.Exec("echo step 1, image bar", name="a", image="bar")
            co.Exec("echo step 1, image foo", name="b")
        with co.Serial(name="one at a time", image="bar") as two:
            co.Exec("echo step 2, image bar", name="c")
            co.Exec("echo step 3, image bar", name="d")
    return root

if __name__ == '__main__':
    co.main(default=context)

You can ensure that Conducto understood what you meant by having it print the pipeline:

$ python tree_example.py
/
├─0 all together
│ ├─ a   echo step 1, image bar
│ └─ b   echo step 1, image foo
└─1 one at a time
  ├─0 c   echo step 2, image bar
  └─1 d   echo step 3, image bar

Notice the numbers that appear to the left of the pipeline's name. These indicate that the node has a Serial as its parent. If there is no number to the left of the node name, then you're looking a batch of nodes to be run in parallel.

Pipeline Definitions vs Pipeline Instances

Pipeline nodes carry state. They can be pending, succeeded, or failed like the ones you saw in the stop_on_error example above. The code we've been looking at so far is pipeline definition code. You might have noticed that it has no state. It's not pending or failed, at least not yet.

The relationship between pipeline definitions and pipeline instances is analogous to the relationship between classes and objects: Given one definition, you can launch multiple pipelines, and those pipelines might change state through their lifetime.

In the previous section, we printed a tree that represents a certain pipeline definition. If you launch a pipeline from that definition, it will look like this:

Node 'd' inherits image parameter from parent

This article was about using Conducto's tree structure to arrange your commands into a pipeline definition. The next article, controlling a pipeline will show you what you can do with a pipeline instance.

Concepts

Example Pipelines

API's

Chat with us for a live demo right now!
(If we're awake 😴)

avatar