score:-1
lets say it is a hive table you are reading and overwriting.
please introduce the timestamp to the hive table location as follows
create table table_name (
id int,
dtdontquery string,
name string
)
location hdfs://user/table_name/timestamp
as overwrite is not possible, we will write the output file to a new location.
write the data to that new location using dataframe api
df.write.orc(hdfs://user/xx/tablename/newtimestamp/)
once data is written alter the hive table location to new location
alter table tablename set location hdfs://user/xx/tablename/newtimestamp/
score:0
i would approach it this way,
>>> df = sqlcontext.sql("select * from t")
>>> df.show()
+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
| 2| fitness|
| 3| footwear|
| 4| apparel|
| 5| golf|
| 6| outdoors|
| 7| fan shop|
+-------------+---------------+
to mimic your flow, i creating 2 dataframes, doing union
and writing back to same table t
( deliberately removing department_id = 4
in this example)
>>> df1 = sqlcontext.sql("select * from t where department_id < 4")
>>> df2 = sqlcontext.sql("select * from t where department_id > 4")
>>> df3 = df1.unionall(df2)
>>> df3.registertemptable("df3")
>>> sqlcontext.sql("insert overwrite table t select * from df3")
dataframe[]
>>> sqlcontext.sql("select * from t").show()
+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
| 2| fitness|
| 3| footwear|
| 5| golf|
| 6| outdoors|
| 7| fan shop|
+-------------+---------------+
score:1
following is an approach you can try.
instead of using registertemptable api, you can write it into an another table using the saveastable api
dffinal.write.mode("overwrite").saveastable("intermediate_result")
then, write it into employees table
val dy = sqlcontext.table("intermediate_result")
dy.write.mode("overwrite").insertinto("employees")
finally, drop intermediate_result table.
score:2
is my approach correct to change the department of two employees
it is not. just to repeat something that has been said multiple times on stack overflow - apache spark is not a database. it is not designed for fine grained updates. if your projects requires operation like this, use one of many databases on hadoop.
why am i getting this error when i have released the dataframes
because you didn't. all you've done is adding a name to the execution plan. checkpointing would be the closest thing to "releasing", but you really don't want to end up in situation when you loose executor, in the middle of destructive operation.
you could write to temporary directory, delete input and move the temporary files, but really - just use a tool which is fit for the job.
Source: stackoverflow.com
Related Query
- How to update few records in Spark
- How can I update a broadcast variable in spark streaming?
- In Spark Dataframe how to get duplicate records and distinct records in two dataframes?
- How to process multi line input records in Spark
- How to read records from Kafka topic from beginning in Spark Streaming?
- How to iterate records spark scala?
- how to update spark dataframe column containing array using udf
- How do I group records that are within a specific time interval using Spark Scala or sql?
- How can I set alert of spark streaming job if no records are being pushed in last 1 hour?
- How to get few rows from Spark data frame on basis of some condition
- How to update Spark dataframe based on Column from other dataframe with many entries in Scala?
- How to Store Failed Status Records of Amazon Deequ in a Separate Spark DataFrame
- How to update multiple fields in mongo collection using spark and mongo Hadoop connector?
- How to use split columns in Spark with delimiter which is available in records also
- how to update a cell of a spark data frame
- How to filter duplicate records having multiple key in Spark Dataframe?
- How to Remove first few lines/header from multiple files using scala in spark
- Spark Scala: How to update each column of a DataFrame in correspondence with each position of a Vector
- How to update data frame column in Spark
- Spark -Scala Nested array DF - How to update value based on condition without changing structure?
- How to write and update by kudu API in Spark 2.1
- How to execute an update query in spark through JDBC
- How to return boolean if column contains integer value instead of searching millions of records using spark dataframe
- How to update the Strategy parameters of tree model in Spark MlLib
- Processing 1 billion records locally in Spark from Hive metastore(parquet format) takes forever 6 hours. How to speed it up?
- How to collect all records at spark executor and process it as batch
- how to update nested column's value of xml in spark scala dataframe
- How to change Datatypes of records inserting into Cassandra using Foreach Spark Structure streaming
- Spark Scala: How to filter RDD and update counter at the same time
- How to update the schema of a Spark DataFrame (methods like Dataset.withColumn and Datset.select don't work in my case)
More Query from same tag
- SBT: How to set common scala version for multiproject
- Scala import java package appending com
- How to efficiently delete all elements from ListBuffer in Scala?
- How to install Guice in Scala / SBT?
- How do I write a query for mongodb using the casbah driver for scala that uses a substring and checks if the field is in a list of supplied values?
- Overriding Java method in Scala using eclipse
- do I need to use coalesce before saving my RDD data to file
- Flattening Future[EitherT[Future, A, B]]
- How are activator and Play! linked?
- Updating existing String elements of an ArrayBuffer in Scala
- Scala: java.lang.VerifyError - Incompatible argument to function - runtime error
- Scala: show systemDrive in TreeView
- What is an efficient way to concatenate lists?
- Is there a canonical or good way to define a non-busy-wait Future in Scala that never ends?
- Compilation error: error: object scalatest is not a member of package.org, am NOT using sbt
- How to iterate through json array in scala
- Substitute while loop with functional code
- How to convert json into RDD[json]
- Shapeless TypeCase Wierd Behavior
- Scala code generates type mismatch (ScalaPB)
- how to create columns out of rows in spark dataframes using scala
- Compilation error on insert-or-update action in Quill when using H2 database
- Why do I get NPE using Scala Enumeration with App mix-in?
- How to force scalac to generate a static final field?
- Idiomatic way to convert A => Seq[B]
- Scala implicit for Option containing Map
- How to convert a specific function to a udf function in apache spark with scala?
- Private Chat Application using Play 2.3 Websocket
- Cannot make slick 3.2 Mapped table example working
- How to find the mode value of a list?