paint-brush
ClickHouse에서 Apache Doris로 Tencent Music 전환~에 의해@junzhang
2,888 판독값
2,888 판독값

ClickHouse에서 Apache Doris로 Tencent Music 전환

~에 의해 Jun Zhang13m2023/03/14
Read on Terminal Reader
Read this story w/o Javascript

너무 오래; 읽다

Tencent Music은 월간 활성 사용자가 8억 명에 달하는 음악 스트리밍 서비스 제공업체입니다. 음악 라이브러리에는 녹음된 음악, 라이브 음악, 오디오, 비디오 등 모든 형식과 유형의 데이터가 포함되어 있습니다. 우리는 대부분의 데이터를 Tencent Data Warehouse(TDW)에 저장하고 처리했습니다. ClickHouse는 플랫 테이블을 다루는 데 탁월했지만, 모든 데이터를 플랫 테이블에 붓고 일별로 분할하는 것은 스토리지 리소스의 엄청난 낭비입니다. Apache Doris가 솔루션이었습니다.

People Mentioned

Mention Thumbnail
featured image - ClickHouse에서 Apache Doris로 Tencent Music 전환
Jun Zhang HackerNoon profile picture
0-item

이 글은 나와 내 동료 Kai Dai가 공동으로 작성했습니다. 우리는 월간 활성 사용자가 무려 8억 명에 달하는 음악 스트리밍 서비스 제공업체인 Tencent Music(NYSE: TME)의 데이터 플랫폼 엔지니어입니다. 여기서 숫자를 빼는 것은 자랑이 아니라 나와 불쌍한 동료들이 매일 처리해야 하는 데이터의 바다를 암시하는 것입니다.


ClickHouse의 용도

Tencent Music의 음악 라이브러리에는 녹음된 음악, 라이브 음악, 오디오, 비디오 등 모든 형식과 유형의 데이터가 포함되어 있습니다. 데이터 플랫폼 엔지니어로서 우리의 임무는 팀원이 더 나은 결정을 내릴 수 있도록 데이터에서 정보를 추출하는 것입니다. 사용자와 음악 파트너를 지원합니다.


구체적으로, 우리는 노래, 가사, 멜로디, 앨범 및 아티스트에 대한 종합적인 분석을 수행하고 이 모든 정보를 데이터 자산으로 변환한 후 재고 계산, 사용자 프로파일링, 지표 분석 및 그룹 타겟팅을 위해 내부 데이터 사용자에게 전달합니다. .



우리는 대부분의 데이터를 다양한 태그 및 메트릭 시스템에 입력한 다음 각 개체(노래, 아티스트 등)를 중심으로 하는 평면 테이블을 생성하는 오프라인 데이터 플랫폼인 Tencent Data Warehouse(TDW)에 저장하고 처리했습니다.


그런 다음 분석을 위해 플랫 테이블을 ClickHouse로 가져왔고 데이터 검색 및 그룹 타겟팅을 위해 Elasticsearch로 가져왔습니다.


그 후, 우리의 데이터 분석가는 다양한 사용 시나리오에 대한 데이터세트를 구성하는 데 필요한 태그 및 측정항목 아래의 데이터를 사용했으며, 그 동안 자체 태그와 측정항목을 생성할 수 있었습니다.


데이터 처리 파이프라인은 다음과 같습니다.



ClickHouse의 문제점


위 파이프라인으로 작업할 때 몇 가지 어려움에 직면했습니다.


  1. 부분 업데이트 : 열의 부분 업데이트가 지원되지 않았습니다. 따라서 데이터 소스 중 하나에서 발생하는 대기 시간으로 인해 플랫 테이블 생성이 지연되어 데이터 적시성이 훼손될 수 있습니다.


  2. 높은 저장 비용 : 다양한 태그와 측정항목의 데이터가 다양한 빈도로 업데이트되었습니다. 클릭하우스가 플랫 테이블 처리에 뛰어난 만큼, 모든 데이터를 플랫 테이블에 쏟아 붓고 하루 단위로 파티션을 나누는 것은 엄청난 저장 자원 낭비였고 그에 따르는 유지 관리 비용도 적지 않았습니다.


  3. 높은 유지 관리 비용 : 구조적으로 말하면 ClickHouse는 스토리지 노드와 컴퓨팅 노드의 강력한 결합이 특징입니다. 해당 구성 요소는 상호 의존성이 커서 클러스터가 불안정해질 위험이 가중되었습니다. 게다가 ClickHouse와 Elasticsearch 전반의 통합 쿼리의 경우 엄청난 양의 연결 문제를 처리해야 했습니다. 정말 지루했어요.


