5——新消費者配置——kafka0.10.x-全系列-加米穀大資料

5——新消費者配置——kafka0.10.x-全系列-加米穀大資料

Kafka新消費者配置

領取大資料開發、大資料探勘分析、Python人工智慧等試聽資料,可上加米穀大資料官網(http://www。dtinone。com/)諮詢領取或者加入加米穀大資料技術交流群領取:234648425

3。3。1 新消費者配置

新消費者配置:(注意,右面是可拖動的)

NAME

DESCRIPTION

TYPE

DEFAULT

VALID VALUES

IMPORTANCE

bootstrap。servers

因為這些伺服器地址僅用於初始化連線並發現完整的叢集成員(可能會動態的變化),所以此列表不需要包含完整的叢集地址(儘量多配置幾個,以防止配置的伺服器關閉)。

list

high

key。deserializer

key的解析序列化介面實現類(Deserializer)。

class

high

value。deserializer

value的解析序列化介面實現類(Deserializer)

class

high

fetch。min。bytes

伺服器哦拉取請求返回的最小資料量,如果資料不足,請求將等待資料積累。預設設定為1位元組,表示只要單個位元組的資料可用或者讀取等待請求超時,就會應答讀取請求。將此值設定的越大將導致伺服器等待資料累積的越長,這可能以一些額外延遲為代價提高伺服器吞吐量。

int

1

[0,。。。]

high

group。id

此消費者所屬消費者組的唯一標識。如果消費者用於訂閱或offset管理策略的組管理功能,則此屬性是必須的。

string

“”

high

heartbeat。interval。ms

當使用Kafka的分組管理功能時,心跳到消費者協調器之間的預計時間。心跳用於確保消費者的會話保持活動狀態,並當有新消費者加入或離開組時方便重新平衡。該值必須必比session。timeout。ms小,通常不高於1/3。它可以調整的更低,以控制正常重新平衡的預期時間。

int

3000

high

max。partition。fetch。bytes

伺服器將返回每個分割槽的最大資料量。如果拉取的第一個非空分割槽中第一個訊息大於此限制,則仍然會返回訊息,以確保消費者可以正常的工作。broker接受的最大訊息大小透過message。max。bytes(broker config)或max。message。bytes (topic config)定義。參閱fetch。max。bytes以限制消費者請求大小。

int

1048576

[0,。。。]

high

session。timeout。ms

用於發現消費者故障的超時時間。消費者週期性的傳送心跳到broker,表示其還活著。如果會話超時期滿之前沒有收到心跳,那麼broker將從分組中移除消費者,並啟動重新平衡。請注意,該值必須在broker配置的group。min。session。timeout。ms和group。max。session。timeout。ms允許的範圍內。

int

10000

high

ssl。key。password

金鑰儲存檔案中的私鑰的密碼。 客戶端可選

password

null

high

ssl。keystore。location

金鑰儲存檔案的位置, 這對於客戶端是可選的,並且可以用於客戶端的雙向認證。

string

null

high

ssl。keystore。password

金鑰倉庫檔案的倉庫密碼。客戶端可選,只有ssl。keystore。location配置了才需要。

password

null

high

ssl。truststore。location

信任倉庫檔案的位置

string

null

high

ssl。truststore。password

信任倉庫檔案的密碼

password

null

high

auto。offset。reset

當Kafka中沒有初始offset或如果當前的offset不存在時(例如,該資料被刪除了),該怎麼辦。

最早:自動將偏移重置為最早的偏移

最新:自動將偏移重置為最新偏移

none:如果消費者組找到之前的offset,則向消費者丟擲異常

其他:丟擲異常給消費者。

string

latest

[latest, earliest, none]

medium

connections。max。idle。ms

指定在多少毫秒之後關閉閒置的連線

long

540000

medium

enable。auto。commit

如果為true,消費者的offset將在後臺週期性的提交

boolean

true

medium

exclude。internal。topics

內部topic的記錄(如偏移量)是否應向消費者公開。如果設定為true,則從內部topic接受記錄的唯一方法是訂閱它。

boolean

true

medium

fetch。max。bytes

伺服器為拉取請求返回的最大資料值。這不是絕對的最大值,如果在第一次非空分割槽拉取的第一條訊息大於該值,該訊息將仍然返回,以確保消費者繼續工作。接收的最大訊息大小透過message。max。bytes (broker config) 或 max。message。bytes (topic config)定義。注意,消費者是並行執行多個提取的。

int

52428800

[0,。。。]

medium

max。poll。interval。ms

使用消費者組管理時poll()呼叫之間的最大延遲。消費者在獲取更多記錄之前可以空閒的時間量的上限。如果此超時時間期滿之前poll()沒有呼叫,則消費者被視為失敗,並且分組將重新平衡,以便將分割槽重新分配給別的成員。

int

300000

[1,。。。]

medium

max。poll。records

在單次呼叫poll()中返回的最大記錄數。

int

500

[1,。。。]

medium

partition。assignment。strategy

當使用組管理時,客戶端將使用分割槽分配策略的類名來分配消費者例項之間的分割槽所有權

list

class org。apache。kafka

。clients。consumer

。RangeAssignor

medium

receive。buffer。bytes

讀取資料時使用的TCP接收緩衝區(SO_RCVBUF)的大小。 如果值為-1,則將使用OS預設值。

int

65536

[-1,。。。]

medium

request。timeout。ms

配置控制客戶端等待請求響應的最長時間。 如果在超時之前未收到響應,客戶端將在必要時重新發送請求,如果重試耗盡則客戶端將重新發送請求。

int

305000

[0,。。。]

medium

sasl。jaas。config

JAAS配置檔案中SASL連線登入上下文引數。 這裡描述JAAS配置檔案格式。 該值的格式為: ‘(=)*;’

password

null

medium

sasl。kerberos。service。name

Kafka執行Kerberos principal名。可以在Kafka的JAAS配置檔案或在Kafka的配置檔案中定義。

string

null

medium

sasl。mechanism

用於客戶端連線的SASL機制。安全提供者可用的機制。GSSAPI是預設機制。

string

GSSAPI

medium

security。protocol

用於與broker通訊的協議。 有效值為:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。

string

PLAINTEXT

medium

send。buffer。bytes

傳送資料時要使用的TCP傳送緩衝區(SO_SNDBUF)的大小。 如果值為-1,則將使用OS預設值。

int

131072

[-1,。。。]

medium

ssl。enabled。protocols

啟用SSL連線的協議列表。

list

TLSv1。2,TLSv1。1,TLSv1

medium

ssl。keystore。type

key倉庫檔案的檔案格式,客戶端可選。

string

JKS

medium

ssl。protocol

用於生成SSLContext的SSL協議。 預設設定是TLS,這對大多數情況都是適用的。 最新的JVM中的允許值為TLS,TLSv1。1和TLSv1。2。 較舊的JVM可能支援SSL,SSLv2和SSLv3,但由於已知的安全漏洞,不建議使用SSL。

string

TLS

medium

ssl。provider

用於SSL連線的安全提供程式的名稱。 預設值是JVM的預設安全提供程式。

string

null

medium

ssl。truststore。type

信任儲存檔案的檔案格式。

string

JKS

medium

auto。commit。interval。ms

如果enable。auto。commit設定為true,則消費者偏移量自動提交給Kafka的頻率(以毫秒為單位)。

int

5000

[0,。。。]

low

check。crcs

自動檢查CRC32記錄的消耗。 這樣可以確保訊息發生時不會線上或磁碟損壞。 此檢查增加了一些開銷,因此在尋求極致效能的情況下可能會被禁用。

boolean

true

low

client。id

在發出請求時傳遞給伺服器的id字串。 這樣做的目的是透過允許將邏輯應用程式名稱包含在伺服器端請求日誌記錄中,來跟蹤ip/port的請求源。

string

“”

low

fetch。max。wait。ms

如果沒有足夠的資料滿足fetch。min。bytes,伺服器將在接收到提取請求之前阻止的最大時間。

int

500

[0,。。。]

low

interceptor。classes

用作攔截器的類的列表。 你可實現ConsumerInterceptor介面以允許攔截(也可能變化)消費者接收的記錄。 預設情況下,沒有攔截器。

list

null

low

metadata。max。age。ms

在一定時間段之後(以毫秒為單位的),強制更新元資料,即使沒有任何分割槽領導變化,任何新的broker或分割槽。

long

300000

[0,。。。]

low

metric。reporters

用作度量記錄員類的列表。實現MetricReporter介面以允許插入通知新的度量建立的類。JmxReporter始終包含在註冊JMX統計資訊中。

list

“”

low

metrics。num。samples

保持的樣本數以計算度量。

int

2

[1,。。。]

low

metrics。recording。level

最高的記錄級別。

string

INFO

[INFO, DEBUG]

low

metrics。sample。window。ms

The window of time a metrics sample is computed over。

long

30000

[0,。。。]

low

reconnect。backoff。ms

嘗試重新連線指定主機之前等待的時間,避免頻繁的連線主機,這種機制適用於消費者向broker傳送的所有請求。

long

50

[0,。。。]

low

retry。backoff。ms

嘗試重新發送失敗的請求到指定topic分割槽之前的等待時間。避免在某些故障情況下,頻繁的重複傳送。

long

100

[0,。。。]

low

sasl。kerberos。kinit。cmd Kerberos

kinit命令路徑。

string

/usr/bin/kinit

low

sasl。kerberos。min。time。before。relogin

嘗試/恢復之間的登入執行緒的休眠時間。

long

60000

low

sasl。kerberos。ticket。renew。jitter

新增到更新時間的隨機抖動百分比。

double

0。05

low

sasl。kerberos。ticket。renew。window。factor

登入執行緒將休眠,直到從上次重新整理到ticket的指定的時間視窗因子到期,此時將嘗試續訂ticket。

double

0。8

low

ssl。cipher。suites

密碼套件列表,用於TLS或SSL網路協議的安全設定,認證,加密,MAC和金鑰交換演算法的明明組合。預設情況下,支援所有可用的密碼套件。

list

null

low

ssl。endpoint。identification。algorithm

使用伺服器證書驗證伺服器主機名的端點識別演算法。

string

null

low

ssl。keymanager。algorithm

金鑰管理器工廠用於SSL連線的演算法。 預設值是為Java虛擬機器配置的金鑰管理器工廠演算法。

string

SunX509

low

ssl。secure。random。implementation

用於SSL加密操作的SecureRandom PRNG實現。

string

null

low

ssl。trustmanager。algorithm

信任管理器工廠用於SSL連線的演算法。 預設值是為Java虛擬機器配置的信任管理器工廠演算法。

string

PKIX

low