스파크 DataFrame 열을 파이썬 목록으로 변환
저는 mvv와 count라는 두 개의 컬럼으로 데이터 프레임 작업을 합니다.
+---+-----+
|mvv|count|
+---+-----+
| 1 | 5 |
| 2 | 9 |
| 3 | 3 |
| 4 | 1 |
저는 mvv 값과 count 값이 포함된 두 개의 목록을 얻고 싶습니다.비슷한 것
mvv = [1,2,3,4]
count = [5,9,3,1]
그래서 저는 다음 코드를 시도했습니다.첫 번째 줄은 파이썬 행 목록을 반환해야 합니다.첫 번째 가치를 보고 싶었습니다.
mvv_list = mvv_count_df.select('mvv').collect()
firstvalue = mvv_list[0].getInt(0)
그러나 두 번째 줄에 오류 메시지가 표시됩니다.
특성 오류: 가져오기인트
보세요, 왜 당신이 하고 있는 이 방법이 효과가 없는지 보세요.먼저 행 유형에서 정수를 가져오려고 하면 수집 결과는 다음과 같습니다.
>>> mvv_list = mvv_count_df.select('mvv').collect()
>>> mvv_list[0]
Out: Row(mvv=1)
다음과 같은 것을 사용할 수 있습니다.
>>> firstvalue = mvv_list[0].mvv
Out: 1
다음과 같은 정보를 얻을 수 있습니다.mvv
값. 배열의 모든 정보를 원하는 경우 다음과 같은 방법을 사용할 수 있습니다.
>>> mvv_array = [int(row.mvv) for row in mvv_list.collect()]
>>> mvv_array
Out: [1,2,3,4]
그러나 다른 열에서도 동일하게 시도하면 다음과 같은 결과를 얻을 수 있습니다.
>>> mvv_count = [int(row.count) for row in mvv_list.collect()]
Out: TypeError: int() argument must be a string or a number, not 'builtin_function_or_method'
이는 다음과 같은 이유로 발생합니다.count
는 기본 제공 메서드입니다.열 이름은 다음과 같습니다.count
이를 위한 해결 방법은 다음의 열 이름을 변경하는 것입니다.count
로._count
:
>>> mvv_list = mvv_list.selectExpr("mvv as mvv", "count as _count")
>>> mvv_count = [int(row._count) for row in mvv_list.collect()]
그러나 사전 구문을 사용하여 열에 액세스할 수 있으므로 이 해결 방법은 필요하지 않습니다.
>>> mvv_array = [int(row['mvv']) for row in mvv_list.collect()]
>>> mvv_count = [int(row['count']) for row in mvv_list.collect()]
그리고 그것은 마침내 작동할 것입니다!
라이너 하나를 따라가면 원하는 목록이 나옵니다.
mvv = mvv_count_df.select("mvv").rdd.flatMap(lambda x: x).collect()
그러면 모든 요소가 목록으로 표시됩니다.
mvv_list = list(
mvv_count_df.select('mvv').toPandas()['mvv']
)
벤치마킹 분석을 해봤는데,list(mvv_count_df.select('mvv').toPandas()['mvv'])
가장 빠른 방법입니다.저는 매우 놀랐습니다.
Spark 2.4.5를 사용하여 5노드 i3.x 대규모 클러스터(각 노드에는 30.5GB의 RAM과 4개의 코어가 있음)를 사용하여 10만/1억 행 데이터셋에 대해 다양한 접근 방식을 실행했습니다.데이터는 하나의 열로 20개의 신속하게 압축된 Parquet 파일에 고르게 분산되었습니다.
다음은 벤치마크 결과(실행 시간(초)입니다.
+-------------------------------------------------------------+---------+-------------+
| Code | 100,000 | 100,000,000 |
+-------------------------------------------------------------+---------+-------------+
| df.select("col_name").rdd.flatMap(lambda x: x).collect() | 0.4 | 55.3 |
| list(df.select('col_name').toPandas()['col_name']) | 0.4 | 17.5 |
| df.select('col_name').rdd.map(lambda row : row[0]).collect()| 0.9 | 69 |
| [row[0] for row in df.select('col_name').collect()] | 1.0 | OOM |
| [r[0] for r in mid_df.select('col_name').toLocalIterator()] | 1.2 | * |
+-------------------------------------------------------------+---------+-------------+
* cancelled after 800 seconds
드라이버 노드에서 데이터를 수집할 때 따라야 하는 골든 규칙:
- 다른 방법으로 문제를 해결해 보십시오.드라이버 노드로 데이터를 수집하는 것은 비용이 많이 들고 스파크 클러스터의 전원을 사용하지 않으므로 가능한 한 피해야 합니다.
- 가능한 한 적은 수의 행을 수집합니다.데이터를 수집하기 전에 열을 집계, 중복제거, 필터링 및 잘라냅니다.가능한 한 적은 데이터를 드라이버 노드로 보냅니다.
toPandas
스파크 2.3에서 상당히 개선되었습니다.Spark 2.3 이전 버전을 사용하는 경우에는 최상의 방법이 아닐 수 있습니다.
자세한 내용/벤치마크 결과는 여기를 참조하십시오.
데이터에서 다음과 같은 벤치마크를 얻었습니다.
>>> data.select(col).rdd.flatMap(lambda x: x).collect()
0.52초
>>> [row[col] for row in data.collect()]
0.271초
>>> list(data.select(col).toPandas()[col])
0.427초
결과는 동일합니다.
다음 코드는 당신에게 도움이 될 것입니다.
mvv_count_df.select('mvv').rdd.map(lambda row : row[0]).collect()
은 ▁the를 사용하는 입니다.collect_list()
에서 합니다.pyspark.sql.functions
이렇게 하면 모든 열 값이 수집될 때 파이썬 목록으로 변환되는 pyspark 배열로 집계됩니다.
mvv_list = df.select(collect_list("mvv")).collect()[0][0]
count_list = df.select(collect_list("count")).collect()[0][0]
아래 오류가 발생하는 경우:
특성 오류: 'list' 개체에 'collect' 특성이 없습니다.
이 코드는 다음과 같은 문제를 해결합니다.
mvv_list = mvv_count_df.select('mvv').collect()
mvv_array = [int(i.mvv) for i in mvv_list]
문제의 데이터 프레임을 생성합니다.
df_test = spark.createDataFrame(
[
(1, 5),
(2, 9),
(3, 3),
(4, 1),
],
['mvv', 'count']
)
df_test.show()
이것이 주는 것
+---+-----+
|mvv|count|
+---+-----+
| 1| 5|
| 2| 9|
| 3| 3|
| 4| 1|
+---+-----+
그런 다음 rdd.flatMap(f).collect()를 적용하여 목록을 가져옵니다.
test_list = df_test.select("mvv").rdd.flatMap(list).collect()
print(type(test_list))
print(test_list)
이는
<type 'list'>
[1, 2, 3, 4]
먼저 행 유형의 will return 목록으로 df를 수집할 수 있습니다.
row_list = df.select('mvv').collect()
목록으로 변환할 행을 반복합니다.
sno_id_array = [ int(row.mvv) for row in row_list]
sno_id_array
[1,2,3,4]
플랫 맵 사용
sno_id_array = df.select("mvv").rdd.flatMap(lambda x: x).collect()
많은 답변에도 불구하고 일부 답변은 목록과 함께 사용해야 할 때 작동하지 않습니다.when
그리고.isin
명령을 실행합니다.값의 평평한 목록을 생성하는 가장 간단하면서도 효과적인 접근법은 목록 이해를 사용하는 것입니다.[0]
이름을 : 다음과 같이 .
flatten_list_from_spark_df=[i[0] for i in df.select("your column").collect()]
다른 접근법은 팬더 데이터 프레임을 사용한 다음list
기능은 하지만 이것만큼 편리하지 않고 효과적입니다.a
언급URL : https://stackoverflow.com/questions/38610559/convert-spark-dataframe-column-to-python-list
'source' 카테고리의 다른 글
Firebase 스토리지 및 액세스 제어 - 오리진 허용 (0) | 2023.06.14 |
---|---|
사용자 지정 헤더 값을 추출하는 방법은 무엇입니까? (0) | 2023.06.14 |
Fragments의 인스턴스 상태를 백스택에 올바르게 저장하는 방법은 무엇입니까? (0) | 2023.06.09 |
Android 런타임: 치명적 예외: Androidmapsapi-ZoomTableManager (0) | 2023.06.09 |
C,C++의 메모리 누수, 무료, 삭제를 잊었습니다. (0) | 2023.06.09 |