아파치 도리스로 전환

실시간 분석 데이터베이스인 Apache Doris는 문제를 해결하는 데 꼭 필요한 몇 가지 기능을 자랑합니다.


  1. 부분 업데이트 : Doris는 다양한 데이터 모델을 지원하며, 그 중 Aggregate Model은 열의 실시간 부분 업데이트를 지원합니다. 이를 토대로 원시 데이터를 Doris에 직접 수집하고 거기에 플랫 테이블을 만들 수 있습니다. 수집은 다음과 같이 진행됩니다. 먼저 Spark를 사용하여 Kafka에 데이터를 로드합니다. 그런 다음 모든 증분 데이터는 Flink를 통해 Doris 및 Elasticsearch로 업데이트됩니다. 한편, Flink는 Doris와 Elasticsearch의 부담을 덜어주기 위해 데이터를 사전 집계할 예정입니다.


  2. 스토리지 비용 : Doris는 Hive, Iceberg, Hudi, MySQL 및 Elasticsearch 전반에 걸쳐 다중 테이블 조인 쿼리와 연합 쿼리를 지원합니다. 이를 통해 대규모 플랫 테이블을 더 작은 테이블로 분할하고 업데이트 빈도별로 분할할 수 있습니다. 이렇게 하면 스토리지 부담이 줄어들고 쿼리 처리량이 늘어나는 이점이 있습니다.


  3. 유지 관리 비용 : Doris는 간단한 아키텍처로 MySQL 프로토콜과 호환됩니다. Doris 배포에는 다른 시스템에 종속되지 않고 두 가지 프로세스(FE 및 BE)만 포함되므로 운영 및 유지 관리가 쉽습니다. 또한 Doris는 외부 ES 데이터 테이블 쿼리를 지원합니다. ES의 메타데이터와 쉽게 인터페이스하고 ES의 테이블 스키마를 자동으로 매핑할 수 있으므로 복잡한 연결 문제 없이 Doris를 통해 Elasticsearch 데이터에 대한 쿼리를 수행할 수 있습니다.


또한 Doris는 HDFS 및 S3와 같은 원격 스토리지에서 일괄 가져오기, MySQL binlog 및 Kafka에서 데이터 읽기, 실시간 데이터 동기화 또는 MySQL, Oracle 및 PostgreSQL에서 일괄 가져오기를 포함한 다양한 데이터 수집 방법을 지원합니다. 일관성 프로토콜을 통해 서비스 가용성과 데이터 신뢰성을 보장하며 자동 디버깅이 가능합니다. 이는 운영자와 유지관리자에게 좋은 소식입니다.


통계적으로 말하면, 이러한 기능으로 인해 스토리지 비용이 42%, 개발 비용이 40% 절감되었습니다.


Doris를 사용하는 동안 우리는 오픈 소스 Apache Doris 커뮤니티로부터 많은 지원을 받았으며 현재 Apache Doris의 상용 버전을 실행 중인 SelectDB 팀으로부터 적시에 도움을 받았습니다.



우리의 요구 사항을 충족하기 위한 추가 개선

의미 계층 도입

데이터 세트에 대해 말하자면, 우리 데이터 분석가는 자신의 편의에 따라 태그와 지표를 자유롭게 재정의하고 결합할 수 있습니다. 그러나 어두운 측면에서는 태그 및 미터법 시스템의 높은 이질성으로 인해 사용 및 관리가 더 어려워집니다.


