728x90
반응형

여러가지 이유로 Spark DataFrame 의 row count 보다는 크기를 계산하여야 한다.

  • Broadcast join 이 적합한지 확인용
  • Executor 자원을 얼마나 할당할지 확인용
  • 데이터 크기를 비교하여 DataFrame 이 정상적으로 생긴 것인지 확인용
  • 다양하게...

disk 에 저장하지 않고도 Spark 에서 DataFrame 의 크기를 알려주는 API 가 있다.

import org.apache.spark.util.SizeEstimator
// dataframe 생성
SizeEstimator.estimate(df)

이런 방법으로 데이터 크기를 측정할 수 있다.

728x90
반응형
728x90
반응형

Spark Dataframe 에서 칼럼 사이즈 구하기: 스칼라 & 파이썬

1. Scala

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

// 스파크 세션 생성
val spark = SparkSession.builder()
  .appName("ArrayLengthExample")
  .getOrCreate()

// 예제 데이터프레임 생성
import spark.implicits._
val df = Seq(
  (Array(1.0, 2.0, 3.0)),
  (Array(4.0, 5.0)),
  (Array.empty[Double])
).toDF("double_array")

// double_array 배열의 길이를 계산
val dfWithArrayLength = df.withColumn(
  "array_length",
  size(col("double_array"))
)

// 결과 출력
dfWithArrayLength.show(truncate = false)

 

2. Python

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, size

# 스파크 세션 생성
spark = SparkSession.builder \
    .appName("ArrayLengthExample") \
    .getOrCreate()

# 예제 데이터프레임 생성
data = [
    ([1.0, 2.0, 3.0],),
    ([4.0, 5.0],),
    ([],)
]

df = spark.createDataFrame(data, ["double_array"])

# double_array 배열의 길이를 계산
df_with_array_length = df.withColumn(
    "array_length",
    size(col("double_array"))
)

# 결과 출력
df_with_array_length.show(truncate=False)
728x90
반응형
728x90
반응형

쾅쾅 부딪히면서 배운 스파크 관련 내용을 그냥 실무에 적용만 해버리고 말면 다 까먹을 것 같아서 정리를 좀 해보려고 한다.

(근데 진짜 쾅쾅 부딪혔다는 점...ㅠ)


Spark 를 띄울 때 가장 기본적으로 설정해야 하는 요소인 Driver와 Executor 사이즈와 개수를 설정하는 방법과 관련 내용을 정리해 보자.

Spark job을 제출한 예시이다.

spark3-submit --master yarn --deploy-mode cluster --queue default --conf spark.dynamicAllocation.enabled=False --conf spark.sql.parquet.writeLegacyFormat=True --conf spark.sql.catalog.spark_catalog.type=hive --files hdfs://nameservice1/user/myConfig.conf --num-executors 3 --executor-cores 2 --executor-memory 4g --driver-memory 2G --name my_test_spark_job --class myApp.MyClass hdfs://nameservice1/user/my-jar.jar
  • executor 수를 정했고: --num-executor 3
  • executor 코어 수를 정했고: --executor-cores 2
  • executor 메모리 크기를 정했다: --executor-memory 4g
  • 추가로 driver 메모리 크기도 정했다: --driver-memory 2g

 

스파크 관련 기초 글은 아래를 참고하자!

2023.12.03 - [코딩해/Kafka, Spark, Data Engineering] - [Spark] 스파크 구조와 실행 과정 | 스파크 기초

 

[Spark] 스파크 구조와 실행 과정 | 스파크 기초

아주 기본적인 내용이지만 글로 정리해보려고 한다. 스파크 처음하는 사람들에게 조금이라도 도움이 될까 해서! 스파크는 Spark Application과 Cluster manager로 구성되어 있다. 그리고 Spark Application에

haonly.tistory.com


