Kafka新消費者配置
領取大資料開發、大資料探勘分析、Python人工智慧等試聽資料,可上加米穀大資料官網(http://www。dtinone。com/)諮詢領取或者加入加米穀大資料技術交流群領取:234648425
3。3。1 新消費者配置
新消費者配置:(注意,右面是可拖動的)
NAME
DESCRIPTION
TYPE
DEFAULT
VALID VALUES
IMPORTANCE
bootstrap。servers
因為這些伺服器地址僅用於初始化連線並發現完整的叢集成員(可能會動態的變化),所以此列表不需要包含完整的叢集地址(儘量多配置幾個,以防止配置的伺服器關閉)。
list
high
key。deserializer
key的解析序列化介面實現類(Deserializer)。
class
high
value。deserializer
value的解析序列化介面實現類(Deserializer)
class
high
fetch。min。bytes
伺服器哦拉取請求返回的最小資料量,如果資料不足,請求將等待資料積累。預設設定為1位元組,表示只要單個位元組的資料可用或者讀取等待請求超時,就會應答讀取請求。將此值設定的越大將導致伺服器等待資料累積的越長,這可能以一些額外延遲為代價提高伺服器吞吐量。
int
1
[0,。。。]
high
group。id
此消費者所屬消費者組的唯一標識。如果消費者用於訂閱或offset管理策略的組管理功能,則此屬性是必須的。
string
“”
high
heartbeat。interval。ms
當使用Kafka的分組管理功能時,心跳到消費者協調器之間的預計時間。心跳用於確保消費者的會話保持活動狀態,並當有新消費者加入或離開組時方便重新平衡。該值必須必比session。timeout。ms小,通常不高於1/3。它可以調整的更低,以控制正常重新平衡的預期時間。
int
3000
high
max。partition。fetch。bytes
伺服器將返回每個分割槽的最大資料量。如果拉取的第一個非空分割槽中第一個訊息大於此限制,則仍然會返回訊息,以確保消費者可以正常的工作。broker接受的最大訊息大小透過message。max。bytes(broker config)或max。message。bytes (topic config)定義。參閱fetch。max。bytes以限制消費者請求大小。
int
1048576
[0,。。。]
high
session。timeout。ms
用於發現消費者故障的超時時間。消費者週期性的傳送心跳到broker,表示其還活著。如果會話超時期滿之前沒有收到心跳,那麼broker將從分組中移除消費者,並啟動重新平衡。請注意,該值必須在broker配置的group。min。session。timeout。ms和group。max。session。timeout。ms允許的範圍內。
int
10000
high
ssl。key。password
金鑰儲存檔案中的私鑰的密碼。 客戶端可選
password
null
high
ssl。keystore。location
金鑰儲存檔案的位置, 這對於客戶端是可選的,並且可以用於客戶端的雙向認證。
string
null
high
ssl。keystore。password
金鑰倉庫檔案的倉庫密碼。客戶端可選,只有ssl。keystore。location配置了才需要。
password
null
high
ssl。truststore。location
信任倉庫檔案的位置
string
null
high
ssl。truststore。password
信任倉庫檔案的密碼
password
null
high
auto。offset。reset
當Kafka中沒有初始offset或如果當前的offset不存在時(例如,該資料被刪除了),該怎麼辦。
最早:自動將偏移重置為最早的偏移
最新:自動將偏移重置為最新偏移
none:如果消費者組找到之前的offset,則向消費者丟擲異常
其他:丟擲異常給消費者。
string
latest
[latest, earliest, none]
medium
connections。max。idle。ms
指定在多少毫秒之後關閉閒置的連線
long
540000
medium
enable。auto。commit
如果為true,消費者的offset將在後臺週期性的提交
boolean
true
medium
exclude。internal。topics
內部topic的記錄(如偏移量)是否應向消費者公開。如果設定為true,則從內部topic接受記錄的唯一方法是訂閱它。
boolean
true
medium
fetch。max。bytes
伺服器為拉取請求返回的最大資料值。這不是絕對的最大值,如果在第一次非空分割槽拉取的第一條訊息大於該值,該訊息將仍然返回,以確保消費者繼續工作。接收的最大訊息大小透過message。max。bytes (broker config) 或 max。message。bytes (topic config)定義。注意,消費者是並行執行多個提取的。
int
52428800
[0,。。。]
medium
max。poll。interval。ms
使用消費者組管理時poll()呼叫之間的最大延遲。消費者在獲取更多記錄之前可以空閒的時間量的上限。如果此超時時間期滿之前poll()沒有呼叫,則消費者被視為失敗,並且分組將重新平衡,以便將分割槽重新分配給別的成員。
int
300000
[1,。。。]
medium
max。poll。records
在單次呼叫poll()中返回的最大記錄數。
int
500
[1,。。。]
medium
partition。assignment。strategy
當使用組管理時,客戶端將使用分割槽分配策略的類名來分配消費者例項之間的分割槽所有權
list
class org。apache。kafka
。clients。consumer
。RangeAssignor
medium
receive。buffer。bytes
讀取資料時使用的TCP接收緩衝區(SO_RCVBUF)的大小。 如果值為-1,則將使用OS預設值。
int
65536
[-1,。。。]
medium
request。timeout。ms
配置控制客戶端等待請求響應的最長時間。 如果在超時之前未收到響應,客戶端將在必要時重新發送請求,如果重試耗盡則客戶端將重新發送請求。
int
305000
[0,。。。]
medium
sasl。jaas。config
JAAS配置檔案中SASL連線登入上下文引數。 這裡描述JAAS配置檔案格式。 該值的格式為: ‘(=)*;’
password
null
medium
sasl。kerberos。service。name
Kafka執行Kerberos principal名。可以在Kafka的JAAS配置檔案或在Kafka的配置檔案中定義。
string
null
medium
sasl。mechanism
用於客戶端連線的SASL機制。安全提供者可用的機制。GSSAPI是預設機制。
string
GSSAPI
medium
security。protocol
用於與broker通訊的協議。 有效值為:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。
string
PLAINTEXT
medium
send。buffer。bytes
傳送資料時要使用的TCP傳送緩衝區(SO_SNDBUF)的大小。 如果值為-1,則將使用OS預設值。
int
131072
[-1,。。。]
medium
ssl。enabled。protocols
啟用SSL連線的協議列表。
list
TLSv1。2,TLSv1。1,TLSv1
medium
ssl。keystore。type
key倉庫檔案的檔案格式,客戶端可選。
string
JKS
medium
ssl。protocol
用於生成SSLContext的SSL協議。 預設設定是TLS,這對大多數情況都是適用的。 最新的JVM中的允許值為TLS,TLSv1。1和TLSv1。2。 較舊的JVM可能支援SSL,SSLv2和SSLv3,但由於已知的安全漏洞,不建議使用SSL。
string
TLS
medium
ssl。provider
用於SSL連線的安全提供程式的名稱。 預設值是JVM的預設安全提供程式。
string
null
medium
ssl。truststore。type
信任儲存檔案的檔案格式。
string
JKS
medium
auto。commit。interval。ms
如果enable。auto。commit設定為true,則消費者偏移量自動提交給Kafka的頻率(以毫秒為單位)。
int
5000
[0,。。。]
low
check。crcs
自動檢查CRC32記錄的消耗。 這樣可以確保訊息發生時不會線上或磁碟損壞。 此檢查增加了一些開銷,因此在尋求極致效能的情況下可能會被禁用。
boolean
true
low
client。id
在發出請求時傳遞給伺服器的id字串。 這樣做的目的是透過允許將邏輯應用程式名稱包含在伺服器端請求日誌記錄中,來跟蹤ip/port的請求源。
string
“”
low
fetch。max。wait。ms
如果沒有足夠的資料滿足fetch。min。bytes,伺服器將在接收到提取請求之前阻止的最大時間。
int
500
[0,。。。]
low
interceptor。classes
用作攔截器的類的列表。 你可實現ConsumerInterceptor介面以允許攔截(也可能變化)消費者接收的記錄。 預設情況下,沒有攔截器。
list
null
low
metadata。max。age。ms
在一定時間段之後(以毫秒為單位的),強制更新元資料,即使沒有任何分割槽領導變化,任何新的broker或分割槽。
long
300000
[0,。。。]
low
metric。reporters
用作度量記錄員類的列表。實現MetricReporter介面以允許插入通知新的度量建立的類。JmxReporter始終包含在註冊JMX統計資訊中。
list
“”
low
metrics。num。samples
保持的樣本數以計算度量。
int
2
[1,。。。]
low
metrics。recording。level
最高的記錄級別。
string
INFO
[INFO, DEBUG]
low
metrics。sample。window。ms
The window of time a metrics sample is computed over。
long
30000
[0,。。。]
low
reconnect。backoff。ms
嘗試重新連線指定主機之前等待的時間,避免頻繁的連線主機,這種機制適用於消費者向broker傳送的所有請求。
long
50
[0,。。。]
low
retry。backoff。ms
嘗試重新發送失敗的請求到指定topic分割槽之前的等待時間。避免在某些故障情況下,頻繁的重複傳送。
long
100
[0,。。。]
low
sasl。kerberos。kinit。cmd Kerberos
kinit命令路徑。
string
/usr/bin/kinit
low
sasl。kerberos。min。time。before。relogin
嘗試/恢復之間的登入執行緒的休眠時間。
long
60000
low
sasl。kerberos。ticket。renew。jitter
新增到更新時間的隨機抖動百分比。
double
0。05
low
sasl。kerberos。ticket。renew。window。factor
登入執行緒將休眠,直到從上次重新整理到ticket的指定的時間視窗因子到期,此時將嘗試續訂ticket。
double
0。8
low
ssl。cipher。suites
密碼套件列表,用於TLS或SSL網路協議的安全設定,認證,加密,MAC和金鑰交換演算法的明明組合。預設情況下,支援所有可用的密碼套件。
list
null
low
ssl。endpoint。identification。algorithm
使用伺服器證書驗證伺服器主機名的端點識別演算法。
string
null
low
ssl。keymanager。algorithm
金鑰管理器工廠用於SSL連線的演算法。 預設值是為Java虛擬機器配置的金鑰管理器工廠演算法。
string
SunX509
low
ssl。secure。random。implementation
用於SSL加密操作的SecureRandom PRNG實現。
string
null
low
ssl。trustmanager。algorithm
信任管理器工廠用於SSL連線的演算法。 預設值是為Java虛擬機器配置的信任管理器工廠演算法。
string
PKIX
low