우리의 솔루션은 데이터 처리 파이프라인에 의미론적 계층을 도입하는 것입니다. 의미 계층은 모든 기술 용어가 내부 데이터 사용자를 위해 보다 이해하기 쉬운 개념으로 번역되는 곳입니다. 즉, 우리는 태그와 지표를 데이터 정의 및 관리를 위한 일류 시민으로 전환하고 있습니다.



이것이 왜 도움이 될까요?


데이터 분석가의 경우 모든 태그와 지표가 의미 계층에서 생성 및 공유되므로 혼란이 줄어들고 효율성이 높아집니다.


데이터 사용자의 경우 더 이상 자체 데이터 세트를 만들거나 각 시나리오에 어떤 것이 적용 가능한지 알아낼 필요가 없으며 지정된 태그 세트 및 메트릭 세트에 대해 쿼리를 수행하기만 하면 됩니다.

의미 계층 업그레이드

의미 계층에서 태그와 지표를 명시적으로 정의하는 것만으로는 충분하지 않았습니다. 표준화된 데이터 처리 시스템을 구축하기 위한 다음 목표는 전체 데이터 처리 파이프라인에 걸쳐 태그와 측정항목의 일관된 정의를 보장하는 것이었습니다.


이를 위해 우리는 의미론적 계층을 데이터 관리 시스템의 핵심으로 만들었습니다.



어떻게 작동하나요?


TDW의 모든 컴퓨팅 로직은 단일 태그 또는 메트릭의 형태로 의미 계층에서 정의됩니다.


의미 계층은 애플리케이션 측으로부터 논리 쿼리를 수신하고 그에 따라 엔진을 선택하며 SQL을 생성합니다. 그런 다음 실행을 위해 SQL 명령을 TDW로 보냅니다. 한편 구성 및 데이터 수집 작업을 Doris에 보내고 어떤 측정항목과 태그를 가속화해야 하는지 결정할 수도 있습니다.


이러한 방식으로 우리는 태그와 지표를 더 쉽게 관리할 수 있게 만들었습니다. 문제는 각 태그와 지표가 개별적으로 정의되기 때문에 쿼리에 대한 유효한 SQL 문 생성을 자동화하는 데 어려움을 겪고 있다는 것입니다. 이에 대해 어떤 생각이 있으시면 언제든지 저희에게 문의해 주시기 바랍니다.


Apache Doris에 전체 플레이 제공


보시다시피 Apache Doris는 우리 솔루션에서 중추적인 역할을 했습니다. Doris의 사용을 최적화하면 전반적인 데이터 처리 효율성이 크게 향상될 수 있습니다. 따라서 이 부분에서는 데이터 수집 및 쿼리를 가속화하고 비용을 절감하기 위해 Doris와 함께 수행하는 작업을 여러분과 공유할 것입니다.


우리가 원하는 것?


현재 TDW의 80개 이상의 소스 테이블에서 파생된 800개 이상의 태그와 1300개 이상의 측정항목이 있습니다. TDW에서 Doris로 데이터를 가져올 때 우리는 다음을 달성하기를 희망합니다.


  • 실시간 가용성 : 기존의 T+1 오프라인 데이터 수집 외에도 실시간 태그 지정이 필요합니다.


  • 부분 업데이트 : 각 소스 테이블은 자체 ETL 작업을 통해 다양한 속도로 데이터를 생성하고 태그 및 지표의 일부만 포함하므로 열의 부분 업데이트에 대한 지원이 필요합니다.


  • 고성능 : 그룹 타겟팅, 분석 및 보고 시나리오에서 단 몇 초의 응답 시간이 필요합니다.


  • 저렴한 비용 : 최대한 비용을 절감하고자 합니다.


우리는 무엇을 하는가?


  1. TDW 대신 Flink에서 플랫 테이블 생성



