Pipelines¶
Node Types¶
A Conducto pipeline is a tree of Node’s. Parallel nodes have children and run them at the same time. Serial nodes have children and run them one after the other, stopping on errors. Exec nodes run a shell command inside a container.
An Exec node is done when its command finishes successfully. Serial and Parallel nodes have children, and are done when all the children are done.
Save this code to a file and run it. It will prompt you for your Conducto credentials, and then you’ll be able to view it on our app.
import conducto as co
def run():
# Use co.Exec() to run a build step
build_node = co.Exec("echo docker build, make, npm i, etc.")
# Use co.Parallel() to run tests in parallel.
test_node = co.Parallel()
for name in ["app", "backend", "metrics"]:
test_node[name] = co.Exec(f"echo run test for {name}")
# Use co.Serial() to build then test.
pipeline = co.Serial()
pipeline["Build"] = build_node
pipeline["Test"] = test_node
return pipeline
if __name__ == "__main__":
co.main()
# Call `python <file> run` to pretty-print the pipeline:
# /
# ├0 Build echo docker build, make, npm i, etc.
# └1 Test
# ├ app echo run test for app
# ├ backend echo run test for backend
# └ metrics echo run test for metrics
#
# Call `python <file> run --local` to run it locally.
Constructing the Tree¶
Pipelines are constructed by adding children to a Parallel or Serial node; children can be any kind of Node. Nest them arbitrarily deeply, as Conducto supports pipelines with millions of Nodes.
Write your pipelines in Python, no YAML or custom languages. From for loops to database queries, use any logic you want when creating your pipeline.
Add children by assigning into a Node like a dict, or use Nodes as context managers with a with statement.
Assign Nodes by direct dict assignment (Node.__setitem__).
import conducto as co
node = co.Parallel()
node["test1"] = co.Exec("echo run test 1")
node["test2"] = co.Exec("echo run test 1")
root = co.Serial()
root["Test"] = node
print(root.pretty())
# /
# └─0 Test
# ├─ test1 echo run test 1
# └─ test2 echo run test 1
Equivalently, use /-delimited paths to get or assign deep into a tree.
import conducto as co
root = co.Serial()
root["Test"] = co.Parallel()
root["Test/test1"] = co.Exec("echo run test 1")
root["Test/test2"] = co.Exec("echo run test 2")
Functions can return Nodes as well.
import conducto as co
def make_test_node():
output = co.Parallel()
output["test1"] = co.Exec("echo run test 1")
output["test2"] = co.Exec("echo run test 2")
return output
root = co.Serial()
root["Test"] = make_test_node()
Use Nodes as context managers for visual clarity, so your indentation matches the structure.
import conducto as co
with co.Serial() as root:
with co.Parallel(name="Test"):
co.Exec("echo run test 1", name="test1")
co.Exec("echo run test 2", name="test2")
Resources and Environment¶
An Exec node runs a shell command inside a container, but there are additional
parameters that specify how it runs. You may specify the
image used for creating the container, how much memory it
requires, environment variables, and more. For the full list, see
conducto.Node
.
All Nodes inherit these attributes from their parents. This simplifies specification and also makes it easier to visualize and change manually in the app.
import conducto as co
root = co.Parallel(cpu=2, env={"LOG_LEVEL": "1"})
root["Node1"] = co.Exec("echo I run with 2 cores, and with LOG_LEVEL=$LOG_LEVEL")
Calling Python Methods¶
The Exec node constructor, conducto.Exec
, can be used to call plain Python
methods.
import conducto as co
def leibniz(n: int) -> float:
"""
Leibniz formula for π
π = 4 - 4/3 + 4/5 - 4/7 + ... + 4/(2*n+1)
See https://en.wikipedia.org/wiki/Leibniz_formula_for_π
"""
pi = 0
sign = 1
for i in range(n):
pi += sign * 4 / (2*i+1)
sign *= -1
return pi
def compute_pi(n: int) -> co.Serial:
output = co.Serial(image=co.Image(copy_dir="."))
output["Leibniz1"] = co.Exec(leibniz, n=n)
output["Leibniz2"] = co.Exec(leibniz, n=n)
return output
Exec nodes with a Python callable are translated to a shell command with form
conducto <file_containing_func> <func> –<arg1>=<val1> –<arg2>=<val2> ….
Calling func inside a container can be tricky: the path on your local file
system is not the same as its path inside the container in which it will
eventually run. An Exec node with callable requires that
conducto.Image
with path translation data be specified or inherited
from a parent node. It then uses the information contained in the image to
compute the correct relative path. Whether the conducto.Image
includes files from the local file system or from a remote Git repo, Exec
will deduce the correct relative path to use.
The shell command conducto <file> <func> is able to call any method inside
<file> that does not start with an underscore. See conducto.main()
for more details.
Exec will serialize the arguments according to the types on func’s arguments. It will infer from default values as well as from type hints. It uses a simple serialization system that supports simple types (bool, int, float, str) as well as lists of them. Complex types can be supported, but that feature still takes some work, so if you want this please contact us on Slack to talk about how we can help you through it. If no type is given or hinted, str is assumed.
Lazy Pipeline Creation¶
Many pipelines are hard or impossible to fully specify up front.
A data science pipeline downloads data in one step and processes in the next. The processing step may ignore data that has already been processed, and may parallelize depending on what data is downloaded. Making these decisions at pipeline generation time can be expensive or impossible.
A CI/CD pipeline deploys a service and later accesses it for testing. In many cases you can predefine your URLs, but more sophisticated setups can spawn services dynamically and autogenerated them. Writing pipelines to reference these upfront can require confusing levels of indirection, which can often be a source of bugs.
Conducto pipelines can be generated lazily using conducto.Lazy()
allowing you to generate your nodes once the necessary data or state is
available.
This can be done either with a Python callable or with a shell command similar
to conducto.Exec
. Here is an example with a Python callable.
import conducto as co
def pipe() -> co.Serial:
root = co.Serial(image=co.Image(copy_dir="."))
root["Deploy"] = co.Exec("echo Deploy service")
root["Test"] = co.Lazy(test_service, num_tests=5)
print(root.pretty())
# /
# ├─0 Deploy echo Deploy service
# └─1 Test
# ├─0 Generate conducto test.py test_service --num_tests=5
# └─1 Execute
return root
def test_service(url:str=None, num_tests=1) -> co.Parallel:
if url is None:
# Some deployment strategies will create a new service. In these cases you may
# not know the URL ahead of time but it can be determined on-the-fly. We mock
# such an example here.
url = "https://example.com/look_up_url_at_runtime"
output = co.Parallel()
for i in range(num_tests):
output[f"RunTest_{i}"] = co.Exec(f"Testing deployment at {url}")
return output
if __name__ == "__main__":
co.main()
Given a func that is type-hinted to return a co.Serial or a co.Parallel, return a co.Serial containing a pair of nodes. The first, Generate, runs func(*args, **kwargs) and prints out the resulting pipeline. The second, Execute, imports that pipeline into the current one and runs it. When the Generate node is Done, a callback is triggered which imports the generated pipeline into the Execute step.
The translation of the Python callable into a shell command for execution in the docker image is the same as described above for Calling Python Methods.
There are cases where a Python callable is inappropriate. The argument
serialization may not suffice, or the paths inside the container may not have
been detected correctly. Use conducto.Lazy
with a command line as a
str. The shell command should use the conducto command line tool to
serialize the returned Node, but aside from that you may run any command you
need.
When used with a shell command Lazy takes a user-specified command with no path inference.
import conducto as co
root = co.Serial()
root["Deploy"] = co.Exec("echo Deploy service")
root["Test"] = co.Lazy("conducto test.py --num-tests=5", node_type=co.Parallel)
print(root.pretty())
# /
# ├─0 Deploy echo Deploy service
# └─1 Test
# ├─0 Generate conducto test.py --num-tests=5
# └─1 Execute
Launch¶
-
conducto.Node.
launch_local
(self, use_shell=True, retention=7, run=False, sleep_when_done=False)¶ Launch directly from python.
- Parameters
use_shell (bool) – If True connect to the running pipeline using the shell UI. Otherwise just launch the pipeline and then exit. Default:
True
retention (int) – Once the pipeline is put to sleep, its logs and Data will be deleted after retention days of inactivity. Until then it can be woken up and interacted with. Default:
7
run (bool) – If True the pipeline will run immediately upon launching. Otherwise it will stay Pending until the user starts it. Default:
False
sleep_when_done (bool) – If True the pipeline will sleep – manager exits with recoverable state – when the root node successfully gets to the Done state. Default:
False
Auto-main Convenience¶
Command-line helper that allows you from the shell to easily execute methods that return Conducto nodes.
-
conducto.
main
(variables=None, default=None, argv=None, env=None, cpu=None, gpu=None, mem=None, requires_docker=False, image: Union[None, str, conducto.image.Image] = None, filename=None, printer=<function pprint>)¶ Command-line helper that allows you from the shell to easily execute methods that return Conducto nodes.
- Parameters
default (Callable) – Specify a method that is the default to run if the user doesn’t specify one on the command line.
image (
Image
) – Specify a default docker image for the pipeline. (See alsoImage
).cpu (float) – If specified, set the cpu for any Node called through co.main.
mem (float) – If specified, set the mem for any Node called through co.main.
env (dict) – If specified, set the env for any Node called through co.main.
requires_docker (bool) – If specified, set requires_docker for any Node called through co.main.
See
Node
for more details.
import conducto as co
def run() -> co.Serial:
return co.Serial()
if __name__ == "__main__":
co.main()
# Call 'python <file> run' to pretty-print the Node returned by `run()`.
# Call 'python <file> run --local` to launch the pipeline on your local host.