Integration with Dagster
Dagster (opens in a new tab) is a popular open-source data pipeline orchestrator. Dagster Cloud (opens in a new tab) is a fully managed service for Dagster.
This guide demonstrates how to setup Cube and Dagster to work together so that Dagster can push changes from upstream data sources to Cube via the Orchestration API.
Resources
In Dagster, each workflow is represented by jobs, Python functions decorated
with a @job
decorator. Jobs include calls to ops, Python functions decorated
with an @op
decorator. Ops represent distinct pieces of work executed within a
job. They can perform various jobs: poll for some precondition, perform
extract-load-transform (ETL), or trigger external systems like Cube.
Integration between Cube and Dagster is enabled by the
dagster_cube
(opens in a new tab) package.
Cube and Dagster integration package was originally contributed by Olivier Dupuis (opens in a new tab), founder of discursus.io (opens in a new tab), for which we're very grateful.
The package provides the CubeResource
class:
- For querying Cube via the
/v1/load
endpoint of the REST API. - For triggering pre-aggregation builds via the
/v1/pre-aggregations/jobs
endpoint of the Orchestration API.
Please refer to the package documentation (opens in a new tab) for details and options reference.
Installation
Install Dagster (opens in a new tab).
Create a new directory:
mkdir cube-dagster
cd cube-dagster
Install the integration package:
pip install dagster_cube
Configuration
Create a new file named cube.py
with the following contents:
from dagster import asset
from dagster_cube.cube_resource import CubeResource
@asset
def cube_query_workflow():
my_cube_resource = CubeResource(
instance_url="https://awesome-ecom.gcp-us-central1.cubecloudapp.dev/cubejs-api/v1/",
api_key="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjEwMDAwMDAwMDAsImV4cCI6NTAwMDAwMDAwMH0.OHZOpOBVKr-sCwn8sbZ5UFsqI3uCs6e4omT7P6WVMFw"
)
response = my_cube_resource.make_request(
method="POST",
endpoint="load",
data={
'query': {
'measures': ['Orders.count'],
'dimensions': ['Orders.status']
}
}
)
return response
@asset
def cube_build_workflow():
my_cube_resource = CubeResource(
instance_url="https://awesome-ecom.gcp-us-central1.cubecloudapp.dev/cubejs-api/v1/",
api_key="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjEwMDAwMDAwMDAsImV4cCI6NTAwMDAwMDAwMH0.OHZOpOBVKr-sCwn8sbZ5UFsqI3uCs6e4omT7P6WVMFw"
)
response = my_cube_resource.make_request(
method="POST",
endpoint="pre-aggregations/jobs",
data={
'action': 'post',
'selector': {
'timezones': ['UTC'],
'contexts': [{'securityContext': {}}]
}
}
)
return response
As you can see, the make_request
method for the load
endpoint accepts a Cube
query via the query
option and the make_request
method for the
pre-aggregations/jobs
endpoint accepts a pre-aggregation selector via the
selector
option.
Running jobs
Now, you can load these jobs to Dagster:
dagster dev -f cube.py
Navigate to Dagit UI (opens in a new tab) at localhost:3000 (opens in a new tab) and click Materialize all to run both jobs: