Cosmos DB 用 Spark コネクタ - グローバル分散型マルチモデル データをシームレスに活用

執筆者: Denny Lee (Principal Program Manager, Azure CosmosDB)

このポストは、6 月 7 日に投稿された Spark Connector for #CosmosDB - seamless interaction with globally-distributed, multi-model data の翻訳です。

 

image

このたび、Azure Cosmos DB 用 Spark コネクタがマルチモデルに完全対応したことが発表されました。Azure Cosmos DB では、使い慣れたツールや API を利用してグローバル分散型アプリをさらに簡単に作成できる環境作りを目指しています。Azure Cosmos DB のデータベース エンジンは、SQL (DocumentDB) APIMongoDB API (英語)Gremlin (Graph) APIAzure Table Storage API をネイティブにサポートしています。新しい Azure Cosmos DB 用 Spark コネクタを使用すると、ドキュメント、テーブル、グラフの Azure Cosmos DB の各データ モデルを Apache Spark で操作できるようになります。

Azure Cosmos DB とは

Azure Cosmos DB は、マイクロソフトの基幹業務アプリケーション向けグローバル分散型 (英語) マルチモデル データベース サービスです。Azure Cosmos DB は、ターンキー方式のグローバル分散、世界規模でのスループットとストレージの弾力的なスケールアウト、99 パーセンタイルにおける 10 ミリ秒未満のレイテンシ、明確に定義された 5 種類の整合性レベル、高可用性などを、すべて業界最高レベルの包括的な SLA (英語) で保証しています。管理者がスキーマやインデックスを管理する必要がなく、すべてのデータのインデックスを自動で作成 (英語) します。さらにマルチモデル データベースとして、ドキュメント、キー/値、グラフ、列指向の各データ モデルをサポートしています。クラウド ネイティブなサービスの Azure Cosmos DB は、開発当初からマルチテナントやグローバルな分散に十分に配慮したうえで設計されています。

Apache Spark と Azure Cosmos DB で世界中に分散されたデータを使ったリアルタイム機械学習を実行

Azure Cosmos DB 用 Spark コネクタを使用すると、Azure Cosmos DB で世界中に分散しているデータを基にしたリアルタイムでのデータ サイエンス、機械学習、高度な分析、データ探索を行うことができます。Apache Spark を接続して Azure Cosmos DB 上で瞬時にデータの永続化やクエリを実行できるため、動きの速いデータ サイエンスにおける処理能力が向上します。Azure Cosmos DB 内で管理されているインデックスを効率的に利用して、分析時に列を更新することも可能です。さらに、変化の激しいグローバル分散型データに対するプッシュダウン述語フィルタリングを備えており、IoT、データ サイエンス、分析などのさまざまなシナリオ (英語) をサポートします。

このほか、Azure Cosmos DB と Spark を利用すると次のことが可能になります。

  • データの抽出、変換、読み込み (ETL) のストリーム化
  • データ エンリッチメント
  • イベント検出のトリガー
  • 複雑なセッション分析やカスタマイズ
  • 視覚的なデータ探索やインタラクティブ分析
  • データ探索、情報共有、共同作業などのノートブック エクスペリエンス

Azure Cosmos DB 用 Spark コネクタは、Azure DocumentDB Java SDK (英語) を使用しています。GitHub (英語) から Spark コネクタをダウンロードしてすぐに使用を開始 (英語) できます。

Azure Cosmos DB テーブルの使用方法

Azure Cosmos DB では、キー/値ストアが必要なアプリケーションに Table API を提供することで、柔軟なスキーマ、予測可能なパフォーマンス、グローバル分散に対応しています。Azure Cosmos DB は、Azure Table Storage SDK および REST API と連携させることができます。Azure Cosmos DB では、現在パブリック プレビュー版で提供されている、スループットに最適化されたテーブル (通称 Premium テーブル) をサポートしています。

image

以下は、Azure Cosmos DB 用 Spark コネクタを使用して Apache Spark を Azure Cosmos DB Table API に接続する場合の例です。

 // 初期化
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config

val readConfig = Config(Map("Endpoint" -> "https://$tableContainer$.documents.azure.com:443/",
"Masterkey" -> "$masterkey$",
"Database" -> "$tableDatabase$",
"Collection" -> "$tableCollection$",
"SamplingRatio" -> "1.0"))


// コレクションの接続を作成
val tblCntr = spark.sqlContext.read.cosmosDB(readConfig)
tblCntr.createOrReplaceTempView("tableContainer")

テーブルへの接続が確立されたら、Spark DataFrame (前の例では tblCntr) を作成します。

 // tblCntr DataFrame スキーマを出力
scala> tblCntr.printSchema()
root
 |-- _etag: string (nullable = true)
 |-- $id: string (nullable = true)
 |-- _rid: string (nullable = true)
 |-- _attachments: string (nullable = true)
 |-- City: struct (nullable = true)
 |    |-- $t: integer (nullable = true)
 |    |-- $v: string (nullable = true)
 |-- State: struct (nullable = true)
 |    |-- $t: integer (nullable = true)
 |    |-- $v: string (nullable = true)
 |-- $pk: string (nullable = true)
 |-- id: string (nullable = true)
 |-- _self: string (nullable = true)
 |-- _ts: integer (nullable = true)



