score:0

I really, really suggest you look into Spark to do this, I sounds like a typical ETL use-case. The problem is that you are materializing thousands of records into memory which is killing your GC and stalling the Futures. Also the fact that you have to do it record by record makes it very, very slow. If you load this into Spark dataframes instead, I'll take waaaay less space because Spark doesn't actually materialize the records into memory (they use an extremely compact binary in-memory serialization which 'spills' to disk if needed) which saves your heap from GC-annihilation. Also it performs the load and transformations for many, many records in parallel. It'll give you the preformance characteristics to make your problem tractable.

Here's roughly what I would do:

  1. Load the records into a Spark Dataset using the spark.read.jdbc
  2. Join the Datasets and the group by the Parent record
  3. Write the records out to MongoDB using the Mongo Spark Collector

The code itself should look something like this:

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import com.mongodb.spark._
// probably import some other stuff

SparkSession.builder()
      .master("local")
      .appName("Load records to mongo")
       // Configure the spark->mongo connector
      .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
      .enableHiveSupport()
      .getOrCreate()

case class PersonWithKids(person:Person, kids:List[Kid])

// make sure the MySQL jdbc driver is not the classpath because spark is about to use it
val people = spark.jdbc.read("(select * from people) as data", "jdbc://...").as[Person]
val kids = spark.jdbc.read("(select * from kids) as data", "jdbc://...").as[Kid]
val joined = 
  people
    .joinWith(kids, people("id") === kids("parent_id"))
    .map({case (person, kid) => PersonWithKids(person, List(kid))})
    .groupByKey(_.person)
    .flatMapGroups({case (person, personWithKidIter) => 
        PersonWithKids(person, personWithKidIter.toList.flatMap(_.kids))
    })

// make sure you did stuff correctly
// joined.show()
// joined.take(100).write.json("someFileWhereYouCanDoubleCheck.json")

MongoSpark.save(joined)

You'll probably need the following SBT dependencies:

"org.apache.spark" %% "spark-core" % "2.3.1" // or 2.4.0 might be latest now
"org.apache.spark" %% "spark-hive" % "2.3.1" // or 2.4.0 might be latest now
"org.apache.spark" %% "spark-sql" % "2.3.1" // or 2.4.0 might be latest now
"org.mongodb.spark" %% "mongo-spark-connector" % "2.3.1" // or 2.4.0 might be latest now

Good luck!


Related Query

More Query from same tag