Skip to content Skip to sidebar Skip to footer

Pyspark Convert Result Of Mappartitions To Spark Dataframe

I have a job requires to run on a partitioned spark dataframe, and the process looks like: rdd = sp_df.repartition(n_partitions, partition_key).rdd.mapPartitions(lambda x: some_fun

Solution 1:

If you want to stay with rdd api. mapPartitions accepts an iterator of a type and expects an iterator of another type as result. A pandas_df is not an iterator type mapPartitions can deal with directly. If you must work with pandas api, you can just create a proper generator from pandas.iterrows

This way your overall mapPartitions result will be a single rdd of your row type instead of an rdd of pandas dataframes. such rdd can be seamlessly converted into a dataframe back with on-the-fly schema discovery

from pyspark.sql import Row

  pandas_df = some_pandas_result(iter)
  for index, row in pandas_df.iterrows():
     yield Row(id=index, foo=row['foo'], bar=row['bar'])

rdd = sp_df.repartition(n_partitions, partition_key).rdd.mapPartitions(lambda x: some_function(x))
df = spark.createDataFrame(rdd)

Solution 2:

You can use a new pandas grouped udf directly on the datframe instead of rdd.mapPartitions . The function itself accepts a group as pandas df and returns pandas df.

When it is used together with a spark dataframe apply api , spark automatically combines the partioned pandas dataframes into a new spark dataframe.

# a grouped pandas_udf receives the whole group as a pandas dataframe# it must also return a pandas dataframe# the first schema string parameter must describe the return dataframe schema# in this example the result dataframe contains 2 columns id and value@pandas_udf("id long, value double", PandasUDFType.GROUPED_MAP)defsome_function(pdf):
    return pdf.apply(some_pdf_func)


Solution 3:

You can do :

sp = Nonedeff(x):
 sp = spark.createDataFrame(x)
 return (sp)
sp = sp.union(rdd.foreach(f))

Refer :

Spark SQL DataFrame

Spark RDD


Post a Comment for "Pyspark Convert Result Of Mappartitions To Spark Dataframe"