MS Teams with Cloud Composer for failure Notifications

Nilesh Khandalkar
3 min readMar 16, 2022

This article is specifically to explain how failure notifications can be sent to MS teams channel from cloud composer.

In Cloud Composer (Airflow) we run DAGs, you can check my below blog on Cloud Composer to understand more on DAG’s.

While running DAGs, we may encounter failures for specific tasks or steps within the tasks, the simplest way to get notified is using MS Teams channel, we can get notified in MS Teams whenever an failure or error has occurred. Let’s understand how we can set this up.

Firstly, we need to create a channel in MS Teams in your teams group by clicking on 3 dots of the group and then select Add channel, provide channel name and description if required.

Once the channel is created, click the 3 dots besides the channel and select Connectors and then from option “Incoming Webhook” click configure, provide a name and click create. There will be a URL which gets generated. Copy the URL and save it to clipboard.

Once the Teams channel is set, go to the composer environment > airflow > admin > variables, in the key give a name (e.g. webhook) which you wish to use in the DAG code and in the value paste the URL which was copied in the previous step.

Now in the DAG code, do the below steps:

webhook = models.Variable.get(“webhook”)

Create a Function

def ms_teams_alert(context, teams_webhook=webhook):
dag_id = context[“dag_run”].dag_id
task_id = context[“task_instance”].task_id
context[“task_instance”].xcom_push(key=dag_id, value=True)
logs_url = context.get(“task_instance”).log_url
teams_notification = pymsteams.connectorcard(teams_webhook)
message = “`{}` has failed on task: `{}`”.format(dag_id, task_id)
teams_notification.title(“ERROR ALERT !!”)
teams_notification.color(“red”)
teams_notification.text(“ERROR MESSAGE: “ + message)
teams_notification.addLinkButton(“LOG URL”, logs_url)
teams_notification.send()

The notification part can be changed or customized as per the requirement.

Now in the default arguments of the DAG, call this function

“on_failure_callback”: ms_teams_alert

That’s it, now if any task in the DAG fails then an failure notification will be sent to the teams channel which was created earlier. This is a easiest and simplest way to get notified incase of failures from Cloud composer DAG execution.

The other option is to use MSTeamsWebhookOperator and creating a connection id.

from ms_teams_webhook_operator import MSTeamsWebhookOperator

def on_failure(context):
dag_id = context[‘dag_run’].dag_id
task_id = context[‘task_instance’].task_id
context[‘task_instance’].xcom_push(key=dag_id, value=True)
logs_url = “https://myairflow/admin/airflow/log?dag_id={}&task_id={}&execution_date={}".format(
dag_id, task_id, context[‘ts’])
teams_notification = MSTeamsWebhookOperator(
task_id=”msteams_notify_failure”, trigger_rule=”all_done”,
message=”`{}` has failed on task: `{}`”.format(dag_id, task_id),
button_text=”View log”, button_url=logs_url,
theme_color=”FF0000", http_conn_id=’msteams_webhook_url’)
teams_notification.execute(context)
default_args = {
‘owner’ : ‘airflow’,
‘description’ : ‘a test dag’,
‘start_date’ : datetime(2019,8,8),
‘on_failure_callback’: on_failure
}

Hope this was helpful!

--

--

Nilesh Khandalkar

Passionate about Data and Cloud, working as Data Engineering Manager at Capgemini UK. GCP Professional Data Engineering Certified Airflow Fundamentals Certified