作為一個 OLAP 的 DBMS 來說,有2個端非常重要:
使用者如何方便的鏈進來,這是入口端ClickHouse 除了自己的 client 外,還提供了 MySQL/PG/GRPC/HTTP 等接入方式
資料如何方便的掛上去,這是資料來源端ClickHouse 除了自己的引擎外,還可以掛載 MySQL/Kafka 等外部資料來源
這樣內外互通,多條朋友多條路,以實現“資料”級的編排能力。
今天談的是入口端的 MySQL 協議,也是本系列 ClickHouse 的第一個好朋友,使用者可透過 MySQL 客戶端或相關 Driver 直接連結到 ClickHouse,進行資料讀寫等操作。
本文透過 MySQL的 Query 請求,借用呼叫棧來了解下 ClickHouse 的資料讀取全過程。
如何實現?
入口檔案在:MySQLHandler。cpp【https://github。com/ClickHouse/ClickHouse/blob/master/src/Server/MySQLHandler。cpp】
握手協議
MySQLClient 傳送 Greeting 資料報文到 MySQLHandler
MySQLHandler 回覆一個 Greeting-Response 報文
MySQLClient 傳送認證報文
MySQLHandler 對認證報文進行鑑權,並返回鑑權結果
MySQL Protocol 實現在: Core/MySQLProtocol。h【https://github。com/ClickHouse/ClickHouse/blob/master/src/Core/MySQLProtocol。h】
Query請求
當認證通過後,就可以進行正常的資料互動了。
當 MySQLClient 傳送請求:1mysql> SELECT * FROM system。numbers LIMIT 5;
MySQLHandler 的呼叫棧:1->MySQLHandler::comQuery -> executeQuery -> pipeline->execute -> MySQLOutputFormat::consume
MySQLClient 接收到結果
在步驟2裡,executeQuery(executeQuery。cpp)非常重要。它是所有前端 Server 和 ClickHouse 核心的接入口,第一個引數是 SQL 文字(‘select 1’),第二個引數是結果集要傳送到哪裡去(socket net)。
呼叫棧分析
SELECT * FROM system。numbers LIMIT 5
1。 獲取資料來源
StorageSystemNumbers 資料來源:
DB::StorageSystemNumbers::read(std::__1::vector, std::__1::allocator >, std::__1::allocator, std::__1::allocator > > > const&, std::__1::shared_ptr const&, DB::SelectQueryInfo const&, DB::Context const&, DB::QueryProcessingStage::Enum, unsigned long, unsigned int) StorageSystemNumbers。cpp:135DB::ReadFromStorageStep::ReadFromStorageStep(std::__1::shared_ptr, std::__1::shared_ptr&, DB::SelectQueryOptions, DB::InterpreterSelectQuery::executeFetchColumns(DB::QueryProcessingStage::Enum, DB::QueryPlan&, std::__1::shared_ptr const&, std::__1::vector, std::__1::allocator >, std::__1::allocator, std::__1::allocator > > > const&) memory:3028DB::InterpreterSelectQuery::executeFetchColumns(DB::QueryProcessingStage::Enum, DB::QueryPlan&, std::__1::shared_ptr const&, std::__1::vector, std::__1::allocator >, std::__1::allocator, std::__1::allocator > > > const&) InterpreterSelectQuery。cpp:1361DB::InterpreterSelectQuery::executeImpl(DB::QueryPlan&, std::__1::shared_ptr const&, std::__1::optional) InterpreterSelectQuery。cpp:791DB::InterpreterSelectQuery::buildQueryPlan(DB::QueryPlan&) InterpreterSelectQuery。cpp:472DB::InterpreterSelectWithUnionQuery::buildQueryPlan(DB::QueryPlan&) InterpreterSelectWithUnionQuery。cpp:183DB::InterpreterSelectWithUnionQuery::execute() InterpreterSelectWithUnionQuery。cpp:198DB::executeQueryImpl(const char *, const char *, DB::Context &, bool, DB::QueryProcessingStage::Enum, bool, DB::ReadBuffer *) executeQuery。cpp:385DB::executeQuery(DB::ReadBuffer&, DB::WriteBuffer&, bool, DB::Context&, std::__1::function, std::__1::allocator > const&, DB::MySQLHandler::comQuery(DB::ReadBuffer&) MySQLHandler。cpp:307DB::MySQLHandler::run() MySQLHandler。cpp:141
2。 Pipeline構造
DB::LimitTransform::LimitTransform(DB::Block const&, unsigned long, unsigned long, unsigned long, bool, bool, std::__1::vector >) LimitTransform。cpp:21DB::LimitStep::transformPipeline(DB::QueryPipeline&) memory:2214DB::LimitStep::transformPipeline(DB::QueryPipeline&) memory:2299DB::LimitStep::transformPipeline(DB::QueryPipeline&) memory:3570DB::LimitStep::transformPipeline(DB::QueryPipeline&) memory:4400DB::LimitStep::transformPipeline(DB::QueryPipeline&) LimitStep。cpp:33DB::ITransformingStep::updatePipeline(std::__1::vector >, std::__1::allocator > > >) ITransformingStep。cpp:21DB::QueryPlan::buildQueryPipeline() QueryPlan。cpp:154DB::InterpreterSelectWithUnionQuery::execute() InterpreterSelectWithUnionQuery。cpp:200DB::executeQueryImpl(const char *, const char *, DB::Context &, bool, DB::QueryProcessingStage::Enum, bool, DB::ReadBuffer *) executeQuery。cpp:385DB::executeQuery(DB::ReadBuffer&, DB::WriteBuffer&, bool, DB::Context&, std::__1::function, std::__1::allocator > const&, std::__1::basic_string, std::__1::allocator > const&, std::__1::basic_string, std::__1::allocator > const&, std::__1::basic_string, std::__1::allocator > const&)>) executeQuery。cpp:722DB::MySQLHandler::comQuery(DB::ReadBuffer&) MySQLHandler。cpp:307DB::MySQLHandler::run() MySQLHandler。cpp:141
3。 Pipeline執行
DB::LimitTransform::prepare(std::__1::vector > const&, std::__1::vector > const&) LimitTransform。cpp:67DB::PipelineExecutor::prepareProcessor(unsigned long, unsigned long, std::__1::queue > >&, std::__1::unique_lock) PipelineExecutor。cpp:291DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::PipelineExecutor::Edge&, std::__1::queue > >&, unsigned long) PipelineExecutor。cpp:264DB::PipelineExecutor::prepareProcessor(unsigned long, unsigned long, std::__1::queue > >&, std::__1::unique_lock) PipelineExecutor。cpp:373DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::PipelineExecutor::Edge&, std::__1::queue > >&, unsigned long) PipelineExecutor。cpp:264DB::PipelineExecutor::prepareProcessor(unsigned long, unsigned long, std::__1::queue > >&, std::__1::unique_lock) PipelineExecutor。cpp:373DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::PipelineExecutor::Edge&, std::__1::queue > >&, unsigned long) PipelineExecutor。cpp:264DB::PipelineExecutor::prepareProcessor(unsigned long, unsigned long, std::__1::queue > >&, std::__1::unique_lock) PipelineExecutor。cpp:373DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::PipelineExecutor::Edge&, std::__1::queue > >&, unsigned long) PipelineExecutor。cpp:264DB::PipelineExecutor::prepareProcessor(unsigned long, unsigned long, std::__1::queue > >&, std::__1::unique_lock) PipelineExecutor。cpp:373DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::PipelineExecutor::Edge&, std::__1::queue > >&, unsigned long) PipelineExecutor。cpp:264DB::PipelineExecutor::prepareProcessor(unsigned long, unsigned long, std::__1::queue > >&, std::__1::unique_lock) PipelineExecutor。cpp:373DB::PipelineExecutor::initializeExecution(unsigned long) PipelineExecutor。cpp:747DB::PipelineExecutor::executeImpl(unsigned long) PipelineExecutor。cpp:764DB::PipelineExecutor::execute(unsigned long) PipelineExecutor。cpp:479DB::executeQuery(DB::ReadBuffer&, DB::WriteBuffer&, bool, DB::Context&, std::__1::function, std::__1::allocator > const&, std::__1::basic_string, std::__1::allocator > const&, std::__1::basic_string, std::__1::allocator > const&, std::__1::basic_string, std::__1::allocator > const&)>) executeQuery。cpp:833DB::MySQLHandler::comQuery(DB::ReadBuffer&) MySQLHandler。cpp:307DB::MySQLHandler::run() MySQLHandler。cpp:141
4。 Output執行傳送
DB::MySQLOutputFormat::consume(DB::Chunk) MySQLOutputFormat。cpp:53DB::IOutputFormat::work() IOutputFormat。cpp:62DB::executeJob(DB::IProcessor *) PipelineExecutor。cpp:155operator() PipelineExecutor。cpp:172DB::PipelineExecutor::executeStepImpl(unsigned long, unsigned long, std::__1::atomic*) PipelineExecutor。cpp:630DB::PipelineExecutor::executeSingleThread(unsigned long, unsigned long) PipelineExecutor。cpp:546DB::PipelineExecutor::executeImpl(unsigned long) PipelineExecutor。cpp:812DB::PipelineExecutor::execute(unsigned long) PipelineExecutor。cpp:479DB::executeQuery(DB::ReadBuffer&, DB::WriteBuffer&, bool, DB::Context&, std::__1::function, std::__1::allocator > const&, std::__1::basic_string, std::__1::allocator > const&, std::__1::basic_string, std::__1::allocator > const&, std::__1::basic_string, std::__1::allocator > const&)>) executeQuery。cpp:800DB::MySQLHandler::comQuery(DB::ReadBuffer&) MySQLHandler。cpp:311DB::MySQLHandler::run() MySQLHandler。cpp:141
總結
ClickHouse 的模組化比較清晰,像樂高積木一樣可以組合拼裝,當我們執行:
SELECT * FROM system。numbers LIMIT 5
首先核心解析 SQL 語句生成 AST,然後根據 AST 獲取資料來源 Source,pipeline。Add(Source)。其次根據 AST 資訊生成 QueryPlan,根據 QueryPlan 再生成相應的 Transform,pipeline。Add(LimitTransform)。然後新增 Output Sink 作為資料傳送物件,pipeline。Add(OutputSink)。執行 pipeline, 各個 Transformer 開始工作。
ClickHouse 的 Transformer 排程系統叫做 Processor,也是決定效能的重要模組,詳情見 Pipeline 處理器和排程器。ClickHouse 是一輛手動擋的豪華跑車,免費擁有,海嘯們!