Executor에 관한 좀 더 깊은 내용을 알아보자

  • Executor는 캐싱과 실행을 위한 공간을 갖고 있는 JVM이다.
  • Executor와 driver 사이즈는 하나의 노드나 컨테이너에 할당된 자원보다 많은 메모리나 코어를 가질 수 없다.
  • Executor의 일부 공간은 스파크의 내부 메타 데이터와 사용자 자료구조를 위해 예약되어야 한다.(평균 약 25%) 이 공간은 spark.memory.fraction 설정으로 변경 가능하며, 기본값은 0.6으로 나머지 0.4는 캐싱에 쓰인다.
  • 하나의 partition이 여러 개의 executor에서 처리될 수 없다 --> 하나의 partition은 하나의 executor에서 처리

 

Executor 크기

Executor 크기는 어떻게 설정하면 좋을까?

executor의 수는 많게, 크기(core)를 작게 설정하면 

  1. OOM, spill 등이 생길 수 있다. -> 처리할 자원이 충분하지 않기 때문이다.
  2. 자원을 효율적으로 사용할 수 없다. -> 같은 노드 내 executor끼리 통신에도 비용이 필요하다.

executor의 수는 적게, 크기(core)를 크게 설정하면

  1. 너무 큰 executor는 힙 사이즈가 클수록 GC가 시작되는 시점을 지연시켜 Full GC로 인한 지연을 더욱 크게 만든다.
  2. executor당 많은 수의 코어를 쓰면 동시 스레드가 많아지면서 스레드를 다루는 HDFS의 제한으로 인해 성능이 더 떨어질 수도 있다. 

효율적인 세팅을 위해서

  • GPU 자원 기준으로 executor의 개수를 정하고,
  • executor당 메모리는 4GB 이상, executor당 core 수는 (1 < number of GPUs <= 5) 기준으로 설정한다면 일반적으로 적용될 수 있는 효율적인 세팅이라고 할 수 있다.

 

이상은 클러스터 환경에 따른 기본 설정을 고려해 볼 세팅 값이었고,

사용자가 제출한 애플리케이션의 규모, 성능 등을 고려하여 적절히 자원을 더 늘리거나 줄일 수 있어야 한다.

 

스파크 기본 shuffle partition은 200으로 설정되어 있다.(내 클러스터만 그럴 수도 있다...!)

나의 경우 위의 제출 기준(--num-executors 3 --executor-cores 2 --executor-memory 4g)로는 원하는 성능이 나오지 않았으며 memory spill이 많이 일어나 아래와 같이 변경 적용하였다.

--num-executors 3 --executor-cores 100 --executor-memory 18g

이때 partition은 300개로 유지될 수 있도록 해주었다.

 

 

다음 글로는 스파크 properties 정리(공식 문서), 파티션 개수, 크기 정하는 내용, 스파크 튜닝한 내용 등을 정리해 봐야겠다.

스파크 어려운데 너무 강력한 놈이라서 많이 배워야겠다.

728x90
반응형
728x90
반응형

에러 메시지

Caused by: java.lang.AssertionError: assertion failed: Concurrent update to the commit log. Multiple streaming jobs detected for

 

원인

로그에 동시 업데이트 하기 때문에 발생

 

스파크 스트리밍에서 동일한 체크포인트 위치에 두 개 이상의 다른 스파크 작업이 업데이트 하려 할 때 발생한다.

이 때 스파크 설정의 checkpointLocation을 다른 위치로 사용하면 해결할 수 있다.

 

두 개의 다른 스파크 스트리밍 작업에서 checkpoint 위치를 다르게 한다.

      .option("checkpointLocation", checkpointPath1)
      .option("checkpointLocation", checkpointPath2)
728x90
반응형
728x90
반응형

에러

org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized
results of XXXX tasks (X.0 MB) is bigger than spark.driver.maxResultSize (X.0 MB)

 

원인

나와있는데로 rdd로 분산되어 있던 데이터가 spark job 을 통해 driver로 합쳐지면서 driver 최대 메모리 크기를 초과해서 발생한 에러이다.

 

해결

메모리 최대 크기 늘려주면 된다.

resource 설정을 하면 되는데 SparkConf를 통해서 하거나 conf 파일을 수정하거나 spark-shell 실행 시 매개변수를 통해 설정을 할 수 있다.

나는 spark-shell 실행 시 매개변수를 주었다.

spark-shell --conf spark.driver.maxResultSize=6G

 

끝.

어렵다 어려워

728x90
반응형

+ Recent posts