Skip to content
This repository was archived by the owner on Mar 10, 2025. It is now read-only.

Adding BulkSink for streaming writes #441

Open
wants to merge 1 commit into
base: 2.4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
### 3.7.0
- Adds a new sink for streaming writes "com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBBulkSinkProvider" that will use bulk ingestion internally

### 3.6.8
- Reduces the performance overhead when a Spark DataFrame as many partitions - especially when using Cosmos DB as a sink in Spark Streaming scenarios

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ limitations under the License.
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-cosmosdb-spark_2.4.0_2.11</artifactId>
<packaging>jar</packaging>
<version>3.6.8</version>
<version>3.7.0</version>
<name>${project.groupId}:${project.artifactId}</name>
<description>Spark Connector for Microsoft Azure CosmosDB</description>
<url>http://azure.microsoft.com/en-us/services/documentdb/</url>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@
package com.microsoft.azure.cosmosdb.spark

object Constants {
val currentVersion = "2.4.0_2.11-3.6.8"
val currentVersion = "2.4.0_2.11-3.7.0"
val userAgentSuffix = s" SparkConnector/$currentVersion"
}
108 changes: 71 additions & 37 deletions src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import scala.reflect.ClassTag
import scala.reflect.runtime.universe._
import scala.util.Random
import scala.collection.JavaConversions._
import java.time.Instant

/**
* The CosmosDBSpark allow fast creation of RDDs, DataFrames or Datasets from CosmosDBSpark.
Expand Down Expand Up @@ -143,13 +144,17 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
* @tparam D the type of the data in the RDD
*/
def save[D: ClassTag](rdd: RDD[D], writeConfig: Config): Unit = {
val start = Instant.now.toEpochMilli()
logDebug("--> CosmosDBSpark.save")
var numPartitions = 0
try {
numPartitions = rdd.getNumPartitions
} catch {
case _: Throwable => // no op
}

val orginalNumPartitions = numPartitions

val maxMiniBatchImportSizeKB: Int = writeConfig
.get[String](CosmosDBConfig.MaxMiniBatchImportSizeKB)
.getOrElse(CosmosDBConfig.DefaultMaxMiniBatchImportSizeKB.toString)
Expand Down Expand Up @@ -192,9 +197,33 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
}
}

val mapRdd = rdd.coalesce(numPartitions).mapPartitions(savePartition(_, writeConfig, numPartitions,
baseMaxMiniBatchImportSizeKB * 1024, writeThroughputBudgetPerCosmosPartition), preservesPartitioning = true)
val mapRdd = if (numPartitions == orginalNumPartitions) {
rdd
.mapPartitions(
savePartition(
_,
writeConfig,
numPartitions,
baseMaxMiniBatchImportSizeKB * 1024,
writeThroughputBudgetPerCosmosPartition),
preservesPartitioning = false)
} else {
rdd
.coalesce(numPartitions)
.mapPartitions(
savePartition(
_,
writeConfig,
numPartitions,
baseMaxMiniBatchImportSizeKB * 1024,
writeThroughputBudgetPerCosmosPartition),
preservesPartitioning = true)
}

mapRdd.collect()

val duration = Instant.now.toEpochMilli() - start
logInfo(s"<-- CosmosDBSpark.save - Duration: ${duration} ms, original #Partitions: ${orginalNumPartitions}, effective #Partitions: ${numPartitions}")
}

