score:0
val tableHeader: String = dataFrame.columns.mkString(",")
dataFrame.foreachPartition((it: Iterator[Row]) => {
println("partition index: " )
val url = "jdbc:...+ "user=;password=;"
val conn = DriverManager.getConnection(url)
conn.setAutoCommit(true)
val stmt = conn.createStatement()
val batchSize = 10
var i =0
while (it.hasNext) {
val row = it.next
import java.sql.SQLException
import java.sql.SQLIntegrityConstraintViolationException
try {
stmt.addBatch(" UPDATE TABLE SET STATUS = 0 , " +
" DATE ='" + new java.sql.Timestamp(System.currentTimeMillis()) +"'" +
" where id = " + row.getAs("IDNUM") )
i += 1
if ( i % batchSize == 0 ) {
stmt.executeBatch
conn.commit
}
} catch {
case e: SQLIntegrityConstraintViolationException =>
case e: SQLException =>
e.printStackTrace()
}
finally {
stmt.executeBatch
conn.commit
}
}
import java.util
val ret = stmt.executeBatch
System.out.println("Ret val: " + util.Arrays.toString(ret))
System.out.println("Update count: " + stmt.getUpdateCount)
conn.commit
stmt.close
score:1
rdd.iterator has grouped function may be useful for you.
for example :
iter.grouped(batchSize)
Sample code snippet which does batch insert with iter.grouped(batchsize) here its 1000 and Im trying to insert in to database
df.repartition(numofpartitionsyouwant) // numPartitions ~ number of simultaneous DB connections you can planning to give...
def insertToTable(sqlDatabaseConnectionString: String,
sqlTableName: String): Unit = {
val tableHeader: String = dataFrame.columns.mkString(",")
dataFrame.foreachPartition { partition =>
//NOTE : EACH PARTITION ONE CONNECTION (more better way is to use connection pools)
val sqlExecutorConnection: Connection =
DriverManager.getConnection(sqlDatabaseConnectionString)
//Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql
partition.grouped(1000).foreach { group =>
val insertString: scala.collection.mutable.StringBuilder =
new scala.collection.mutable.StringBuilder()
group.foreach { record =>
insertString.append("('" + record.mkString(",") + "'),")
}
sqlExecutorConnection
.createStatement()
.executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES "
+ insertString.stripSuffix(","))
}
sqlExecutorConnection.close() // close the connection so that connections wont exhaust.
}
}
Source: stackoverflow.com
Related Query
- Batching of Dataset Spark scala
- Get the row corresponding to the latest timestamp in a Spark Dataset using Scala
- spark scala : Convert DataFrame OR Dataset to single comma separated string
- Scala 2.11 & Spark 2.0.0 Create dynamically case class to encode Dataset
- How to convert spark dataset to scala seq
- Converting DataSet to Json Array Spark using Scala
- Scala generic type usage in Apache Spark Dataset creation
- Problem creating dataset in Spark and Scala
- Split a dataset in training and test in one line in scala spark
- How to do a Typesafe Dataset Left Join in Spark Scala
- Scala Spark handles Double.NaN differently in dataframe and dataset
- How to make an Encoder for scala Iterable, spark dataset
- How to set the type of array with dataset in spark scala
- Scala Reflection exception during creation of DataSet in Spark
- update element of array inside dataset based on condition in spark scala
- Issue converting scala case class to spark dataset with embedded options that wrap monads
- Unable to find encoder for type stored in a Dataset in Spark Scala
- Filter Spark dataset using custom function in scala
- Using Java Domain Object Instead of Scala Case Class in Spark DataSet Creation
- Find columns to select, for spark.read(), from another Dataset - Spark Scala
- How to convert a simple DataFrame to a DataSet Spark Scala with case class?
- Scala Spark Dataset change class type
- Convert spark scala dataset to specific RDD format
- Spark Scala Dataset Type Hierarchy
- How to create a new column for dataset using ".withColumn" with many conditions in Scala Spark
- Spark Scala Dataset map works in main but not in function
- Scala - Ids lists of objects with duplicated values from spark dataset
- processing group of rows from spark dataset in scala
- How to join two huge dataset in Optimal way in Scala Spark
- Convert Arrays into Spark DataSet in Scala
More Query from same tag
- Scala: recursive value listHuman needs type
- RESTful Methods in Play
- How to represent the set of Integers including +- Infinity in Scala
- Scala play-json - doesn't parse/deserialize to single argument case class
- spark write dataframe with hashMap to postgres as json
- What is the easiest way to count my scala tests?
- scala error : ';' expected but 'import' found
- how to check avro schema registry usage
- Generics and Constrained Polymorphism versus Subtyping
- How to use map with for loop inside in Scala?
- override a val in trait from Future
- ScalaTest - how to use forEvery without printing the entire collection?
- How does Scala manage to avoid racing in reactive programming?
- Ignoring SSL certificates in Scala dispatch
- Problems finding Main class in sub-directories w/SBT assembly
- error trying to get errors with scala Try function
- Converting timestamp to UTC in Spark Scala
- Scala : Significance of file generated with $ during compile
- Flink CEP iterative condition for single pattern in Scala
- Running Play 2.4.x Locally Yields: java.lang.IllegalArgumentException: port out of range:-1
- Lost connectionProperties value after loading Spark
- import statemets in Scala
- scala: define a Type that has defined a certain implicit
- How to write unit test(s) for custom OutputStream
- How to fetch a set of pages as a Stream?
- Scala type inference for existential types and type members
- Why is Dataproc using this weird shaded version of the JSON package and how do I work with it?
- How to replace words (in one DataFrame) with matching ids from another DataFrame?
- Why no Type Erasure Warning
- Spark | Could not create FileClient | read json | scala