728x90
반응형

여러가지 이유로 Spark DataFrame 의 row count 보다는 크기를 계산하여야 한다.

  • Broadcast join 이 적합한지 확인용
  • Executor 자원을 얼마나 할당할지 확인용
  • 데이터 크기를 비교하여 DataFrame 이 정상적으로 생긴 것인지 확인용
  • 다양하게...

disk 에 저장하지 않고도 Spark 에서 DataFrame 의 크기를 알려주는 API 가 있다.

import org.apache.spark.util.SizeEstimator
// dataframe 생성
SizeEstimator.estimate(df)

이런 방법으로 데이터 크기를 측정할 수 있다.

728x90
반응형
728x90
반응형

Spark Dataframe 에서 칼럼 사이즈 구하기: 스칼라 & 파이썬

1. Scala

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

// 스파크 세션 생성
val spark = SparkSession.builder()
  .appName("ArrayLengthExample")
  .getOrCreate()

// 예제 데이터프레임 생성
import spark.implicits._
val df = Seq(
  (Array(1.0, 2.0, 3.0)),
  (Array(4.0, 5.0)),
  (Array.empty[Double])
).toDF("double_array")

// double_array 배열의 길이를 계산
val dfWithArrayLength = df.withColumn(
  "array_length",
  size(col("double_array"))
)

// 결과 출력
dfWithArrayLength.show(truncate = false)

 

2. Python

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, size

# 스파크 세션 생성
spark = SparkSession.builder \
    .appName("ArrayLengthExample") \
    .getOrCreate()

# 예제 데이터프레임 생성
data = [
    ([1.0, 2.0, 3.0],),
    ([4.0, 5.0],),
    ([],)
]

df = spark.createDataFrame(data, ["double_array"])

# double_array 배열의 길이를 계산
df_with_array_length = df.withColumn(
    "array_length",
    size(col("double_array"))
)

# 결과 출력
df_with_array_length.show(truncate=False)
728x90
반응형
728x90
반응형

하둡에 적재된 Iceberg 테이블에 각 칼럼들에 대한 description 을 추가하여 
칼럼 설명을 달 수 있다.


테이블을 생성하며 하는 것이 더 효율적이겠지만 칼럼이 추가되거나 comment 를 추가하지 못했었다면 
spark shell 명령어로도 추가할 수 있다.

spark.sql("ALTER TABLE my_table ALTER COLUMN my_col_1 COMMENT '1번 칼럼입니다';")

 

728x90
반응형
728x90
반응형

쾅쾅 부딪히면서 배운 스파크 관련 내용을 그냥 실무에 적용만 해버리고 말면 다 까먹을 것 같아서 정리를 좀 해보려고 한다.

(근데 진짜 쾅쾅 부딪혔다는 점...ㅠ)


Spark 를 띄울 때 가장 기본적으로 설정해야 하는 요소인 Driver와 Executor 사이즈와 개수를 설정하는 방법과 관련 내용을 정리해 보자.

Spark job을 제출한 예시이다.

spark3-submit --master yarn --deploy-mode cluster --queue default --conf spark.dynamicAllocation.enabled=False --conf spark.sql.parquet.writeLegacyFormat=True --conf spark.sql.catalog.spark_catalog.type=hive --files hdfs://nameservice1/user/myConfig.conf --num-executors 3 --executor-cores 2 --executor-memory 4g --driver-memory 2G --name my_test_spark_job --class myApp.MyClass hdfs://nameservice1/user/my-jar.jar
  • executor 수를 정했고: --num-executor 3
  • executor 코어 수를 정했고: --executor-cores 2
  • executor 메모리 크기를 정했다: --executor-memory 4g
  • 추가로 driver 메모리 크기도 정했다: --driver-memory 2g

 

스파크 관련 기초 글은 아래를 참고하자!

2023.12.03 - [코딩해/Kafka, Spark, Data Engineering] - [Spark] 스파크 구조와 실행 과정 | 스파크 기초

 

[Spark] 스파크 구조와 실행 과정 | 스파크 기초

아주 기본적인 내용이지만 글로 정리해보려고 한다. 스파크 처음하는 사람들에게 조금이라도 도움이 될까 해서! 스파크는 Spark Application과 Cluster manager로 구성되어 있다. 그리고 Spark Application에

haonly.tistory.com


Executor에 관한 좀 더 깊은 내용을 알아보자

  • Executor는 캐싱과 실행을 위한 공간을 갖고 있는 JVM이다.
  • Executor와 driver 사이즈는 하나의 노드나 컨테이너에 할당된 자원보다 많은 메모리나 코어를 가질 수 없다.
  • Executor의 일부 공간은 스파크의 내부 메타 데이터와 사용자 자료구조를 위해 예약되어야 한다.(평균 약 25%) 이 공간은 spark.memory.fraction 설정으로 변경 가능하며, 기본값은 0.6으로 나머지 0.4는 캐싱에 쓰인다.
  • 하나의 partition이 여러 개의 executor에서 처리될 수 없다 --> 하나의 partition은 하나의 executor에서 처리

 

Executor 크기

Executor 크기는 어떻게 설정하면 좋을까?

executor의 수는 많게, 크기(core)를 작게 설정하면 

  1. OOM, spill 등이 생길 수 있다. -> 처리할 자원이 충분하지 않기 때문이다.
  2. 자원을 효율적으로 사용할 수 없다. -> 같은 노드 내 executor끼리 통신에도 비용이 필요하다.

executor의 수는 적게, 크기(core)를 크게 설정하면

  1. 너무 큰 executor는 힙 사이즈가 클수록 GC가 시작되는 시점을 지연시켜 Full GC로 인한 지연을 더욱 크게 만든다.
  2. executor당 많은 수의 코어를 쓰면 동시 스레드가 많아지면서 스레드를 다루는 HDFS의 제한으로 인해 성능이 더 떨어질 수도 있다. 