TDW에서 플랫 테이블을 생성하면 몇 가지 단점이 있습니다.


  • 높은 스토리지 비용 : TDW는 80개 이상의 개별 소스 테이블과 별도로 추가 플랫 테이블을 유지해야 합니다. 엄청난 중복입니다.


  • 낮은 실시간성 : 소스 테이블의 지연이 증가하고 전체 데이터 링크가 지연됩니다.


  • 높은 개발 비용 : 실시간성을 달성하려면 추가 개발 노력과 리소스가 필요합니다.

반대로 Doris에서 플랫 테이블을 생성하는 것이 훨씬 쉽고 비용도 저렴합니다. 프로세스는 다음과 같습니다.


  • Spark를 사용하여 오프라인 방식으로 새 데이터를 Kafka로 가져옵니다.
  • Flink를 사용하여 Kafka 데이터를 사용하세요.
  • 기본 키 ID를 통해 플랫 테이블을 만듭니다.
  • 플랫 테이블을 Doris로 가져옵니다. 아래 그림과 같이 Flink는 "ID"=1인 5줄의 데이터를 Doris에서 한 줄로 집계하여 Doris의 데이터 쓰기 부담을 줄였습니다.


TDW는 더 이상 두 개의 데이터 복사본을 유지할 필요가 없고 KafKa는 수집을 위해 보류 중인 새 데이터만 저장하면 되므로 스토리지 비용을 크게 줄일 수 있습니다. 또한 Flink에 원하는 ETL 로직을 추가하고 오프라인 및 실시간 데이터 수집을 위해 많은 개발 로직을 재사용할 수 있습니다.


  1. 열 이름을 현명하게 지정


앞서 언급했듯이 Doris의 집계 모델은 열의 부분 업데이트를 허용합니다. 여기서는 참고용으로 Doris의 다른 데이터 모델에 대한 간단한 소개를 제공합니다.


고유 모델 : 기본 키 고유성이 필요한 시나리오에 적용 가능합니다. 동일한 기본 키 ID의 최신 데이터만 유지합니다. (우리가 아는 한, Apache Doris 커뮤니티는 고유 모델의 열 부분 업데이트도 포함할 계획입니다.)


Duplicate Model : 이 모델은 사전 집계나 중복 제거 없이 모든 원본 데이터를 그대로 저장합니다.


데이터 모델을 결정한 후에는 열 이름을 어떻게 지정할지 고민해야 했습니다. 다음과 같은 이유로 태그나 지표를 열 이름으로 사용하는 것은 선택 사항이 아닙니다.


Ⅰ. 내부 데이터 사용자는 측정항목이나 태그의 이름을 바꿔야 할 수도 있지만 Doris 1.1.3은 열 이름 수정을 지원하지 않습니다.


Ⅱ. 태그는 자주 온라인과 오프라인으로 처리될 수 있습니다. 여기에 열을 추가하고 삭제하는 작업이 포함되면 시간이 많이 걸릴 뿐만 아니라 쿼리 성능에도 해를 끼칠 수 있습니다. 대신 우리는 다음을 수행합니다.


  • 태그 및 지표의 유연한 이름 변경을 위해 MySQL 테이블을 사용하여 메타데이터(이름, 전역 고유 ID, 상태 등)를 저장합니다. 이름에 대한 변경 사항은 메타데이터에서만 발생하며 Doris의 테이블 스키마에는 영향을 미치지 않습니다. 예를 들어 song_name 에 ID가 4로 부여되면 Doris에서는 a4라는 열 이름으로 저장됩니다. 그런 다음 song_name 이 쿼리에 포함된 경우 SQL에서 a4로 변환됩니다.


  • 태그의 온라인화와 오프라인화를 위해 태그가 사용되는 빈도에 따라 태그를 정렬합니다. 가장 적게 사용된 것에는 메타데이터에 오프라인 표시가 부여됩니다. 오프라인 태그 아래에는 새 데이터가 추가되지 않지만 해당 태그 아래의 기존 데이터는 계속 사용할 수 있습니다.


  • 새로 추가된 태그 및 지표를 실시간으로 사용할 수 있도록 이름 ID 매핑을 기반으로 Doris 테이블에 몇 가지 ID 열을 미리 구축합니다. 이러한 예약된 ID 열은 새로 추가된 태그 및 지표에 할당됩니다. 따라서 테이블 스키마 변경과 그에 따른 오버헤드를 피할 수 있습니다. 경험에 따르면 태그와 지표를 추가한 후 10분만 지나면 그 아래의 데이터를 사용할 수 있습니다.


