說到JMeter後端監聽器,大家接觸的一般是InfluxdbBackendListenerClient(Influxdb後端監聽器),可以將測試報告實時推送到Influxdb,然後用Grafana展示。但是這種方式在大併發情況下,會因為吞吐量過大,Influxdb本身的效能瓶頸,無法支撐(Influxdb崩潰是常有的事),所以使用Kafka監聽器就很有必要了,Kafka作為訊息佇列中介軟體,可以起到緩衝器的作用。
一、Kafka後端監聽器原理介紹
jmeter-backend-listener-kafka其實就是透過繼承AbstractBackendListenerClient來將非同步獲取到的JMeter測試結果集SampleResult進行相應處理(與JMeter原生自帶的influxdb、graphite後端監聽器原理一樣),然後將元資料上報至kakfa,這樣你就可以透過消費kafka Topic非同步來接收測試結果集:
KafkaBackendClient
透過實現的handleSampleResults方法來處理資料並上報至kafka:
handleSampleResults
二、下載和使用Kafka後端監聽器
我們可以從兩個地方下載原始碼或releases的jar包,如下:
原始碼路徑:https://github。com/rahulsinghai/jmeter-backend-listener-kafka
Metersphere官方路徑:https://github。com/metersphere/jmeter-backend-listener-kafka
把jar包jmeter。backendlistener。kafka-1。0。4。jar放到Jmeter的lib\ext後重啟JMeter即可支援:
Kafka後端監聽器
為了快速部署驗證環境,我這次用Docker裝了Kafka和Zookeeper叢集,裝了influxdb和Grafana,其中Kafka路徑和埠如上圖所標示,測試前請確保Kafka服務和埠是連通的。
三、通過後端監聽器收集測試結果
我們按上圖配置好後端監聽器,並執行JMeter測試,然後用Offset Explorer連線kafka可以檢視到我們監聽器收集到的報告資料:
kafka獲取的資料
由於儲存的是編碼後的Key-value格式,我們可以用Telegraf消費訊息,往influxdb儲存訊息,來看收到的是什麼訊息(當然,你也可以採用別的方式)。
Telegraf的配置如下:
首先配置Output(主要是influxdb的url和database):
################################################################################ OUTPUT PLUGINS ################################################################################# Configuration for sending metrics to InfluxDB[[outputs。influxdb]] ## The full HTTP or UDP URL for your InfluxDB instance。 ## ## Multiple URLs can be specified for a single cluster, only ONE of the ## urls will be written to each interval。 # urls = [“unix:///var/run/influxdb。sock”] # urls = [“udp://127。0。0。1:8089”] urls = [“http://172。17。2。130:8086”] ## The target database for metrics; will be created as needed。 ## For UDP url endpoint database needs to be configured on server side。 database = “kafka”
然後配置Input(為了方便檢視只配置kafka,把預設其他的CPU、disk等註釋掉,以免干擾):
# # Read metrics from Kafka topic(s) [[inputs。kafka_consumer]]# ## kafka servers brokers = [“172。17。2。43:9092”]# ## topic(s) to consume topics = [“JMETER_METRICS”]# ## Add topic as tag if topic_tag is not empty topic_tag = “JMETER_METRICS”# ## the name of the consumer group consumer_group = “telegraf_metrics_consumers”# ## Offset (must be either “oldest” or “newest”) offset = “oldest”# ## Data format to consume。# ## Each data format has its own unique set of configuration options, read# ## more about them here:# ## https://github。com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT。md data_format = “value” data_type = “string”
啟動telegraf,看同步資料的日誌是否正常:
2022-08-04T00:29:48Z I! Starting Telegraf 1。10。22022-08-04T00:29:48Z I! Loaded inputs: kafka_consumer2022-08-04T00:29:48Z I! Loaded aggregators: 2022-08-04T00:29:48Z I! Loaded processors: 2022-08-04T00:29:48Z I! Loaded outputs: influxdb2022-08-04T00:29:48Z I! Tags enabled: host=172。17。2。432022-08-04T00:29:48Z I! [agent] Config: Interval:10s, Quiet:false, Hostname:“172。17。2。43”, Flush Interval:10s2022-08-04T00:29:48Z I! Started the kafka consumer service, brokers: [172。17。2。43:9092], topics: [JMETER_METRICS]
同步正常,我們就用InfluxDB Studio連線influxdb檢視收集到的資料:
influxdb收集到的資料
檢視value值,可以看到收集到的測試結果內容,value值如下:
{\“ContentType\”:\“text/html; charset\\u003dUTF-8\”,\“IdleTime\”:0,\“ElapsedTime\”:\“2022-08-04T00:00:01。000+0800\”,\“ErrorCount\”:0,\“Timestamp\”:\“2022-08-04T10:01:22。259+0800\”,\“URL\”:\“https://mp。weixin。qq。com/s/dWBD8ZNYnzuao5ca3gMi3Q\”,\“SampleStartTime\”:\“2022-08-04T10:01:22。259+0800\”,\“Success\”:true,\“Bytes\”:64438,\“SentBytes\”:689,\“AllThreads\”:1,\“TestElement。name\”:\“Thread-11\”,\“DataType\”:\“text\”,\“ResponseTime\”:396,\“SampleCount\”:1,\“FailureMessage\”:\“\”,\“ConnectTime\”:176,\“ResponseCode\”:\“200\”,\“TestStartTime\”:1659578481614,\“AssertionResults\”:[],\“Latency\”:342,\“InjectorHostname\”:\“ZGH-PC\”,\“GrpThreads\”:1,\“SampleEndTime\”:\“2022-08-04T10:01:22。655+0800\”,\“BodySize\”:61665,\“ThreadName\”:\“threadGroup 1-1\”,\“SampleLabel\”:\“chrome-192。168。1。246\”}
其實我們看kafka監聽器的原始碼/jmeter/backendlistener/model/MetricsRow。java,也能知道收集的測試結果資料格式:
public Map
大家發現這些內容,只要經過計算就可以生成JMeter測試報告,有執行緒數,有響應時間,有Sample名稱數量和成功標識、Bytes等指標。但是缺少TPS,90%響應時間等指標,這些指標可以參考Influxdb監聽器自己進行擴充套件和重計數。
四、透過Grafana進行結果展示
以上的Key Value格式是不利於在Grafana中展現的,我們可以在Telegraf中改變傳輸格式為json:
# # Read metrics from Kafka topic(s) [[inputs。kafka_consumer]]# ## kafka servers brokers = [“172。17。2。43:9092”]# ## topic(s) to consume 可以新增多個測試專案的topic topics = [“JMETER_METRICS”]# ## Add topic as tag if topic_tag is not empty topic_tag = “JMETER_METRICS”# ## the name of the consumer group consumer_group = “telegraf_metrics_consumers”# ## Offset (must be either “oldest” or “newest”) offset = “oldest”# ## Data format to consume。# ## Each data format has its own unique set of configuration options, read# ## more about them here:# ## https://github。com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT。md data_format = “json”
再次測試,這時候發現能夠在influxdb中按不同欄位顯示指標值了:
influxdb
但是展示的欄位不全,到telegraf官網檢視配置說明,發現可以新增顯示欄位:
telegraf inputs
修改telegraf。conf配置,在data_format配置下新增缺少的欄位,同時把SampleLabel新增為tag Key(也可以按需要新增多個):
# ## Data format to consume。# ## Each data format has its own unique set of configuration options, read# ## more about them here:# ## https://github。com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT。md data_format = “json” tag_keys = [“SampleLabel”] json_string_fields=[“Success”, “ThreadName”, “SampleLabel”, “Timestamp”, “URL”, “FailureMessage”, “ResponseCode”, “AssertionResults”, “InjectorHostname”, “SampleStartTime”, “SampleEndTime”]
再次測試,檢視inluxdb中同步的結果資料,發現能看到更多的欄位了:
influxdb
有了這些欄位,我們就可以在grafana中配置展示:
grafana
grafana
五、Kafka監聽器外掛擴充套件開發
透過上面的演示,我們發現不能像influxdb後端監聽器那樣收集到【點選率hits】等常規的效能指標,那麼我們可以參考Influxdb後端監聽器的原始碼對Kafka監聽器進行改造,同過由上面提到的監聽器原理可以知道,監聽器是透過handleSampleResults方法來處理資料並上報至kafka或influxdb,那麼我們就從這個函式著手,這個函式所屬於KafkaBackendClient。java類檔案中:
io。github。rahulsinghai。jmeter。backendlistener。kafka。KafkaBackendClient
我們將handleSampleResults方法修改如下(註釋為add的內容):
@Override public void handleSampleResults(List
我們在這個方法中增加了SamplerMetric的呼叫(上面標示的add 部分),關於SamplerMetric類中我們可以看到有我們需要的指標計算,可以get到我們所要的指標,如下:
public int getTotal() { return successes+failures; } public int getSuccesses() { return successes; } public int getFailures() { return failures; } public double getOkMaxTime() { return okResponsesStats。getMax(); } public double getOkMinTime() { return okResponsesStats。getMin(); } public double getOkMean() { return okResponsesStats。getMean(); } public double getOkPercentile(double percentile) { return okResponsesStats。getPercentile(percentile); } public double getKoMaxTime() { return koResponsesStats。getMax(); } public double getKoMinTime() { return koResponsesStats。getMin(); } public double getKoMean() { return koResponsesStats。getMean(); } public double getKoPercentile(double percentile) { return koResponsesStats。getPercentile(percentile); } public double getAllMaxTime() { return allResponsesStats。getMax(); } public double getAllMinTime() { return allResponsesStats。getMin(); } public double getAllMean() { return allResponsesStats。getMean(); } public double getAllPercentile(double percentile) { return pctResponseStats。getPercentile(percentile); } /** * Returns hits to server * @return the hits */ public int getHits() { return hits; } public Map
由於我們在MetricsRow方法呼叫時加了samplerMetric引數,所以需要改一下MetricsRow類的建構函式(add引數):
public MetricsRow( SampleResult sr, String testMode, String timeStamp, int buildNumber, boolean parseReqHeaders, boolean parseResHeaders, Set
然後我們在MetricsRow的getRowAsMap函式中就可以新增SamplerMetric類提供的指標,以下只具例了其中三個指標:
addFilteredMetricToMetricsMap( “Hits”, this。samplerMetric。getHits());addFilteredMetricToMetricsMap( “TotalRequest”, this。samplerMetric。getTotal());addFilteredMetricToMetricsMap( “AllMaxTime”, this。samplerMetric。getAllMaxTime());
重新構建 jmeter-backend-listener-kafka 的原始碼,生成jar包,替換Jmeter原來的jar包,重新測試,這回我們就可以看到資料庫中收集到指標就有Hits了:
influxdb
這樣新增指標的目的就達到了,如果還需要其他指標,也可以基於這個方式繼續在MetricsRow中的getRowAsMap函式中新增各類指標,以上過程其實不難理解,只要懂點Java的並在理解了監聽器原理後,參照influxdb監聽器的原始碼我們就輕鬆完成Kafka監聽器的改造,如果對效能指標的計算原理了解的話,還可以擴充套件個性化的效能指標計算。
當然,我們完全可以不用去改造jmeter-backend-listener-kafka,只要在外部加個處理程式,對收集到的基礎sampler指標值進行重計算,就像JMeter的html報告生成那樣,透過計算也能得到想要的效能測試報告。另外還可以像Metersphere那樣,加個 data-streaming 讀取kafka資料,並重計算後發給mysql儲存,最後從mysql讀取測試結果資料進行報告展現(其中data-streaming對測試結果資料的計算處理應該也是借鑑了JMeter原生程式碼)
六、有關influxdb2。x應用介紹
由於influxdb已經推出2。x版本,以上都是基於1。x版本,下一篇文章會提到influxdb2監聽器的使用《JMeter關於influxDB 2。x 後端監聽器使用》,對於Kafka監聽器來說,透過telegraf也可以支援influxdb2的資料格式傳輸,目前telegraf已經支援influxdb2的資料寫入:
[[outputs。influxdb_v2]] urls = [“http://localhost:8086”] token = “$INFLUX_TOKEN” organization = “example-org” bucket = “example-bucket”
參考influxdb的官方文件 Manually configure Telegraf for InfluxDB v2。0 | InfluxDB OSS 2。0 Documentation
傳給influxdb2的資料在influxdb介面上也可以查詢得到:
grafana Flux 編輯
透過InfluxDB 2。x的flux語法可以展示Hits圖:
from(bucket: “kafka”) |> range(start: v。timeRangeStart, stop: v。timeRangeStop) |> filter(fn: (r) => r[“_measurement”] == “kafka_consumer”) |> filter(fn: (r) => r[“JMETER_METRICS”] == “JMETER_METRICS”) |> filter(fn: (r) => r[“_field”] == “Hits”) |> aggregateWindow(every: v。windowPeriod, fn: mean, createEmpty: false) |> yield(name: “mean”)
Grafna Data Explorer
編輯目前主要是Grafana官網上提供Jmeter的influxdb2格式模板比較少,希望以後能多一些,因為基於influxdb2。x的Grafana展示效果會比influxdb1。x要好。