Integration with Apache Airflow
Apache Airflow (opens in a new tab) is a popular open-source workflow scheduler commonly used for data orchestration. Astro (opens in a new tab) is a fully managed service for Airflow by Astronomer (opens in a new tab).
This guide demonstrates how to setup Cube and Airflow to work together so that Airflow can push changes from upstream data sources to Cube via the Orchestration API.
Tasks
In Airflow, pipelines are represented by directed acyclic graphs (DAGs), Python
function decorated with a @dag decorator. DAGs include calls to tasks,
implemented as instances of the Operator class. Operators can perform various
tasks: poll for some precondition, perform extract-load-transform (ETL), or
trigger external systems like Cube.
Integration between Cube and Airflow is enabled by the
airflow-provider-cube (opens in a new tab) package that provides
the following operators.
CubeQueryOperator
CubeQueryOperator is used to query Cube via the
/v1/load endpoint of the REST API.
It supports the following options:
| Option | Type | Default | Description |
|---|---|---|---|
cube_conn_id | string | cube_default | Airflow connection name. |
headers | dict | HTTP headers to be added to the request. | |
query | dict | Cube query object. | |
timeout | int | 30 | Response wait timeout in seconds. |
wait | int | 10 | Interval between API calls in seconds. |
CubeBuildOperator
CubeBuildOperator is used to trigger pre-aggregation builds and check their
status via the /v1/pre-aggregations/jobs endpoint of
the Orchestration API.
It supports the following options:
| Option | Type | Default | Description |
|---|---|---|---|
cube_conn_id | string | cube_default | Airflow connection name. |
headers | dict | HTTP headers to be added to the request. | |
selector | dict | /v1/pre-aggregations/jobs selector. | |
complete | bool | False | Whether a task should wait for builds to complete or not. |
wait | int | 10 | Interval between API calls in seconds. |
Installation
Install Astro CLI installed (opens in a new tab).
Create a new directory and initialize (opens in a new tab) a new Astro project:
mkdir cube-astro
cd cube-astro
astro dev initAdd the integration package to requirements.txt:
echo "airflow-provider-cube" >> ./requirements.txtConfiguration
Connection
Create an Airflow connection via the web console or by adding the following
contents to the airflow_settings.yaml file:
airflow:
connections:
- conn_id: cube_default
conn_type: generic
conn_host: https://awesome-ecom.gcp-us-central1.cubecloudapp.dev
conn_schema:
conn_login:
conn_password: SECRET
conn_port:
conn_extra:
security_context: {}Let's break the options down:
- By default, Cube operators use
cube_defaultas an Airflow connection name. - The connection shoud be of the
generictype. conn_hostshould be set to the URL of your Cube deployment.conn_passwordshould be set to the value of theCUBEJS_API_SECRETenvironment variable.conn_extrashould contain a security context (assecurity_context) that will be sent with API requests.
DAGs
Create a new DAG named cube_query.py in the dags subdirectory with the
following contents. As you can see, the CubeQueryOperator accepts a Cube query
via the query option.
from typing import Any
from pendulum import datetime
from airflow.decorators import dag, task
from cube_provider.operators.cube import CubeQueryOperator
@dag(
start_date=datetime(2023, 6, 1),
schedule='*/1 * * * *',
max_active_runs=1,
concurrency=1,
default_args={"retries": 1, "cube_conn_id": "cube_default"},
tags=["cube"],
)
def cube_query_workflow():
query_op = CubeQueryOperator(
task_id="query_op",
query={
"measures": ["Orders.count"],
"dimensions": ["Orders.status"]
}
)
@task()
def print_op(data: Any):
print(f"Result: {data}")
print_op(query_op.output)
cube_query_workflow()Create a new DAG named cube_build.py in the dags subdirectory with the
following contents. As you can see, the CubeBuildOperator accepts a
pre-aggregation selector via the selector option.
from typing import Any
from pendulum import datetime
from airflow.decorators import dag, task
from cube_provider.operators.cube import CubeBuildOperator
@dag(
start_date=datetime(2023, 6, 1),
schedule='*/1 * * * *',
max_active_runs=1,
concurrency=1,
default_args={"retries": 1, "cube_conn_id": "cube_default"},
tags=["cube"],
)
def cube_build_workflow():
build_op = CubeBuildOperator(
task_id="build_op",
selector={
"contexts": [
{"securityContext": {}}
],
"timezones": ["UTC"]
},
complete=True,
wait=10,
)
@task()
def print_op(data: Any):
print(f"Result: {data}")
print_op(build_op.output)
cube_build_workflow()Pay attention to the complete option. When it's set to True, the operator
will wait for pre-aggregation builds to complete before allowing downstream
tasks to run.
Running workflows
Now, you can run these DAGs:
astro run cube_query_workflow
astro run cube_build_workflowAlternatively, you can run Airflow and navigate to the web console at
localhost:8080 (opens in a new tab) (use admin/admin to authenticate):
astro dev start