0

Применение UDF в GroupedData в PySpark (с работающим примером на Python)

15

У меня есть следующий код на Python, который работает локально с использованием DataFrame из библиотеки pandas:

df_result = pd.DataFrame(df
                          .groupby('A')
                          .apply(lambda x: myFunction(zip(x.B, x.C), x.name))

Я хотел бы запустить этот код в PySpark, но сталкиваюсь с проблемами при работе с объектом pyspark.sql.group.GroupedData.

Я попробовал следующее:

sparkDF
 .groupby('A')
 .agg(myFunction(zip('B', 'C'), 'A')) 

Но это приводит к ошибке:

KeyError: 'A'

Я предполагаю, что причина в том, что 'A' больше не является столбцом, и я не могу найти эквивалент для x.name.

Затем я попытался сделать так:

sparkDF
 .groupby('A')
 .map(lambda row: Row(myFunction(zip('B', 'C'), 'A'))) 
 .toDF()

Однако это вызывает следующую ошибку:

AttributeError: 'GroupedData' object has no attribute 'map'

Буду признателен за любые советы!

2 ответ(ов)

0

Вы можете реализовать аналогичную логику, как в pandas.groupby().apply, в PySpark, используя декоратор @pandas_udf, который является методом векторизации и работает быстрее, чем простой UDF.

Вот пример кода:

from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd

df3 = spark.createDataFrame([('a', 1, 0), ('a', -1, 42), ('b', 3, -1),
                             ('b', 10, -2)], ('key', 'value1', 'value2'))

from pyspark.sql.types import *

schema = StructType([StructField('key', StringType()),
                     StructField('avg_value1', DoubleType()),
                     StructField('avg_value2', DoubleType()),
                     StructField('sum_avg', DoubleType()),
                     StructField('sub_avg', DoubleType())])

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
    gr = df['key'].iloc[0]
    x = df.value1.mean()
    y = df.value2.mean()
    w = df.value1.mean() + df.value2.mean()
    z = df.value1.mean() - df.value2.mean()
    return pd.DataFrame([[gr] + [x] + [y] + [w] + [z]])

df3.groupby('key').apply(g).show()

Вы получите следующий результат:

+---+----------+----------+-------+-------+
|key|avg_value1|avg_value2|sum_avg|sub_avg|
+---+----------+----------+-------+-------+
|  b|       6.5|      -1.5|    5.0|    8.0|
|  a|       0.0|      21.0|   21.0|  -21.0|
+---+----------+----------+-------+-------+

Таким образом, вы можете выполнять дополнительные вычисления с другими полями в сгруппированных данных и добавлять их в DataFrame в формате списка.

0

В новой версии PySpark 3.0.0 была добавлена функция applyInPandas, которая позволяет выполнять операции с использованием pandas внутри групп. Это может быть особенно полезно, когда необходимо использовать более сложные агрегации или когда вам нужно взаимодействовать с DataFrame в формате pandas.

Вот пример использования applyInPandas. Мы создаем DataFrame и группируем его по столбцу id, а затем применяем функцию mean_func, которая вычисляет среднее значение столбца v для каждой группы.

df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], 
                            ("id", "v"))

def mean_func(key, pdf):
   # key - это кортеж из одного numpy.int64, который является значением
   # 'id' для текущей группы
   return pd.DataFrame([key + (pdf.v.mean(),)])

df.groupby('id').applyInPandas(mean_func, schema="id long, v double").show()

Результатом выполнения кода будет:

+---+---+
| id|  v|
+---+---+
|  1|1.5|
|  2|6.0|
+---+---+

Для получения дополнительной информации вы можете посетить официальную документацию PySpark.

Чтобы ответить на вопрос, пожалуйста, войдите или зарегистрируйтесь