주목할 만한 점은 최근 출시된 Doris 1.2.0은 Light Schema Change를 지원한다는 것입니다. 즉, 열을 추가하거나 제거하려면 FE에서 메타데이터만 수정하면 됩니다. 또한 테이블에 대해 Light Schema Change를 활성화한 경우 데이터 테이블의 열 이름을 바꿀 수 있습니다. 이것은 우리에게 큰 문제를 해결해 줍니다.


  1. 날짜 작성 최적화


다음은 일일 오프라인 데이터 수집 시간을 75% 줄이고 CUMU 압축 점수를 600+에서 100으로 단축한 몇 가지 사례입니다.


  • Flink 사전 집계: 위에서 언급한 바와 같습니다.


  • 쓰기 배치 자동 크기 조정: Flink 리소스 사용량을 줄이기 위해 하나의 Kafka Topic의 데이터를 다양한 Doris 테이블에 쓸 수 있도록 하고 데이터 양에 따라 배치 크기의 자동 변경을 실현합니다.


  • Doris 데이터 쓰기 최적화: 태블릿과 버킷의 크기는 물론 각 시나리오에 대한 압축 매개변수를 미세 조정합니다.


     max_XXXX_compaction_thread max_cumulative_compaction_num_singleton_deltas


  • BE 커밋 논리 최적화: BE 목록의 정기적인 캐싱을 수행하고, 이를 일괄적으로 BE 노드에 커밋하고, 보다 세밀한 로드 밸런싱 세분성을 사용합니다.



  1. 쿼리에 Dori-on-ES 사용

    데이터 쿼리의 약 60%는 그룹 타겟팅과 관련됩니다. 그룹 타겟팅은 일련의 태그를 필터로 사용하여 타겟 데이터를 찾는 것입니다. 이는 데이터 처리 아키텍처에 대한 몇 가지 요구 사항을 제시합니다.


  • APP 사용자와 관련된 그룹 타겟팅에는 매우 복잡한 논리가 포함될 수 있습니다. 이는 시스템이 수백 개의 태그를 동시에 필터로 지원해야 함을 의미합니다.


  • 대부분의 그룹 타겟팅 시나리오에는 최신 태그 데이터만 필요합니다. 그러나 메트릭 쿼리는 기록 데이터를 지원해야 합니다.


  • 데이터 사용자는 그룹 타겟팅 후 지표 데이터에 대한 추가 집계 분석을 수행해야 할 수도 있습니다.


  • 데이터 사용자는 그룹 타겟팅 후 태그 및 지표에 대한 자세한 쿼리를 수행해야 할 수도 있습니다.


고민 끝에 Doris-on-ES를 채택하기로 결정했습니다. Doris는 각 시나리오의 메트릭 데이터를 파티션 테이블로 저장하는 곳이고 Elasticsearch는 모든 태그 데이터를 저장하는 곳입니다. Doris-on-ES 솔루션은 Doris의 분산 쿼리 계획 기능과 Elasticsearch의 전체 텍스트 검색 기능을 결합합니다. 쿼리 패턴은 다음과 같습니다.


 SELECT tag, agg(metric) FROM Doris WHERE id in (select id from Es where tagFilter) GROUP BY tag


표시된 것처럼 Elasticsearch에 있는 ID 데이터는 메트릭 분석을 위해 Doris의 하위 쿼리에 사용됩니다. 실제로 쿼리 응답 시간은 대상 그룹의 크기와 관련이 있음을 알 수 있습니다. 대상 그룹에 100만 개 이상의 개체가 포함된 경우 쿼리에 최대 60초가 소요됩니다. 그보다 더 크면 시간 초과 오류가 발생할 수 있습니다. 조사 결과, 우리는 가장 큰 시간 낭비 요인 두 가지를 확인했습니다.