효율적인 세팅을 위해서

  • GPU 자원 기준으로 executor의 개수를 정하고,
  • executor당 메모리는 4GB 이상, executor당 core 수는 (1 < number of GPUs <= 5) 기준으로 설정한다면 일반적으로 적용될 수 있는 효율적인 세팅이라고 할 수 있다.

 

이상은 클러스터 환경에 따른 기본 설정을 고려해 볼 세팅 값이었고,

사용자가 제출한 애플리케이션의 규모, 성능 등을 고려하여 적절히 자원을 더 늘리거나 줄일 수 있어야 한다.

 

스파크 기본 shuffle partition은 200으로 설정되어 있다.(내 클러스터만 그럴 수도 있다...!)

나의 경우 위의 제출 기준(--num-executors 3 --executor-cores 2 --executor-memory 4g)로는 원하는 성능이 나오지 않았으며 memory spill이 많이 일어나 아래와 같이 변경 적용하였다.

--num-executors 3 --executor-cores 100 --executor-memory 18g

이때 partition은 300개로 유지될 수 있도록 해주었다.

 

 

다음 글로는 스파크 properties 정리(공식 문서), 파티션 개수, 크기 정하는 내용, 스파크 튜닝한 내용 등을 정리해 봐야겠다.

스파크 어려운데 너무 강력한 놈이라서 많이 배워야겠다.

728x90
반응형
728x90
반응형

아주 기본적인 내용이지만 글로 정리해보려고 한다.

스파크 처음하는 사람들에게 조금이라도 도움이 될까 해서!


스파크는 Spark Application과 Cluster manager로 구성되어 있다.

그리고 Spark Application에는 Driver와 Executor라는 두가지 JVM 프로세스가 포함되어 있다.

  • Driver: Driver는 SparkSession/SparkContext를 생성하고, Job 을 제출하고 task로 변환하고, worker 간의 task 실행을 조정하는 주요 프로세스이다.
  • Executor: Executor는 주로 특정 계산 작업을 수행하고 결과를 driver에게 반환하는 일을 담당한다.

Spark Application은 실제 일을 수행하며, Cluster manager는 Spark Application 사이에 자원을 중계해주는 역할을 담당한다.

Spark Application

<Spark The Definitive Guide> 참고

Spark Driver 와 Executor에 대해 더 자세히 살펴보자.

  • Spark Driver: 한 개의 노드에서 실행되며, 스파크 전체의 main() 함수를 실행한다. 어플리케이션 내 정보를 유지/관리한다. 사용자가 제출한 job을 task 단위로 변환하여 executor에게 전달한다.
  • Executor: 다수의 worker 노드에서 실행되는 프로세스로 spark driver가 할당한 작업(task)를 수행하여 결과를 반환한다. 또한 블록매니저를 통해 cache 하는 RDD를 저장ㅎ나다.

Cluster Manager(클러스터 매니저)

Cluster Manager 는 스파크와 붙이거나 뗄 수 있는 컴포넌트로, Spark Application의 리소스를 효율적으로 분배하는 역할을 담당한다.

Spark는 executor에 task를 할당하고 관리하기 위하여 Cluster manager에 의존한다.

Spark는 단지 cluster manager와 통신하며 할당 가능한 Executor를 전달받으며 clustor manager의 상세 동작을 알지는 못한다.

스파크3.0 기준으로 Cluster manager의 종류는

  • Spark StandAlone
  • Hadoop Yarn
  • apache mesos
  • kubernetes 등이 있다.

Spark Application 실행 과정(흐름)

Spark를 사용할 때의 대력적인 실행 흐름이다.

  1. 사용자가 spark-submit을 통해 애플리케이션을 제출한다.
  2. Spark driver가 main()을 싱행하여 SparkContext를 생성한다.
  3. SparkContext가 Cluster Manager와 연결된다.
  4. Spark Driver가 Cluster Manager로부터 Executor 실행을 위한 리소스를 요청한다.
  5. Spark Context는 작업 내용을 Task 단위로 분할하여 Executor에게 전달한다.
  6. executor들은 작업을 수행하고 결과를 저장하여 driver에게 제출한다.
  7. Driver 의 main()이 끝나거나 SparkContext.stop()이 호출된다면 Executor들은 중지되고 Cluster manager에 사용했던 자원을 반환한다.

기타 개념 정리

Deploy mode

cluster 사용시 driver 의 실행 위치를 지정한다.

  • Client mode: Driver가 Cluster 외부에 위차치할 때

  • Cluster mode: Driver가 Cluster 내부에 위치할 때

Job, Stage, Task

  • Job: stage의 집합. Application 에서 spark에 요청하는 일련의 작업
  • Stage: task의 집합
  • Task: 하나의 Executor에서 수행되는 최소 작업 단위
728x90
반응형
728x90
반응형

에러 메시지

Caused by: java.lang.AssertionError: assertion failed: Concurrent update to the commit log. Multiple streaming jobs detected for

 

원인

로그에 동시 업데이트 하기 때문에 발생

 

스파크 스트리밍에서 동일한 체크포인트 위치에 두 개 이상의 다른 스파크 작업이 업데이트 하려 할 때 발생한다.

이 때 스파크 설정의 checkpointLocation을 다른 위치로 사용하면 해결할 수 있다.

 

두 개의 다른 스파크 스트리밍 작업에서 checkpoint 위치를 다르게 한다.

      .option("checkpointLocation", checkpointPath1)
      .option("checkpointLocation", checkpointPath2)
728x90
반응형

+ Recent posts