Azure Data Lake と Azure Data Factory を使用してビッグ データ パイプラインを作成する

このポストは、10 月 29 日に投稿された Creating big data pipelines using Azure Data Lake and Azure Data Factory の翻訳です。

このたびマイクロソフトは、新しい Azure Data Lake のパブリック プレビューを発表しました (英語)。Azure Data Lake Store、Azure Data Lake Analytics、Azure HDInsight を含む拡張版 Azure Data Lake では、ビッグ データの処理と分析がより簡単になり、アクセス性が向上しています。

Azure Data Lake Store は、サイズや種類を問わずさまざまなデータを簡単に取得できる単一のリポジトリで、どのようなスケールのデータでもアプリケーションを変更することなく高速に処理します。Azure Data Lake Analytics は Apache YARN を基盤とし、U-SQL 言語に対応した新しいサービスで、お客様の強力なコードと SQL のメリットを統合することができます。さらに、動的なスケーリングや、Azure Active Directory の企業レベルのセキュリティが確保されたあらゆる種類のデータ分析がサービスによって実施されるため、お客様はビジネス目標の達成に注力することができます。

10 月初旬に、従来からサポートされていた Azure HDInsight に加えて、Azure Data Lake および Azure Data Factory (ADF) でビッグ データ パイプライン (ワークフロー) の作成と運用準備が可能になることをお知らせしましたが、今回、この機能がパブリック プレビューに追加されました。この記事では、Azure Data Lake と Azure Data Factory の統合により実現される内容をご紹介します。

Azure Data Lake Store へのデータの移動が簡単に

現時点で、Azure Data Factory は下記のソースから Azure Data Lake Store へのデータの移動をサポートしています。

  • Azure Blob
  • Azure SQL Database
  • Azure Table
  • オンプレミスの SQL Server データベース
  • Azure DocumentDB
  • Azure SQL DW
  • オンプレミスのファイル システム
  • オンプレミスの Oracle Database
  • オンプレミスの MYSQL データベース
  • オンプレミスの DB2 データベース
  • オンプレミスの Teradata データベース
  • オンプレミスの Sybase データベース
  • オンプレミスの PostgreSQL データベース
  • オンプレミスの HDFS
  • 一般的な OData (近日追加予定)
  • 一般的な ODBC (近日追加予定)

また、Azure Blob、Azure SQL Database、オンプレミスのファイル システムなど、Azure Data Lake Store からさまざまな送信先にデータを移動できます。データを Azure Blob ストレージから Azure Data Lake Store に移動する場合は、次の手順を実施します。

注: Azure Data Lake Store の有効なアカウントが必要です。アカウントをお持ちでない方は、こちらのページ (英語) からアカウントを新規作成してください。

Azure Data Factory を作成する

Azure ポータルにログインして Azure Data Factory に移動します。名前を入力し、サブスクリプション、リソース グループ名、リージョン名を選択します。ここでは、"AzureDataLakeStoreAnalyticsSample" という名前を使用します。

作成が完了したら、Data Factory に移動して [Author and deploy] をクリックします。

ADF とリンクされたサービスを作成する

Azure Storage とリンクされたサービスを作成する: Azure Blob ストレージ (送信元) からデータを移動する場合について説明します。

[New Data Store]、[Azure Storage] の順にクリックします。<AccountName> および <AccountKey> の各パラメーターに値を入力し、[Deploy] をクリックします。

Azure Data Lake Store とリンクされたサービスを作成する: Azure Data Lake Store (送信先) へデータを移動する場合について説明します。

[New Data Store]、[Azure Data Lake Store] の順にクリックします。

Azure Data Lake Store とリンクされたサービスで必要なパラメーターを入力します。

