diff --git a/CHANGELOG.md b/CHANGELOG.md index a0eaf984..785a6e8f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +### 3.7.0 +- Adds a new sink for streaming writes "com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBBulkSinkProvider" that will use bulk ingestion internally + ### 3.6.8 - Reduces the performance overhead when a Spark DataFrame as many partitions - especially when using Cosmos DB as a sink in Spark Streaming scenarios diff --git a/pom.xml b/pom.xml index 657739da..25f2fb8c 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,7 @@ limitations under the License. com.microsoft.azure azure-cosmosdb-spark_2.4.0_2.11 jar - 3.6.8 + 3.7.0 ${project.groupId}:${project.artifactId} Spark Connector for Microsoft Azure CosmosDB http://azure.microsoft.com/en-us/services/documentdb/ diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala index 46cd2865..61d58bda 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala @@ -23,6 +23,6 @@ package com.microsoft.azure.cosmosdb.spark object Constants { - val currentVersion = "2.4.0_2.11-3.6.8" + val currentVersion = "2.4.0_2.11-3.7.0" val userAgentSuffix = s" SparkConnector/$currentVersion" } \ No newline at end of file diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala index 30127fbb..4ac2a855 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala @@ -48,6 +48,7 @@ import scala.reflect.ClassTag import scala.reflect.runtime.universe._ import scala.util.Random import scala.collection.JavaConversions._ +import java.time.Instant /** * The CosmosDBSpark allow fast creation of RDDs, DataFrames or Datasets from CosmosDBSpark. @@ -143,6 +144,8 @@ object CosmosDBSpark extends CosmosDBLoggingTrait { * @tparam D the type of the data in the RDD */ def save[D: ClassTag](rdd: RDD[D], writeConfig: Config): Unit = { + val start = Instant.now.toEpochMilli() + logDebug("--> CosmosDBSpark.save") var numPartitions = 0 try { numPartitions = rdd.getNumPartitions @@ -150,6 +153,8 @@ object CosmosDBSpark extends CosmosDBLoggingTrait { case _: Throwable => // no op } + val orginalNumPartitions = numPartitions + val maxMiniBatchImportSizeKB: Int = writeConfig .get[String](CosmosDBConfig.MaxMiniBatchImportSizeKB) .getOrElse(CosmosDBConfig.DefaultMaxMiniBatchImportSizeKB.toString) @@ -192,9 +197,33 @@ object CosmosDBSpark extends CosmosDBLoggingTrait { } } - val mapRdd = rdd.coalesce(numPartitions).mapPartitions(savePartition(_, writeConfig, numPartitions, - baseMaxMiniBatchImportSizeKB * 1024, writeThroughputBudgetPerCosmosPartition), preservesPartitioning = true) + val mapRdd = if (numPartitions == orginalNumPartitions) { + rdd + .mapPartitions( + savePartition( + _, + writeConfig, + numPartitions, + baseMaxMiniBatchImportSizeKB * 1024, + writeThroughputBudgetPerCosmosPartition), + preservesPartitioning = false) + } else { + rdd + .coalesce(numPartitions) + .mapPartitions( + savePartition( + _, + writeConfig, + numPartitions, + baseMaxMiniBatchImportSizeKB * 1024, + writeThroughputBudgetPerCosmosPartition), + preservesPartitioning = true) + } + mapRdd.collect() + + val duration = Instant.now.toEpochMilli() - start + logInfo(s"<-- CosmosDBSpark.save - Duration: ${duration} ms, original #Partitions: ${orginalNumPartitions}, effective #Partitions: ${numPartitions}") } private def bulkUpdate[D: ClassTag](iter: Iterator[D], @@ -444,46 +473,47 @@ object CosmosDBSpark extends CosmosDBLoggingTrait { partitionCount: Int, baseMaxMiniBatchImportSize: Int, writeThroughputBudgetPerCosmosPartition: Option[Int]): Iterator[D] = { - val connection: CosmosDBConnection = CosmosDBConnection(config) - val asyncConnection: AsyncCosmosDBConnection = new AsyncCosmosDBConnection(config) - - val isBulkImporting = config.get[String](CosmosDBConfig.BulkImport). - getOrElse(CosmosDBConfig.DefaultBulkImport.toString). - toBoolean - - val upsert: Boolean = config - .getOrElse(CosmosDBConfig.Upsert, String.valueOf(CosmosDBConfig.DefaultUpsert)) - .toBoolean - val writingBatchSize = if (isBulkImporting) { - config.getOrElse(CosmosDBConfig.WritingBatchSize, String.valueOf(CosmosDBConfig.DefaultWritingBatchSize_BulkInsert)) + if (iter.nonEmpty) { + logError("--> savePartition (NON-EMPTY)") + val connection: CosmosDBConnection = CosmosDBConnection(config) + val asyncConnection: AsyncCosmosDBConnection = new AsyncCosmosDBConnection(config) + + val isBulkImporting = config.get[String](CosmosDBConfig.BulkImport). + getOrElse(CosmosDBConfig.DefaultBulkImport.toString). + toBoolean + + val upsert: Boolean = config + .getOrElse(CosmosDBConfig.Upsert, String.valueOf(CosmosDBConfig.DefaultUpsert)) + .toBoolean + val writingBatchSize = if (isBulkImporting) { + config.getOrElse(CosmosDBConfig.WritingBatchSize, String.valueOf(CosmosDBConfig.DefaultWritingBatchSize_BulkInsert)) + .toInt + } else { + config.getOrElse(CosmosDBConfig.WritingBatchSize, String.valueOf(CosmosDBConfig.DefaultWritingBatchSize_PointInsert)) + .toInt + } + + val writingBatchDelayMs = config + .getOrElse(CosmosDBConfig.WritingBatchDelayMs, String.valueOf(CosmosDBConfig.DefaultWritingBatchDelayMs)) .toInt - } else { - config.getOrElse(CosmosDBConfig.WritingBatchSize, String.valueOf(CosmosDBConfig.DefaultWritingBatchSize_PointInsert)) + val rootPropertyToSave = config + .get[String](CosmosDBConfig.RootPropertyToSave) + val isBulkUpdating = config.get[String](CosmosDBConfig.BulkUpdate). + getOrElse(CosmosDBConfig.DefaultBulkUpdate.toString). + toBoolean + + val maxConcurrencyPerPartitionRange = config + .getOrElse[String](CosmosDBConfig.BulkImportMaxConcurrencyPerPartitionRange, String.valueOf(CosmosDBConfig.DefaultBulkImportMaxConcurrencyPerPartitionRange)) .toInt - } - - val writingBatchDelayMs = config - .getOrElse(CosmosDBConfig.WritingBatchDelayMs, String.valueOf(CosmosDBConfig.DefaultWritingBatchDelayMs)) - .toInt - val rootPropertyToSave = config - .get[String](CosmosDBConfig.RootPropertyToSave) - val isBulkUpdating = config.get[String](CosmosDBConfig.BulkUpdate). - getOrElse(CosmosDBConfig.DefaultBulkUpdate.toString). - toBoolean - - val maxConcurrencyPerPartitionRange = config - .getOrElse[String](CosmosDBConfig.BulkImportMaxConcurrencyPerPartitionRange, String.valueOf(CosmosDBConfig.DefaultBulkImportMaxConcurrencyPerPartitionRange)) - .toInt - CosmosDBSpark.lastUpsertSetting = Some(upsert) - CosmosDBSpark.lastWritingBatchSize = Some(writingBatchSize) + CosmosDBSpark.lastUpsertSetting = Some(upsert) + CosmosDBSpark.lastWritingBatchSize = Some(writingBatchSize) - if (iter.nonEmpty) { if (isBulkUpdating) { - logDebug(s"Writing partition with bulk update") + logError(s"Writing partition with bulk update") bulkUpdate(iter, connection, writingBatchSize) } else if (isBulkImporting) { - logDebug(s"Writing partition with bulk import") + logError(s"Writing partition with bulk import") bulkImport( iter, connection, @@ -495,10 +525,14 @@ object CosmosDBSpark extends CosmosDBLoggingTrait { baseMaxMiniBatchImportSize, writeThroughputBudgetPerCosmosPartition) } else { - logDebug(s"Writing partition with rxjava") + logError(s"Writing partition with rxjava") asyncConnection.importWithRxJava(iter, asyncConnection, writingBatchSize, writingBatchDelayMs, rootPropertyToSave, upsert) } - } + + logError("<-- savePartition (NON-EMPTY)") + } else { + logError("<--> savePartition (EMPTY)") + } new ListBuffer[D]().iterator } diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/streaming/CosmosDBBulkSink.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/streaming/CosmosDBBulkSink.scala new file mode 100644 index 00000000..65434f60 --- /dev/null +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/streaming/CosmosDBBulkSink.scala @@ -0,0 +1,53 @@ +/** + * The MIT License (MIT) + * Copyright (c) 2016 Microsoft Corporation + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +package com.microsoft.azure.cosmosdb.spark.streaming + +import com.microsoft.azure.cosmosdb.spark.CosmosDBLoggingTrait +import org.apache.spark.sql.cosmosdb.util.StreamingWriteTask +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.execution.streaming.Sink +import org.apache.spark.sql.{DataFrame, SQLContext} +import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark +import com.microsoft.azure.cosmosdb.spark.config.Config + +private[spark] class CosmosDBBulkSink(sqlContext: SQLContext, + configMap: Map[String, String]) + extends Sink with CosmosDBLoggingTrait with Serializable { + + private var lastBatchId: Long = -1L + private val config = Config(configMap) + + override def addBatch(batchId: Long, data: DataFrame): Unit = { + if (batchId <= lastBatchId) { + logDebug(s"Rerun batchId $batchId") + } else { + logDebug(s"Run batchId $batchId") + lastBatchId = batchId + } + + val queryExecution: QueryExecution = data.queryExecution + val schemaOutput = queryExecution.analyzed.output + val rdd = queryExecution.toRdd + CosmosDBSpark.save(rdd, config) + } +} \ No newline at end of file diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/streaming/CosmosDBBulkSinkProvider.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/streaming/CosmosDBBulkSinkProvider.scala new file mode 100644 index 00000000..a013bf12 --- /dev/null +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/streaming/CosmosDBBulkSinkProvider.scala @@ -0,0 +1,42 @@ +/** + * The MIT License (MIT) + * Copyright (c) 2016 Microsoft Corporation + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +package com.microsoft.azure.cosmosdb.spark.streaming + +import com.microsoft.azure.cosmosdb.spark.CosmosDBLoggingTrait +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.execution.streaming.Sink +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} +import org.apache.spark.sql.streaming.OutputMode + +class CosmosDBBulkSinkProvider extends DataSourceRegister + with StreamSinkProvider with CosmosDBLoggingTrait { + + override def shortName(): String = "CosmosDBBulkSinkProvider" + + override def createSink(sqlContext: SQLContext, + parameters: Map[String, String], + partitionColumns: Seq[String], + outputMode: OutputMode): Sink = { + new CosmosDBBulkSink(sqlContext, parameters) + } +}