// Azure Cosmos DB テーブルに対して Spark SQL クエリを実行
scala > spark.sql("select `$id`, `$pk`, City.`$v` as City, State.`$v` as State from tableContainer where City.`$v` = 'Seattle'").show()
+----+-----+-------+-----+
| $id|  $pk|   City|State|
+----+-----+-------+-----+
|John|Smith|Seattle|   WA|
+----+-----+-------+-----+

すばやく簡単にスキーマを操作し、Azure Cosmos DB テーブルに対して Spark SQL クエリを実行できます。

Azure Cosmos DB グラフの使用方法

Azure Cosmos DB では、ターンキー方式のグローバルな分散、ストレージとスループットの弾力的なスケールアウト、99 パーセンタイル値におけるレイテンシ (読み取り 10 ミリ秒未満および書き込み 15 ミリ秒未満)、インデックスとクエリの自動作成、整合性レベルの調整、99.99% の可用性を保証する包括的な SLA を提供していますが、これらに加え、グラフ モデリングおよびトラバーサル API も提供しています。Apache TinkerPop (英語) のグラフ トラバーサル言語である Gremlin (英語) のクエリを実行でき、また Apache Spark GraphX などの TinkerPop と互換性を持つ他のグラフ システムともシームレスに統合されています。

image

Azure Cosmos DB 用 Spark コネクタを使用して Apache Spark を Azure Cosmos DB Graph API に接続する場合の例を次に示します。

 // 初期化
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config


// 地図
val baseConfigMap = Map(
"Endpoint" -> "https://$graphContainer$.documents.azure.com:443/",
"Masterkey" -> "$masterKey$"
"Database" -> "$database$",
"Collection" -> "$collection$", 
"SamplingRatio" -> "1.0",
"schema_samplesize" -> "1000"
)

val airportConfigMap = baseConfigMap ++ Map("query_custom" -> "select * from c where c.label='airport'") 
val delayConfigMap = baseConfigMap ++ Map("query_custom" -> "select * from c where c.label='flight'") 


// 構成
// 空港データの取得 (頂点)
val airportConfig = Config(airportConfigMap)
val airportColl = spark.sqlContext.read.cosmosDB(airportConfig)
airportColl.createOrReplaceTempView("airportColl") 

// フライトの遅延データの取得 (辺)
val delayConfig = Config(delayConfigMap)
val delayColl = spark.sqlContext.read.cosmosDB(delayConfig)
delayColl.createOrReplaceTempView("delayColl") 

この例では、空港データ (頂点) とフライトの遅延データ (辺) の 2 つの Spark DataFrame を作成しました。Azure Cosmos DB に格納されているグラフは下図のように視覚化されます。この図では、頂点は空港を表す青い円、辺は都市間のフライトを表す黒い直線で示されています。この例では、フライト (辺) の出発地はシアトル (すべての辺の起点となっている地図左上の青い円) です。

image

図: Airport D3.js で視覚化された空港 (青い円) と都市間のフライト (黒い直線)

Spark を Cosmos DB グラフに統合するメリット

Azure Cosmos DB グラフと Spark コネクタを統合する大きなメリットの 1 つは、他の Spark クエリのように、Gremlin のクエリや Spark DataFrame を同一データ コンテナー (グラフ、テーブル、ドキュメント コレクションなど) に対して実行できるという点です。次の例では、Azure Cosmos DB グラフに格納されているフライト グラフに対して簡単な Gremlin Groovy クエリを実行しています。

          \,,,/
         (o o)
-----oOOo-(3)-oOOo-----
plugin activated: tinkerpop.server
plugin activated: tinkerpop.utilities
plugin activated: tinkerpop.tinkergraph
gremlin> :remote connect tinkerpop.server conf/remote-secure.yaml
==>Configured tychostation.graphs.azure.com/52.173.137.146:443

gremlin> // シアトルから特定の都市へのフライト数
==>true
gremlin> :> g.V().has('iata', 'SEA').outE('flight').inV().values('city').groupCount()
==>[Chicago:1088,New York:432,Dallas:800,Miami:90,Washington DC:383,Newark:345,Boston:315,Orlando:116,Philadelphia:193,Fort Lauderdale:90,Minneapolis:601,Juneau:180,Ketchikan:270,Anchorage:1097,Fairbanks:260,San Jose:611,San Francisco:1698,San Diego:617,Oakland:798,Sacramento:629,Los Angeles:1804,Orange County:569,Burbank:266,Ontario:201,Palm Springs:236,Las Vegas:1204,Phoenix:1228,Tucson:90,Austin:90,Denver:1231,Spokane:269,San Antonio:90,Salt Lake City:860,Houston:568,Atlanta:521,St. Louis:90,Kansas City:95,Honolulu, Oahu:415,Kahului, Maui:270,Lihue, Kauai:128,Long Beach:345,Detroit:244,Cincinnati:4,Omaha:90,Santa Barbara:90,Fresno:142,Colorado Springs:90,Portland:602,Jackson Hole:13,Cleveland:6,Charlotte:169,Albuquerque:105,Reno:90,Milwaukee:82]


