728x90
반응형

쾅쾅 부딪히면서 배운 스파크 관련 내용을 그냥 실무에 적용만 해버리고 말면 다 까먹을 것 같아서 정리를 좀 해보려고 한다.

(근데 진짜 쾅쾅 부딪혔다는 점...ㅠ)


Spark 를 띄울 때 가장 기본적으로 설정해야 하는 요소인 Driver와 Executor 사이즈와 개수를 설정하는 방법과 관련 내용을 정리해 보자.

Spark job을 제출한 예시이다.

spark3-submit --master yarn --deploy-mode cluster --queue default --conf spark.dynamicAllocation.enabled=False --conf spark.sql.parquet.writeLegacyFormat=True --conf spark.sql.catalog.spark_catalog.type=hive --files hdfs://nameservice1/user/myConfig.conf --num-executors 3 --executor-cores 2 --executor-memory 4g --driver-memory 2G --name my_test_spark_job --class myApp.MyClass hdfs://nameservice1/user/my-jar.jar
  • executor 수를 정했고: --num-executor 3
  • executor 코어 수를 정했고: --executor-cores 2
  • executor 메모리 크기를 정했다: --executor-memory 4g
  • 추가로 driver 메모리 크기도 정했다: --driver-memory 2g

 

스파크 관련 기초 글은 아래를 참고하자!

2023.12.03 - [코딩해/Kafka, Spark, Data Engineering] - [Spark] 스파크 구조와 실행 과정 | 스파크 기초

 

[Spark] 스파크 구조와 실행 과정 | 스파크 기초

아주 기본적인 내용이지만 글로 정리해보려고 한다. 스파크 처음하는 사람들에게 조금이라도 도움이 될까 해서! 스파크는 Spark Application과 Cluster manager로 구성되어 있다. 그리고 Spark Application에

haonly.tistory.com


Executor에 관한 좀 더 깊은 내용을 알아보자

  • Executor는 캐싱과 실행을 위한 공간을 갖고 있는 JVM이다.
  • Executor와 driver 사이즈는 하나의 노드나 컨테이너에 할당된 자원보다 많은 메모리나 코어를 가질 수 없다.
  • Executor의 일부 공간은 스파크의 내부 메타 데이터와 사용자 자료구조를 위해 예약되어야 한다.(평균 약 25%) 이 공간은 spark.memory.fraction 설정으로 변경 가능하며, 기본값은 0.6으로 나머지 0.4는 캐싱에 쓰인다.
  • 하나의 partition이 여러 개의 executor에서 처리될 수 없다 --> 하나의 partition은 하나의 executor에서 처리

 

Executor 크기

Executor 크기는 어떻게 설정하면 좋을까?

executor의 수는 많게, 크기(core)를 작게 설정하면 

  1. OOM, spill 등이 생길 수 있다. -> 처리할 자원이 충분하지 않기 때문이다.
  2. 자원을 효율적으로 사용할 수 없다. -> 같은 노드 내 executor끼리 통신에도 비용이 필요하다.

executor의 수는 적게, 크기(core)를 크게 설정하면

  1. 너무 큰 executor는 힙 사이즈가 클수록 GC가 시작되는 시점을 지연시켜 Full GC로 인한 지연을 더욱 크게 만든다.
  2. executor당 많은 수의 코어를 쓰면 동시 스레드가 많아지면서 스레드를 다루는 HDFS의 제한으로 인해 성능이 더 떨어질 수도 있다. 

효율적인 세팅을 위해서

  • GPU 자원 기준으로 executor의 개수를 정하고,
  • executor당 메모리는 4GB 이상, executor당 core 수는 (1 < number of GPUs <= 5) 기준으로 설정한다면 일반적으로 적용될 수 있는 효율적인 세팅이라고 할 수 있다.

 

이상은 클러스터 환경에 따른 기본 설정을 고려해 볼 세팅 값이었고,

사용자가 제출한 애플리케이션의 규모, 성능 등을 고려하여 적절히 자원을 더 늘리거나 줄일 수 있어야 한다.

 

