みどりのジーパン

Data Science / Data Engineering / MLOpsを勉強中

Azure Data Factoryでのデータ処理結果をSlack通知する

はじめに

本投稿では,Azure Data Factoryのデータ加工処理結果をSlack投稿する方法をまとめます.

と言っても,ここ数週間たまたま業務で触っていくうちに知り得たtipsを書き残すだけなので,機能を最大限活用できていないかもしれない点はご了承ください.

最終的に,以下の画像のようなSlack通知を作成することを目指します.

Image.png

Azure Data Factoryとは

私の認識を一言で言うと,Azure Data FactoryはELTやETL処理過程をGUI上で実装できる,データ加工のためのサービスです,類似サービスにArgo WorkflowsGCPCloud Data Fusionなどがあります.

Image.png

ref: https://docs.microsoft.com/ja-jp/azure/data-factory/introduction

公式サイトには以下のような説明がありました.

追加のコストなしでメンテナンス不要の 90 を超える組み込みのコネクタを使用して、データ ソースを視覚的に統合できます。直感的な環境でコードなしで ETL および ELT プロセスを簡単に構築することも、独自のコードを書くこともできます。

Data Factory - データ統合サービス | Microsoft Azure

名前にAzureと入っているのでAzure系やMicrosoft系のサービスしか利用できないのかと思いきや,PostgreSQLなどのオンプレミス系やAWSなどの他社クラウドサービスも活用したデータ処理過程も実装できるみたいです.1

パイプラインとアクティビティ - Azure Data Factory & Azure Synapse

Azure Data Factory からSlackに通知させる手順

0. 前提条件

このブログのテーマでもあるので言うまでもないですが,以下の前提条件を満たしている環境を仮定します.

  • Azure Data Factoryのインスタンスを作成済み(要はData Factoryを使えればOK)
  • SlackのWorkspaceを作成済み(要はSlackが使えればOK)

インスタンス作成は以下の記事が役に立つかと思います.

Azure Data Factory UI を使用して Azure データ ファクトリを作成する - Azure Data Factory

1. SlackのworkspaceにIncoming WebHooksを入れる

以下のURLに飛んでください.

Incoming WebHooks

するとIncoming WebHooksのinstall画面が出てくるはずです.

右上に使用したいworkspaceが表示されていることを確認して(sign inしていなければしてください),Add to Slackで追加します.

次に出てくる画面で,Azure Data Factory(でなくても良いですが)から処理結果を通知させたいチャンネルを選択します.

Image.png

Webhook URLはコピーしてどこかに保存しておきます.

Image.png

2. Azure Data Factory上でPipelineを作成する

Pipelineを作成すること自体は本題ではないので,今回はSQL DataBaseのサンプルProcedureを実行するという,極めて単純なケースを考えます.2

下図がPipelineの全体像になります.

Image.png

3. Web機能を使ってSlack通知を実装する

本題です.まず,先ほど作成したPipelineにWebという機能を2つ追加します.何故2つかというと,成功した場合の通知と失敗した場合の2種類が必要になるからです.

作成したらPipelineの最後尾とWebを矢印で繋ぎます.最後尾に緑色の突起があるので,それをドラッグして繋ぎます.下図のようなイメージです.

Image.png

緑色の矢印はそれまでの実行が成功した場合の矢印を示します.失敗した場合の通知機能NoticeFailureへの矢印は赤色に変更をします.矢印を選択後,右クリックでパターンを変更することができます.

Image.png

ここまでできたらSlackに通知させる機能を実装していきます. 先ほどのNoticeSuccessを例に中身を見ていくと以下のようになります.映っていない部分は一旦デフォルトのままで特段問題ないと思います.

Image.png

URLの部分には手順1でメモしてあるはずのwebhook URLを入力してください. なんとなく見れば分かりますが,bodyの部分はそれぞれ次のような要素になっております.

{username”: Slackでのユーザー名,
    “text”:本文,
    “icon_emoji”:Slackの絵文字
}

NoticeFailureも同様の形式で実装します.これで通知機能は完成です.

4. Test

実際に動作するのか確かめてみましょう. Azure Data FactoryではDebugというところをクリックすると,パイプラインが実行されるようになっています.

成功時

Image.png

失敗時

Image.png

SlackでAzure Data Factoryの実行状況を通知させることができました.

応用例

以上で本題は終了なのですが,ついでに私がこれまでの業務で獲得したいくつかのtipsを紹介します.

トリガーを設定する

上記の例だと手動で単一のProcedureをわざわざGUI上のサービスで実行し,その結果をわざわざSlackで確認するみたいな流れになっていました.スポットでの実行であれば,わざわざAzure Data Factoryを使用する必要がありません.

しかしこのProcedureが1時間に一度,1日に一度というようにバッチ処理させたい場合もあるでしょう.自動でProcedureを実行させ,その結果を自動で通知することができる機能が欲しくなってきます.

このような場合,トリガーを設定すれば自動実行させることが可能となります. またトリガーとSlack通知を組み合わせることで,基本的には自動でPipelineを実行し,問題が発生した場合はそれをSlackから把握し,原因調査やリカバリー作業につなげることができるようになります, こういった状況では通知機能のありがたみが分かりますね.

設定方法を簡単に説明します.Pipeline設定画面から,Add triggerを選択,新規作成をしようとすると以下のような画面が出てきます.こちらは1時間に一度自動でパイプラインが実行されるように設定したものとなります.

Image.png

トリガーは時間帯,n分に一度という設定方法のほか,ストレージにファイルがアップロードされたら実行が開始されるなどの設定も可能です.

詳しくはドキュメント見てください.

パイプラインの実行とトリガー - Azure Data Factory & Azure Synapse

