Pyspark Udf On Withcolumn To Replace Column
Solution 1:
Tried to create a sample dataframe and then make use of the lit
function in the PySpark.
Seems to work fine, this is using the Databricks notebook
Solution 2:
The error is because you are using pyspark functions inside a udf. It would also be very helpful to know the content of your L1, L2.. variables.
However, if I am understanding what you want to do correctly, you don't need a udf. I am assuming L1, L2 etc are constants, right? If not let me know to adjust the code accordingly. Here's an example:
from pyspark import SparkConf
from pyspark.sql import SparkSession, functions as F
conf = SparkConf()
spark_session = SparkSession.builder \
.config(conf=conf) \
.appName('test') \
.getOrCreate()
data = [{'L1': "test", 'L2': "data"}, {'L1': "other test", 'L2': "other data"}]
df = spark_session.createDataFrame(data)
df.show()
# +----------+----------+# | L1| L2|# +----------+----------+# | test| data|# |other test|other data|# +----------+----------+
L1 = 'some other data'
updatedDF = df.withColumn(
"L1",
F.lit(L1)
)
updatedDF.show()
# +---------------+----------+# | L1| L2|# +---------------+----------+# |some other data| data|# |some other data|other data|# +---------------+----------+# or if you need to replace the value in a more complex way
pattern = '\w+'
updatedDF = updatedDF.withColumn(
"L1",
F.regexp_replace(F.col("L1"), pattern, "testing replace")
)
updatedDF.show()
# +--------------------+----------+# | L1| L2|# +--------------------+----------+# |testing replace t...| data|# |testing replace t...|other data|# +--------------------+----------+# or even something more complicated:# set L1 value to L2 column when L2 column equals to data, otherwise, just leave L2 as it is
updatedDF = df.withColumn(
"L2",
F.when(F.col('L2') == 'data', L1).otherwise(F.col('L2'))
)
updatedDF.show()
# +----------+---------------+# | L1| L2|# +----------+---------------+# | test|some other data|# |other test| other data|# +----------+---------------+
So your example would be:
DF = orig_df.withColumn("L1", pyspark_func.lit(L_1))
...
Also, please make sure you have an active spark sessionbefore this point
I hope this helps.
Edit: If L1, L2 etc are lists, then one option is to create a dataframe with them and join to the initial df. We'll need indexes for the join unfortunately and since your dataframe is quite big, I don't think this is a very performant solution. We could also use broadcasts and a udf or broadcasts and join.
Here's a (suboptimal I think) example of how to do the join:
L1 = ['row 1 L1', 'row 2 L1']
L2 = ['row 1 L2', 'row 2 L2']
# create a df with indexes
to_update_df = spark_session.createDataFrame([{"row_index": i, "L1": row[0], "L2": row[1]} for i, row inenumerate(zip(L1, L2))])
# add indexes to the initial df
indexed_df = updatedDF.rdd.zipWithIndex().toDF()
indexed_df.show()
# +--------------------+---+# | _1 | _2 |# +--------------------+---+# | [test, some other... | 0 |# | [other test, othe... | 1 |# +--------------------+---+# bring the df back to its initial form
indexed_df = indexed_df.withColumn('row_number', F.col("_2"))\
.withColumn('L1', F.col("_1").getItem('L1'))\
.withColumn('L2', F.col("_1").getItem('L2')).\
select('row_number', 'L1', 'L2')
indexed_df.show()
# +----------+----------+---------------+# |row_number| L1| L2|# +----------+----------+---------------+# | 0| test|some other data|# | 1|other test| other data|# +----------+----------+---------------+# join with your results and keep the updated columns
final_df = indexed_df.alias('initial_data').join(to_update_df.alias('other_data'), F.col('row_index')==F.col('row_number'), how='left')
final_df = final_df.select('initial_data.row_number', 'other_data.L1', 'other_data.L2')
final_df.show()
# +----------+--------+--------+# |row_number| L1| L2|# +----------+--------+--------+# | 0|row 1 L1|row 1 L2|# | 1|row 2 L1|row 2 L2|# +----------+--------+--------+
This ^ can definitely be better in terms of performance.
Post a Comment for "Pyspark Udf On Withcolumn To Replace Column"