source

스파크 DataFrame 열을 파이썬 목록으로 변환

manysource 2023. 6. 14. 21:55

스파크 DataFrame 열을 파이썬 목록으로 변환

저는 mvv와 count라는 두 개의 컬럼으로 데이터 프레임 작업을 합니다.

+---+-----+
|mvv|count|
+---+-----+
| 1 |  5  |
| 2 |  9  |
| 3 |  3  |
| 4 |  1  |

저는 mvv 값과 count 값이 포함된 두 개의 목록을 얻고 싶습니다.비슷한 것

mvv = [1,2,3,4]
count = [5,9,3,1]

그래서 저는 다음 코드를 시도했습니다.첫 번째 줄은 파이썬 행 목록을 반환해야 합니다.첫 번째 가치를 보고 싶었습니다.

mvv_list = mvv_count_df.select('mvv').collect()
firstvalue = mvv_list[0].getInt(0)

그러나 두 번째 줄에 오류 메시지가 표시됩니다.

특성 오류: 가져오기인트

보세요, 왜 당신이 하고 있는 이 방법이 효과가 없는지 보세요.먼저 유형에서 정수를 가져오려고 하면 수집 결과는 다음과 같습니다.

>>> mvv_list = mvv_count_df.select('mvv').collect()
>>> mvv_list[0]
Out: Row(mvv=1)

다음과 같은 것을 사용할 수 있습니다.

>>> firstvalue = mvv_list[0].mvv
Out: 1

다음과 같은 정보를 얻을 수 있습니다.mvv값. 배열의 모든 정보를 원하는 경우 다음과 같은 방법을 사용할 수 있습니다.

>>> mvv_array = [int(row.mvv) for row in mvv_list.collect()]
>>> mvv_array
Out: [1,2,3,4]

그러나 다른 열에서도 동일하게 시도하면 다음과 같은 결과를 얻을 수 있습니다.

>>> mvv_count = [int(row.count) for row in mvv_list.collect()]
Out: TypeError: int() argument must be a string or a number, not 'builtin_function_or_method'

이는 다음과 같은 이유로 발생합니다.count는 기본 제공 메서드입니다.열 이름은 다음과 같습니다.count이를 위한 해결 방법은 다음의 열 이름을 변경하는 것입니다.count로._count:

>>> mvv_list = mvv_list.selectExpr("mvv as mvv", "count as _count")
>>> mvv_count = [int(row._count) for row in mvv_list.collect()]

그러나 사전 구문을 사용하여 열에 액세스할 수 있으므로 이 해결 방법은 필요하지 않습니다.

>>> mvv_array = [int(row['mvv']) for row in mvv_list.collect()]
>>> mvv_count = [int(row['count']) for row in mvv_list.collect()]

그리고 그것은 마침내 작동할 것입니다!

라이너 하나를 따라가면 원하는 목록이 나옵니다.

mvv = mvv_count_df.select("mvv").rdd.flatMap(lambda x: x).collect()

그러면 모든 요소가 목록으로 표시됩니다.

mvv_list = list(
    mvv_count_df.select('mvv').toPandas()['mvv']
)

벤치마킹 분석을 해봤는데,list(mvv_count_df.select('mvv').toPandas()['mvv'])가장 빠른 방법입니다.저는 매우 놀랐습니다.

Spark 2.4.5를 사용하여 5노드 i3.x 대규모 클러스터(각 노드에는 30.5GB의 RAM과 4개의 코어가 있음)를 사용하여 10만/1억 행 데이터셋에 대해 다양한 접근 방식을 실행했습니다.데이터는 하나의 열로 20개의 신속하게 압축된 Parquet 파일에 고르게 분산되었습니다.

