Airflow¶
Odahu-flow provides a set of custom operators that allow you to interact with a Odahu cluster using Apache Airflow
Connections¶
The Airflow plugin should be authorized by Odahu. Authorization is implemented using regular Airflow Connections
All custom Odahu-flow operators accept api_connection_id as a parameter that refers to Odahu-flow Connection
Odahu-flow Connection¶
The Odahu connection provides access to a Odahu cluster for Odahu custom operators.
Configuring the Connection¶
- Host (required)
- The host to connect to. Usually available at: odahu.<cluster-base-url>
- Type (required)
- HTTP
- Schema (optional)
- https
- Login (not required)
- Leave this field empty
- Password (required)
- The client secret. The client MAY omit the parameter if the client secret is an empty string. See more
- Extra (Required)
Specify the extra parameters (as json dictionary) that can be used in Odahu connection. Because Odahu uses OpenID authorization, additional OpenID/OAuth 2.0 parameters may be supplied here.
The following parameters are supported and must be defined:
- auth_url: url of authorization server
- client_id: The client identifier issued to the client during the registration process. See more
- scope: Access Token Scope
Example “extras” field:
{ "auth_url": "https://keycloak.<my-app-domain>", "client_id": "my-app", "scope": "openid profile email offline_access groups", }
Custom operators¶
This chapter describes the custom operators provided by Odahu.
Train, Pack, Deploy operators¶
-
class
TrainingOperator
(training=None, api_connection_id=None, *args, **kwargs)¶ The operator that runs Train phase
Use args and kwargs to override other operator parameters
Parameters: - training (odahuflow.sdk.models.ModelTraining) – describes the Train phase
- api_connection_id (str) – conn_id of Odahu-flow Connection
-
class
TrainingSensor
(training_id=None, api_connection_id=None, *args, **kwargs)¶ The operator that waits for Train phase is finished
Use args and kwargs to override other operator parameters
Parameters: - training_id (str) – Train id waits for
- api_connection_id (str) – conn_id of Odahu-flow Connection
-
class
PackagingOperator
(packaging=None, api_connection_id=None, trained_task_id: str = "", *args, **kwargs)¶ The operator that runs Package phase
Use args and kwargs to override other operator parameters
Parameters: - packaging (odahuflow.sdk.models.ModelPackaging) – describes the Package phase
- api_connection_id (str) – conn_id of Odahu-flow Connection
- trained_task_id (str) – finished task id of TrainingSensor
-
class
PackagingSensor
(training_id=None, api_connection_id=None, *args, **kwargs)¶ The operator that waits for Package phase is finished
Use args and kwargs to override other operator parameters
Parameters: - packaging_id (str) – Package id waits for
- api_connection_id (str) – conn_id of Odahu-flow Connection
-
class
DeploymentOperator
(deployment=None, api_connection_id=None, *args, **kwargs)¶ The operator that runs Deploy phase
Use args and kwargs to override other operator parameters
Parameters: - packaging (odahuflow.sdk.models.ModelDeployment) – describes the Deploy phase
- api_connection_id (str) – conn_id of Odahu-flow Connection
- packaging_task_id (str) – finished task id of PackagingSensor
-
class
DeploymentSensor
(training_id=None, api_connection_id=None, *args, **kwargs)¶ The operator that waits for Deploy phase is finished
Use args and kwargs to override other operator parameters
Parameters: - deployment_id (str) – Deploy id waits for
- api_connection_id (str) – conn_id of Odahu-flow Connection
Model usage operators¶
These operators are used to interact with deployed models.
-
class
ModelInfoRequestOperator
(self, model_deployment_name: str, api_connection_id: str, model_connection_id: str, md_role_name: str = "", *args, **kwargs)¶ The operator what extract metadata of deployed model.
Use args and kwargs to override other operator parameters
Parameters: - model_deployment_name (str) – Model deployment name
- api_connection_id (str) – conn_id of Odahu-flow Connection
- model_connection_id (str) – id of Odahu Connection for deployed model access
- md_role_name (str) – Role name
-
class
ModelPredictRequestOperator
(self, model_deployment_name: str, api_connection_id: str, model_connection_id: str, request_body: typing.Any, md_role_name: str = "", *args, **kwargs)¶ The operator request prediction using deployed model.
Use args and kwargs to override other operator parameters
Parameters: - model_deployment_name (str) – <paste>
- api_connection_id (str) – conn_id of Odahu-flow Connection
- model_connection_id (str) – id of Odahu Connection for deployed model access
- request_body (dict) – JSON Body with model parameters
- md_role_name (str) – Role name
Helper operators¶
These operators are helpers to simplify using Odahu-flow.
-
class
GcpConnectionToOdahuConnectionOperator
(self, api_connection_id: str, google_cloud_storage_conn_id: str, conn_template: typing.Any, *args, **kwargs)¶ Create Odahu-flow Connection using GCP Airflow Connection
Use args and kwargs to override other operator parameters
Parameters: - api_connection_id (str) – conn_id of Odahu-flow Connection
- google_cloud_storage_conn_id (str) – conn_id to Gcp Connection
- conn_template (odahuflow.sdk.models.connection.Connection) – Odahu-flow Connection template
How to describe operators¶
When you initialize Odahu custom operators such as TrainingOperator
, PackagingOperator
, or
DeploymentOperator
you should pass odahu resource payload as a parameter.
Actually, this is a payload that describes a resource that will be created at Odahu-flow cluster. You should describe such payloads using odahuflow.sdk models
training = ModelTraining(
id=training_id,
spec=ModelTrainingSpec(
model=ModelIdentity(
name="wine",
version="1.0"
),
toolchain="mlflow",
entrypoint="main",
work_dir="mlflow/sklearn/wine",
hyper_parameters={
"alpha": "1.0"
},
data=[
DataBindingDir(
conn_name='wine',
local_path='mlflow/sklearn/wine/wine-quality.csv'
),
],
resources=ResourceRequirements(
requests=ResourceList(
cpu="2024m",
memory="2024Mi"
),
limits=ResourceList(
cpu="2024m",
memory="2024Mi"
)
),
vcs_name="odahu-flow-examples"
),
)
But if you did some RnD work with Odahu-flow previously, it’s likely that you already have yaml/json files that describe the same payloads. You can reuse them to create odahuflow.sdk models automatically
from odahuflow.airflow.resources import resource
packaging_id, packaging = resource("""
id: airlfow-wine
kind: ModelPackaging
spec:
artifactName: "<fill-in>"
targets:
- connectionName: docker-ci
name: docker-push
integrationName: docker-rest
""")
Or refer to yaml/json files that must be located at Airflow DAGs folder or Airflow Home folder (these folders are configured at airflow.cfg file)
from odahuflow.airflow.resources import resource
training_id, training = resource('training.odahuflow.yaml')
In this file, we refer to file training.odahuflow.yaml that is located at airflow dag’s folder
For example, if you use Google Cloud Composer then you can locate your yamls inside DAGs bucket and refer to them by relative path:
gsutil cp ~/.training.odahuflow.yaml gs://<your-composer-dags-bucket>/
DAG example¶
The example of the DAG that uses custom Odahu-flow operators is shown below. Four DAGs are described.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 | from datetime import datetime from airflow import DAG from airflow.contrib.operators.gcs_to_gcs import GoogleCloudStorageToGoogleCloudStorageOperator from airflow.models import Variable from airflow.operators.bash_operator import BashOperator from odahuflow.sdk.models import ModelTraining, ModelTrainingSpec, ModelIdentity, ResourceRequirements, ResourceList, \ ModelPackaging, ModelPackagingSpec, Target, ModelDeployment, ModelDeploymentSpec, Connection, ConnectionSpec, \ DataBindingDir from odahuflow.airflow.connection import GcpConnectionToOdahuConnectionOperator from odahuflow.airflow.deployment import DeploymentOperator, DeploymentSensor from odahuflow.airflow.model import ModelPredictRequestOperator, ModelInfoRequestOperator from odahuflow.airflow.packaging import PackagingOperator, PackagingSensor from odahuflow.airflow.training import TrainingOperator, TrainingSensor default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2019, 9, 3), 'email_on_failure': False, 'email_on_retry': False, 'end_date': datetime(2099, 12, 31) } api_connection_id = "odahuflow_api" model_connection_id = "odahuflow_model" gcp_project = Variable.get("GCP_PROJECT") wine_bucket = Variable.get("WINE_BUCKET") wine_conn_id = "wine" wine = Connection( id=wine_conn_id, spec=ConnectionSpec( type="gcs", uri=f'gs://{wine_bucket}/data/wine-quality.csv', region=gcp_project, ) ) training_id = "airlfow-wine" training = ModelTraining( id=training_id, spec=ModelTrainingSpec( model=ModelIdentity( name="wine", version="1.0" ), toolchain="mlflow", entrypoint="main", work_dir="mlflow/sklearn/wine", hyper_parameters={ "alpha": "1.0" }, data=[ DataBindingDir( conn_name='wine', local_path='mlflow/sklearn/wine/wine-quality.csv' ), ], resources=ResourceRequirements( requests=ResourceList( cpu="2024m", memory="2024Mi" ), limits=ResourceList( cpu="2024m", memory="2024Mi" ) ), vcs_name="odahu-flow-examples" ), ) packaging_id = "airlfow-wine" packaging = ModelPackaging( id=packaging_id, spec=ModelPackagingSpec( targets=[Target(name="docker-push", connection_name="docker-ci")], integration_name="docker-rest" ), ) deployment_id = "airlfow-wine" deployment = ModelDeployment( id=deployment_id, spec=ModelDeploymentSpec( min_replicas=1, ), ) model_example_request = { "columns": ["alcohol", "chlorides", "citric acid", "density", "fixed acidity", "free sulfur dioxide", "pH", "residual sugar", "sulphates", "total sulfur dioxide", "volatile acidity"], "data": [[12.8, 0.029, 0.48, 0.98, 6.2, 29, 3.33, 1.2, 0.39, 75, 0.66], [12.8, 0.029, 0.48, 0.98, 6.2, 29, 3.33, 1.2, 0.39, 75, 0.66]] } dag = DAG( 'wine_model', default_args=default_args, schedule_interval=None ) with dag: data_extraction = GoogleCloudStorageToGoogleCloudStorageOperator( task_id='data_extraction', google_cloud_storage_conn_id='wine_input', source_bucket=wine_bucket, destination_bucket=wine_bucket, source_object='input/*.csv', destination_object='data/', project_id=gcp_project, default_args=default_args ) data_transformation = BashOperator( task_id='data_transformation', bash_command='echo "imagine that we transform a data"', default_args=default_args ) odahuflow_conn = GcpConnectionToOdahuConnectionOperator( task_id='odahuflow_connection_creation', google_cloud_storage_conn_id='wine_input', api_connection_id=api_connection_id, conn_template=wine, default_args=default_args ) train = TrainingOperator( task_id="training", api_connection_id=api_connection_id, training=training, default_args=default_args ) wait_for_train = TrainingSensor( task_id='wait_for_training', training_id=training_id, api_connection_id=api_connection_id, default_args=default_args ) pack = PackagingOperator( task_id="packaging", api_connection_id=api_connection_id, packaging=packaging, trained_task_id="wait_for_training", default_args=default_args ) wait_for_pack = PackagingSensor( task_id='wait_for_packaging', packaging_id=packaging_id, api_connection_id=api_connection_id, default_args=default_args ) dep = DeploymentOperator( task_id="deployment", api_connection_id=api_connection_id, deployment=deployment, packaging_task_id="wait_for_packaging", default_args=default_args ) wait_for_dep = DeploymentSensor( task_id='wait_for_deployment', deployment_id=deployment_id, api_connection_id=api_connection_id, default_args=default_args ) model_predict_request = ModelPredictRequestOperator( task_id="model_predict_request", model_deployment_name=deployment_id, api_connection_id=api_connection_id, model_connection_id=model_connection_id, request_body=model_example_request, default_args=default_args ) model_info_request = ModelInfoRequestOperator( task_id='model_info_request', model_deployment_name=deployment_id, api_connection_id=api_connection_id, model_connection_id=model_connection_id, default_args=default_args ) data_extraction >> data_transformation >> odahuflow_conn >> train train >> wait_for_train >> pack >> wait_for_pack >> dep >> wait_for_dep wait_for_dep >> model_info_request wait_for_dep >> model_predict_request |
In this file, we create four dags:
- DAG on line 190 extract and transform data, create Odahu-flow connection and run Train
- DAG on line 191 sequentially run phases Train, Package, Deploy
- DAG on line 192 wait for model deploy and then extract schema of model predict API
- DAG on line 193 wait for model deploy and then invoke model prediction API