DataLakeUri: 上記の手順で作成するか、既存のものを使用します (例: https://<adlstoreaccountname>.azuredatalakestore.net/webhdfs/v1 の<adlstoreaccountname> の部分を Azure Data Lake Store のアカウント名に変更します)。

Authorization: このパラメーターを指定するには、[Authorize] をクリックします。ポップアップ ウィンドウが開くので、資格情報を入力します。

Azure Data Lake Store アカウントが異なるサブスクリプションに存在し、現在の Data Factory とリソース グループ名が異なる場合、下記のパラメーターも指定する必要があります。

  • AccountName
  • SubscriptionID
  • ResourceGroupName

[Deploy] をクリックすると、Azure Data Lake Store とリンクされたサービスが作成されます。

注: JSON で [Optional] とされている行のうち、値を指定しない行は、[Deploy] をクリックする前に削除する必要があります。

ADF のデータセットを作成する

Azure Blob ストレージの送信元データセットを作成する:

[New Dataset]、[Azure Blob storage] の順にクリックします。

Azure Blob ストレージ データセットのテンプレートが用意されるので、すべての値を入力します。下記は Azure Blob ストレージのデータセットの例です。わかりやすくするために、句による時間ごとのパーティション分割や static フォルダーは使用していません。下記データセットでは、コピー元のデータ (SearchLog.tsv) が Azure Storage 内の rawdatasample/data/ フォルダーに存在することを指定しています。

 {
    "name": "RawBlobDemoTable",
    "properties": {
        "published": false,
        "type": "AzureBlob",
        "linkedServiceName": "StorageLinkedService",
        "typeProperties": {
            "fileName": "SearchLog.tsv",
            "folderPath": "rawdatasample/data/",
            "format": {
                "type": "TextFormat",
                "rowDelimiter": "\n",
                "columnDelimiter": "\t"
            }
        },
        "availability": {
            "frequency": "Day",
            "interval": 1,
            "style": "StartOfInterval"
        },
        "external": true,
        "policy": {
            "validation": {
                "minimumSizeMB": 0.00001
            }
        }
    }
}

Azure Data Lake Store の送信先データセットを作成する:

[New Dataset]、[Azure Data Lake Store] の順にクリックします。

Azure Data Lake Store のデータセットのテンプレートが用意されるので、すべての値を入力します。下記は Azure Data Lake Store のデータセットの例です。わかりやすくするために、句による時間ごとのパーティション分割、および static フォルダーは使用していません。下記のデータセットでは、データを Azure Data Lake 内の datalake/input/ フォルダーにコピーするように指定しています。

 {
    "name": "DataLakeTable",
    "properties": {
        "published": false,
        "type": "AzureDataLakeStore",
        "linkedServiceName": "AzureDataLakeStoreLinkedService",
        "typeProperties": {
            "folderPath": "datalake/input/",
            "fileName": "SearchLog.tsv",
            "format": {
                "type": "TextFormat",
                "rowDelimiter": "\n",
                "columnDelimiter": "\t"
            }
        },
        "availability": {
            "frequency": "Day",
            "interval": 1
        }
    }
}

ADF パイプラインを作成する

ADF のコピー パイプラインを作成する: Azure Blob ストレージから Azure Data Lake にデータをコピーする場合について説明します。

[New Pipeline] をクリックすると、パイプラインのテンプレートのサンプルが開きます。下記のパイプラインの例では、前のセクションで作成したサンプル データセットを使用して、Azure Blob ストレージから Azure Data Lake にデータをコピーします。

パイプラインの定義

 {
    "name": "EgressBlobToDataLakePipeline",
    "properties": {
        "description": "Egress data from blob to azure data lake",
        "activities": [
            {
                "type": "Copy",
                "typeProperties": {
                    "source": {
                        "type": "BlobSource",
                        "treatEmptyAsNull": true
                    },
                    "sink": {
                        "type": "AzureDataLakeStoreSink",
                        "writeBatchSize": 0,
                        "writeBatchTimeout": "00:00:00"
                    }
                },
                "inputs": [
                    {
                        "name": "RawBlobDemoTable"
                    }
                ],
                "outputs": [
                    {
                        "name": "DataLakeTable"
                    }
                ],
                "policy": {
                    "timeout": "10:00:00",
                    "concurrency": 1,
                    "executionPriorityOrder": "NewestFirst",
                    "retry": 1
                },
                "scheduler": {
                    "frequency": "Day",
                    "interval": 1
                },
                "name": "EgressDataLake",
                "description": "Move data from blob to azure data lake"
            }
        ],
        "start": "2015-08-08T00:00:00Z",
        "end": "2015-08-08T01:00:00Z",
        "isPaused": false
    }
}

ADF パイプラインを監視する

前のセクションで作成した ADF のコピー パイプラインは、実行頻度が 1 日 1 回、start と end が 2015 年 8 月 8 日と定義されているため、1 回だけコピーが実行されます。ADF パイプラインのスケジュール設定の詳細については、こちらのページを参照してください。

ADF の [Diagram] ビューに移動して、Data Factory の動作フローを表示します。ここでは、Azure Blob ストレージと Azure Data Lake Store のデータセットと、Blob ストレージから Data Lake Store にデータを移動する際に使用されるパイプラインが表示されます。

[Diagram] ビューで [DataLakeTable] をクリックすると、実行されるアクティビティとそのステータスが表示されます。

上記では、ADF の EgressBlobToDataLakePipeline のコピー アクティビティが正常に実行され、Azure Blob ストレージから Azure Data Lake Store に 3.08 KB のデータがコピーされたことが確認できます。また、Microsoft Azure ポータルにログインして Azure Data Lake データ エクスプローラーを使用しても、Azure Data Lake Store にコピーされたデータを表示することができます。

Azure Data Factory でのデータ移動アクティビティの詳細については、こちらのページを参照してください。また、ADF の AzureDataLakeStore コネクタの詳細なドキュメントは、こちらから参照してください (英語)

ADF で E2E のビッグ データ パイプラインを作成し Azure Data Lake Analytics サービスで U-SQL スクリプトを実行

ログの処理は、さまざまな業種 (小売、金融、ゲームなど) で一般的に行われています。

注: Azure Data Lake Analytics の有効なアカウントが必要です。アカウントをお持ちでない方は、こちらのページ (英語) からアカウントを新規作成してください。

このシナリオでは、前の手順で Azure Data Lake Store アカウントにコピーされたログを取得する ADF パイプラインを作成し、Azure Data Lake Analytics での処理段階の 1 つとして U-SQL スクリプトでログを処理します。この U-SQL スクリプトではリージョンごとにイベントの演算処理を実行し、その結果を下流工程で使用します。

ここでは、前のシナリオで作成した Data Factory (AzureDataLakeStoreAnalyticsSample) を再利用して、データを Azure Blob ストレージから Azure Data Lake Store にコピーします。

ADF とリンクされたサービスを作成する

Azure Data Lake Analytics とリンクされたサービスを作成します。これは、ログの処理を行う U-SQL スクリプトを実行する Azure Data Lake Analytics アカウントです。

[New Compute]、[Azure Data Lake Analytics] の順にクリックします。

Azure Data Lake Analytics とリンクされたサービスで必要なパラメーターを入力します。

  • AccountName: 上記の手順で作成するか、既存のものを使用します。
  • Authorization: このパラメーターを指定するには、[Authorize] をクリックします。ポップアップ ウィンドウが開くので、資格情報を入力します。

Azure Data Lake Analytics アカウントが異なるサブスクリプションに存在し、リソース グループ名が異なる場合、下記のオプション パラメーターも指定します。

  • SubscriptionID
  • ResourceGroupName

[Deploy] をクリックすると、Azure Data Lake Analytics とリンクされたサービスが作成されます。

注: JSON で [Optional] とされている行のうち、値を指定しない行は、[Deploy] をクリックする前に削除する必要があります。

Azure Data Lake Store とリンクされたサービスを作成する: Azure Data Lake Store (送信先) へデータを移動する場合について説明します。

注: 前のデータ コピーのシナリオに続けてこのシナリオを進める場合、このリンクされたサービスは既に作成されています。

[New Data Store]、[Azure Data Lake Store] の順にクリックします。

Azure Data Lake Store とリンクされたサービスで必要なパラメーターを入力します。

DataLakeUri: 上記の手順で作成するか、既存のものを使用します (例: https://<adlstoreaccountname>.azuredatalakestore.net/webhdfs/v1 の<adlstoreaccountname> の部分を Azure Data Lake Store のアカウント名に変更します)。

Authorization: このパラメーターを指定するには、[Authorize] をクリックします。ポップアップ ウィンドウが開くので、資格情報を入力します。

Azure Data Lake Store アカウントが異なるサブスクリプションに存在し、現在の Data Factory とリソース グループ名が異なる場合、下記のパラメーターも指定する必要があります。

  • AccountName
  • SubscriptionID
  • ResourceGroupName

[Deploy] をクリックすると、Azure Data Lake Store とリンクされたサービスが作成されます。

注: JSON で [Optional] とされている行のうち、値を指定しない行は、[Deploy] をクリックする前に削除する必要があります。

ADF のデータセットを作成する

Azure Data Lake Store の送信元データセットを作成する:

注: 前のデータ コピーのシナリオに続けてこのシナリオを進める場合、このデータセットは既に作成されています。

[New Dataset]、[Azure Data Lake Store] の順にクリックします。

Azure Data Lake Store のデータセット テンプレートが用意されるので、すべての値を入力します。

下記は、Azure Data Lake Store のデータセットの例です。わかりやすくするために、句による時間ごとのパーティション分割、および static フォルダーは使用していません。下記データセットでは、データを Azure Data Lake 内の datalake/input/ フォルダーにコピーするように指定しています。

 {
    "name": "DataLakeTable",
    "properties": {
        "published": false,
        "type": "AzureDataLakeStore",
        "linkedServiceName": "AzureDataLakeStoreLinkedService",
        "typeProperties": {
            "folderPath": "datalake/input/",
            "fileName": "SearchLog.tsv",
            "format": {
                "type": "TextFormat",
                "rowDelimiter": "\n",
                "columnDelimiter": "\t"
            }
        },
        "availability": {
            "frequency": "Day",
            "interval": 1
        }
    }
}

Azure Data Lake Store の送信先データセットを作成する:

[New Dataset]、[Azure Data Lake Store] の順にクリックします。

下記の例では、EventsByEnGbRegionTable データセットを定義しています。このデータセットに対応するデータは、AzureDataLakeAnalytics という U-SQL スクリプトを実行し "en-gb" ロケールで日付の値が “2012/02/19” より小さいイベントをすべて取得した後に生成されます。

 {
    "name": "EventsByEnGbRegionTable",
    "properties": {
        "published": false,
        "type": "AzureDataLakeStore",
        "linkedServiceName": "AzureDataLakeStoreLinkedService",
        "typeProperties": {
            "folderPath": "datalake/output/"
        },
        "availability": {
            "frequency": "Day",
            "interval": 1
        }
    }
}

ADF パイプラインを作成する

ADF AzureDataLakeAnalytics パイプラインを作成する: パイプラインで U-SQL アクティビティを実行して処理する場合について説明します。

[New Pipeline] をクリックすると、パイプラインのテンプレートのサンプルが開きます。

また、[New Pipeline] をクリックした後に [Add Activity] をクリックして、 DataLakeAnalytics U-SQL アクティビティにテンプレートを追加することもできます。

下記のパイプラインでは Azure Data Lake Analytics U-SQL アクティビティが実行され、"en-gb" ロケールで日付の値が “2012/02/19” より小さいイベントがすべて取得されます。

パイプラインの定義

 {
    "name": "ComputeEventsByEnGbRegionPipeline",
    "properties": {
        "description": "This is a pipeline to compute events for en-gb locale and date less than 2012/02/19.",
        "activities": [
            {
                "type": "DataLakeAnalyticsU-SQL",
                "typeProperties": {
                    "scriptPath": "scripts\\kona\\SearchLogProcessing.txt",
                    "scriptLinkedService": "StorageLinkedService",
                    "degreeOfParallelism": 3,
                    "priority": 100,
                    "parameters": {
                        "in": "/datalake/input/SearchLog.tsv",
                        "out": "/datalake/output/Result.tsv"
                    }
                },
                "inputs": [
                    {
                        "name": "DataLakeTable"
                    }
                ],
                "outputs": [
                    {
                        "name": "EventsByEnGbRegionTable"
                    }
                ],
                "policy": {
                    "timeout": "06:00:00",
                    "concurrency": 1,
                    "executionPriorityOrder": "NewestFirst",
                    "retry": 1
                },
                "scheduler": {
                    "frequency": "Day",
                    "interval": 1
                },
                "name": "EventsByRegion",
                "linkedServiceName": "AzureDataLakeAnalyticsLinkedService"
            }
        ],
        "start": "2015-08-08T00:00:00Z",
        "end": "2015-08-08T01:00:00Z",
        "isPaused": false
    }
}

上記のパイプラインで実行されるこの U-SQL スクリプトは、デプロイされた StorageLinkedService に対応する Azure Blob ストレージ アカウントの scripts/kona フォルダー内に存在します。

SearchLogProcessing.txt スクリプトの定義

 @searchlog =
        EXTRACT UserId             int,
                       Start               DateTime,
                       Region            string,
                       Query             string,
                       Duration         int?,
                       Urls                string,
                       ClickedUrls      string
    FROM @in
    USING Extractors.Tsv(nullEscape:"#NULL#");

@rs1 =
    SELECT Start, Region, Duration
    FROM @searchlog
WHERE Region == "en-gb";

@rs1 =
    SELECT Start, Region, Duration
    FROM @rs1
    WHERE Start <= DateTime.Parse("2012/02/19");

OUTPUT @rs1   
    TO @out
      USING Outputters.Tsv(quoting:false, dateTimeFormat:null);

上記の U-SQL スクリプトの @in パラメーターおよび @out パラメーターの値は、Parameters セクションの内容を受けて ADF が動的に指定します。パイプラインの定義の Parameters セクションを確認してください。

他にも、パイプラインの定義では、degreeOfParallelism や priority などのパラメーターを指定して Azure Data Lake Analytics サービスで実行されるジョブに適用することもできます。

ADF パイプラインを監視する

前のセクションで作成した ADF のコピー パイプラインは、実行頻度が 1 日 1 回、start と end が 2015 年 8 月 8 日と定義されているため、このパイプラインおよび U-SQL スクリプトは 1 回だけ実行されます。ADF パイプラインのスケジュール設定の詳細については、こちらのページを参照してください。

ADF の [Diagram] ビューに移動して、Data Factory の動作フローを表示します。ここでは、EgressBlobToDataLakePipeline (Azure BLOB ストレージから Azure Data Lake Store にデータをコピー) と ComputeEventsByEnGbRegionPipeline ("en-gb" ロケールで日付の値が “2012/02/19” より小さいイベントをすべて取得) の 2 つのパイプラインと対応するデータセットが表示されます。

[Diagram] ビューで [EventsByEnGbRegionTable] をクリックすると、実行されるアクティビティとそのステータスが表示されます。

ADF の ComputeEventsByEnGbRegionPipeline の U-SQL アクティビティが正常に実行され、Results.tsv ファイル (/datalake/output/Result.tsv) が AzureDataLakeStore アカウントで作成されていることがわかります。この Result.tsv には、"en-gb" ロケールで日付の値が “2012/02/19” よりも小さいイベントがすべて含まれます。また、Microsoft Azure ポータルにログインして Azure Data Lake データ エクスプローラーを使用しても、上記の手順の Azure Data Lake Store の処理で生成された Result.tsv を表示できます。

Azure Data Factory の AzureDataLakeAnalytics U-SQL アクティビティの詳細なドキュメントは、こちらから参照してください (英語)

今回ご紹介した手順では、Azure Data Factory を使用して E2E のビッグ データ パイプラインを作成することが可能で、またこのパイプラインを使用してデータを Azure Data Lake Store に移動することができます。さらに、Azure Data Lake Analytics で処理段階の 1 つとして U-SQL スクリプトを実行したり、ニーズに応じてスケールを動的に変更したりできます。

マイクロソフトでは、ビッグ データ処理と分析のワークフローを向上させるソリューション開発を今後も継続的に進めていきます。マイクロソフト クラウド プラットフォーム チームによる Microsoft Azure Data Lake の詳細については、こちらのページをご覧ください。また、こちらのビデオ (英語) では、Azure Data Factory でパイプラインを作成する方法や Data Factory の使用方法を説明しています。Data Factory に関するご意見や新機能のご要望がありましたら、ぜひ Azure Data Factory フォーラム (英語) まで投稿してください。