One of the advantages of using dbt (getdbt.com) is that we can use tags to run groups of models. In this way, we save time by not running all models available.
This implies that some dbt runs could last shorter times than others. How would you then go about making sure that a containerized dbt project has ended in a pipeline?
The Goal
In the past, we have talked about running dbt in Azure Data Factory (ADF) based on this approach. The solution consists on running dbt in a Azure Container Instance (ACI). A Web activity in ADF takes care of starting the container. And another activity waits until the container provisioning status is successful.
When started, the container group receives the resources to run the container instance. Once this process is successful, the instance starts running. When the instance is done with the dbt run, it is terminated.
The earlier article covers how to start a containerized dbt project in ACI. Our goal now is to create two more activities to make sure that the ACI started and ended as expected.
To achieve this, we will request the ACI’s status via REST API:
https://management.azure.com/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.ContainerInstance/containerGroups/{containerGroupName}?api-version=2022-10-01-preview
Keep in mind that, ADF should have access rights to ACI to get a response from the state request. The steps on how to grant such access is in our previous article.
Wait Until Success
We begin by creating an Until activity after the container-start activity. Let us call this activity wait_aci_success.
Within wait_aci_success, we create two activities. Firstly, a Web activity named aci_state_if_success. Secondly, a Wait activity named wait_30_seconds_for_success.
As its name implies, we assign the Wait activity a 30-second waiting period.
Within aci_state_if_success, we use the state ACI REST API mentioned above. Remember to substitute your ACI’s own information for the replace strings.
We then place the above API in the URL box and choose GET as method. In Authentication, select System Assigned Managed Identity and let https://management.azure.com/ be the Resource.
The goal of these two activities is to repeatedly get the ACI state until it is successful. Remember that, aci_state_if_success returns a JSON string containing lots of information.
For this step, we are interested in the provisioning state. It is available under the dynamic value @activity('aci_state_if_success').output.properties.provisioningState
.
Now, we step back into the original pipeline. Click on wait_aci_success and go Settings>Expression>Add dynamic content. To check whether the provisioining state is successful, we evaluate the expression
@equals(
'Succeeded',
coalesce(
activity('aci_state_if_success').output.properties.provisioningState,
'null'))
within the expression builder.
Click Ok. At this point, we have a pipeline which starts the ACI and waits until the provisioning is successful. We now create an activity which checks that the instance has ended.
Wait Until Terminated
Right after wait_aci_success, connect another Until activity on success. Let us call this activity wait_aci_terminate.
This activity follows the same procedure as its predecessor, wait_aci_success. The only difference is that we are now interested, not in the provisioning state, but in the instance state.
Let us use the name aci_state_if_terminate as the new status check activity within wait_aci_terminate. Also, let us assume that our container group only has one instance. The instance state would then be found under the dynamic value @activity('aci_state_if_terminate').output.properties.containers[0].properties.instanceView.currentState.State
.
Within the expression builder from wait_aci_terminate, we evaluate whether the instance state in terminated:
@equals(
'Terminated',
coalesce(
activity('aci_state_if_terminate').output.properties.containers[0].properties.instanceView.currentState.State,
'null'))
The outcome is an ADF pipeline that starts the ACI, checks if the ACI has started, and checks if the ACI has ended. In this way, we can orchestrate our next activity in the pipeline in a timely manner.