カスタム絵文字を設定する

通知するアイコンはデフォルトで登録されている絵文字でも問題はないですが,ついでに遊び心もしのばせておきたい時があると思います.

後からユーザーが独自に追加するカスタム絵文字でも問題なく通知に組み込むことができます.

試しにこちらのサイトから画像を拝借し,アイコンの部分だけ変えて通知してみます.

SPECIAL|TVアニメ『SPY×FAMILY』

Image.png

成功通知:「アーニャのおかげ!」って声が聞こえてきそうです

Image.png

失敗通知:ショックの度合いがひしと伝わってきます3

Image.png

ここで注意事項なのですが,Slackの絵文字を日本語文字列で入力するとうまくいきません. 例えば,文字列で:やっほー:という絵文字がありますが,これではAzure Data Factoryは読み取ってくれないので,英語での文字列の:hugging_face:と入力する必要があります. カスタム絵文字の場合は絵文字の名前を日本語で登録しないよう気をつけてください.

話は逸れますが,絵文字以外もカスタマイズして,よりリッチな通知を実装させることも可能です. 以下の記事が参考になるかと思います.

SlackのIncoming Webhooksを使い倒す - Qiita

上記の記事に載っていて大変便利だと感じたのですが,通知画面のUIを見比べながらjsonの実装内容を生成できる優れものもあるみたいです.

https://api.slack.com/docs/messages/builder

詳細な実行結果を取得する

通知内容が「成功した」「失敗した」でも最低限の状況は分かりますが,「成功した」と通知されていても,実は不具合が発生している場合も考えられます.

例えば,平常時なら100レコードほどがInsertされるPipelineで,10レコード程度しかInsertされなかった場合,そのPipelineかその前の段階での不具合を疑う必要があります.

しかしこれまでの「成功した」通知だと,平常時よりも極端に少ないレコード数しかInsertされていることに気がつくことができません. データに不具合があることを見落としてしまうと,後続のダッシュボード可視化や機械学習モデルに正しくないデータを流し込んでしまいます.

ここで,実行時にどのくらいのレコード数がInsertされたのかも併せて通知させれば,極端な実行結果になっていること把握することができます. 即ち予期せぬ不備やエラーが発生しているなどの現象を疑うことが可能になります.例として今回はこれを実装しましょう.

Pipelineの流れとして,こちらを仮定します.

  1. とあるProcedureを実行する
  2. テーブルに国ごとにどのくらいのレコードがあるか確認する
  3. 国ごとのレコード数を通知する

詳細は省力しますが,これを実装すると以下のようなフローになります.

全体像がこちら

Image.png

ForEachCountryArrayの中身

Image.png

  • getCountriesでテーブルにある国のリストを抽出
  • SetCountryArrayjson形式の結果をArray型に変換してCountryArrayに保持
  • ForEachCountryArrayCountryArrayを繰り返し処理,即ち要素ごとの処理を実行
    • SetCountryでその繰り返しターンで処理する国名を保持
    • HowManyRecordsで国に該当するレコード数をカウント
    • SetCountで上記結果を変数として保持
    • NoticeSuccessでSlack通知

国ごとにどのくらいのレコードがあるかを通知させるには,ForEachCountryArrayの内部の結果を通知文に掲載した状態にする必要があります.ここが,これまでの固定されたjson形式で記述していた方法とは異なる部分になります.

実行結果などを通知に組み込むためには,NoticeSuccessのbody部分を以下のように記述するのがコツです.

@json(

    concat(

        '{"username": "NoticeBot(Success)",',

        '"icon_emoji": ":anya_good:",',

        '"text": "',

        variables('Country'), -- 国を表現する変数

        'に関するレコード数は',

        variables('Count'), -- レコード数を表現する変数

        '行です"}'

    )

)

一旦String型として結合させてから,再度json 形式に修復させるという形をとっています.単純に面倒なのと,,の位置など間違えやすいので気をつけてください.私は{を忘れておりしばらく詰まってしまいました.

追記(2022/08/17)

上記のようにconcatで頑張らなくても、Pythonのf-stringのような書き方も可能なようです。

{
    "username": "NoticeBot(Success)",

    "icon_emoji": ":anya_good:",

    "text": "@{variables('Country')} に関するレコード数は@{variables('Count')}行です"
}

Azure Data Factory でパラメーターと式を使用する方法 - Azure Data Factory | Microsoft Docs

これで実行すると次のような通知がSlackに届きます.これで急にレコード数が増加/減少したなどを把握することができるようになりました.

Image.png

終わりに

今回この記事を作成するにあたって初めてAzureを個人アカウントで使用しました.Pythonの挙動や機械学習についての記事を書く場合は,google colaboratoryなどでサッと実装できたのですが,今回は環境作りに苦労しました.聞き慣れない言葉の並ぶ画面でのサービス登録,データベース準備,触ったことのないデータを使った例題考案…といった部分で,無駄に時間を溶かしてしまいました.

参考(本文中に掲載したものを除く)

  • Azure Data FactoryのScript4の実装例が参考になります.

How To Use Script Activity in Azure Data Factory

  • サンプルのデータベースを作成する際に参考にしました.

AdventureWorks サンプル データベース - SQL Server

  • Pipeline作成の大枠を掴むのであれば以下の記事がおすすめです.

Azure Data Factoryを触ってみる #Azureリレー | cloud.config Tech Blog

  • 色々なデータ処理を実装して見たくなったら以下の動画もおすすめです.

How to use Azure Data factory expressions (with examples!)


  1. 私の周りはAzureでガチガチに固めるという謎戦略なのでやったことはないですが

  2. わざわざAzure Data Factoryでやる必要皆無です

  3. 地味に気に入っている

  4. UXがマジでアレ