스파크 기본 shuffle partition은 200으로 설정되어 있다.(내 클러스터만 그럴 수도 있다...!)

나의 경우 위의 제출 기준(--num-executors 3 --executor-cores 2 --executor-memory 4g)로는 원하는 성능이 나오지 않았으며 memory spill이 많이 일어나 아래와 같이 변경 적용하였다.

--num-executors 3 --executor-cores 100 --executor-memory 18g

이때 partition은 300개로 유지될 수 있도록 해주었다.

 

 

다음 글로는 스파크 properties 정리(공식 문서), 파티션 개수, 크기 정하는 내용, 스파크 튜닝한 내용 등을 정리해 봐야겠다.

스파크 어려운데 너무 강력한 놈이라서 많이 배워야겠다.

728x90
반응형
728x90
반응형

아주 기본적인 내용이지만 글로 정리해보려고 한다.

스파크 처음하는 사람들에게 조금이라도 도움이 될까 해서!


스파크는 Spark Application과 Cluster manager로 구성되어 있다.

그리고 Spark Application에는 Driver와 Executor라는 두가지 JVM 프로세스가 포함되어 있다.

  • Driver: Driver는 SparkSession/SparkContext를 생성하고, Job 을 제출하고 task로 변환하고, worker 간의 task 실행을 조정하는 주요 프로세스이다.
  • Executor: Executor는 주로 특정 계산 작업을 수행하고 결과를 driver에게 반환하는 일을 담당한다.

Spark Application은 실제 일을 수행하며, Cluster manager는 Spark Application 사이에 자원을 중계해주는 역할을 담당한다.

Spark Application

<Spark The Definitive Guide> 참고

Spark Driver 와 Executor에 대해 더 자세히 살펴보자.

  • Spark Driver: 한 개의 노드에서 실행되며, 스파크 전체의 main() 함수를 실행한다. 어플리케이션 내 정보를 유지/관리한다. 사용자가 제출한 job을 task 단위로 변환하여 executor에게 전달한다.
  • Executor: 다수의 worker 노드에서 실행되는 프로세스로 spark driver가 할당한 작업(task)를 수행하여 결과를 반환한다. 또한 블록매니저를 통해 cache 하는 RDD를 저장ㅎ나다.

Cluster Manager(클러스터 매니저)

Cluster Manager 는 스파크와 붙이거나 뗄 수 있는 컴포넌트로, Spark Application의 리소스를 효율적으로 분배하는 역할을 담당한다.

Spark는 executor에 task를 할당하고 관리하기 위하여 Cluster manager에 의존한다.

Spark는 단지 cluster manager와 통신하며 할당 가능한 Executor를 전달받으며 clustor manager의 상세 동작을 알지는 못한다.

스파크3.0 기준으로 Cluster manager의 종류는

  • Spark StandAlone
  • Hadoop Yarn
  • apache mesos
  • kubernetes 등이 있다.

Spark Application 실행 과정(흐름)

Spark를 사용할 때의 대력적인 실행 흐름이다.

  1. 사용자가 spark-submit을 통해 애플리케이션을 제출한다.
  2. Spark driver가 main()을 싱행하여 SparkContext를 생성한다.
  3. SparkContext가 Cluster Manager와 연결된다.
  4. Spark Driver가 Cluster Manager로부터 Executor 실행을 위한 리소스를 요청한다.
  5. Spark Context는 작업 내용을 Task 단위로 분할하여 Executor에게 전달한다.
  6. executor들은 작업을 수행하고 결과를 저장하여 driver에게 제출한다.
  7. Driver 의 main()이 끝나거나 SparkContext.stop()이 호출된다면 Executor들은 중지되고 Cluster manager에 사용했던 자원을 반환한다.

기타 개념 정리

Deploy mode

cluster 사용시 driver 의 실행 위치를 지정한다.

  • Client mode: Driver가 Cluster 외부에 위차치할 때

  • Cluster mode: Driver가 Cluster 내부에 위치할 때

Job, Stage, Task

  • Job: stage의 집합. Application 에서 spark에 요청하는 일련의 작업
  • Stage: task의 집합
  • Task: 하나의 Executor에서 수행되는 최소 작업 단위
