score:0
I really, really suggest you look into Spark to do this, I sounds like a typical ETL use-case. The problem is that you are materializing thousands of records into memory which is killing your GC and stalling the Futures. Also the fact that you have to do it record by record makes it very, very slow. If you load this into Spark dataframes instead, I'll take waaaay less space because Spark doesn't actually materialize the records into memory (they use an extremely compact binary in-memory serialization which 'spills' to disk if needed) which saves your heap from GC-annihilation. Also it performs the load and transformations for many, many records in parallel. It'll give you the preformance characteristics to make your problem tractable.
Here's roughly what I would do:
- Load the records into a Spark Dataset using the
spark.read.jdbc
- Join the Datasets and the group by the Parent record
- Write the records out to MongoDB using the Mongo Spark Collector
The code itself should look something like this:
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import com.mongodb.spark._
// probably import some other stuff
SparkSession.builder()
.master("local")
.appName("Load records to mongo")
// Configure the spark->mongo connector
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.enableHiveSupport()
.getOrCreate()
case class PersonWithKids(person:Person, kids:List[Kid])
// make sure the MySQL jdbc driver is not the classpath because spark is about to use it
val people = spark.jdbc.read("(select * from people) as data", "jdbc://...").as[Person]
val kids = spark.jdbc.read("(select * from kids) as data", "jdbc://...").as[Kid]
val joined =
people
.joinWith(kids, people("id") === kids("parent_id"))
.map({case (person, kid) => PersonWithKids(person, List(kid))})
.groupByKey(_.person)
.flatMapGroups({case (person, personWithKidIter) =>
PersonWithKids(person, personWithKidIter.toList.flatMap(_.kids))
})
// make sure you did stuff correctly
// joined.show()
// joined.take(100).write.json("someFileWhereYouCanDoubleCheck.json")
MongoSpark.save(joined)
You'll probably need the following SBT dependencies:
"org.apache.spark" %% "spark-core" % "2.3.1" // or 2.4.0 might be latest now
"org.apache.spark" %% "spark-hive" % "2.3.1" // or 2.4.0 might be latest now
"org.apache.spark" %% "spark-sql" % "2.3.1" // or 2.4.0 might be latest now
"org.mongodb.spark" %% "mongo-spark-connector" % "2.3.1" // or 2.4.0 might be latest now
Good luck!
Source: stackoverflow.com
Related Query
- issues with running script to read from mysql and insert into mongo (multithreading issues) using play framework
- How to read from textfile(String type data) map and load data into parquet format(multiple columns with different datatype) in Spark scala dynamically
- Trying to read and write parquet files from s3 with local spark
- How to merge two maps into one with keys from the first map and merged values?
- Read Nested List from Database with relation and Create another List using Scala Recursivly
- How to put datas from MongoDB into an object with Casbah and Scala
- Read data from Elasticsearch with scala and spark 3.0.1
- Read from hdfs and write to MySQL
- How to read files from a directory and find files with a specific extension -scala
- How do I read from a json-lines file into a Dataset with an immutable.HashMap?
- How to insert a Clob into a Oracle table with Slick 3 and Oracle 12?
- From R to Scala. How can I read a CSV in Scala, save it to a res slot and then combined those res slots into a sample csv?
- Insert data into nested form with play framework and Scala
- How to insert Array[Byte] into binary datatype column with slick and mariadb?
- How to insert double quotes into String with interpolation in scala
- Byte array to String and back.. issues with -127
- Read CSV in Scala into case class instances with error handling
- How can I save an RDD into HDFS and later read it back?
- Read from a hive table and write back to it using spark sql
- Scala Slick: Issues with groupBy and missing shapes
- Code completion issues with the Scala-IDE and Eclipse Juno
- How to know if a Scala file modified with IntelliJ Idea is saved and if it is checked into CVS?
- Pass system property to spark-submit and read file from classpath or custom path
- Create script with classpath from SBT
- read files recursively from sub directories with spark from s3 or local filesystem
- How to read from TCP and write to stdout?
- Spark-SQL : How to read a TSV or CSV file into dataframe and apply a custom schema?
- Reading JSON files into Spark Dataset and adding columns from a separate Map
- Packaging and Running Scala Spark Project with Maven
- Running a Scala Script with external dependencies
More Query from same tag
- DB schema migration in Lift
- Akka Streams: how to wait until several Flows are completed
- Cannot load play.http.errorHandler
- Invoking Scala class' primary constructor with an implicit parameter inside an auxiliary constructor
- Scala type mismatch error trying to view parameterised or abstract type constructor as its upper bound
- How to count record changes for a particular value of a column in a scala Dataframe
- Scala library dependencies in internal artifactory pointing outside
- What is the Scala annotation to ensure a tail recursive function is optimized?
- How to access test resources in Scala?
- Fetch the current Julian date and change it to normal date in Scala
- Overloading in Scala when type parameters are being used
- Reason and fix for "Error: Could not find or load main class Anne" on Scala Getting started tutorial
- Why do we need traits in scala?
- DSL : shortcut to apply a function on every element of a collection of a specific type
- Spray.io directive not registering?
- Does it make sense to write a UDAF to perform a rolling regression on a spark dataframe?
- Compiling with multiple scala versions
- what is the difference extends type and type
- Find difference between two enumerators with sorted entries in scala
- Why java complains about jar files with lots of entries?
- Lift rogue query by mongolistfield
- How to convert scala function as parameter in c++?
- libraries for external DSL evaluation in Scala
- Gatling convert Json array to Map
- Scala list of list String find distinct elements
- ScalaMock verifying a generic method returning Unit gives method overloading compile error
- Failed to apply plugin [class 'org.gradle.api.plugins.scala.ScalaBasePlugin']: Gradle v2.13
- Why my Spark Streaming program processes so slow?
- How to read only Successful values from a Seq of Futures
- Akka RestartSource retries forever despite maxRetries being < 5 and > 0