diff --git a/CHANGELOG.md b/CHANGELOG.md
index a0eaf984..785a6e8f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,6 @@
+### 3.7.0
+- Adds a new sink for streaming writes "com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBBulkSinkProvider" that will use bulk ingestion internally
+
### 3.6.8
- Reduces the performance overhead when a Spark DataFrame as many partitions - especially when using Cosmos DB as a sink in Spark Streaming scenarios
diff --git a/pom.xml b/pom.xml
index 657739da..25f2fb8c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,7 +21,7 @@ limitations under the License.
com.microsoft.azure
azure-cosmosdb-spark_2.4.0_2.11
jar
- 3.6.8
+ 3.7.0
${project.groupId}:${project.artifactId}
Spark Connector for Microsoft Azure CosmosDB
http://azure.microsoft.com/en-us/services/documentdb/
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala
index 46cd2865..61d58bda 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala
@@ -23,6 +23,6 @@
package com.microsoft.azure.cosmosdb.spark
object Constants {
- val currentVersion = "2.4.0_2.11-3.6.8"
+ val currentVersion = "2.4.0_2.11-3.7.0"
val userAgentSuffix = s" SparkConnector/$currentVersion"
}
\ No newline at end of file
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala
index 30127fbb..4ac2a855 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala
@@ -48,6 +48,7 @@ import scala.reflect.ClassTag
import scala.reflect.runtime.universe._
import scala.util.Random
import scala.collection.JavaConversions._
+import java.time.Instant
/**
* The CosmosDBSpark allow fast creation of RDDs, DataFrames or Datasets from CosmosDBSpark.
@@ -143,6 +144,8 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
* @tparam D the type of the data in the RDD
*/
def save[D: ClassTag](rdd: RDD[D], writeConfig: Config): Unit = {
+ val start = Instant.now.toEpochMilli()
+ logDebug("--> CosmosDBSpark.save")
var numPartitions = 0
try {
numPartitions = rdd.getNumPartitions
@@ -150,6 +153,8 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
case _: Throwable => // no op
}
+ val orginalNumPartitions = numPartitions
+
val maxMiniBatchImportSizeKB: Int = writeConfig
.get[String](CosmosDBConfig.MaxMiniBatchImportSizeKB)
.getOrElse(CosmosDBConfig.DefaultMaxMiniBatchImportSizeKB.toString)
@@ -192,9 +197,33 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
}
}
- val mapRdd = rdd.coalesce(numPartitions).mapPartitions(savePartition(_, writeConfig, numPartitions,
- baseMaxMiniBatchImportSizeKB * 1024, writeThroughputBudgetPerCosmosPartition), preservesPartitioning = true)
+ val mapRdd = if (numPartitions == orginalNumPartitions) {
+ rdd
+ .mapPartitions(
+ savePartition(
+ _,
+ writeConfig,
+ numPartitions,
+ baseMaxMiniBatchImportSizeKB * 1024,
+ writeThroughputBudgetPerCosmosPartition),
+ preservesPartitioning = false)
+ } else {
+ rdd
+ .coalesce(numPartitions)
+ .mapPartitions(
+ savePartition(
+ _,
+ writeConfig,
+ numPartitions,
+ baseMaxMiniBatchImportSizeKB * 1024,
+ writeThroughputBudgetPerCosmosPartition),
+ preservesPartitioning = true)
+ }
+
mapRdd.collect()
+
+ val duration = Instant.now.toEpochMilli() - start
+ logInfo(s"<-- CosmosDBSpark.save - Duration: ${duration} ms, original #Partitions: ${orginalNumPartitions}, effective #Partitions: ${numPartitions}")
}
private def bulkUpdate[D: ClassTag](iter: Iterator[D],
@@ -444,46 +473,47 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
partitionCount: Int,
baseMaxMiniBatchImportSize: Int,
writeThroughputBudgetPerCosmosPartition: Option[Int]): Iterator[D] = {
- val connection: CosmosDBConnection = CosmosDBConnection(config)
- val asyncConnection: AsyncCosmosDBConnection = new AsyncCosmosDBConnection(config)
-
- val isBulkImporting = config.get[String](CosmosDBConfig.BulkImport).
- getOrElse(CosmosDBConfig.DefaultBulkImport.toString).
- toBoolean
-
- val upsert: Boolean = config
- .getOrElse(CosmosDBConfig.Upsert, String.valueOf(CosmosDBConfig.DefaultUpsert))
- .toBoolean
- val writingBatchSize = if (isBulkImporting) {
- config.getOrElse(CosmosDBConfig.WritingBatchSize, String.valueOf(CosmosDBConfig.DefaultWritingBatchSize_BulkInsert))
+ if (iter.nonEmpty) {
+ logError("--> savePartition (NON-EMPTY)")
+ val connection: CosmosDBConnection = CosmosDBConnection(config)
+ val asyncConnection: AsyncCosmosDBConnection = new AsyncCosmosDBConnection(config)
+
+ val isBulkImporting = config.get[String](CosmosDBConfig.BulkImport).
+ getOrElse(CosmosDBConfig.DefaultBulkImport.toString).
+ toBoolean
+
+ val upsert: Boolean = config
+ .getOrElse(CosmosDBConfig.Upsert, String.valueOf(CosmosDBConfig.DefaultUpsert))
+ .toBoolean
+ val writingBatchSize = if (isBulkImporting) {
+ config.getOrElse(CosmosDBConfig.WritingBatchSize, String.valueOf(CosmosDBConfig.DefaultWritingBatchSize_BulkInsert))
+ .toInt
+ } else {
+ config.getOrElse(CosmosDBConfig.WritingBatchSize, String.valueOf(CosmosDBConfig.DefaultWritingBatchSize_PointInsert))
+ .toInt
+ }
+
+ val writingBatchDelayMs = config
+ .getOrElse(CosmosDBConfig.WritingBatchDelayMs, String.valueOf(CosmosDBConfig.DefaultWritingBatchDelayMs))
.toInt
- } else {
- config.getOrElse(CosmosDBConfig.WritingBatchSize, String.valueOf(CosmosDBConfig.DefaultWritingBatchSize_PointInsert))
+ val rootPropertyToSave = config
+ .get[String](CosmosDBConfig.RootPropertyToSave)
+ val isBulkUpdating = config.get[String](CosmosDBConfig.BulkUpdate).
+ getOrElse(CosmosDBConfig.DefaultBulkUpdate.toString).
+ toBoolean
+
+ val maxConcurrencyPerPartitionRange = config
+ .getOrElse[String](CosmosDBConfig.BulkImportMaxConcurrencyPerPartitionRange, String.valueOf(CosmosDBConfig.DefaultBulkImportMaxConcurrencyPerPartitionRange))
.toInt
- }
-
- val writingBatchDelayMs = config
- .getOrElse(CosmosDBConfig.WritingBatchDelayMs, String.valueOf(CosmosDBConfig.DefaultWritingBatchDelayMs))
- .toInt
- val rootPropertyToSave = config
- .get[String](CosmosDBConfig.RootPropertyToSave)
- val isBulkUpdating = config.get[String](CosmosDBConfig.BulkUpdate).
- getOrElse(CosmosDBConfig.DefaultBulkUpdate.toString).
- toBoolean
-
- val maxConcurrencyPerPartitionRange = config
- .getOrElse[String](CosmosDBConfig.BulkImportMaxConcurrencyPerPartitionRange, String.valueOf(CosmosDBConfig.DefaultBulkImportMaxConcurrencyPerPartitionRange))
- .toInt
- CosmosDBSpark.lastUpsertSetting = Some(upsert)
- CosmosDBSpark.lastWritingBatchSize = Some(writingBatchSize)
+ CosmosDBSpark.lastUpsertSetting = Some(upsert)
+ CosmosDBSpark.lastWritingBatchSize = Some(writingBatchSize)
- if (iter.nonEmpty) {
if (isBulkUpdating) {
- logDebug(s"Writing partition with bulk update")
+ logError(s"Writing partition with bulk update")
bulkUpdate(iter, connection, writingBatchSize)
} else if (isBulkImporting) {
- logDebug(s"Writing partition with bulk import")
+ logError(s"Writing partition with bulk import")
bulkImport(
iter,
connection,
@@ -495,10 +525,14 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
baseMaxMiniBatchImportSize,
writeThroughputBudgetPerCosmosPartition)
} else {
- logDebug(s"Writing partition with rxjava")
+ logError(s"Writing partition with rxjava")
asyncConnection.importWithRxJava(iter, asyncConnection, writingBatchSize, writingBatchDelayMs, rootPropertyToSave, upsert)
}
- }
+
+ logError("<-- savePartition (NON-EMPTY)")
+ } else {
+ logError("<--> savePartition (EMPTY)")
+ }
new ListBuffer[D]().iterator
}
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/streaming/CosmosDBBulkSink.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/streaming/CosmosDBBulkSink.scala
new file mode 100644
index 00000000..65434f60
--- /dev/null
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/streaming/CosmosDBBulkSink.scala
@@ -0,0 +1,53 @@
+/**
+ * The MIT License (MIT)
+ * Copyright (c) 2016 Microsoft Corporation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package com.microsoft.azure.cosmosdb.spark.streaming
+
+import com.microsoft.azure.cosmosdb.spark.CosmosDBLoggingTrait
+import org.apache.spark.sql.cosmosdb.util.StreamingWriteTask
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.execution.streaming.Sink
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
+import com.microsoft.azure.cosmosdb.spark.config.Config
+
+private[spark] class CosmosDBBulkSink(sqlContext: SQLContext,
+ configMap: Map[String, String])
+ extends Sink with CosmosDBLoggingTrait with Serializable {
+
+ private var lastBatchId: Long = -1L
+ private val config = Config(configMap)
+
+ override def addBatch(batchId: Long, data: DataFrame): Unit = {
+ if (batchId <= lastBatchId) {
+ logDebug(s"Rerun batchId $batchId")
+ } else {
+ logDebug(s"Run batchId $batchId")
+ lastBatchId = batchId
+ }
+
+ val queryExecution: QueryExecution = data.queryExecution
+ val schemaOutput = queryExecution.analyzed.output
+ val rdd = queryExecution.toRdd
+ CosmosDBSpark.save(rdd, config)
+ }
+}
\ No newline at end of file
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/streaming/CosmosDBBulkSinkProvider.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/streaming/CosmosDBBulkSinkProvider.scala
new file mode 100644
index 00000000..a013bf12
--- /dev/null
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/streaming/CosmosDBBulkSinkProvider.scala
@@ -0,0 +1,42 @@
+/**
+ * The MIT License (MIT)
+ * Copyright (c) 2016 Microsoft Corporation
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package com.microsoft.azure.cosmosdb.spark.streaming
+
+import com.microsoft.azure.cosmosdb.spark.CosmosDBLoggingTrait
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.execution.streaming.Sink
+import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
+import org.apache.spark.sql.streaming.OutputMode
+
+class CosmosDBBulkSinkProvider extends DataSourceRegister
+ with StreamSinkProvider with CosmosDBLoggingTrait {
+
+ override def shortName(): String = "CosmosDBBulkSinkProvider"
+
+ override def createSink(sqlContext: SQLContext,
+ parameters: Map[String, String],
+ partitionColumns: Seq[String],
+ outputMode: OutputMode): Sink = {
+ new CosmosDBBulkSink(sqlContext, parameters)
+ }
+}