728x90
반응형
728x90
반응형

에러 메시지

Caused by: java.lang.AssertionError: assertion failed: Concurrent update to the commit log. Multiple streaming jobs detected for

 

원인

로그에 동시 업데이트 하기 때문에 발생

 

스파크 스트리밍에서 동일한 체크포인트 위치에 두 개 이상의 다른 스파크 작업이 업데이트 하려 할 때 발생한다.

이 때 스파크 설정의 checkpointLocation을 다른 위치로 사용하면 해결할 수 있다.

 

두 개의 다른 스파크 스트리밍 작업에서 checkpoint 위치를 다르게 한다.

      .option("checkpointLocation", checkpointPath1)
      .option("checkpointLocation", checkpointPath2)
728x90
반응형
728x90
반응형

본 글은 유데미 Fundamentals of Database Engineering 강의를 들으며 나만 알아볼 수 있게 휘갈긴 정리본임.

섹션2 - ACID -1) What is a Transaction?

쿼리의 모임 - 트랜잭션

하나의 unit으로 작동하는 것임

예) 돈을 송금한다면 1. 계좌에 돈이 있는지 확인(select), 2. 그 계좌에서 돈 빼기(update), 3. 새 계좌에 돈 넣기(update)

트랜잭션은 항상 일어난다.

 

BEGIN: 트랜잭션 시작

COMMIT: 모든 작업을 커밋(변화를 쓴다.)

  - 디스크에 하느냐 메모리에 하느냐 이런 장단점을 이해해야한다.

ROLLBACK: 모두 취소하는 것. 장애시 원상태로 돌려놔야함

  - 메모리에 저장하고 있었다면 쉽지만 디스크라면, 서버가 죽었는데 디스크라면 어떻게 롤백할 것인가? 이러한 고민을 해야함

 

섹션2 - ACID -2) Atomicity

섹션2 - ACID -3) Isolation

  • Dirty reads
  • Non-repeatable reads: 더티 기드의 경우에서 롤백이 아닌 커밋된 경우
  • Phantom reads: 존재하지 않았던 데이터가 다시 조회했을 때 존재하는 현상
  • Lost updates: 업데이트한 결과를 잃는다.

-> 읽기에서 발생할 수 있는 현상들

격리를 통해 막아야 함

1. Read uncommitted -> 빠르다, 격리하지 않는다

2. Read committed -> 일관성은 없음, 커밋된 데이터만 읽는다.

3. Repeatable read -> 트랜젝션이 진행중인 동안 값은 동일하게 유지

4. Snapshot -> 그 순간의 스냅샷 버전을 읽는다

5. Serializable -> 직렬화하여 거의 같은 값을 읽는다(?) 가장 엄격한 격리 수준

고립 수준은 모든 DBMS에서 다 다르다.

--> 결국 모든 격리는 그 순간 순간의 버전인 것

다양하게 격리 레벨들이 있으며 자세히는 천천히 공부해봐야겠음

 

섹션2 - ACID -4) Consistency

데이터의 일관성

사용자에 의해 정의되며 참조 무결성과 관련이 있다

원자성과 격리

읽기(read)의 일관성

어떤 값에 변화가 있을때 그 변화를 바로 알아아 하며, 작업 이후 전체 복제가 되어야 한다.

근데 일관성 유지를 위한 과도한 데이터 정규화는 성능과 조회 속도를 저하시킨다.

-> 복제와도 관련이 있음

 

섹션2 - ACID -4) Durability

단단해야함. 망가져도 다시 돌아와야함 -> 내구성: 트랜젝션 실행 후 서버가 다운되더라도 변화를 볼 수 있어야 한다.

느려질 수 있음

WAL, Asynchronous snapshot, AOF 를 통해 Durability 보장

WAL: 큰 데이터를 디스크에 쓰기에는 느릴 수 밖에 없음 -> 변경사항에 대해 세그먼트 로그를 디스크로 먼저 보낸다