private def bulkUpdate[D: ClassTag](iter: Iterator[D],
Expand Down Expand Up @@ -444,46 +473,47 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
partitionCount: Int,
baseMaxMiniBatchImportSize: Int,
writeThroughputBudgetPerCosmosPartition: Option[Int]): Iterator[D] = {
val connection: CosmosDBConnection = CosmosDBConnection(config)
val asyncConnection: AsyncCosmosDBConnection = new AsyncCosmosDBConnection(config)

val isBulkImporting = config.get[String](CosmosDBConfig.BulkImport).
getOrElse(CosmosDBConfig.DefaultBulkImport.toString).
toBoolean

val upsert: Boolean = config
.getOrElse(CosmosDBConfig.Upsert, String.valueOf(CosmosDBConfig.DefaultUpsert))
.toBoolean
val writingBatchSize = if (isBulkImporting) {
config.getOrElse(CosmosDBConfig.WritingBatchSize, String.valueOf(CosmosDBConfig.DefaultWritingBatchSize_BulkInsert))
if (iter.nonEmpty) {
logError("--> savePartition (NON-EMPTY)")
val connection: CosmosDBConnection = CosmosDBConnection(config)
val asyncConnection: AsyncCosmosDBConnection = new AsyncCosmosDBConnection(config)

val isBulkImporting = config.get[String](CosmosDBConfig.BulkImport).
getOrElse(CosmosDBConfig.DefaultBulkImport.toString).
toBoolean

val upsert: Boolean = config
.getOrElse(CosmosDBConfig.Upsert, String.valueOf(CosmosDBConfig.DefaultUpsert))
.toBoolean
val writingBatchSize = if (isBulkImporting) {
config.getOrElse(CosmosDBConfig.WritingBatchSize, String.valueOf(CosmosDBConfig.DefaultWritingBatchSize_BulkInsert))
.toInt
} else {
config.getOrElse(CosmosDBConfig.WritingBatchSize, String.valueOf(CosmosDBConfig.DefaultWritingBatchSize_PointInsert))
.toInt
}

val writingBatchDelayMs = config
.getOrElse(CosmosDBConfig.WritingBatchDelayMs, String.valueOf(CosmosDBConfig.DefaultWritingBatchDelayMs))
.toInt
} else {
config.getOrElse(CosmosDBConfig.WritingBatchSize, String.valueOf(CosmosDBConfig.DefaultWritingBatchSize_PointInsert))
val rootPropertyToSave = config
.get[String](CosmosDBConfig.RootPropertyToSave)
val isBulkUpdating = config.get[String](CosmosDBConfig.BulkUpdate).
getOrElse(CosmosDBConfig.DefaultBulkUpdate.toString).
toBoolean

val maxConcurrencyPerPartitionRange = config
.getOrElse[String](CosmosDBConfig.BulkImportMaxConcurrencyPerPartitionRange, String.valueOf(CosmosDBConfig.DefaultBulkImportMaxConcurrencyPerPartitionRange))
.toInt
}

val writingBatchDelayMs = config
.getOrElse(CosmosDBConfig.WritingBatchDelayMs, String.valueOf(CosmosDBConfig.DefaultWritingBatchDelayMs))
.toInt
val rootPropertyToSave = config
.get[String](CosmosDBConfig.RootPropertyToSave)
val isBulkUpdating = config.get[String](CosmosDBConfig.BulkUpdate).
getOrElse(CosmosDBConfig.DefaultBulkUpdate.toString).
toBoolean

val maxConcurrencyPerPartitionRange = config
.getOrElse[String](CosmosDBConfig.BulkImportMaxConcurrencyPerPartitionRange, String.valueOf(CosmosDBConfig.DefaultBulkImportMaxConcurrencyPerPartitionRange))
.toInt

CosmosDBSpark.lastUpsertSetting = Some(upsert)
CosmosDBSpark.lastWritingBatchSize = Some(writingBatchSize)
CosmosDBSpark.lastUpsertSetting = Some(upsert)
CosmosDBSpark.lastWritingBatchSize = Some(writingBatchSize)

if (iter.nonEmpty) {
if (isBulkUpdating) {
logDebug(s"Writing partition with bulk update")
logError(s"Writing partition with bulk update")
bulkUpdate(iter, connection, writingBatchSize)
} else if (isBulkImporting) {
logDebug(s"Writing partition with bulk import")
logError(s"Writing partition with bulk import")
bulkImport(
iter,
connection,
Expand All @@ -495,10 +525,14 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
baseMaxMiniBatchImportSize,
writeThroughputBudgetPerCosmosPartition)
} else {
logDebug(s"Writing partition with rxjava")
logError(s"Writing partition with rxjava")
asyncConnection.importWithRxJava(iter, asyncConnection, writingBatchSize, writingBatchDelayMs, rootPropertyToSave, upsert)
}
}

logError("<-- savePartition (NON-EMPTY)")
} else {
logError("<--> savePartition (EMPTY)")
}
new ListBuffer[D]().iterator
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* The MIT License (MIT)
* Copyright (c) 2016 Microsoft Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package com.microsoft.azure.cosmosdb.spark.streaming

import com.microsoft.azure.cosmosdb.spark.CosmosDBLoggingTrait
import org.apache.spark.sql.cosmosdb.util.StreamingWriteTask
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.{DataFrame, SQLContext}
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark
import com.microsoft.azure.cosmosdb.spark.config.Config

private[spark] class CosmosDBBulkSink(sqlContext: SQLContext,
configMap: Map[String, String])
extends Sink with CosmosDBLoggingTrait with Serializable {

private var lastBatchId: Long = -1L
private val config = Config(configMap)

override def addBatch(batchId: Long, data: DataFrame): Unit = {
if (batchId <= lastBatchId) {
logDebug(s"Rerun batchId $batchId")
} else {
logDebug(s"Run batchId $batchId")
lastBatchId = batchId
}

val queryExecution: QueryExecution = data.queryExecution
val schemaOutput = queryExecution.analyzed.output
val rdd = queryExecution.toRdd
CosmosDBSpark.save(rdd, config)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* The MIT License (MIT)
* Copyright (c) 2016 Microsoft Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package com.microsoft.azure.cosmosdb.spark.streaming

import com.microsoft.azure.cosmosdb.spark.CosmosDBLoggingTrait
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode

class CosmosDBBulkSinkProvider extends DataSourceRegister
with StreamSinkProvider with CosmosDBLoggingTrait {

override def shortName(): String = "CosmosDBBulkSinkProvider"

override def createSink(sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
new CosmosDBBulkSink(sqlContext, parameters)
}
}