Skywalking-11:Skywalking查詢協議——案例分析

以查詢

Metrics

資訊案例來分析

Skywalking

查詢協議

基本概述

Skywalking

查詢協議預設基於

GraphQL

,如果有需要也可以自定義擴充套件,提供一個實現了

org。apache。skywalking。oap。server。core。query。QueryModule

的查詢模組即可。

擷取

Skywalking UI

傳送的請求

請求路徑

POST http://127。0。0。1:8080/graphql

請求體

{ “query”: “query queryData($condition: MetricsCondition!, $duration: Duration!) {\n readMetricsValues: readMetricsValues(condition: $condition, duration: $duration) {\n label\n values {\n values {value}\n }\n }}”, “variables”: { “duration”: { “start”: “2021-07-03 1320”, “end”: “2021-07-03 1321”, “step”: “MINUTE” }, “condition”: { “name”: “instance_jvm_thread_runnable_thread_count”, “entity”: { “scope”: “ServiceInstance”, “serviceName”: “business-zone::projectA”, “serviceInstanceName”: “e8cf34a1d54a4058a8c98505877770e2@192。168。50。113”, “normal”: true } } }}

響應

{ “data”: { “readMetricsValues”: { “values”: { “values”: [ { “value”: 22 }, { “value”: 22 } ] } } }}

Skywalking

原始碼中找到對應

GraphQL

定義

開啟

oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol

目錄,使用請求體中的模板關鍵字

readMetricsValues

搜尋在

oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol/metrics-v2。graphqls

中找到對應的定義

extend type Query { # etc。。。 # Read time-series values in the duration of required metrics readMetricsValues(condition: MetricsCondition!, duration: Duration!): MetricsValues! # etc。。。}

輸入引數定義

input MetricsCondition { # Metrics name, which should be defined in OAL script # Such as: # Endpoint_avg = from(Endpoint。latency)。avg() # Then, `Endpoint_avg` name: String! # Follow entity definition description。 entity: Entity!}input Entity { # 1。 scope=All, no name is required。 # 2。 scope=Service, ServiceInstance and Endpoint, set neccessary serviceName/serviceInstanceName/endpointName # 3。 Scope=ServiceRelation, ServiceInstanceRelation and EndpointRelation # serviceName/serviceInstanceName/endpointName is/are the source(s) # destServiceName/destServiceInstanceName/destEndpointName is/are destination(s) # set necessary names of sources and destinations。 scope: Scope! serviceName: String # Normal service is the service having installed agent or metrics reported directly。 # Unnormal service is conjectural service, usually detected by the agent。 normal: Boolean serviceInstanceName: String endpointName: String destServiceName: String # Normal service is the service having installed agent or metrics reported directly。 # Unnormal service is conjectural service, usually detected by the agent。 destNormal: Boolean destServiceInstanceName: String destEndpointName: String}# The Duration defines the start and end time for each query operation。# Fields: `start` and `end`# represents the time span。 And each of them matches the step。# ref https://www。ietf。org/rfc/rfc3339。txt# The time formats are# `SECOND` step: yyyy-MM-dd HHmmss# `MINUTE` step: yyyy-MM-dd HHmm# `HOUR` step: yyyy-MM-dd HH# `DAY` step: yyyy-MM-dd# `MONTH` step: yyyy-MM# Field: `step`# represents the accurate time point。# e。g。# if step==HOUR , start=2017-11-08 09, end=2017-11-08 19# then# metrics from the following time points expected# 2017-11-08 9:00 -> 2017-11-08 19:00# there are 11 time points (hours) in the time span。input Duration { start: String! end: String! step: Step!}enum Step { DAY HOUR MINUTE SECOND}

返回結果定義

type MetricsValues { # Could be null if no label assigned in the query condition label: String # Values of this label value。 values: IntValues}type IntValues { values: [KVInt!]!}type KVInt { id: ID! # This is the value, the caller must understand the Unit。 # Such as: # 1。 If ask for cpm metric, the unit and result should be count。 # 2。 If ask for response time (p99 or avg), the unit should be millisecond。 value: Long!}

使用

GraphQL

IDEA

外掛驗證

Skywalking UI

的請求

使用“

GraphQL

Skywalking

中的應用”一節中的方式,模仿“擷取 Skywalking UI 傳送的請求”一節中前端傳送的請求

請求模板

query queryData($condition: MetricsCondition!, $duration: Duration!) { readMetricsValues: readMetricsValues(duration: $duration, condition: $condition) { label values { values { id value }} }}

請求引數

{ “duration”: { “start”: “2021-07-03 1400”, “end”: “2021-07-03 1401”, “step”: “MINUTE” }, “condition”: { “name”: “instance_jvm_thread_runnable_thread_count”, “entity”: { “scope”: “ServiceInstance”, “serviceName”: “business-zone::projectA”, “serviceInstanceName”: “e8cf34a1d54a4058a8c98505877770e2@192。168。50。113”, “normal”: true } }}

響應結果

{ “data”: { “readMetricsValues”: { “values”: { “values”: [ { “id”: “202107031400_YnVzaW5lc3Mtem9uZTo6cHJvamVjdEE=。1_ZThjZjM0YTFkNTRhNDA1OGE4Yzk4NTA1ODc3NzcwZTJAMTkyLjE2OC41MC4xMTM=”, “value”: 22 }, { “id”: “202107031401_YnVzaW5lc3Mtem9uZTo6cHJvamVjdEE=。1_ZThjZjM0YTFkNTRhNDA1OGE4Yzk4NTA1ODc3NzcwZTJAMTkyLjE2OC41MC4xMTM=”, “value”: 22 } ] } } }}

Skywalking-11:Skywalking查詢協議——案例分析

Skywalking-11:Skywalking查詢協議——案例分析

PS:如果不使用模板的方式,寫查詢語句是會有程式碼提示的

query queryData { readMetricsValues( duration: {start: “2021-07-03 1400”,end: “2021-07-03 1401”, step: MINUTE}, condition: { name: “instance_jvm_thread_runnable_thread_count”, entity: { scope: ServiceInstance, serviceName: “business-zone::projectA”, serviceInstanceName: “e8cf34a1d54a4058a8c98505877770e2@192。168。50。113”, normal: true } } ) { label values{ values{ id value }} }}

如何將

GraphQL Schema

檔案載入到程式中

搜尋

metrics-v2。graphqls

,在

oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/GraphQLQueryProvider。java

找到載入程式碼

// 初始化GraphQL引擎 @Override public void prepare() throws ServiceNotProvidedException, ModuleStartException { GraphQLSchema schema = SchemaParser。newParser() // etc。。。 。file(“query-protocol/metrics-v2。graphqls”) 。resolvers(new MetricsQuery(getManager())) // MetricsQuery 是 com。coxautodev。graphql。tools。GraphQLQueryResolver 介面實現類 // etc。。。 。build() 。makeExecutableSchema(); this。graphQL = GraphQL。newGraphQL(schema)。build(); }

org。apache。skywalking。oap。query。graphql。resolver。MetricsQuery

類中,找到

readMetricsValues

方法

/** * Read time-series values in the duration of required metrics */ public MetricsValues readMetricsValues(MetricsCondition condition, Duration duration) throws IOException { if (MetricsType。UNKNOWN。equals(typeOfMetrics(condition。getName())) || !condition。getEntity()。isValid()) { final List pointOfTimes = duration。assembleDurationPoints(); MetricsValues values = new MetricsValues(); pointOfTimes。forEach(pointOfTime -> { String id = pointOfTime。id( condition。getEntity()。isValid() ? condition。getEntity()。buildId() : “ILLEGAL_ENTITY” ); final KVInt kvInt = new KVInt(); kvInt。setId(id); kvInt。setValue(0); values。getValues()。addKVInt(kvInt); }); return values; } return getMetricsQueryService()。readMetricsValues(condition, duration); } private MetricsQueryService getMetricsQueryService() { if (metricsQueryService == null) { this。metricsQueryService = moduleManager。find(CoreModule。NAME) 。provider() 。getService(MetricsQueryService。class); } return metricsQueryService; }

org。apache。skywalking。oap。server。core。query。MetricsQueryService#readMetricsValues

/** * Read time-series values in the duration of required metrics */ public MetricsValues readMetricsValues(MetricsCondition condition, Duration duration) throws IOException { return getMetricQueryDAO()。readMetricsValues( condition, ValueColumnMetadata。INSTANCE。getValueCName(condition。getName()), duration); } private IMetricsQueryDAO getMetricQueryDAO() { if (metricQueryDAO == null) { metricQueryDAO = moduleManager。find(StorageModule。NAME)。provider()。getService(IMetricsQueryDAO。class); } return metricQueryDAO; }

檢視Extend storage文件,

IMetricsQueryDAO

為指標查詢資料訪問物件

# Implement all DAOs# Here is the list of all DAO interfaces in storageIServiceInventoryCacheDAOIServiceInstanceInventoryCacheDAOIEndpointInventoryCacheDAOINetworkAddressInventoryCacheDAOIBatchDAOStorageDAOIRegisterLockDAOITopologyQueryDAOIMetricsQueryDAOITraceQueryDAOIMetadataQueryDAOIAggregationQueryDAOIAlarmQueryDAOIHistoryDeleteDAOIMetricsDAOIRecordDAOIRegisterDAOILogQueryDAOITopNRecordsQueryDAOIBrowserLogQueryDAO

透過類圖,可以看出

IMetricsQueryDAO

實現類有

ES

ES7

InfluxDB

SQL

四種

Skywalking-11:Skywalking查詢協議——案例分析

如何將

GraphQL

引擎註冊到

Jetty

服務

// 註冊GraphQL查詢處理器至Jetty服務 @Override public void start() throws ServiceNotProvidedException, ModuleStartException { JettyHandlerRegister service = getManager()。find(CoreModule。NAME) 。provider() 。getService(JettyHandlerRegister。class); service。addHandler(new GraphQLQueryHandler(config。getPath(), graphQL)); }

透過分析

GraphQLQueryProvider

該類,發現就是

QueryModule

(查詢模組)的

Provider

(提供)類

由此,也驗證了在“基本概述”一節的說法:

Skywalking

查詢協議預設基於

GraphQL

,如果有需要也可以自定義擴充套件,提供一個實現了

org。apache。skywalking。oap。server。core。query。QueryModule

的查詢模組即可。

@Override public String name() { return “graphql”; } @Override public Class<? extends ModuleDefine> module() { return QueryModule。class; }

package org。apache。skywalking。oap。query。graphql;import com。google。gson。Gson;import com。google。gson。JsonArray;import com。google。gson。JsonElement;import com。google。gson。JsonObject;import com。google。gson。reflect。TypeToken;import graphql。ExecutionInput;import graphql。ExecutionResult;import graphql。GraphQL;import graphql。GraphQLError;import java。io。BufferedReader;import java。io。IOException;import java。io。InputStreamReader;import java。lang。reflect。Type;import java。util。List;import java。util。Map;import javax。servlet。http。HttpServletRequest;import lombok。RequiredArgsConstructor;import org。apache。skywalking。oap。server。library。server。jetty。JettyJsonHandler;import org。apache。skywalking。oap。server。library。util。CollectionUtils;import org。slf4j。Logger;import org。slf4j。LoggerFactory;@RequiredArgsConstructorpublic class GraphQLQueryHandler extends JettyJsonHandler { private static final Logger LOGGER = LoggerFactory。getLogger(GraphQLQueryHandler。class); private static final String QUERY = “query”; private static final String VARIABLES = “variables”; private static final String DATA = “data”; private static final String ERRORS = “errors”; private static final String MESSAGE = “message”; private final Gson gson = new Gson(); private final Type mapOfStringObjectType = new TypeToken>() { }。getType(); private final String path; private final GraphQL graphQL; @Override public String pathSpec() { return path; } @Override protected JsonElement doGet(HttpServletRequest req) { throw new UnsupportedOperationException(“GraphQL only supports POST method”); } @Override protected JsonElement doPost(HttpServletRequest req) throws IOException { BufferedReader reader = new BufferedReader(new InputStreamReader(req。getInputStream())); String line; StringBuilder request = new StringBuilder(); while ((line = reader。readLine()) != null) { request。append(line); } JsonObject requestJson = gson。fromJson(request。toString(), JsonObject。class); return execute(requestJson。get(QUERY) 。getAsString(), gson。fromJson(requestJson。get(VARIABLES), mapOfStringObjectType)); } private JsonObject execute(String request, Map variables) { try { ExecutionInput executionInput = ExecutionInput。newExecutionInput() 。query(request) 。variables(variables) 。build(); // 使用GraphQL引擎獲取查詢結果 ExecutionResult executionResult = graphQL。execute(executionInput); LOGGER。debug(“Execution result is {}”, executionResult); // 封裝返回結果 Object data = executionResult。getData(); List errors = executionResult。getErrors(); JsonObject jsonObject = new JsonObject(); if (data != null) { jsonObject。add(DATA, gson。fromJson(gson。toJson(data), JsonObject。class)); } if (CollectionUtils。isNotEmpty(errors)) { JsonArray errorArray = new JsonArray(); errors。forEach(error -> { JsonObject errorJson = new JsonObject(); errorJson。addProperty(MESSAGE, error。getMessage()); errorArray。add(errorJson); }); jsonObject。add(ERRORS, errorArray); } return jsonObject; } catch (final Throwable e) { LOGGER。error(e。getMessage(), e); JsonObject jsonObject = new JsonObject(); JsonArray errorArray = new JsonArray(); JsonObject errorJson = new JsonObject(); errorJson。addProperty(MESSAGE, e。getMessage()); errorArray。add(errorJson); jsonObject。add(ERRORS, errorArray); return jsonObject; } }}

Webapp

閘道器轉發

GraphQL

請求至

OAP

v8。6。0

及之前,閘道器都是

zuul

v8。7。0

及之後替換成了

Spring Cloud Gateway

。因為這塊不是這篇文章的重點,這裡不再贅述

總結

Skywalking

的查詢協議預設使用通用性很強的

GraphQL

實現,客戶端可以透過

GraphQL

協議很方便的選取自己需要的資料。對應

Skywalking

這種模式相對固定、變更不頻繁的查詢需求來說,還是挺適合的。

參考文件

Extend storage

分享並記錄所學所見