728x90
반응형

쾅쾅 부딪히면서 배운 스파크 관련 내용을 그냥 실무에 적용만 해버리고 말면 다 까먹을 것 같아서 정리를 좀 해보려고 한다.

(근데 진짜 쾅쾅 부딪혔다는 점...ㅠ)


Spark 를 띄울 때 가장 기본적으로 설정해야 하는 요소인 Driver와 Executor 사이즈와 개수를 설정하는 방법과 관련 내용을 정리해 보자.

Spark job을 제출한 예시이다.

spark3-submit --master yarn --deploy-mode cluster --queue default --conf spark.dynamicAllocation.enabled=False --conf spark.sql.parquet.writeLegacyFormat=True --conf spark.sql.catalog.spark_catalog.type=hive --files hdfs://nameservice1/user/myConfig.conf --num-executors 3 --executor-cores 2 --executor-memory 4g --driver-memory 2G --name my_test_spark_job --class myApp.MyClass hdfs://nameservice1/user/my-jar.jar
  • executor 수를 정했고: --num-executor 3
  • executor 코어 수를 정했고: --executor-cores 2
  • executor 메모리 크기를 정했다: --executor-memory 4g
  • 추가로 driver 메모리 크기도 정했다: --driver-memory 2g

 

스파크 관련 기초 글은 아래를 참고하자!

2023.12.03 - [코딩해/Kafka, Spark, Data Engineering] - [Spark] 스파크 구조와 실행 과정 | 스파크 기초

 

[Spark] 스파크 구조와 실행 과정 | 스파크 기초

아주 기본적인 내용이지만 글로 정리해보려고 한다. 스파크 처음하는 사람들에게 조금이라도 도움이 될까 해서! 스파크는 Spark Application과 Cluster manager로 구성되어 있다. 그리고 Spark Application에

haonly.tistory.com


Executor에 관한 좀 더 깊은 내용을 알아보자

  • Executor는 캐싱과 실행을 위한 공간을 갖고 있는 JVM이다.
  • Executor와 driver 사이즈는 하나의 노드나 컨테이너에 할당된 자원보다 많은 메모리나 코어를 가질 수 없다.
  • Executor의 일부 공간은 스파크의 내부 메타 데이터와 사용자 자료구조를 위해 예약되어야 한다.(평균 약 25%) 이 공간은 spark.memory.fraction 설정으로 변경 가능하며, 기본값은 0.6으로 나머지 0.4는 캐싱에 쓰인다.
  • 하나의 partition이 여러 개의 executor에서 처리될 수 없다 --> 하나의 partition은 하나의 executor에서 처리

 

Executor 크기

Executor 크기는 어떻게 설정하면 좋을까?

executor의 수는 많게, 크기(core)를 작게 설정하면 

  1. OOM, spill 등이 생길 수 있다. -> 처리할 자원이 충분하지 않기 때문이다.
  2. 자원을 효율적으로 사용할 수 없다. -> 같은 노드 내 executor끼리 통신에도 비용이 필요하다.

executor의 수는 적게, 크기(core)를 크게 설정하면

  1. 너무 큰 executor는 힙 사이즈가 클수록 GC가 시작되는 시점을 지연시켜 Full GC로 인한 지연을 더욱 크게 만든다.
  2. executor당 많은 수의 코어를 쓰면 동시 스레드가 많아지면서 스레드를 다루는 HDFS의 제한으로 인해 성능이 더 떨어질 수도 있다. 

효율적인 세팅을 위해서

  • GPU 자원 기준으로 executor의 개수를 정하고,
  • executor당 메모리는 4GB 이상, executor당 core 수는 (1 < number of GPUs <= 5) 기준으로 설정한다면 일반적으로 적용될 수 있는 효율적인 세팅이라고 할 수 있다.

 

이상은 클러스터 환경에 따른 기본 설정을 고려해 볼 세팅 값이었고,

사용자가 제출한 애플리케이션의 규모, 성능 등을 고려하여 적절히 자원을 더 늘리거나 줄일 수 있어야 한다.

 

스파크 기본 shuffle partition은 200으로 설정되어 있다.(내 클러스터만 그럴 수도 있다...!)

나의 경우 위의 제출 기준(--num-executors 3 --executor-cores 2 --executor-memory 4g)로는 원하는 성능이 나오지 않았으며 memory spill이 많이 일어나 아래와 같이 변경 적용하였다.

--num-executors 3 --executor-cores 100 --executor-memory 18g

