玩轉 jmeter backend listener kafka

說到JMeter後端監聽器,大家接觸的一般是InfluxdbBackendListenerClient(Influxdb後端監聽器),可以將測試報告實時推送到Influxdb,然後用Grafana展示。但是這種方式在大併發情況下,會因為吞吐量過大,Influxdb本身的效能瓶頸,無法支撐(Influxdb崩潰是常有的事),所以使用Kafka監聽器就很有必要了,Kafka作為訊息佇列中介軟體,可以起到緩衝器的作用。

一、Kafka後端監聽器原理介紹

jmeter-backend-listener-kafka其實就是透過繼承AbstractBackendListenerClient來將非同步獲取到的JMeter測試結果集SampleResult進行相應處理(與JMeter原生自帶的influxdb、graphite後端監聽器原理一樣),然後將元資料上報至kakfa,這樣你就可以透過消費kafka Topic非同步來接收測試結果集:

玩轉 jmeter backend listener kafka

KafkaBackendClient

透過實現的handleSampleResults方法來處理資料並上報至kafka:

玩轉 jmeter backend listener kafka

handleSampleResults

二、下載和使用Kafka後端監聽器

我們可以從兩個地方下載原始碼或releases的jar包,如下:

原始碼路徑:https://github。com/rahulsinghai/jmeter-backend-listener-kafka

Metersphere官方路徑:https://github。com/metersphere/jmeter-backend-listener-kafka

把jar包jmeter。backendlistener。kafka-1。0。4。jar放到Jmeter的lib\ext後重啟JMeter即可支援:

玩轉 jmeter backend listener kafka

Kafka後端監聽器

為了快速部署驗證環境,我這次用Docker裝了Kafka和Zookeeper叢集,裝了influxdb和Grafana,其中Kafka路徑和埠如上圖所標示,測試前請確保Kafka服務和埠是連通的。

三、通過後端監聽器收集測試結果

我們按上圖配置好後端監聽器,並執行JMeter測試,然後用Offset Explorer連線kafka可以檢視到我們監聽器收集到的報告資料:

玩轉 jmeter backend listener kafka

kafka獲取的資料

由於儲存的是編碼後的Key-value格式,我們可以用Telegraf消費訊息,往influxdb儲存訊息,來看收到的是什麼訊息(當然,你也可以採用別的方式)。

Telegraf的配置如下:

首先配置Output(主要是influxdb的url和database):

################################################################################ OUTPUT PLUGINS ################################################################################# Configuration for sending metrics to InfluxDB[[outputs。influxdb]] ## The full HTTP or UDP URL for your InfluxDB instance。 ## ## Multiple URLs can be specified for a single cluster, only ONE of the ## urls will be written to each interval。 # urls = [“unix:///var/run/influxdb。sock”] # urls = [“udp://127。0。0。1:8089”] urls = [“http://172。17。2。130:8086”] ## The target database for metrics; will be created as needed。 ## For UDP url endpoint database needs to be configured on server side。 database = “kafka”

然後配置Input(為了方便檢視只配置kafka,把預設其他的CPU、disk等註釋掉,以免干擾):

# # Read metrics from Kafka topic(s) [[inputs。kafka_consumer]]# ## kafka servers brokers = [“172。17。2。43:9092”]# ## topic(s) to consume topics = [“JMETER_METRICS”]# ## Add topic as tag if topic_tag is not empty topic_tag = “JMETER_METRICS”# ## the name of the consumer group consumer_group = “telegraf_metrics_consumers”# ## Offset (must be either “oldest” or “newest”) offset = “oldest”# ## Data format to consume。# ## Each data format has its own unique set of configuration options, read# ## more about them here:# ## https://github。com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT。md data_format = “value” data_type = “string”

啟動telegraf,看同步資料的日誌是否正常:

2022-08-04T00:29:48Z I! Starting Telegraf 1。10。22022-08-04T00:29:48Z I! Loaded inputs: kafka_consumer2022-08-04T00:29:48Z I! Loaded aggregators: 2022-08-04T00:29:48Z I! Loaded processors: 2022-08-04T00:29:48Z I! Loaded outputs: influxdb2022-08-04T00:29:48Z I! Tags enabled: host=172。17。2。432022-08-04T00:29:48Z I! [agent] Config: Interval:10s, Quiet:false, Hostname:“172。17。2。43”, Flush Interval:10s2022-08-04T00:29:48Z I! Started the kafka consumer service, brokers: [172。17。2。43:9092], topics: [JMETER_METRICS]

同步正常,我們就用InfluxDB Studio連線influxdb檢視收集到的資料:

玩轉 jmeter backend listener kafka

influxdb收集到的資料

檢視value值,可以看到收集到的測試結果內容,value值如下:

