There is a gap between what data scientists do while experimenting and what is required for a machine learning product to run in production.
Every company uses different tooling, but at Earnest we looked at what this gap meant internally:
- On the experimentation side, Data Scientists at Earnest perform experiments using Python, creating or extending package libraries in an IDE of choice but also heavily leveraging Jupyter notebooks, a web-based interactive computing platform. While iterating on these experiments, code is frequently changed and updated, and results are saved in the notebook itself.
- In production ETL, Earnest uses a scheduler (mostly Airflow) to orchestrate pipelines. These pipelines are usually defined in steps consisting of isolated KubernetesPodOperator containers.
How to get from experiment to production? This is the gap that the Data Science Enablement team aimed to reduce, in order to make Earnest data scientists more productive and able to ship and update machine learning products within the company rapidly and confidently.
In the existing world, we identified 6 error-prone steps a data scientist (assisted by engineering) had to follow to go from experiment to production pipeline:
- Write some chained transformations in Python
- Test locally
- Refactor chained transformations into cli apps
- For each transformation, write a KubernetesPodOperator wrapping the CLI app
- Test by running Airflow locally, then deploy and test on a remote instance that pulls code from a git repo
- If everything works, success, otherwise GOTO 2
From experience, step 4 is the most likely to get wrong. It is quite hard to pass information between airflow and the CLI app, having to make sure the CLI arguments match the right types and keep working when the container gets updated. People familiar with Airflow know that there are alternatives to the KubernetesPodOperator, for example just using the plain PythonOperator, but for machine learning using different libraries frequently leads to dependency conflicts, which makes having isolated environments, at least at the pipeline level, essential.
This way of working proved really hard and frustrating, and far from the experience we desired for a data scientist or engineer working for Earnest.
Data scientists should be able to work on Jupyter notebooks, and have a way to translate their code into production or close-to-production pipelines with little to no engineering effort required. These pipelines should still be robust enough that the company can rely upon them, and should be easy to update and change. Analysts should also be able to have some interaction with pre-defined pipelines, e.g. triggering them with custom parameters.
So, how to achieve this?
Trying to improve our Airflow usage didn’t cut it. We found Airflow to be a better tool for sparse engineering teams that needed to orchestrate their jobs. In particular in Airflow, it’s hard to execute DAGs in an interactive way like you would do in a Jupyter notebook or in code, there is no input/output checking between tasks, and the programming model is, by design, task centric, with a strong tradeoff between using PythonOperator (easier code execution) and KubernetesPodOperator (isolated environments). While Airflow 2.0 improves on some of these issues, we still found it too complex to handle the quick experiment to pipeline workflow we had in mind.
We then started looking at other tools:
- Kubeflow, a machine learning toolkit leveraging Kubernetes, started by Google
- TFX, Google’s framework for machine learning based on Tensorflow
- Dagster, an open source “data orchestrator” developed by Elementl
For Kubeflow, the team did two lengthy trials in the past, and we felt that while the tooling has grown a lot, and the ecosystem improved, it still didn’t help us enough improving our workflow. For example running a pipeline locally now required at least having a Kubernetes cluster. Also while it provided some basic type checking, it didn’t completely satisfy our “fearlessly move to the next step” hitch.
TFX looked really good, the framework is complete and supported by Google, it also provided a way of running pipelines locally (by allowing users to work with different orchestrators interchangeably), and had a good basic type system. Unfortunately, despite looking great on paper, we found the API difficult to learn and use effectively, and data handling still happened as a “side effect” making the model not that far from Airflow’s, even if supported by better tooling and classes. We also found it too tied to Tensorflow, making it hard to port some of the team’s efforts, which use pytorch underneath.
Finally, we landed on Dagster, a relatively young open source project, but with a radically different approach: compared to the other solutions we explored, Dagster is more “data centric”. Pipelines are defined by data inputs and outputs between steps (at the time called “Solids”, now “Ops”), making a user design their business logic independent of how the data is serialized/deserialized. We were also impressed with the type system, enabling Developers to validate both the data passed between steps, and the pipeline configuration. The UI looked slick, and the API intuitive (usually just requiring decorating regular python functions), This, combined with the ability to execute the pipelines in different environments, including as Python code in Jupyter notebooks, was the killer feature.
How does Dagster work?
This paragraph provides a short example on how dagster works. If you are already familiar with it, feel free to skip it.
In Dagster, you normally define a pipeline by decorating your python functions with Dagster annotations, chaining them based on the data they produce and they consume. For example, a machine learning pipeline can look like this:
import dagster
import dagster_pandas as dagster_pd
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
@dagster.op(
ins={"input_data": dagster.In(dagster_pd.DataFrame)},
out={
"train": dagster.Out(dagster_pd.DataFrame),
"test": dagster.Out(dagster_pd.DataFrame),
},
)
def split_data(input_data: pd.DataFrame):
train, test = train_test_split(input_data, test_size=0.3)
return train, test
@dagster.op(
ins={"training_set": dagster.In(dagster_pd.DataFrame)},
out={"result": dagster.Out(SVC)},
)
def train_model(training_set: pd.DataFrame) -> SVC:
svc = SVC()
iris_data = training_set[
["sepal_length", "sepal_width", "petal_length", "petal_width"]
]
iris_results = training_set["species"]
svc.fit(iris_data, iris_results)
return svc
@dagster.op(
ins={"model": dagster.In(SVC), "test_set": dagster.In(dagster_pd.DataFrame)},
out={"result": dagster.Out(dagster_pd.DataFrame)},
)
def run_inference(model: SVC, test_set: pd.DataFrame) -> pd.DataFrame:
result = test_set[
["sepal_length", "sepal_width", "petal_length", "petal_width"]
].copy()
result["result"] = result.apply(
lambda row: model.predict(row.to_numpy().reshape(1, -1))[0], axis=1
)
return result
@dagster.job
def iris_pipeline():
train, test = split_data()
model = train_model(train)
result = run_inference(model, test)
There are a few things happening with this sample python code.
Starting from the bottom, we define a job called iris_pipeline. In the job the first function we call, split_data, yields two outputs, train and test, which we assign to the corresponding variables. The train variable is passed to the train_model function, which computes the model. Finally the model, together with the test set, are passed to the run_inference function which computes the result. You may notice that the split_data function is invoked without passing the “input_data” argument. Dagster knows that it will need to look into the run configuration to understand how to fetch the data.
If you look above each Op definition, you can see two lines describing the inputs and outputs of the function. These are normally optional, but we are including them here for clarity and to make it easier to run the pipeline (for example the dagster_pd.DataFrame type will help dagster understand how to read a dataframe from a local file).
How does Dagster “run” all of this? There are a few options.
In python
You can run the pipeline above from python by invoking
iris_pipeline.execute_in_process(run_config={…insert config here…})
This makes it very fast and easy to iterate on notebooks and write unit tests for our pipelines, keeping data scientists in their environment without needing to resort to other tooling.
In dagit
Alternatively, you can also spin up a local “dagit” instance, or deploy it to a remote one. Dagit is a web UI that provides a way of displaying the pipeline and running it. The pipeline we defined above will look similar to this:
Without having to manually specify the sequence in which operations have to be run, Dagster created a pipeline simply by using inputs and outputs.This avoids a whole class of problems trying to sequence steps in the right order in more complex pipelines. You can also notice that there are type annotations next to our input and output names. Dagster will perform a type check between the steps to make sure that the types match, failing the pipeline avoiding wasting computational resources if they don’t. Having pipelines expressed this way also makes developers always think about separating data input and output from the business logic of the pipeline, making it easy to swap data sources at configuration time.
To run the pipeline in Dagit, switch to the “launchpad” tab and pass the required configuration. In our case this looks like this:
ops:
split_data:
inputs:
input_data:
csv:
path: iris.csv
run_inference:
outputs:
- result:
csv:
path: result.csv
More advanced pipelines will have more configuration options, but for the purpose of this post we are keeping things simple.
You can also see how you are able to define where the initial data comes from, and, optionally, where to save the results. This also makes it easy to embed pipelines into other pipelines (by using @graph, which we are not covering here).
If you click the “launch execution” button the run starts, and you can see the job running with a Gantt chart showing the execution times, and a rich log describing what’s happening and information about the data.
At the execution layer, Dagster allows you to run pipelines in different environments: we rely on a single process for unit tests, multiple processes for local manual tests, and kubernetes for production. Changing the execution environment is just a matter of updating the configuration, without touching the pipeline’s logic.
How does Earnest work with Dagster?
Even before using Dagster, the team developed a library containing utilities and integrations with the Earnest cloud ecosystem. We call this library the Data Science Development Kit (in short DSDK).
The DSDK is a library that allows interacting with our infrastructure, and provides a standard way of developing machine learning products.
The DSDK aims to structure the code so that you can go from experiments to products in a seamless way, and Dagster became a core component of how we made this possible.
Our data scientists implement their machine learning components using pre-defined opinionated DSDK classes for Data Sourcing, Transformation, Training and Inference, which all provide methods to be converted into Dagster pipeline components, leveraging Dagster’s type system and the DSDK internal types to create robust pipelines from the get go.
We also provide types that describe different ways of fetching the data using a custom IO Manager, a class that Dagster provides to handle inputs and outputs between steps. By using our custom classes and IO Manager, developers can swap a local stub in a unit test with a table in BigQuery by only acting at the config level.
Writing an experiment and converting it to a production pipeline then involves simply using these classes and composing them together, without worrying about packaging them into individual containers.
Conclusion and next steps
In conclusion, Dagster greatly helps us move from ML experiment to production, significantly reducing friction. Its type system enables us to separate business logic from computation and data serialization / deserialization and the integrations we built through the DSDK allow us to freely interact from our pipelines with the rest of our cloud stack.
We are currently developing tooling for our data analysts to interact with our ML platform. We want to enable them to use our machine learning products and collaborate. For this reason we are creating “clients” for our pipelines that run on Google Colab. These clients are able to talk to our Dagster instances via the GraphQL API, and trigger pipelines running on Kubernetes on demand, notifying the user when a run’s results are ready to inspect.
You can learn more about how we use Dagster by watching Alessandro’s presentation at the Dagster Community Meeting: