Skip to content
This repository was archived by the owner on Mar 10, 2025. It is now read-only.

Added sbt-based project for Scala samples #110

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions samples/scala/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
airports.dat
lib/
spark-warehouse/

dist/*
target/
lib_managed/
src_managed/
project/boot/
project/plugins/project/
.history
.cache
.lib/
.env
.idea/

40 changes: 40 additions & 0 deletions samples/scala/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
azure-cosmosdb-spark-samples-scala
============================
Examples of Cosmos DB on Spark that can serve as starting points for
Scala-based Cosmos DB Spark projects.

Setup
-----
Before running any of the samples, you will need

1. A working version of SBT (or higher recommended).

2. The following enviroment variables set:

```
export COSMOS_DB_ENDPOINT=
export COSMOS_DB_MASTER_KEY=
```

3. A Cosmos DB database setup on Azure. You may run something like the
following Azure CLI commands:

```
az cosmosdb database create --db-name samples --url-connection $COSMOS_DB_ENDPOINT --key $COSMOS_DB_MASTER_KEY
az cosmosdb collection create --collection-name airports --db-name samples --throughput 5000 --url-connection $COSMOS_DB_ENDPOINT --key $COSMOS_DB_MASTER_KEY
```

Running Samples
---------------
The easiest way to run the samples is to build a uber jar and submit it to
spark like so (replace
`com.microsoft.partnercatalyst.cosmosdb.samples.CSVToCosmos` with your prefered
sample):

```
sbt assembly
spark-submit --class com.microsoft.partnercatalyst.cosmosdb.samples.CountByCountry target/scala-2.11/azure-cosmosdb-spark-samples-assembly-0.1-SNAPSHOT.jar
```

You may also run the examples by opening this project in IntelliJ.

45 changes: 45 additions & 0 deletions samples/scala/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
organization := "com.github.catalystcode"
name := "azure-cosmosdb-spark-samples"
description := "Scala examples of Cosmos DB on Spark that can serve as starting points for Cosmos DB-based Spark projects."

scalaVersion := "2.11.7"

scalacOptions ++= Seq(
"-unchecked",
"-deprecation",
"-feature"
)

val sparkVersion = "2.2.0"

libraryDependencies ++= Seq(
"com.microsoft.azure" % "azure-documentdb" % "1.12.0",
"com.microsoft.azure" % "azure-documentdb-rx" % "0.9.0-rc1",
"com.microsoft.azure" % "azure-cosmosdb-spark_2.2.0_2.11" % "0.0.3" excludeAll(
ExclusionRule(organization = "org.apache.tinkerpop"),
ExclusionRule(organization = "com.fasterxml.jackson.core", name = "jackson-databind")
),

"org.apache.tinkerpop" % "spark-gremlin" % "3.2.5" excludeAll(
ExclusionRule(organization = "org.apache.spark"),
ExclusionRule(organization = "org.scala-lang")
),
"org.apache.tinkerpop" % "tinkergraph-gremlin" % "3.2.5",

"com.fasterxml.jackson.core" % "jackson-annotations" % "2.8.0",
"com.fasterxml.jackson.core" % "jackson-core" % "2.8.3",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.8.3",
"com.fasterxml.jackson.module" % "jackson-module-paranamer" % "2.8.9",
"com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.8.9",
Copy link
Author

@jcjimenez jcjimenez Aug 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please note that before adding these (and the corresponding exclusion above), I was getting an exception citing a conflict in the minor version(s) of jackson modules. I'm happy to file an issue as well as to roll this change into #105


"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"org.apache.spark" %% "spark-mllib" % sparkVersion
).map(_ % "compile")

assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}

2 changes: 2 additions & 0 deletions samples/scala/project/assembly.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5")

1 change: 1 addition & 0 deletions samples/scala/project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=0.13.15
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.microsoft.partnercatalyst.cosmosdb.samples

case class Airport(airportId: String,
name: String,
city: String,
country: String,
iata: String /* 3-letter IATA code. Null if not assigned/unknown.*/ ,
icao: String /* 4-letter ICAO code.*/ ,
latitude: Double,
longitude: Double,
altitude: Double,
timezone: Double,
dst: String,
tz: String,
airportType: String,
source: String)
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.microsoft.partnercatalyst.cosmosdb.samples

import com.microsoft.azure.cosmosdb.spark.config.{Config, CosmosDBConfig}
import com.microsoft.azure.cosmosdb.spark.schema._
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Minutes, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object CountAggregates {

def main(args: Array[String]): Unit = {
val appName = this.getClass.getSimpleName
val conf = new SparkConf()
.setAppName(appName)
.setIfMissing("spark.master", "local[*]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Minutes(1))
val spark = SparkSession.builder().appName(ssc.sparkContext.appName).getOrCreate()
import spark.implicits._

sc.setLogLevel("ERROR")

val configMap = Map[String, String](
CosmosDBConfig.Endpoint -> sys.env("COSMOS_DB_ENDPOINT"),
CosmosDBConfig.Masterkey -> sys.env("COSMOS_DB_MASTER_KEY"),
CosmosDBConfig.Database -> "samples",
CosmosDBConfig.Collection -> "airports",
CosmosDBConfig.ConnectionMode -> "Gateway"
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please note that omitting the ConnectionMode entry resulted in an exception like the following:

Caused by: com.microsoft.azure.documentdb.DocumentClientException: Server could not parse the Url., StatusCode: NotFound
	at com.microsoft.azure.documentdb.internal.ErrorUtils.maybeThrowException(ErrorUtils.java:69)
	at com.microsoft.azure.documentdb.internal.GatewayProxy.performGetRequest(GatewayProxy.java:240)
	at com.microsoft.azure.documentdb.internal.GatewayProxy.doReadFeed(GatewayProxy.java:109)
	at com.microsoft.azure.documentdb.internal.GatewayProxy.processMessage(GatewayProxy.java:338)
	at com.microsoft.azure.documentdb.DocumentClient$9.apply(DocumentClient.java:2996)
	at com.microsoft.azure.documentdb.internal.RetryUtility.executeDocumentClientRequest(RetryUtility.java:58)
	at com.microsoft.azure.documentdb.DocumentClient.doReadFeed(DocumentClient.java:3006)
	at com.microsoft.azure.documentdb.DocumentQueryClientInternal.doReadFeed(DocumentQueryClientInternal.java:36)
	at com.microsoft.azure.documentdb.internal.query.AbstractQueryExecutionContext.executeRequest(AbstractQueryExecutionContext.java:215)
	at com.microsoft.azure.documentdb.internal.query.DefaultQueryExecutionContext.executeOnce(DefaultQueryExecutionContext.java:131)
	at com.microsoft.azure.documentdb.internal.query.DefaultQueryExecutionContext.fillBuffer(DefaultQueryExecutionContext.java:101)
	at com.microsoft.azure.documentdb.internal.query.DefaultQueryExecutionContext.next(DefaultQueryExecutionContext.java:84)

)
val cosmosConfig = Config(configMap)

val airportsDF = spark.read.cosmosDB(cosmosConfig).as[Airport]
airportsDF.createOrReplaceTempView("airports")

spark.sqlContext.sql("select airports.country, count(1) as airport_count from airports group by airports.country")
.createOrReplaceTempView("airport_counts")

spark.sqlContext.sql("select min(airport_count), max(airport_count), avg(airport_count), stddev_pop(airport_count) from airport_counts")
.show()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.microsoft.partnercatalyst.cosmosdb.samples

import com.microsoft.azure.cosmosdb.spark.config.{Config, CosmosDBConfig}
import com.microsoft.azure.cosmosdb.spark.schema._
import org.apache.spark.rdd.RDD._
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Minutes, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object CountByCountry {

def main(args: Array[String]): Unit = {
val appName = this.getClass.getSimpleName
val conf = new SparkConf()
.setAppName(appName)
.setIfMissing("spark.master", "local[*]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Minutes(1))
val spark = SparkSession.builder().appName(ssc.sparkContext.appName).getOrCreate()
import spark.implicits._

val cosmosConfig = Config(Map(
CosmosDBConfig.Endpoint -> sys.env("COSMOS_DB_ENDPOINT"),
CosmosDBConfig.Masterkey -> sys.env("COSMOS_DB_MASTER_KEY"),
CosmosDBConfig.Database -> "samples",
CosmosDBConfig.Collection -> "airports",
CosmosDBConfig.ConnectionMode -> "Gateway"
))

val airportsDF = spark.read.cosmosDB(cosmosConfig).as[Airport]
airportsDF.cache()
airportsDF.show()

// Count number of aiports by country using reduceByKey from the incoming RDD
val rddStart = System.currentTimeMillis()
val reducedByKey = airportsDF.map(r=>(r.country, 1))
.rdd
.reduceByKey(_+_)
.toDF("country", "airport_count")
reducedByKey.show()
val rddStop = System.currentTimeMillis()

airportsDF.createOrReplaceTempView("airports")
val airportsTempView = airportsDF.sqlContext.table("airports")
airportsTempView.show()

// Perform the same count using SQL
val sqlStart = System.currentTimeMillis()
val counts = airportsDF.sqlContext.sql("select airports.country, count(1) as airport_count from airports group by airports.country")
counts.show()
val sqlStop = System.currentTimeMillis()

// Once again, perform the count by fetching from the temp table and calling reduceByKey.
val rdd2Start = System.currentTimeMillis()
airportsTempView.select("country", "airportId")
.rdd
.map(row=>(row.getString(1), 1))
.reduceByKey(_+_)
.toDF("country", "airport_count")
.show()
val rdd2Stop = System.currentTimeMillis()

println(s"sql time in millis: ${sqlStop - sqlStart}")
println(s"rdd time in millis: ${rddStop - rddStart}")
println(s"rdd2 time in millis: ${rdd2Stop - rdd2Start}")
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.microsoft.partnercatalyst.cosmosdb.samples

import java.io.{File, PrintWriter}

import com.microsoft.azure.cosmosdb.spark.config.{Config, CosmosDBConfig}
import com.microsoft.azure.cosmosdb.spark.schema._
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.streaming.{Minutes, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object UploadToCosmos {

def main(args: Array[String]): Unit = {
val appName = this.getClass.getSimpleName
val conf = new SparkConf()
.setAppName(appName)
.setIfMissing("spark.master", "local[*]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Minutes(1))
val spark = SparkSession.builder().appName(ssc.sparkContext.appName).getOrCreate()

// Originally referenced in https://github.com/dennyglee/databricks/blob/master/notebooks/Users/denny%40databricks.com/flights/On-Time%20Flight%20Performance.py
val writer = new PrintWriter(new File("airports.dat"))
scala.io.Source.fromURL("https://raw.githubusercontent.com/jpatokal/openflights/master/data/airports.dat").getLines().foreach(writer.println)

import spark.implicits._
val airports = spark.read.csv("airports.dat").map(row => {
Airport(
row.getString(0),
row.getString(1),
row.getString(2),
row.getString(3),
row.getString(4),
row.getString(5),
SafeStringToDouble(row.getString(6)),
SafeStringToDouble(row.getString(7)),
SafeStringToDouble(row.getString(8)),
SafeStringToDouble(row.getString(9)),
row.getString(10),
row.getString(11),
row.getString(12),
row.getString(13)
)
})
airports.show()

val cosmosConfig = Config(Map(
CosmosDBConfig.Endpoint -> sys.env("COSMOS_DB_ENDPOINT"),
CosmosDBConfig.Masterkey -> sys.env("COSMOS_DB_MASTER_KEY"),
CosmosDBConfig.Database -> "samples",
CosmosDBConfig.Collection -> "airports",
CosmosDBConfig.SamplingRatio -> "1.0",
CosmosDBConfig.QueryMaxRetryOnThrottled -> "10",
CosmosDBConfig.QueryMaxRetryWaitTimeSecs -> "10"
))

airports.write.mode(SaveMode.Append).cosmosDB(cosmosConfig)
}

object SafeStringToDouble extends Serializable {
def apply(str: String): Double = {
try {
if (str == null) 0.0 else str.toDouble
} catch {
case nfe: NumberFormatException => 0.0
}
}
}

}