I. Doris BE가 Elasticsearch에서 데이터를 가져올 때(기본적으로 한 번에 1024줄), 대상 그룹이 백만 개 이상인 경우 네트워크 I/O 오버헤드가 엄청날 수 있습니다.


II. 데이터를 가져온 후 Doris BE는 SHUFFLE/BROADCAST를 통해 로컬 메트릭 테이블과 조인 작업을 수행해야 하며 이는 많은 비용이 소요될 수 있습니다.



따라서 우리는 다음과 같은 최적화를 수행합니다.


  • 최적화 활성화 여부를 지정하는 쿼리 세션 변수 es_optimize 추가합니다.


  • ES에 데이터를 쓸 때 기본 키 ID를 해싱한 후 버킷 번호를 저장하기 위해 BK 열을 추가합니다. 알고리즘은 Doris(CRC32)의 버킷팅 알고리즘과 동일합니다.


  • Doris BE를 사용하여 버킷 조인 실행 계획을 생성하고 버킷 번호를 BE ScanNode에 전달한 후 ES로 푸시합니다.


  • ES를 사용하여 쿼리된 데이터를 압축합니다. 여러 데이터 가져오기를 하나로 바꾸고 네트워크 I/O 오버헤드를 줄입니다.


  • Doris BE가 로컬 메트릭 테이블과 관련된 버킷의 데이터만 가져오고 로컬 조인 작업을 직접 수행하여 Doris BE 간의 데이터 셔플링을 방지하는지 확인하세요.



    그 결과, 대규모 그룹 타겟팅에 대한 쿼리 응답 시간이 60초에서 3.7초로 놀라운 수준으로 단축되었습니다. 커뮤니티 정보에 따르면 Doris는 곧 출시될 버전 2.0.0부터 역인덱싱을 지원할 예정입니다. 이번 새 버전에서는 텍스트 유형에 대한 전체 텍스트 검색, 텍스트, 숫자, 날짜/시간의 동등성 또는 범위 필터링을 수행할 수 있으며, 역인덱싱이 배열 유형을 지원하므로 필터링에서 AND, OR, NOT 논리를 편리하게 결합할 수 있습니다. Doris의 이 새로운 기능은 동일한 작업에 대해 Elasticsearch보다 3~5배 더 나은 성능을 제공할 것으로 예상됩니다.


  1. 데이터 관리 개선


Doris의 콜드 및 핫 데이터 분리 기능은 데이터 처리 비용 절감 전략의 기초를 제공합니다.


  • Doris의 TTL 메커니즘을 기반으로 Doris에는 해당 연도의 데이터만 저장하고 그 이전의 기록 데이터는 TDW에 저장하여 저장 비용을 절감합니다.


  • 다양한 데이터 파티션에 대한 복사본 수를 변경합니다. 예를 들어 자주 사용하는 최근 3개월의 데이터에는 3개의 복사본을 설정하고, 6개월이 지난 데이터에는 1개의 복사본, 그 사이의 데이터에는 2개의 복사본을 설정합니다.


Doris는 핫 데이터를 콜드 데이터로 전환하는 기능을 지원하므로 지난 7일간의 데이터만 SSD에 저장하고 그보다 오래된 데이터는 HDD로 전송하여 더 저렴한 스토리지로 저장할 수 있습니다.

결론

여기까지 스크롤하여 긴 읽기를 마무리해 주셔서 감사합니다. ClickHouse에서 Doris로 전환하는 동안 여러분에게 도움이 될 수 있는 환호와 눈물, 배운 교훈, 몇 가지 관행을 공유했습니다. Apache Doris 커뮤니티와 SelectDB 팀의 도움에 진심으로 감사드립니다. 하지만 콜드 데이터와 핫 데이터의 자동 식별, 자주 사용되는 태그/메트릭의 사전 계산, 구체화된 뷰 등을 사용하여 코드 논리를 단순화합니다.