score:0

val tableHeader: String = dataFrame.columns.mkString(",")
dataFrame.foreachPartition((it: Iterator[Row]) => {
      println("partition index: " )
      val url = "jdbc:...+ "user=;password=;"
      val conn = DriverManager.getConnection(url)
      conn.setAutoCommit(true)
      val stmt = conn.createStatement()
      val batchSize = 10
      var i =0
      while (it.hasNext) {
        val row = it.next
        import java.sql.SQLException
        import java.sql.SQLIntegrityConstraintViolationException
        try {
          stmt.addBatch(" UPDATE TABLE SET STATUS = 0 , " +
            " DATE ='" + new java.sql.Timestamp(System.currentTimeMillis()) +"'" +
            " where id = " + row.getAs("IDNUM")  )
          i += 1
          if (  i  % batchSize == 0 ) {
            stmt.executeBatch
            conn.commit
          }
        } catch {
          case e: SQLIntegrityConstraintViolationException =>
          case e: SQLException =>
            e.printStackTrace()
        }
        finally {
             stmt.executeBatch
             conn.commit
        }

      }
      import java.util
      val ret = stmt.executeBatch
      System.out.println("Ret val: " + util.Arrays.toString(ret))
      System.out.println("Update count: " + stmt.getUpdateCount)
      conn.commit
      stmt.close

score:1

rdd.iterator has grouped function may be useful for you.

for example :

iter.grouped(batchSize)

Sample code snippet which does batch insert with iter.grouped(batchsize) here its 1000 and Im trying to insert in to database

   df.repartition(numofpartitionsyouwant) // numPartitions ~ number of simultaneous DB connections you can planning to give...
def insertToTable(sqlDatabaseConnectionString: String,
                  sqlTableName: String): Unit = {

  val tableHeader: String = dataFrame.columns.mkString(",")
  dataFrame.foreachPartition { partition =>
    //NOTE : EACH PARTITION ONE CONNECTION (more better way is to use connection pools)
    val sqlExecutorConnection: Connection =
      DriverManager.getConnection(sqlDatabaseConnectionString)
    //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql
    partition.grouped(1000).foreach { group =>
      val insertString: scala.collection.mutable.StringBuilder =
        new scala.collection.mutable.StringBuilder()

      group.foreach { record =>
        insertString.append("('" + record.mkString(",") + "'),")
      }

      sqlExecutorConnection
        .createStatement()
        .executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES "
          + insertString.stripSuffix(","))
    }

    sqlExecutorConnection.close() // close the connection so that connections wont exhaust.
  }
}

Related Query

More Query from same tag