Применение UDF в GroupedData в PySpark (с работающим примером на Python)
У меня есть следующий код на Python, который работает локально с использованием DataFrame из библиотеки pandas:
df_result = pd.DataFrame(df
.groupby('A')
.apply(lambda x: myFunction(zip(x.B, x.C), x.name))
Я хотел бы запустить этот код в PySpark, но сталкиваюсь с проблемами при работе с объектом pyspark.sql.group.GroupedData.
Я попробовал следующее:
sparkDF
.groupby('A')
.agg(myFunction(zip('B', 'C'), 'A'))
Но это приводит к ошибке:
KeyError: 'A'
Я предполагаю, что причина в том, что 'A' больше не является столбцом, и я не могу найти эквивалент для x.name.
Затем я попытался сделать так:
sparkDF
.groupby('A')
.map(lambda row: Row(myFunction(zip('B', 'C'), 'A')))
.toDF()
Однако это вызывает следующую ошибку:
AttributeError: 'GroupedData' object has no attribute 'map'
Буду признателен за любые советы!
2 ответ(ов)
Вы можете реализовать аналогичную логику, как в pandas.groupby().apply
, в PySpark, используя декоратор @pandas_udf
, который является методом векторизации и работает быстрее, чем простой UDF.
Вот пример кода:
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd
df3 = spark.createDataFrame([('a', 1, 0), ('a', -1, 42), ('b', 3, -1),
('b', 10, -2)], ('key', 'value1', 'value2'))
from pyspark.sql.types import *
schema = StructType([StructField('key', StringType()),
StructField('avg_value1', DoubleType()),
StructField('avg_value2', DoubleType()),
StructField('sum_avg', DoubleType()),
StructField('sub_avg', DoubleType())])
@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
gr = df['key'].iloc[0]
x = df.value1.mean()
y = df.value2.mean()
w = df.value1.mean() + df.value2.mean()
z = df.value1.mean() - df.value2.mean()
return pd.DataFrame([[gr] + [x] + [y] + [w] + [z]])
df3.groupby('key').apply(g).show()
Вы получите следующий результат:
+---+----------+----------+-------+-------+
|key|avg_value1|avg_value2|sum_avg|sub_avg|
+---+----------+----------+-------+-------+
| b| 6.5| -1.5| 5.0| 8.0|
| a| 0.0| 21.0| 21.0| -21.0|
+---+----------+----------+-------+-------+
Таким образом, вы можете выполнять дополнительные вычисления с другими полями в сгруппированных данных и добавлять их в DataFrame в формате списка.
В новой версии PySpark 3.0.0 была добавлена функция applyInPandas
, которая позволяет выполнять операции с использованием pandas внутри групп. Это может быть особенно полезно, когда необходимо использовать более сложные агрегации или когда вам нужно взаимодействовать с DataFrame в формате pandas.
Вот пример использования applyInPandas
. Мы создаем DataFrame и группируем его по столбцу id
, а затем применяем функцию mean_func
, которая вычисляет среднее значение столбца v
для каждой группы.
df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def mean_func(key, pdf):
# key - это кортеж из одного numpy.int64, который является значением
# 'id' для текущей группы
return pd.DataFrame([key + (pdf.v.mean(),)])
df.groupby('id').applyInPandas(mean_func, schema="id long, v double").show()
Результатом выполнения кода будет:
+---+---+
| id| v|
+---+---+
| 1|1.5|
| 2|6.0|
+---+---+
Для получения дополнительной информации вы можете посетить официальную документацию PySpark.
Псевдонимы столбцов после groupBy в PySpark
Как посчитать количество каждой уникальной значения в DataFrame PySpark?
Как изменить порядок столбцов в DataFrame?
'pip' не распознан как командa внутреннего или внешнего формата
Почему statistics.mean() работает так медленно?