OS Casche : OS 메모리에 저장하는 것은 일단 메모리에 적고 디스크에 한번에 나중에 적재해라. 라고 알려줌

근데, 디스크에 적기 전에 컴퓨터가 망가지면 OS 메모리에 있던 것은 날라가게 되는 것. 자주 있는 사고는 아니지만..

결국 하나의 트랜젝션 커밋까지의 내구성을 보장하는 것.

snapshot : 비동기 스냅샷. 데이터 백업/복제/복구 수행 가능

--> NoSQL에서 특히 매우 중요하다.

728x90
반응형
728x90
반응형

기본서이긴 하지만 기본이 없는 나를 위한 정리! 

(진짜 진짜 뭐라도 공부 좀 하자 싶어서 ^^)

인터넷은 잘 동작하고 있어서 대부분의 사람들은 인공의 무언가라기보다 태평양 같은 천연자원으로 생각한다. 이런 규모의 기술이 이토록 오류가 없었던 적은 언제가 마지막일까?
- 앨런 케이, Dr. Dobbs 저널 인터뷰에서(2012)

-> 내가 든 생각은 누군가의 영혼이 갈려서..? ㅋㅋㅋ

 


데이터 중심 애플리케이션의 경우 CPU 성능은 애플리케이션을 제한하는 요소가 아니며, 더 큰 문제는 보통 데이터의 양, 데이터의 복잡도, 데이터의 변화속도다.

데이터 중심 애플리케이션은 

  • 데이터베이스
  • 캐시(읽기 속도 향상)
  • 검색 색인
  • 스트림 처리
  • 일괄 처리

를 필요로 한다.

그러나, 애플리케이션마다 요구사항이 다르기 때문에 애플리케이션을 만들 때 어떤 도구와 어떤 접근 방식이 수행 중인 작업에 가장 적합한지 생각해야 한다.

-> 신뢰성 / 확장성 / 유지보수성


데이터 시스템에 대한 생각

데이터 저장과 처리를 위한 여러 새로운 도구들이 모여 만드는 데이터 시스템에 대한 이해가 필요!

데이터 시스템에서의 좋은 API는 어떤 모습일까?

 

1. 신뢰성

하드웨어나 소프트웨어 결함, 심지어 휴먼에러 같은 역경에 직면하더라도 시스템은 지속적으로 올바르게 동작해야 한다.

-> 올바르게, 무언가 잘못되더라도 지속적으로 올바르게 동작함.

잘못될 수 있는 일: 결함

결함을 예측하고 대처할 수 있는 시스템: 내결함성, 탄력성(resilient)

사실 모든 결함을 막을 수 없기에(블랙홀이 지구와 지구상의 모든 서버를 삼켜버려도 웹 호스팅이 가능한 내결함성을 지닐 순 없기에...ㅋㅋ),

특정 유형의 결함 내성에 대해서만 이야기 하는 것이 타당하다.

결함과 장애는 다름

결함: 사양에서 벗어난 시스템의 한 구성 요소

장애: 사용자에게 필요한 서비스를 제공하지 못하고 시스템 전체가 멈춘 경우.

내결함성 시스템을 훈련시킴 -> 넷플릭스 카오스 몽키 예시

 

오류와 결함의 종류:

하드웨어 결함 / 소프트웨어 오류(신속한 해결책이 없음) / 인적 오류(롤백 필요, 테스트 추가, 모니터링 대책, 조기 교육(ㅜㅜ))

 

신뢰성은 단순한 것이 아니다. 일상적인 애플리케이션 조차도 안정적으로 작동해야 한다.

"중요하지 않은" 애플리케이션도 사용자에 대한 책임이 있어야 한다. 사진 애플리케이션에서 사진이 모두 사라진다면 어떻게 될 것인가? 백업을 복원하는 방법을 알고 있을까?

2. 확장성

시스템의 데이터 양, 트래픽 양, 복잡도가 증가하면서 이를 처리할 수 있는 적절한 방법이 있어야 한다.

시스템이 현재 안정적으로 동작한다고 해서 미래에도 안정적으로 동작한다는 보장은 없다. 