이때 partition은 300개로 유지될 수 있도록 해주었다.

 

 

다음 글로는 스파크 properties 정리(공식 문서), 파티션 개수, 크기 정하는 내용, 스파크 튜닝한 내용 등을 정리해 봐야겠다.

스파크 어려운데 너무 강력한 놈이라서 많이 배워야겠다.

728x90
반응형
728x90
반응형

아주 기본적인 내용이지만 글로 정리해보려고 한다.

스파크 처음하는 사람들에게 조금이라도 도움이 될까 해서!


스파크는 Spark Application과 Cluster manager로 구성되어 있다.

그리고 Spark Application에는 Driver와 Executor라는 두가지 JVM 프로세스가 포함되어 있다.

  • Driver: Driver는 SparkSession/SparkContext를 생성하고, Job 을 제출하고 task로 변환하고, worker 간의 task 실행을 조정하는 주요 프로세스이다.
  • Executor: Executor는 주로 특정 계산 작업을 수행하고 결과를 driver에게 반환하는 일을 담당한다.

Spark Application은 실제 일을 수행하며, Cluster manager는 Spark Application 사이에 자원을 중계해주는 역할을 담당한다.

Spark Application

<Spark The Definitive Guide> 참고

Spark Driver 와 Executor에 대해 더 자세히 살펴보자.

  • Spark Driver: 한 개의 노드에서 실행되며, 스파크 전체의 main() 함수를 실행한다. 어플리케이션 내 정보를 유지/관리한다. 사용자가 제출한 job을 task 단위로 변환하여 executor에게 전달한다.
  • Executor: 다수의 worker 노드에서 실행되는 프로세스로 spark driver가 할당한 작업(task)를 수행하여 결과를 반환한다. 또한 블록매니저를 통해 cache 하는 RDD를 저장ㅎ나다.

Cluster Manager(클러스터 매니저)

Cluster Manager 는 스파크와 붙이거나 뗄 수 있는 컴포넌트로, Spark Application의 리소스를 효율적으로 분배하는 역할을 담당한다.

Spark는 executor에 task를 할당하고 관리하기 위하여 Cluster manager에 의존한다.

Spark는 단지 cluster manager와 통신하며 할당 가능한 Executor를 전달받으며 clustor manager의 상세 동작을 알지는 못한다.

스파크3.0 기준으로 Cluster manager의 종류는

  • Spark StandAlone
  • Hadoop Yarn
  • apache mesos
  • kubernetes 등이 있다.

Spark Application 실행 과정(흐름)

Spark를 사용할 때의 대력적인 실행 흐름이다.

  1. 사용자가 spark-submit을 통해 애플리케이션을 제출한다.
  2. Spark driver가 main()을 싱행하여 SparkContext를 생성한다.
  3. SparkContext가 Cluster Manager와 연결된다.
  4. Spark Driver가 Cluster Manager로부터 Executor 실행을 위한 리소스를 요청한다.
  5. Spark Context는 작업 내용을 Task 단위로 분할하여 Executor에게 전달한다.
  6. executor들은 작업을 수행하고 결과를 저장하여 driver에게 제출한다.
  7. Driver 의 main()이 끝나거나 SparkContext.stop()이 호출된다면 Executor들은 중지되고 Cluster manager에 사용했던 자원을 반환한다.

기타 개념 정리

Deploy mode

cluster 사용시 driver 의 실행 위치를 지정한다.

  • Client mode: Driver가 Cluster 외부에 위차치할 때

  • Cluster mode: Driver가 Cluster 내부에 위치할 때

Job, Stage, Task

  • Job: stage의 집합. Application 에서 spark에 요청하는 일련의 작업
  • Stage: task의 집합
  • Task: 하나의 Executor에서 수행되는 최소 작업 단위
728x90
반응형
728x90
반응형

에러 메시지

Caused by: java.lang.AssertionError: assertion failed: Concurrent update to the commit log. Multiple streaming jobs detected for

 

원인

로그에 동시 업데이트 하기 때문에 발생

 

스파크 스트리밍에서 동일한 체크포인트 위치에 두 개 이상의 다른 스파크 작업이 업데이트 하려 할 때 발생한다.

이 때 스파크 설정의 checkpointLocation을 다른 위치로 사용하면 해결할 수 있다.

 

두 개의 다른 스파크 스트리밍 작업에서 checkpoint 위치를 다르게 한다.

      .option("checkpointLocation", checkpointPath1)
      .option("checkpointLocation", checkpointPath2)