다음은 벤치마크 결과(실행 시간(초)입니다.

+-------------------------------------------------------------+---------+-------------+
|                          Code                               | 100,000 | 100,000,000 |
+-------------------------------------------------------------+---------+-------------+
| df.select("col_name").rdd.flatMap(lambda x: x).collect()    |     0.4 | 55.3        |
| list(df.select('col_name').toPandas()['col_name'])          |     0.4 | 17.5        |
| df.select('col_name').rdd.map(lambda row : row[0]).collect()|     0.9 | 69          |
| [row[0] for row in df.select('col_name').collect()]         |     1.0 | OOM         |
| [r[0] for r in mid_df.select('col_name').toLocalIterator()] |     1.2 | *           |
+-------------------------------------------------------------+---------+-------------+

* cancelled after 800 seconds

드라이버 노드에서 데이터를 수집할 때 따라야 하는 골든 규칙:

  • 다른 방법으로 문제를 해결해 보십시오.드라이버 노드로 데이터를 수집하는 것은 비용이 많이 들고 스파크 클러스터의 전원을 사용하지 않으므로 가능한 한 피해야 합니다.
  • 가능한 한 적은 수의 행을 수집합니다.데이터를 수집하기 전에 열을 집계, 중복제거, 필터링 및 잘라냅니다.가능한 한 적은 데이터를 드라이버 노드로 보냅니다.

toPandas 스파크 2.3에서 상당히 개선되었습니다.Spark 2.3 이전 버전을 사용하는 경우에는 최상의 방법이 아닐 수 있습니다.

자세한 내용/벤치마크 결과는 여기를 참조하십시오.

데이터에서 다음과 같은 벤치마크를 얻었습니다.

>>> data.select(col).rdd.flatMap(lambda x: x).collect()

0.52초

>>> [row[col] for row in data.collect()]

0.271초

>>> list(data.select(col).toPandas()[col])

0.427초

결과는 동일합니다.

다음 코드는 당신에게 도움이 될 것입니다.

mvv_count_df.select('mvv').rdd.map(lambda row : row[0]).collect()

은 ▁the를 사용하는 입니다.collect_list()에서 합니다.pyspark.sql.functions이렇게 하면 모든 열 값이 수집될 때 파이썬 목록으로 변환되는 pyspark 배열로 집계됩니다.

mvv_list   = df.select(collect_list("mvv")).collect()[0][0]
count_list = df.select(collect_list("count")).collect()[0][0] 

아래 오류가 발생하는 경우:

특성 오류: 'list' 개체에 'collect' 특성이 없습니다.

이 코드는 다음과 같은 문제를 해결합니다.

mvv_list = mvv_count_df.select('mvv').collect()

mvv_array = [int(i.mvv) for i in mvv_list]

문제의 데이터 프레임을 생성합니다.

df_test = spark.createDataFrame(
    [
        (1, 5),
        (2, 9),
        (3, 3),
        (4, 1),
    ],
    ['mvv', 'count']
)
df_test.show()

이것이 주는 것

+---+-----+
|mvv|count|
+---+-----+
|  1|    5|
|  2|    9|
|  3|    3|
|  4|    1|
+---+-----+

그런 다음 rdd.flatMap(f).collect()를 적용하여 목록을 가져옵니다.

test_list = df_test.select("mvv").rdd.flatMap(list).collect()
print(type(test_list))
print(test_list)

이는

<type 'list'>
[1, 2, 3, 4]

먼저 행 유형의 will return 목록으로 df를 수집할 수 있습니다.

row_list = df.select('mvv').collect()

목록으로 변환할 행을 반복합니다.

sno_id_array = [ int(row.mvv) for row in row_list]

sno_id_array 
[1,2,3,4]

플랫 맵 사용

sno_id_array = df.select("mvv").rdd.flatMap(lambda x: x).collect()

많은 답변에도 불구하고 일부 답변은 목록과 함께 사용해야 할 때 작동하지 않습니다.when그리고.isin명령을 실행합니다.값의 평평한 목록을 생성하는 가장 간단하면서도 효과적인 접근법은 목록 이해를 사용하는 것입니다.[0]이름을 : 다음과 같이 .

flatten_list_from_spark_df=[i[0] for i in df.select("your column").collect()]

다른 접근법은 팬더 데이터 프레임을 사용한 다음list기능은 하지만 이것만큼 편리하지 않고 효과적입니다.a

언급URL : https://stackoverflow.com/questions/38610559/convert-spark-dataframe-column-to-python-list