시스템은 전에 처리했던 양보다 더 많은 데이터를 처리하고 있을지도 모른다.

확장성은 증가한 부하에 대처하는 시스템 능력을 말하는데, 이때 고려한 질문은,

"시스템이 특정 방식으로 커지면 이에 대처하기 위한 선택은 무엇인가?", "추가 부하를 다루기 위해 계산 자원을 어떻게 투입할까?"라는 구체적인 용어이다.

부하 기술하기

부하 매개변수: 웹 서버의 초당 요청 수, 데이터베이스의 읽기 대 쓰기 비율, 활성 사용자 수, 등

트위터 예시: 

사용자는 팔로워에게 새로운 메시지를 게시할 수 있다(평균 초당 4.6k 요청, 피크일 때 초당 12k 요청 이상)
사용자는 팔로우한 사람이 작성한 트윗을 볼 수 있다(초당 300k 요청)

성능 기술하기

1. 부하 매개변수를 증가시키고 시스템 자원은 변경하지 않고 유지하면 시스템 성능은 어떻게 영향을 받을까?

2. 부하 매개변수를 증가시켰을 때 성능이 변하지 않고 유지되길 원한다면 자원을 얼마나 많이 늘려야 할까?

시스템 성능 면에서 일괄 처리 시스템은 처리량, 온라인 시스템은 응답시간이 중요한 성능 지표이다.

평균보다 여러가지 상황을 고려한 백분위 응답시간을 사용하는 것이 좋다.

사용자가 보통 얼마나 오랫동안 기다려야 하는지 알고 싶다면 중앙값이 좋은 지표다.(p50)

응답 시간 지연에 따라 매출에 영향을 주기도 하는 시스템이 있다는 것을 기억하자!

 

시스템의 확장성을 테스트하려고 인위적으로 부하를 생성하는 경우 부하 생성 클라이언트는 응답 시간과 독립적으로 요청을 지속적으로 보내야 한다. 만약 클라이언트가 다음 요청을 보내기 전에 이전 요청이 완료되길 기다리면 테스트에서 인위적으로 대기 시간을 실제보다 더 짧게 만들어 평가를 왜곡한다.

 

부하 대응 접근 방식

성능 측정을 위한 부하와 지표 매개변수를 확인했다.

부하 매개변수가 어느 정도 증가하더라도 좋은 성능을 유지하려면 어떻게 해야 할까?

흔히 아는 내용: 스케일 업 / 스케일 아웃

적절한 사양의 장비 몇 대가 다량의 낮은 사양 가상 장비보다 훨씬 간단하고 저렴함

일부 시스템은 탄력적이다. 컴퓨팅 자원을 자동으로 추가할 수 있다는 점.

그렇지 않은 시스템은 수동으로 확장해야 한다.(수동으로 확장하는 시스템이 더 간단하고 운영상 예상치 못한 일이 더 적다. -> 이해 안됨!)

다수의 장비에 stateless 서비스를 배포하는 일은 상당히 간단하지만,

단일 노드에 stateful 데이터 시스템을 분산 설치하는 일은 복잡하다.

그래서 대용량 데이터와 트래픽을 다루지 않는 사용 사례에도 분산 데이터 시스템이 향후 기본 아키텍처로 자리 잡을 가능성이 있다.

 

특정 애플리케이션에 적합한 확장성을 갖춘 아키텍처는 주요 동작이 무엇이고 잘하지 않는 동작이 무엇인지에 대한 가정을 바탕으로 구축하고 이 가정은 곧 부하 매개변수가 된다.

3. 유지보수성

시간이 지남에 따라 여러 다양한 사람들이 시스템 상에서 작업할 것이기 때문에 모든 사용자가 시스템 상에서 생산적으로 작업할 수 있게 해야 한다.

초기 개발 그 이후 지속해서 이어지는 유지보수에 소프트웨어 비용의 대부분이 들어간다.

레거시 시스템 유지보수 작업은 모두가 싫어하는 일이다(나도..)

그래서 이러한 유지보수 중 고통을 최소화하고 레거시 소프트웨어를 직접 만들지 않게끔 애초에 설계를 잘해야 한다. 