{\“ContentType\”:\“text/html; charset\\u003dUTF-8\”,\“IdleTime\”:0,\“ElapsedTime\”:\“2022-08-04T00:00:01。000+0800\”,\“ErrorCount\”:0,\“Timestamp\”:\“2022-08-04T10:01:22。259+0800\”,\“URL\”:\“https://mp。weixin。qq。com/s/dWBD8ZNYnzuao5ca3gMi3Q\”,\“SampleStartTime\”:\“2022-08-04T10:01:22。259+0800\”,\“Success\”:true,\“Bytes\”:64438,\“SentBytes\”:689,\“AllThreads\”:1,\“TestElement。name\”:\“Thread-11\”,\“DataType\”:\“text\”,\“ResponseTime\”:396,\“SampleCount\”:1,\“FailureMessage\”:\“\”,\“ConnectTime\”:176,\“ResponseCode\”:\“200\”,\“TestStartTime\”:1659578481614,\“AssertionResults\”:[],\“Latency\”:342,\“InjectorHostname\”:\“ZGH-PC\”,\“GrpThreads\”:1,\“SampleEndTime\”:\“2022-08-04T10:01:22。655+0800\”,\“BodySize\”:61665,\“ThreadName\”:\“threadGroup 1-1\”,\“SampleLabel\”:\“chrome-192。168。1。246\”}

其實我們看kafka監聽器的原始碼/jmeter/backendlistener/model/MetricsRow。java,也能知道收集的測試結果資料格式:

public Map getRowAsMap(BackendListenerContext context, String servicePrefixName) throws UnknownHostException { SimpleDateFormat sdf = new SimpleDateFormat(this。kafkaTimestamp); // add all the default SampleResult parameters addFilteredMetricToMetricsMap(“AllThreads”, this。sampleResult。getAllThreads()); addFilteredMetricToMetricsMap(“BodySize”, this。sampleResult。getBodySizeAsLong()); addFilteredMetricToMetricsMap(“Bytes”, this。sampleResult。getBytesAsLong()); addFilteredMetricToMetricsMap(“SentBytes”, this。sampleResult。getSentBytes()); addFilteredMetricToMetricsMap(“ConnectTime”, this。sampleResult。getConnectTime()); addFilteredMetricToMetricsMap(“ContentType”, this。sampleResult。getContentType()); addFilteredMetricToMetricsMap(“DataType”, this。sampleResult。getDataType()); addFilteredMetricToMetricsMap(“ErrorCount”, this。sampleResult。getErrorCount()); addFilteredMetricToMetricsMap(“GrpThreads”, this。sampleResult。getGroupThreads()); addFilteredMetricToMetricsMap(“IdleTime”, this。sampleResult。getIdleTime()); addFilteredMetricToMetricsMap(“Latency”, this。sampleResult。getLatency()); addFilteredMetricToMetricsMap(“ResponseTime”, this。sampleResult。getTime()); addFilteredMetricToMetricsMap(“SampleCount”, this。sampleResult。getSampleCount()); addFilteredMetricToMetricsMap(“SampleLabel”, this。sampleResult。getSampleLabel()); addFilteredMetricToMetricsMap(“ThreadName”, this。sampleResult。getThreadName()); addFilteredMetricToMetricsMap(“URL”, this。sampleResult。getURL()); addFilteredMetricToMetricsMap(“ResponseCode”, this。sampleResult。getResponseCode()); addFilteredMetricToMetricsMap(“TestStartTime”, JMeterContextService。getTestStartTime()); addFilteredMetricToMetricsMap( “SampleStartTime”, sdf。format(new Date(this。sampleResult。getStartTime()))); addFilteredMetricToMetricsMap( “SampleEndTime”, sdf。format(new Date(this。sampleResult。getEndTime()))); addFilteredMetricToMetricsMap( “Timestamp”, sdf。format(new Date(this。sampleResult。getTimeStamp()))); addFilteredMetricToMetricsMap(“InjectorHostname”, InetAddress。getLocalHost()。getHostName()); // Add the details according to the mode that is set switch (this。kafkaTestMode) { case “debug”: case “error”: addDetails(); break; case “info”: if (!this。sampleResult。isSuccessful()) { addDetails(); } break; default: break; } addAssertions(); addElapsedTime(sdf); addCustomFields(context, servicePrefixName); parseHeadersAsJsonProps(this。allReqHeaders, this。allResHeaders); return this。metricsMap; }

大家發現這些內容,只要經過計算就可以生成JMeter測試報告,有執行緒數,有響應時間,有Sample名稱數量和成功標識、Bytes等指標。但是缺少TPS,90%響應時間等指標,這些指標可以參考Influxdb監聽器自己進行擴充套件和重計數。

四、透過Grafana進行結果展示

以上的Key Value格式是不利於在Grafana中展現的,我們可以在Telegraf中改變傳輸格式為json:

# # Read metrics from Kafka topic(s) [[inputs。kafka_consumer]]# ## kafka servers brokers = [“172。17。2。43:9092”]# ## topic(s) to consume 可以新增多個測試專案的topic topics = [“JMETER_METRICS”]# ## Add topic as tag if topic_tag is not empty topic_tag = “JMETER_METRICS”# ## the name of the consumer group consumer_group = “telegraf_metrics_consumers”# ## Offset (must be either “oldest” or “newest”) offset = “oldest”# ## Data format to consume。# ## Each data format has its own unique set of configuration options, read# ## more about them here:# ## https://github。com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT。md data_format = “json”

再次測試,這時候發現能夠在influxdb中按不同欄位顯示指標值了:

玩轉 jmeter backend listener kafka

influxdb

但是展示的欄位不全,到telegraf官網檢視配置說明,發現可以新增顯示欄位:

玩轉 jmeter backend listener kafka

telegraf inputs

修改telegraf。conf配置,在data_format配置下新增缺少的欄位,同時把SampleLabel新增為tag Key(也可以按需要新增多個):

# ## Data format to consume。# ## Each data format has its own unique set of configuration options, read# ## more about them here:# ## https://github。com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT。md data_format = “json” tag_keys = [“SampleLabel”] json_string_fields=[“Success”, “ThreadName”, “SampleLabel”, “Timestamp”, “URL”, “FailureMessage”, “ResponseCode”, “AssertionResults”, “InjectorHostname”, “SampleStartTime”, “SampleEndTime”]

再次測試,檢視inluxdb中同步的結果資料,發現能看到更多的欄位了:

玩轉 jmeter backend listener kafka

influxdb

有了這些欄位,我們就可以在grafana中配置展示:

玩轉 jmeter backend listener kafka

grafana

玩轉 jmeter backend listener kafka

grafana

五、Kafka監聽器外掛擴充套件開發

透過上面的演示,我們發現不能像influxdb後端監聽器那樣收集到【點選率hits】等常規的效能指標,那麼我們可以參考Influxdb後端監聽器的原始碼對Kafka監聽器進行改造,同過由上面提到的監聽器原理可以知道,監聽器是透過handleSampleResults方法來處理資料並上報至kafka或influxdb,那麼我們就從這個函式著手,這個函式所屬於KafkaBackendClient。java類檔案中:

io。github。rahulsinghai。jmeter。backendlistener。kafka。KafkaBackendClient

我們將handleSampleResults方法修改如下(註釋為add的內容):

@Override public void handleSampleResults(List results, BackendListenerContext context) { for (SampleResult sr : results) { String sampleLabel = sr。getSampleLabel(); // add部分 SamplerMetric samplerMetric = getSamplerMetric(sampleLabel); // add部分 /* jmeter 5。1。1之後版本,SamplerMetric支援addCumulated Pattern samplersToFilter; if (samplersToFilter。matcher(sampleLabel)。find()) { samplerMetric。add(sr); } samplerMetric = getSamplerMetric(“all”); samplerMetric。addCumulated(sr); */ samplerMetric。add(sr); // add部分 MetricsRow row = new MetricsRow( sr, context。getParameter(KAFKA_TEST_MODE), context。getParameter(KAFKA_TIMESTAMP), this。buildNumber, context。getBooleanParameter(KAFKA_PARSE_REQ_HEADERS, false), context。getBooleanParameter(KAFKA_PARSE_RES_HEADERS, false), fields, samplerMetric); // add引數samplerMetric if (validateSample(context, sr)) { try { // Prefix to skip from adding service specific parameters to the metrics row String servicePrefixName = “kafka。”; this。publisher。addToList(new Gson()。toJson(row。getRowAsMap(context, servicePrefixName))); } catch (Exception e) { logger。error( “The Kafka Backend Listener was unable to add sampler to the list of samplers to send。。。 More info in JMeter‘s console。”); e。printStackTrace(); } } }

我們在這個方法中增加了SamplerMetric的呼叫(上面標示的add 部分),關於SamplerMetric類中我們可以看到有我們需要的指標計算,可以get到我們所要的指標,如下:

public int getTotal() { return successes+failures; } public int getSuccesses() { return successes; } public int getFailures() { return failures; } public double getOkMaxTime() { return okResponsesStats。getMax(); } public double getOkMinTime() { return okResponsesStats。getMin(); } public double getOkMean() { return okResponsesStats。getMean(); } public double getOkPercentile(double percentile) { return okResponsesStats。getPercentile(percentile); } public double getKoMaxTime() { return koResponsesStats。getMax(); } public double getKoMinTime() { return koResponsesStats。getMin(); } public double getKoMean() { return koResponsesStats。getMean(); } public double getKoPercentile(double percentile) { return koResponsesStats。getPercentile(percentile); } public double getAllMaxTime() { return allResponsesStats。getMax(); } public double getAllMinTime() { return allResponsesStats。getMin(); } public double getAllMean() { return allResponsesStats。getMean(); } public double getAllPercentile(double percentile) { return pctResponseStats。getPercentile(percentile); } /** * Returns hits to server * @return the hits */ public int getHits() { return hits; } public Map getErrors() { return errors; } public long getSentBytes() { return sentBytes; } public long getReceivedBytes() { return receivedBytes; }

由於我們在MetricsRow方法呼叫時加了samplerMetric引數,所以需要改一下MetricsRow類的建構函式(add引數):

public MetricsRow( SampleResult sr, String testMode, String timeStamp, int buildNumber, boolean parseReqHeaders, boolean parseResHeaders, Set fields, SamplerMetric samplerMetric) { // add引數 samplerMetric this。sampleResult = sr; this。kafkaTestMode = testMode。trim(); this。kafkaTimestamp = timeStamp。trim(); this。ciBuildNumber = buildNumber; this。metricsMap = new HashMap<>(); this。allReqHeaders = parseReqHeaders; this。allResHeaders = parseResHeaders; this。fields = fields; this。samplerMetric = samplerMetric; }

然後我們在MetricsRow的getRowAsMap函式中就可以新增SamplerMetric類提供的指標,以下只具例了其中三個指標:

addFilteredMetricToMetricsMap( “Hits”, this。samplerMetric。getHits());addFilteredMetricToMetricsMap( “TotalRequest”, this。samplerMetric。getTotal());addFilteredMetricToMetricsMap( “AllMaxTime”, this。samplerMetric。getAllMaxTime());

重新構建 jmeter-backend-listener-kafka 的原始碼,生成jar包,替換Jmeter原來的jar包,重新測試,這回我們就可以看到資料庫中收集到指標就有Hits了:

玩轉 jmeter backend listener kafka

influxdb

這樣新增指標的目的就達到了,如果還需要其他指標,也可以基於這個方式繼續在MetricsRow中的getRowAsMap函式中新增各類指標,以上過程其實不難理解,只要懂點Java的並在理解了監聽器原理後,參照influxdb監聽器的原始碼我們就輕鬆完成Kafka監聽器的改造,如果對效能指標的計算原理了解的話,還可以擴充套件個性化的效能指標計算。

當然,我們完全可以不用去改造jmeter-backend-listener-kafka,只要在外部加個處理程式,對收集到的基礎sampler指標值進行重計算,就像JMeter的html報告生成那樣,透過計算也能得到想要的效能測試報告。另外還可以像Metersphere那樣,加個 data-streaming 讀取kafka資料,並重計算後發給mysql儲存,最後從mysql讀取測試結果資料進行報告展現(其中data-streaming對測試結果資料的計算處理應該也是借鑑了JMeter原生程式碼)

六、有關influxdb2。x應用介紹

由於influxdb已經推出2。x版本,以上都是基於1。x版本,下一篇文章會提到influxdb2監聽器的使用《JMeter關於influxDB 2。x 後端監聽器使用》,對於Kafka監聽器來說,透過telegraf也可以支援influxdb2的資料格式傳輸,目前telegraf已經支援influxdb2的資料寫入:

[[outputs。influxdb_v2]] urls = [“http://localhost:8086”] token = “$INFLUX_TOKEN” organization = “example-org” bucket = “example-bucket”

參考influxdb的官方文件 Manually configure Telegraf for InfluxDB v2。0 | InfluxDB OSS 2。0 Documentation

傳給influxdb2的資料在influxdb介面上也可以查詢得到:

玩轉 jmeter backend listener kafka

grafana Flux 編輯

透過InfluxDB 2。x的flux語法可以展示Hits圖:

from(bucket: “kafka”) |> range(start: v。timeRangeStart, stop: v。timeRangeStop) |> filter(fn: (r) => r[“_measurement”] == “kafka_consumer”) |> filter(fn: (r) => r[“JMETER_METRICS”] == “JMETER_METRICS”) |> filter(fn: (r) => r[“_field”] == “Hits”) |> aggregateWindow(every: v。windowPeriod, fn: mean, createEmpty: false) |> yield(name: “mean”)

玩轉 jmeter backend listener kafka

Grafna Data Explorer

編輯目前主要是Grafana官網上提供Jmeter的influxdb2格式模板比較少,希望以後能多一些,因為基於influxdb2。x的Grafana展示效果會比influxdb1。x要好。