gremlin> // シアトル発リノ行きのフライトの遅延
==>true
gremlin> :> g.V().has('iata', 'SEA').outE('flight').as('e').inV().has('iata', 'RNO').select('e').values('delay').sum()
==>963

前の例のコードでは、tychostation グラフ (tychostation.graphs.azure.com) に接続して Gremlin Groovy クエリを実行しています。

  • データセット内のシアトルからリッスン対象の都市へのフライト数を導き出すには、グラフ トラバーサルと groupCount() を使用します (例ではシアトル発シカゴ行きは 1,088 便)。
  • シアトル発リノ行きの 90 便の合計遅延 (分) を導き出すには、グラフを使用します (例では 963 分の遅延)。

Spark コネクタで同じ tychostation グラフを使用すると、独自の Spark DataFrame クエリを実行することもできます。Spark コネクタのコード スニペットに続いて、独自の Spark SQL クエリを紹介します。この例では、HDInsight Jupyter ノートブック サービスを使用しています。

シアトル発フライトの目的地トップ 5 都市

 %%sql
select a.city, sum(f.delay) as TotalDelay 
from delays f 
join airports a 
  on a.iata = f.dst 
where f.src = 'SEA' and f.delay < 0 
group by a.city 
order by sum(f.delay) limit 5

image

シアトル発フライトの目的地までの遅延時間の中央値

 %%sql
select a.city, percentile_approx(f.delay, 0.5) as median_delay 
from delays f 
join airports a 
  on a.iata = f.dst 
where f.src = 'SEA' and f.delay < 0 
group by a.city 
order by median_delay

image

Azure Cosmos DB では、同一のグラフに対して Apache Tinkerpop Gremlin クエリと Apache Spark DataFrame クエリの両方を扱えます。

Azure Cosmos DB ドキュメント データ モデルの使用方法

Azure Cosmos DB では、グラフ、テーブル、ドキュメントのいずれのモデルの場合でも、Azure Cosmos DB 用 Spark コネクタで使用するコードは同じです。手順は下記のとおりです。

  1. 接続を構成します。
  2. 独自の Config と DataFrame を構築します。
  3. これで Apache Spark と Azure Cosmos DB を連携させることができます。
 // 初期化
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config


// 接続を構成
val baseConfigMap = Map(
"Endpoint" -> "https://$documentContainer$.documents.azure.com:443/",
"Masterkey" -> "$masterKey$"
"Database" -> "$database$",
"Collection" -> "$collection$", 
"SamplingRatio" -> "1.0",
"schema_samplesize" -> "1000"
)

// Config と DataFrame を構築
val baseConfig = Config(baseConfigMap)
val baseColl = spark.sqlContext.read.cosmosDB(baseConfig)

また、Azure Cosmos DB 用 Spark コネクタでは、Spark ワーカー ノードと Azure Cosmos DB データ パーティションの間でデータが並列化されます。このため、データがテーブル、グラフ、ドキュメントのいずれに格納されている場合でも、Azure Cosmos DB のパフォーマンスやスケーラビリティ、スループット、整合性を活用して、Apache Spark で機械学習やデータ サイエンスの処理を行うことができます。

image

次のステップ

今回の記事では、Azure Cosmos DB 用 Spark コネクタを使用して Azure Cosmos DB でサポートされている複数のデータ モデルをシームレスに操作する方法を紹介しました。Apache Spark と Azure Cosmos DB を併用すると、ビッグ データに対するアドホックのインタラクティブなクエリ、高度な分析やデータ サイエンス、機械学習、人工知能などのいずれの用途にも対応できます。Azure Cosmos DB では、ソーシャル分析、時系列、ゲームやアプリケーション テレメトリ、販売カタログ、最新のトレンドやカウンター、監査ログ システムなど、世界中の多様なソースから続々と収集されるデータを取り込むことができます。また、Spark では、Azure Cosmos DB 内のグローバルに分散された大規模データに対して、高度な分析や AI アルゴリズムを実行できます。Azure Cosmos DB は、業界初のグローバル分散型マルチモデル データベース サービスとして、今後 Spark コネクタをテーブル、グラフ、ドキュメント以外のさまざまなデータ モデルに対応させていく予定です。

Azure Cosmos DB でクエリを実行するには、まず Azure ポータル で Azure Cosmos DB アカウントを新規作成し、Azure-CosmosDB-Spark GitHub リポジトリ (英語) のプロジェクトで作業を行います。

Azure Cosmos DB に関する最新のニュースや機能の情報は、Twitter アカウント (@AzureCosmosDB) やハッシュタグ (#CosmosDB) でご確認いただけます。また、Stack Overflow の開発者フォーラム (英語) もぜひご利用ください。