728x90
반응형
728x90
반응형

본 글은 유데미 Fundamentals of Database Engineering 강의를 들으며 나만 알아볼 수 있게 휘갈긴 정리본임.

섹션2 - ACID -1) What is a Transaction?

쿼리의 모임 - 트랜잭션

하나의 unit으로 작동하는 것임

예) 돈을 송금한다면 1. 계좌에 돈이 있는지 확인(select), 2. 그 계좌에서 돈 빼기(update), 3. 새 계좌에 돈 넣기(update)

트랜잭션은 항상 일어난다.

 

BEGIN: 트랜잭션 시작

COMMIT: 모든 작업을 커밋(변화를 쓴다.)

  - 디스크에 하느냐 메모리에 하느냐 이런 장단점을 이해해야한다.

ROLLBACK: 모두 취소하는 것. 장애시 원상태로 돌려놔야함

  - 메모리에 저장하고 있었다면 쉽지만 디스크라면, 서버가 죽었는데 디스크라면 어떻게 롤백할 것인가? 이러한 고민을 해야함

 

섹션2 - ACID -2) Atomicity

섹션2 - ACID -3) Isolation

  • Dirty reads
  • Non-repeatable reads: 더티 기드의 경우에서 롤백이 아닌 커밋된 경우
  • Phantom reads: 존재하지 않았던 데이터가 다시 조회했을 때 존재하는 현상
  • Lost updates: 업데이트한 결과를 잃는다.

-> 읽기에서 발생할 수 있는 현상들

격리를 통해 막아야 함

1. Read uncommitted -> 빠르다, 격리하지 않는다

2. Read committed -> 일관성은 없음, 커밋된 데이터만 읽는다.

3. Repeatable read -> 트랜젝션이 진행중인 동안 값은 동일하게 유지

4. Snapshot -> 그 순간의 스냅샷 버전을 읽는다

5. Serializable -> 직렬화하여 거의 같은 값을 읽는다(?) 가장 엄격한 격리 수준

고립 수준은 모든 DBMS에서 다 다르다.

--> 결국 모든 격리는 그 순간 순간의 버전인 것

다양하게 격리 레벨들이 있으며 자세히는 천천히 공부해봐야겠음

 

섹션2 - ACID -4) Consistency

데이터의 일관성

사용자에 의해 정의되며 참조 무결성과 관련이 있다

원자성과 격리

읽기(read)의 일관성

어떤 값에 변화가 있을때 그 변화를 바로 알아아 하며, 작업 이후 전체 복제가 되어야 한다.

근데 일관성 유지를 위한 과도한 데이터 정규화는 성능과 조회 속도를 저하시킨다.

-> 복제와도 관련이 있음

 

섹션2 - ACID -4) Durability

단단해야함. 망가져도 다시 돌아와야함 -> 내구성: 트랜젝션 실행 후 서버가 다운되더라도 변화를 볼 수 있어야 한다.

느려질 수 있음

WAL, Asynchronous snapshot, AOF 를 통해 Durability 보장

WAL: 큰 데이터를 디스크에 쓰기에는 느릴 수 밖에 없음 -> 변경사항에 대해 세그먼트 로그를 디스크로 먼저 보낸다

OS Casche : OS 메모리에 저장하는 것은 일단 메모리에 적고 디스크에 한번에 나중에 적재해라. 라고 알려줌

근데, 디스크에 적기 전에 컴퓨터가 망가지면 OS 메모리에 있던 것은 날라가게 되는 것. 자주 있는 사고는 아니지만..

결국 하나의 트랜젝션 커밋까지의 내구성을 보장하는 것.

snapshot : 비동기 스냅샷. 데이터 백업/복제/복구 수행 가능

--> NoSQL에서 특히 매우 중요하다.

728x90
반응형
728x90
반응형

3부로 넘어왔다.

 

1부와 2부에서는 요청과 응답, 질의와 결과에 대한 내용을 주로 다뤘다.

시스템은 세가지 유형으로 구분 가능하다.

  • 서비스(온라인 시스템)
  • 일괄 처리 시스템(오프라인 시스템)
  • 스트림 처리 시스템(준실시간 시스템)

이번 장에서는 일괄처리 알고리즘인 맵리듀스를 알아보고 다른 일괄 처리 알고리즘과 프레임워크도 살펴볼 것이다.

 

단순 로그 분석

