Pyspark Convert Result Of Mappartitions To Spark Dataframe
Solution 1:
If you want to stay with rdd api. mapPartitions
accepts an iterator of a type and expects an iterator of another type as result. A pandas_df is not an iterator type mapPartitions
can deal with directly. If you must work with pandas api, you can just create a proper generator from pandas.iterrows
This way your overall mapPartitions
result will be a single rdd of your row type instead of an rdd of pandas dataframes. such rdd can be seamlessly converted into a dataframe back with on-the-fly schema discovery
from pyspark.sql import Row
defsome_fuction(iter):
pandas_df = some_pandas_result(iter)
for index, row in pandas_df.iterrows():
yield Row(id=index, foo=row['foo'], bar=row['bar'])
rdd = sp_df.repartition(n_partitions, partition_key).rdd.mapPartitions(lambda x: some_function(x))
df = spark.createDataFrame(rdd)
Solution 2:
You can use a new pandas grouped udf directly on the datframe instead of rdd.mapPartitions
. The function itself accepts a group as pandas df and returns pandas df.
When it is used together with a spark dataframe apply api , spark automatically combines the partioned pandas dataframes into a new spark dataframe.
# a grouped pandas_udf receives the whole group as a pandas dataframe# it must also return a pandas dataframe# the first schema string parameter must describe the return dataframe schema# in this example the result dataframe contains 2 columns id and value@pandas_udf("id long, value double", PandasUDFType.GROUPED_MAP)defsome_function(pdf):
return pdf.apply(some_pdf_func)
df.groupby(df.partition_key).apply(some_function).show()
Solution 3:
You can do :
sp = Nonedeff(x):
sp = spark.createDataFrame(x)
return (sp)
sp = sp.union(rdd.foreach(f))
Refer :
UPVOTE IF WORKS
Post a Comment for "Pyspark Convert Result Of Mappartitions To Spark Dataframe"