score:40
I found this here Bulk data migration through Spark SQL
The dbname parameter can be any query wrapped in parenthesis with an alias. So in my case, I need to do this:
val query = """
(select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d
join DialogLine as dl on dl.DialogID=d.DialogID
join DialogLineWordInstanceMatch as dlwim on dlwim.DialogLineID=dl.DialogLineID
join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID
join WordRoot as wr on wr.WordRootID=wi.WordRootID
where d.InSite=1 and dl.Active=1
limit 100) foo
"""
val df = sqlContext.format("jdbc").
option("url", "jdbc:mysql://localhost:3306/local_content").
option("driver", "com.mysql.jdbc.Driver").
option("useUnicode", "true").
option("continueBatchOnError","true").
option("useSSL", "false").
option("user", "root").
option("password", "").
option("dbtable",query).
load()
As expected, loading each table as its own Dataframe and joining them in Spark was very inefficient.
score:0
TL;DR: just create a view in your database.
Detail: I have a table t_city in my postgres database, on which I create a view:
create view v_city_3500 as
select asciiname, country, population, elevation
from t_city
where elevation>3500
and population>100000
select * from v_city_3500;
asciiname | country | population | elevation
-----------+---------+------------+-----------
Potosi | BO | 141251 | 3967
Oruro | BO | 208684 | 3936
La Paz | BO | 812799 | 3782
Lhasa | CN | 118721 | 3651
Puno | PE | 116552 | 3825
Juliaca | PE | 245675 | 3834
In the spark-shell:
val sx= new org.apache.spark.sql.SQLContext(sc)
var props=new java.util.Properties()
props.setProperty("driver", "org.postgresql.Driver" )
val url="jdbc:postgresql://buya/dmn?user=dmn&password=dmn"
val city_df=sx.read.jdbc(url=url,table="t_city",props)
val city_3500_df=sx.read.jdbc(url=url,table="v_city_3500",props)
Result:
city_df.count()
Long = 145725
city_3500_df.count()
Long = 6
score:0
with MYSQL read/loading data something like below
val conf = new SparkConf().setAppName("SparkMe Application").setMaster("local[2]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val jdbcDF = sqlContext.read.format("jdbc").options(
Map("url" -> "jdbc:mysql://<host>:3306/corbonJDBC?user=user&password=password",
"dbtable" -> "TABLE_NAME")).load()
write data to table as below
import java.util.Properties
val prop = new Properties()
prop.put("user", "<>")
prop.put("password", "simple$123")
val dfWriter = jdbcDF.write.mode("append")
dfWriter.jdbc("jdbc:mysql://<host>:3306/corbonJDBC?user=user&password=password", "tableName", prop)
to create dataframe from query do something like below
val finalModelDataDF = {
val query = "select * from table_name"
sqlContext.sql(query)
};
finalModelDataDF.show()
score:1
to save the output of a query to a new dataframe, simple set the result equal to a variable:
val newDataFrame = spark.sql("SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...")
and now newDataFrame
is a dataframe with all the dataframe functionalities available to it.
score:3
If you have your table
already registered in your SQLContext, you could simply use sql
method.
val resultDF = sqlContext.sql("SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...")
Source: stackoverflow.com
Related Query
- Create Spark Dataframe from SQL Query
- Create table from SQL query in Spark (v.1.5.2)
- Create another dataframe from existing Dataframe with alias value in spark sql
- Spark Dataframe from SQL Query
- Create dynamic query from the Dataframe present in Spark Scala
- Create a Spark SQL query filtering by values in dataframe
- Apache Spark SQL get values in dataframe from SQL query
- How to create a DataFrame from a text file in Spark
- How to create Spark Dataset or Dataframe from case classes that contains Enums
- how to create DataFrame from multiple arrays in Spark Scala?
- How can I create a Spark DataFrame from a nested array of struct element?
- Create Spark DataFrame in Spark Streaming from JSON Message on Kafka
- Scala Spark : How to create a RDD from a list of string and convert to DataFrame
- How to get the table name from Spark SQL Query [PySpark]?
- How to create a distributed sparse matrix in Spark from DataFrame in Scala
- Spark Create a dataframe from an InputStream?
- create a spark dataframe from a nested json file in scala
- create a Spark DataFrame from a nested array of struct element?
- Create a dataframe from a hashmap with keys as column names and values as rows in Spark
- Create separate columns from array column in Spark Dataframe in Scala when array is large
- Passing a list loaded from text file to sql query in Spark SQL
- INSERT data from spark dataframe to a table in SQL server
- create map from dataframe in spark scala
- Create another dataframe from existing dataframe with different schema in spark
- Flatten data in a spark sql query - Spark Dataframe
- Create new column in Spark DataFrame with diff of previous values from another column
- Create a Sequence of case class objects from a Spark DataFrame
- How to create a Spark SQL Dataframe with list of Map objects
- Apache Spark SQL query and DataFrame as reference data
- Select a record from multiple with preference hierarchy from spark dataframe or sql
More Query from same tag
- How to compile tests with SBT without running them
- how to build a tree then traverse every leaf (from root node to leaf every time)?
- Scala style for empty class parameter lists
- Spark Scala: How to update each column of a DataFrame in correspondence with each position of a Vector
- about model training results from Spark Scala ML API
- How would one implement OCaml / F#'s "function" construct in Scala?
- Slick 3 Transactions checking for failure Multiple Actions
- How to specify indentations on multiline parameter lists in IntelliJ Scala?
- How a val value bean changed
- Time-based thread-safe priority queue
- How to remove List() and escape characters from response JSON in Scala
- Why Scala's transient is not a keyword but an annotation?
- Need a better "getting started" guide for Lift
- Exception in thread "main" java.lang.NoClassDefFoundError: scala/Cloneable
- How to return empty Vector as a result of Task?
- Using Apache Camel with Amazon AWS
- Scala - convert large or scientific numbers to Long
- How to make Pattern and alert for the following
- Play framework 2.x (Scala) Navigation menu - Best practice to get some data from db on every request
- Binary distributions of old (1.0 - 2.5) versions of Scala?
- Is java.lang.OutOfMemoryError: Metaspace normal in sbt REPL?
- Play 2.6 evolutions DB change not applied
- Validation error while trying to parse a json array to List[Object] in Scala
- Spark code to find maximum not working
- How to convert Scala class object to jSON and manipulate a dependent object?
- Monad transformers - Scalaz - List[Future[String\/String]] to Future[String\/String]
- Is there example of scala abstract type usage which is impossible to achieve with generics?
- Need to write a spark scala code for the following sample data( Columns to Rows)
- How to create a map key from a variable?
- RabbitMQ only once getting message count from queue