이러한 원칙으로는

  • 운용성: 운영팀이 시스템을 원활하게 운영할 수 있게 쉽게 만들어라.
  • 단순성: 시스템에서 복잡도를 최대한 제거해 새로운 엔지니어가 시스템을 이해하기 쉽게 만들어라.
  • 발전성: 엔지니어가 이후에 시스템을 쉽게 변경할 수 있게 하라. 그래야 요구사항 변경 같은 예기치 않은 사용 사례를 적용하기가 쉽다. 이 속성은 유연성/수정가능성/적응성으로 알려져 있다.

운용성 책임 작업 중 기억에 남는 작업:

  • 시스템 장애, 성능 저하 등의 문제의 원인을 추적
  • 예측 가능한 운영과 안정적인 서비스 환경을 유지하기 위한 절차 정의
  • 개인 인사 이동에도 시스템에 대한 조직의 지식을 보존함

단순성에서 기억에 남는 내용:

  • 변수 명명, 모듈 간 강한 커플링, 임시방편으로 문제를 해결한 사례, 복잡한 의존성 등등이 복잡도의 다양한 증상이다.

복잡도 때문에 시스템 유지보수가 어려울 때 예산과 일정이 초과되며 버그가 생길 위험이 더 크다.

시스템을 단순하게 ㅁ나든느 일이 반드시 기능을 줄인다는 의미는 아니다. 우발적 복잡도를 줄인다는 뜻일 수 있다.

추상화하면 우발적 복잡도를 제거할 수 있다.

발전성: 변화를 쉽게 만들기

시스템의 요구사항이 영원히 바뀌지 않을 가능성은 매우 적다.

(최근 진행한 업무에서 짠 스크립트는 버전 26까지 갔던 거 보면 사실인 듯하다 ㅋㅋ)

조직 프로세스 측면에서 애자일 작업 패턴은 변화에 적응하기 위한 프레임워크를 제공한다. 

애자일 커뮤니티에서는 자주 변화하는 환경에서 소프트웨어를 개발할 때 도움이 되는 기술 도구와 패턴을 개발하고 있다.


정리

데이터 중심 애플리케이션을 생각하는 기본적인 방법 몇 가지를 알아봤다.

애플리케이션이 유용하려면 충족되어야 할 요구사항(비기능적 요구사항, 기능적 요구사항) 중 신뢰성/유지보수성/확장성을 살펴봤다.

신뢰성: 결함이 발생해도 시스템이 올바르게 동작하게 만드는 것.

확장성: 부하가 증가해도 좋은 성능을 유지하기 위한 전략.

유지보수성: 본질은 시스템에서 작업한느 엔지니어와 운영팀의 삶을 개선. 좋은 추상화를 통한 복잡도를 줄이기.

 

안타깝게도 애플리케이션을 신뢰할 수 있고, 확장 가능하며 유지보수하기 쉽게 만들어주는 간단한 해결책은 없다. 

하지만 여러 애플리케이션에서 계속 재현되는 특정 패턴과 기술이 있다.

728x90
반응형
728x90
반응형

spark-sql이 데이터를 빠르게 search 하는데 효과적이긴 하지만 데이터가 너무 크거나 복잡한 연산을 필요로 할 때는 Spark RDD API를 사용하는 것이 더 나을 때가 있다.
스칼라로 코드를 작성할 수 있기 때문에 복잡한 연산을 표현하기에 더 직관적이고 적절하다.

combine 관련 연산 중 가장 많이 쓰이는 두 개의 메소드가 groupByKey와 reduceByKey이다.
groupByKey보다는 reduceByKey가 더 효율적으로 구현되어 있다.

그래서 공식 API 문서를 비롯한 여러 곳에서 groupByKey 사용을 권장하지 않고 있다.


이유는 map-side combine이 효율적인데 groupByKey는 map-side combine을 하지 않도록 되어있기 때문이다.


https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html
(groupByKey 보다 reduceByKey가 더 나은 이유)

728x90
반응형

+ Recent posts