유닉스 셸(웹사이트에서 가장 인기 높은 페이지 5개 출력)

cat 명령어 좀 공부해야겠다..!

연쇄 명령 대 맞춤형 프로그램

유닉스 연쇄 명령 대신 같은 작업을 하는 간단한 프로그램을 작성할 수도 있다.

루비로는 이렇게 작성한다고 한다.

-> 유닉스 연쇄 파이프보다 간결하지는 않지만  더 읽기 쉬우며 뭘 선택하는지는 취향의 문제이다.

정렬 대 인메모리 집계

허용 메머리보다 작업 세트가 크다면 정렬 접근법을 사용하여 디스크를 효율적으로 사용하는 것이 좋다.

유닉스 철학

연쇄 명령을 사용해 쉽게 로그파일을 분석할 수 있었던 것은 유닉스의 핵심 설계 아이디어 중 하나였다.

유닉스에서 빌려올 수 있는 아이디어에는 무엇이 더 있을까

  • 유닉스 파이프: "다른 방법으로 데이터 처리가 필요할 때 정원 호스와 같이 여러 다른 프로그램을 연결하는 방법이 필요하다. 이것은 I/O 방식이기도 하다" -> 배관 공사와 비슷한 점에 착안해 파이프로 프로그램을 연결하는 아이디어이며 이것이 지금은 유닉스 철학의 일부가 됐다.

유닉스 철학

  • 각 프로그램이 한 가지 일만 하도록 작성하라. 새 작업을 하려면 기존 프로그램을 고쳐 새로은 "기능"을 추가해 프로그램을 복잡하게 만들기보다는 새로운 프로그램을 작성하라.
  • 모든 프로그램의 출력은 아직 알려지지 않은 다른 프로그램의 입력으로 쓰일 수 있다고 생각하라. 불필요한 정보로 출력이 너저분해서는 안된다. 입력 형식으로 엄격하게 열을 맞춘다거나 이진형태를 사용하지 마라. 대화형 입력을 고집하지 마라.
  • 소프트웨어를 빠르게 써볼 수 있게 설계하고 구축하라. 심지어 운영체제도 마찬가지다. 수 주 안에 끝내는 것이 이상적이다. 거슬리는 부분은 과감히 버리고 새로 구축하라.
  • 프로그래밍 작업을 줄이려면 미숙한 도움보단 도구를 사용하라. 도구를 빌드하기 위해 한참 둘러가야 하고 게다가 사용 후 바로 버린다고 할지라도 도구를 써라.

동일 인터페이스

특정 프로그램이 다른 어떤 프로그램과도 연결 가능하려면 프로그램 모두가 같은 입출력 인터페이스를 사용해야 한다는 의미.

로직과 연결의 분리

유닉스 도구의 다른 특징으로 표준 입력과 표준 출력을 사용한다는 점이 있다.

입력은 키보드, 출력은 화면으로 설정되어 있다.

파이프는 한 프로세스의 출력을 다른 프로세스의 입력과 연결한다. 이 때 중간 데이터를 디스크에 쓰지 않고 작은 인메모리 버퍼를 사용해 프로세스 간 데이터를 전송한다.

투명성과 실험

유닉스 도구가 성공적인 이유 중 하나는 진행 사항을 파악하기가 상당히 쉽기 때문이다.

단순하지만 놀라울 정도로 유용하다.

 


 

맵리듀스와 분산 파일 시스템

맵리듀스는 유닉스 도구와 마찬가지로 상당히 불친절하고 무차별 대입 방법이지만 대신 엄청나게 효율적인 도구다.

단일 맵리듀스 작업은 하나 이상의 입력을 받아 하나 이상의 출력을 만들어 낸다는 점에서 단일 유닉스 프로세스와 유사하다.

유닉스 도구는 stdin과 stdout을 입력과 출력으로 사용하는데 맵리듀스 작업은 분산 파일 시스템상의 파일을 입력과 출력으로 사용한다.

하둡 맵리듀스 구현에서는 HDFS라고 하는 파일 시스템을 사용한다.

HDFS는 비공유 원칙을 기반으로 하며 각 장비에서 실행되는 데몬 프로세스로 구성된다.

네임노드라고 부르는 중앙 서버가 있고 파일 블록들을 여러 장비에 복제한다.

맵리듀스 작업 실행하기

맵리듀스의 분산 실행

사용자 활동 이벤트 분석 예제

728x90
반응형
728x90
반응형
728x90
반응형

+ Recent posts