I'm trying to convert some Pandas code to Spark for scaling.
myfunc
is a wrapper to a complex API that takes a string and returns a new string (meaning I can't use vectorized functions).def myfunc(ds): for attribute, value in ds.items(): value = api_function(attribute, value) ds[attribute] = value return ds df = df.apply(myfunc, axis='columns')
myfunc
takes a DataSeries, breaks it up into individual cells, calls the API for each cell, and builds a new DataSeries with the same column names. This effectively modifies all cells in the DataFrame.I'm new to Spark and I want to translate this logic using
pyspark
. I've converted my pandas DataFrame to Spark:spark = SparkSession.builder.appName('My app').getOrCreate() spark_schema = StructType([StructField(c, StringType(), True) for c in df.columns]) spark_df = spark.createDataFrame(df, schema=spark_schema)
This is where I get lost. Do I need a
UDF
, apandas_udf
? How do I iterate across all cells and return a new string for each usingmyfunc
?spark_df.foreach()
doesn't return anything and it doesn't have amap()
function.I can modify
myfunc
fromDataSeries
->DataSeries
tostring
->string
if necessary.
score:5
The solution is:
udf_func = udf(func, StringType())
for col_name in spark_df.columns:
spark_df = spark_df.withColumn(col_name, udf_func(lit(col_name), col_name))
return spark_df.toPandas()
There are 3 key insights that helped me figure this out:
- If you use
withColumn
with the name of an existing column (col_name
), Spark "overwrites"/shadows the original column. This essentially gives the appearance of editing the column directly as if it were mutable. - By creating a loop across the original columns and reusing the same DataFrame variable
spark_df
, I use the same principle to simulate a mutable DataFrame, creating a chain of column-wise transformations, each time "overwriting" a column (per #1 - see below) - Spark
UDFs
expect all parameters to beColumn
types, which means it attempts to resolve column values for each parameter. Becauseapi_function
's first parameter is a literal value that will be the same for all rows in the vector, you must use thelit()
function. Simply passing col_name to the function will attempt to extract the column values for that column. As far as I could tell, passingcol_name
is equivalent to passingcol(col_name)
.
Assuming 3 columns 'a', 'b' and 'c', unrolling this concept would look like this:
spark_df = spark_df.withColumn('a', udf_func(lit('a'), 'a')
.withColumn('b', udf_func(lit('b'), 'b')
.withColumn('c', udf_func(lit('c'), 'c')
score:7
Option 1: Use a UDF on One Column at a Time
The simplest approach would be to rewrite your function to take a string as an argument (so that it is string
-> string
) and use a UDF. There's a nice example here. This works on one column at a time. So, if your DataFrame
has a reasonable number of columns, you can apply the UDF to each column one at a time:
from pyspark.sql.functions import col
new_df = df.select(udf(col("col1")), udf(col("col2")), ...)
Example
df = sc.parallelize([[1, 4], [2,5], [3,6]]).toDF(["col1", "col2"])
df.show()
+----+----+
|col1|col2|
+----+----+
| 1| 4|
| 2| 5|
| 3| 6|
+----+----+
def plus1_udf(x):
return x + 1
plus1 = spark.udf.register("plus1", plus1_udf)
new_df = df.select(plus1(col("col1")), plus1(col("col2")))
new_df.show()
+-----------+-----------+
|plus1(col1)|plus1(col2)|
+-----------+-----------+
| 2| 5|
| 3| 6|
| 4| 7|
+-----------+-----------+
Option 2: Map the entire DataFrame at once
map
is available for Scala DataFrame
s, but, at the moment, not in PySpark.
The lower-level RDD API does have a map
function in PySpark. So, if you have too many columns to transform one at a time, you could operate on every single cell in the DataFrame
like this:
def map_fn(row):
return [api_function(x) for (column, x) in row.asDict().items()
column_names = df.columns
new_df = df.rdd.map(map_fn).toDF(df.columns)
Example
df = sc.parallelize([[1, 4], [2,5], [3,6]]).toDF(["col1", "col2"])
def map_fn(row):
return [value + 1 for (_, value) in row.asDict().items()]
columns = df.columns
new_df = df.rdd.map(map_fn).toDF(columns)
new_df.show()
+----+----+
|col1|col2|
+----+----+
| 2| 5|
| 3| 6|
| 4| 7|
+----+----+
Context
The documentation of foreach
only gives the example of printing, but we can verify looking at the code that it indeed does not return anything.
You can read about pandas_udf
in this post, but it seems that it is most suited to vectorized functions, which, as you pointed out, you can't use because of api_function
.
Credit To: stackoverflow.com
Related Query
- Apply a function to all cells in Spark DataFrame
- Apply a function on all possible combination of columns in a dataframe in Python -- Better way
- apply function to all columns in pandas groupby dataframe
- apply function to specific cells in a pandas dataframe
- Apply function to all columns in a my dataframe
- How to optimally apply a function on all items of a dataframe using inputs from another dataframe?
- How to apply function to ALL columns of dataframe GROUPWISELY ? (In python pandas)
- Apply returned value from function in all Dataframe rows
- Apply a function to each row, where the function uses all previous rows of the DataFrame
- How can I iterate through all rows of a dataframe to apply a look up function to a string value and apply the result to a new column?
- Apply function to all rows in pandas dataframe (lambda)
- Apply function on all but K columns in a dataframe (similarity of a vector to each member of the df, without id column)?
- How to apply a function to all the columns in a data frame and take output in the form of dataframe in python
- How to apply a function on all rows of a DataFrame
- Groupby on a column and apply function on another column but keep first element of all other columns of dataframe
- Python: Is there a way to apply a function separately on all columns of a dataframe specified in a list? Pyfinance package
- How to apply a function to two columns of Pandas dataframe
- Pandas DataFrame: apply function to all columns
- Apply function to each row of pandas dataframe to create two new columns
- Apply 'wrap_text' to all cells using openpyxl
- Apply function to pandas DataFrame that can return multiple rows
- Apply function to grouped data frame in Dask: How do you specify the grouped Dataframe as argument in the function?
- Apply function to dataframe column element based on value in other column for same row?
- how to apply a function to multiple columns in a pandas dataframe at one time
- Creating a row number of each row in PySpark DataFrame using row_number() function with Spark version 2.2
- Apply function to pandas dataframe row using values in other rows
- How to apply function to dataframe in place
- Apply function on Pandas dataframe
- Fast alternative to run a numpy based function over all the rows in Pandas DataFrame
- Apply custom cumulative function to pandas dataframe
More Query from same tag
- return a customer series using function with pandas
- Python: Balance a dataset for regression analysis (rearranging dataset and filling blank categories)
- Create a dummy by firm based on a condition in pandas
- Explode pandas dataframe list of lists (of points) into dataframe with pairs of points
- Pandas sort_values
- Pandas: Select Rows with condition for several columns
- How to flatten rows having similar index in pandas?
- Trying to understand .apply() in Pandas
- Two row header to one row header for a data frame in Pandas
- Loop to calculate values between two data frames, with one condition
- Pandas groupy aggregate/apply specific functions to specific columns (np.sum, sum)
- Python - Pick_Event for pcolor get pandas column and index value
- Find the business days between two columns in a pandas dataframe, which contain NaTs
- Write Large Pandas DataFrames to SQL Server database
- Count the repetition in a column in pandas