score:-1

lets say it is a hive table you are reading and overwriting.

please introduce the timestamp to the hive table location as follows

    create table table_name (
  id                int,
  dtdontquery       string,
  name              string
)
 location hdfs://user/table_name/timestamp

as overwrite is not possible, we will write the output file to a new location.

write the data to that new location using dataframe api

df.write.orc(hdfs://user/xx/tablename/newtimestamp/)

once data is written alter the hive table location to new location

alter table tablename set location hdfs://user/xx/tablename/newtimestamp/

score:0

i would approach it this way,

>>> df = sqlcontext.sql("select * from t")
>>> df.show()
+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
|            2|        fitness|
|            3|       footwear|
|            4|        apparel|
|            5|           golf|
|            6|       outdoors|
|            7|       fan shop|
+-------------+---------------+

to mimic your flow, i creating 2 dataframes, doing union and writing back to same table t ( deliberately removing department_id = 4 in this example)

>>> df1 = sqlcontext.sql("select * from t where department_id < 4")
>>> df2 = sqlcontext.sql("select * from t where department_id > 4")
>>> df3 = df1.unionall(df2)
>>> df3.registertemptable("df3")
>>> sqlcontext.sql("insert overwrite table t select * from df3")
dataframe[]  
>>> sqlcontext.sql("select * from t").show()
+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
|            2|        fitness|
|            3|       footwear|
|            5|           golf|
|            6|       outdoors|
|            7|       fan shop|
+-------------+---------------+

score:1

following is an approach you can try.

instead of using registertemptable api, you can write it into an another table using the saveastable api

dffinal.write.mode("overwrite").saveastable("intermediate_result")

then, write it into employees table

 val dy = sqlcontext.table("intermediate_result")
  dy.write.mode("overwrite").insertinto("employees")

finally, drop intermediate_result table.

score:2

is my approach correct to change the department of two employees

it is not. just to repeat something that has been said multiple times on stack overflow - apache spark is not a database. it is not designed for fine grained updates. if your projects requires operation like this, use one of many databases on hadoop.

why am i getting this error when i have released the dataframes

because you didn't. all you've done is adding a name to the execution plan. checkpointing would be the closest thing to "releasing", but you really don't want to end up in situation when you loose executor, in the middle of destructive operation.

you could write to temporary directory, delete input and move the temporary files, but really - just use a tool which is fit for the job.


Related Query

More Query from same tag