ClickHouse和他的朋友們(2)MySQL Protocol和Read呼叫棧

作為一個 OLAP 的 DBMS 來說,有2個端非常重要:

使用者如何方便的鏈進來,這是入口端ClickHouse 除了自己的 client 外,還提供了 MySQL/PG/GRPC/HTTP 等接入方式

資料如何方便的掛上去,這是資料來源端ClickHouse 除了自己的引擎外,還可以掛載 MySQL/Kafka 等外部資料來源

這樣內外互通,多條朋友多條路,以實現“資料”級的編排能力。

今天談的是入口端的 MySQL 協議,也是本系列 ClickHouse 的第一個好朋友,使用者可透過 MySQL 客戶端或相關 Driver 直接連結到 ClickHouse,進行資料讀寫等操作。

本文透過 MySQL的 Query 請求,借用呼叫棧來了解下 ClickHouse 的資料讀取全過程。

如何實現?

入口檔案在:MySQLHandler。cpp【https://github。com/ClickHouse/ClickHouse/blob/master/src/Server/MySQLHandler。cpp】

握手協議

MySQLClient 傳送 Greeting 資料報文到 MySQLHandler

MySQLHandler 回覆一個 Greeting-Response 報文

MySQLClient 傳送認證報文

MySQLHandler 對認證報文進行鑑權,並返回鑑權結果

MySQL Protocol 實現在: Core/MySQLProtocol。h【https://github。com/ClickHouse/ClickHouse/blob/master/src/Core/MySQLProtocol。h】

Query請求

當認證通過後,就可以進行正常的資料互動了。

當 MySQLClient 傳送請求:1mysql> SELECT * FROM system。numbers LIMIT 5;

MySQLHandler 的呼叫棧:1->MySQLHandler::comQuery -> executeQuery -> pipeline->execute -> MySQLOutputFormat::consume

MySQLClient 接收到結果

在步驟2裡,executeQuery(executeQuery。cpp)非常重要。它是所有前端 Server 和 ClickHouse 核心的接入口,第一個引數是 SQL 文字(‘select 1’),第二個引數是結果集要傳送到哪裡去(socket net)。

呼叫棧分析

SELECT * FROM system。numbers LIMIT 5

1。 獲取資料來源

StorageSystemNumbers 資料來源:

DB::StorageSystemNumbers::read(std::__1::vector, std::__1::allocator >, std::__1::allocator, std::__1::allocator > > > const&, std::__1::shared_ptr const&, DB::SelectQueryInfo const&, DB::Context const&, DB::QueryProcessingStage::Enum, unsigned long, unsigned int) StorageSystemNumbers。cpp:135DB::ReadFromStorageStep::ReadFromStorageStep(std::__1::shared_ptr, std::__1::shared_ptr&, DB::SelectQueryOptions, DB::InterpreterSelectQuery::executeFetchColumns(DB::QueryProcessingStage::Enum, DB::QueryPlan&, std::__1::shared_ptr const&, std::__1::vector, std::__1::allocator >, std::__1::allocator, std::__1::allocator > > > const&) memory:3028DB::InterpreterSelectQuery::executeFetchColumns(DB::QueryProcessingStage::Enum, DB::QueryPlan&, std::__1::shared_ptr const&, std::__1::vector, std::__1::allocator >, std::__1::allocator, std::__1::allocator > > > const&) InterpreterSelectQuery。cpp:1361DB::InterpreterSelectQuery::executeImpl(DB::QueryPlan&, std::__1::shared_ptr const&, std::__1::optional) InterpreterSelectQuery。cpp:791DB::InterpreterSelectQuery::buildQueryPlan(DB::QueryPlan&) InterpreterSelectQuery。cpp:472DB::InterpreterSelectWithUnionQuery::buildQueryPlan(DB::QueryPlan&) InterpreterSelectWithUnionQuery。cpp:183DB::InterpreterSelectWithUnionQuery::execute() InterpreterSelectWithUnionQuery。cpp:198DB::executeQueryImpl(const char *, const char *, DB::Context &, bool, DB::QueryProcessingStage::Enum, bool, DB::ReadBuffer *) executeQuery。cpp:385DB::executeQuery(DB::ReadBuffer&, DB::WriteBuffer&, bool, DB::Context&, std::__1::function, std::__1::allocator > const&, DB::MySQLHandler::comQuery(DB::ReadBuffer&) MySQLHandler。cpp:307DB::MySQLHandler::run() MySQLHandler。cpp:141

2。 Pipeline構造

DB::LimitTransform::LimitTransform(DB::Block const&, unsigned long, unsigned long, unsigned long, bool, bool, std::__1::vector >) LimitTransform。cpp:21DB::LimitStep::transformPipeline(DB::QueryPipeline&) memory:2214DB::LimitStep::transformPipeline(DB::QueryPipeline&) memory:2299DB::LimitStep::transformPipeline(DB::QueryPipeline&) memory:3570DB::LimitStep::transformPipeline(DB::QueryPipeline&) memory:4400DB::LimitStep::transformPipeline(DB::QueryPipeline&) LimitStep。cpp:33DB::ITransformingStep::updatePipeline(std::__1::vector >, std::__1::allocator > > >) ITransformingStep。cpp:21DB::QueryPlan::buildQueryPipeline() QueryPlan。cpp:154DB::InterpreterSelectWithUnionQuery::execute() InterpreterSelectWithUnionQuery。cpp:200DB::executeQueryImpl(const char *, const char *, DB::Context &, bool, DB::QueryProcessingStage::Enum, bool, DB::ReadBuffer *) executeQuery。cpp:385DB::executeQuery(DB::ReadBuffer&, DB::WriteBuffer&, bool, DB::Context&, std::__1::function, std::__1::allocator > const&, std::__1::basic_string, std::__1::allocator > const&, std::__1::basic_string, std::__1::allocator > const&, std::__1::basic_string, std::__1::allocator > const&)>) executeQuery。cpp:722DB::MySQLHandler::comQuery(DB::ReadBuffer&) MySQLHandler。cpp:307DB::MySQLHandler::run() MySQLHandler。cpp:141

3。 Pipeline執行

DB::LimitTransform::prepare(std::__1::vector > const&, std::__1::vector > const&) LimitTransform。cpp:67DB::PipelineExecutor::prepareProcessor(unsigned long, unsigned long, std::__1::queue > >&, std::__1::unique_lock) PipelineExecutor。cpp:291DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::PipelineExecutor::Edge&, std::__1::queue > >&, unsigned long) PipelineExecutor。cpp:264DB::PipelineExecutor::prepareProcessor(unsigned long, unsigned long, std::__1::queue > >&, std::__1::unique_lock) PipelineExecutor。cpp:373DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::PipelineExecutor::Edge&, std::__1::queue > >&, unsigned long) PipelineExecutor。cpp:264DB::PipelineExecutor::prepareProcessor(unsigned long, unsigned long, std::__1::queue > >&, std::__1::unique_lock) PipelineExecutor。cpp:373DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::PipelineExecutor::Edge&, std::__1::queue > >&, unsigned long) PipelineExecutor。cpp:264DB::PipelineExecutor::prepareProcessor(unsigned long, unsigned long, std::__1::queue > >&, std::__1::unique_lock) PipelineExecutor。cpp:373DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::PipelineExecutor::Edge&, std::__1::queue > >&, unsigned long) PipelineExecutor。cpp:264DB::PipelineExecutor::prepareProcessor(unsigned long, unsigned long, std::__1::queue > >&, std::__1::unique_lock) PipelineExecutor。cpp:373DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::PipelineExecutor::Edge&, std::__1::queue > >&, unsigned long) PipelineExecutor。cpp:264DB::PipelineExecutor::prepareProcessor(unsigned long, unsigned long, std::__1::queue > >&, std::__1::unique_lock) PipelineExecutor。cpp:373DB::PipelineExecutor::initializeExecution(unsigned long) PipelineExecutor。cpp:747DB::PipelineExecutor::executeImpl(unsigned long) PipelineExecutor。cpp:764DB::PipelineExecutor::execute(unsigned long) PipelineExecutor。cpp:479DB::executeQuery(DB::ReadBuffer&, DB::WriteBuffer&, bool, DB::Context&, std::__1::function, std::__1::allocator > const&, std::__1::basic_string, std::__1::allocator > const&, std::__1::basic_string, std::__1::allocator > const&, std::__1::basic_string, std::__1::allocator > const&)>) executeQuery。cpp:833DB::MySQLHandler::comQuery(DB::ReadBuffer&) MySQLHandler。cpp:307DB::MySQLHandler::run() MySQLHandler。cpp:141

4。 Output執行傳送

DB::MySQLOutputFormat::consume(DB::Chunk) MySQLOutputFormat。cpp:53DB::IOutputFormat::work() IOutputFormat。cpp:62DB::executeJob(DB::IProcessor *) PipelineExecutor。cpp:155operator() PipelineExecutor。cpp:172DB::PipelineExecutor::executeStepImpl(unsigned long, unsigned long, std::__1::atomic*) PipelineExecutor。cpp:630DB::PipelineExecutor::executeSingleThread(unsigned long, unsigned long) PipelineExecutor。cpp:546DB::PipelineExecutor::executeImpl(unsigned long) PipelineExecutor。cpp:812DB::PipelineExecutor::execute(unsigned long) PipelineExecutor。cpp:479DB::executeQuery(DB::ReadBuffer&, DB::WriteBuffer&, bool, DB::Context&, std::__1::function, std::__1::allocator > const&, std::__1::basic_string, std::__1::allocator > const&, std::__1::basic_string, std::__1::allocator > const&, std::__1::basic_string, std::__1::allocator > const&)>) executeQuery。cpp:800DB::MySQLHandler::comQuery(DB::ReadBuffer&) MySQLHandler。cpp:311DB::MySQLHandler::run() MySQLHandler。cpp:141

總結

ClickHouse 的模組化比較清晰,像樂高積木一樣可以組合拼裝,當我們執行:

SELECT * FROM system。numbers LIMIT 5

首先核心解析 SQL 語句生成 AST,然後根據 AST 獲取資料來源 Source,pipeline。Add(Source)。其次根據 AST 資訊生成 QueryPlan,根據 QueryPlan 再生成相應的 Transform,pipeline。Add(LimitTransform)。然後新增 Output Sink 作為資料傳送物件,pipeline。Add(OutputSink)。執行 pipeline, 各個 Transformer 開始工作。

ClickHouse 的 Transformer 排程系統叫做 Processor,也是決定效能的重要模組,詳情見 Pipeline 處理器和排程器。ClickHouse 是一輛手動擋的豪華跑車,免費擁有,海嘯們!