Integration with Apache Airflow
Apache Airflow (opens in a new tab) is a popular open-source workflow scheduler commonly used for data orchestration. Astro (opens in a new tab) is a fully managed service for Airflow by Astronomer (opens in a new tab).
This guide demonstrates how to setup Cube and Airflow to work together so that Airflow can push changes from upstream data sources to Cube via the Orchestration API.
Tasks
In Airflow, pipelines are represented by directed acyclic graphs (DAGs), Python
function decorated with a @dag
decorator. DAGs include calls to tasks,
implemented as instances of the Operator
class. Operators can perform various
tasks: poll for some precondition, perform extract-load-transform (ETL), or
trigger external systems like Cube.
Integration between Cube and Airflow is enabled by the
airflow-provider-cube
(opens in a new tab) package that provides
the following operators.
CubeQueryOperator
CubeQueryOperator
is used to query Cube via the
/v1/load
endpoint of the REST API.
It supports the following options:
Option | Type | Default | Description |
---|---|---|---|
cube_conn_id | string | cube_default | Airflow connection name. |
headers | dict | HTTP headers to be added to the request. | |
query | dict | Cube query object. | |
timeout | int | 30 | Response wait timeout in seconds. |
wait | int | 10 | Interval between API calls in seconds. |
CubeBuildOperator
CubeBuildOperator
is used to trigger pre-aggregation builds and check their
status via the /v1/pre-aggregations/jobs
endpoint of
the Orchestration API.
It supports the following options:
Option | Type | Default | Description |
---|---|---|---|
cube_conn_id | string | cube_default | Airflow connection name. |
headers | dict | HTTP headers to be added to the request. | |
selector | dict | /v1/pre-aggregations/jobs selector. | |
complete | bool | False | Whether a task should wait for builds to complete or not. |
wait | int | 10 | Interval between API calls in seconds. |
Installation
Install Astro CLI installed (opens in a new tab).
Create a new directory and initialize (opens in a new tab) a new Astro project:
mkdir cube-astro
cd cube-astro
astro dev init
Add the integration package to requirements.txt
:
echo "airflow-provider-cube" >> ./requirements.txt
Configuration
Connection
Create an Airflow connection via the web console or by adding the following
contents to the airflow_settings.yaml
file:
airflow:
connections:
- conn_id: cube_default
conn_type: generic
conn_host: https://awesome-ecom.gcp-us-central1.cubecloudapp.dev
conn_schema:
conn_login:
conn_password: SECRET
conn_port:
conn_extra:
security_context: {}
Let's break the options down:
- By default, Cube operators use
cube_default
as an Airflow connection name. - The connection shoud be of the
generic
type. conn_host
should be set to the URL of your Cube deployment.conn_password
should be set to the value of theCUBEJS_API_SECRET
environment variable.conn_extra
should contain a security context (assecurity_context
) that will be sent with API requests.
DAGs
Create a new DAG named cube_query.py
in the dags
subdirectory with the
following contents. As you can see, the CubeQueryOperator
accepts a Cube query
via the query
option.
from typing import Any
from pendulum import datetime
from airflow.decorators import dag, task
from cube_provider.operators.cube import CubeQueryOperator
@dag(
start_date=datetime(2023, 6, 1),
schedule='*/1 * * * *',
max_active_runs=1,
concurrency=1,
default_args={"retries": 1, "cube_conn_id": "cube_default"},
tags=["cube"],
)
def cube_query_workflow():
query_op = CubeQueryOperator(
task_id="query_op",
query={
"measures": ["Orders.count"],
"dimensions": ["Orders.status"]
}
)
@task()
def print_op(data: Any):
print(f"Result: {data}")
print_op(query_op.output)
cube_query_workflow()
Create a new DAG named cube_build.py
in the dags
subdirectory with the
following contents. As you can see, the CubeBuildOperator
accepts a
pre-aggregation selector via the selector
option.
from typing import Any
from pendulum import datetime
from airflow.decorators import dag, task
from cube_provider.operators.cube import CubeBuildOperator
@dag(
start_date=datetime(2023, 6, 1),
schedule='*/1 * * * *',
max_active_runs=1,
concurrency=1,
default_args={"retries": 1, "cube_conn_id": "cube_default"},
tags=["cube"],
)
def cube_build_workflow():
build_op = CubeBuildOperator(
task_id="build_op",
selector={
"contexts": [
{"securityContext": {}}
],
"timezones": ["UTC"]
},
complete=True,
wait=10,
)
@task()
def print_op(data: Any):
print(f"Result: {data}")
print_op(build_op.output)
cube_build_workflow()
Pay attention to the complete
option. When it's set to True
, the operator
will wait for pre-aggregation builds to complete before allowing downstream
tasks to run.
Running workflows
Now, you can run these DAGs:
astro run cube_query_workflow
astro run cube_build_workflow
Alternatively, you can run Airflow and navigate to the web console at
localhost:8080
(opens in a new tab) (use admin
/admin
to authenticate):
astro dev start