一、認識kafka
面試官提問:什麼是 Kafka ?用來幹嘛的?
官方定義如下:
Kafka is used for building real-time data pipelines and streaming apps。 It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies。
翻譯過來,大致的意思就是,這是一個實時資料處理系統,可以橫向擴充套件,並高可靠!
實時資料處理
,從名字上看,很好理解,就是將資料進行實時處理,在現在流行的微服務開發中,最常用實時資料處理平臺有 RabbitMQ、RocketMQ 等訊息中介軟體。
這些中介軟體,最大的特點主要有兩個:
服務解耦
流量削峰
在早期的 web 應用程式開發中,當請求量突然上來了時候,我們會將要處理的資料推送到一個佇列通道中,然後另起一個執行緒來不斷輪訓拉取佇列中的資料,從而加快程式的執行效率。
但是隨著請求量不斷的增大,並且佇列通道的資料一致處於高負載,在這種情況下,應用程式的記憶體佔用率會非常高,稍有不慎,會出現記憶體不足,造成程式記憶體溢位,從而導致服務不可用。
隨著業務量的不斷擴張,在一個應用程式內,使用這種模式已然無法滿足需求,因此之後,就誕生了各種訊息中介軟體,例如
ActiveMQ
、
RabbitMQ
、
RocketMQ
等中介軟體。
採用這種模型,本質就是將要推送的資料,不在存放在當前應用程式的記憶體中,而是將資料存放到另一個專門負責資料處理的應用程式中,從而實現服務解耦。
訊息中介軟體
:主要的職責就是保證能接受到訊息,並將訊息儲存到磁碟,即使其他服務都掛了,資料也不會丟失,同時還可以對資料消費情況做好監控工作。
應用程式
:只需要將訊息推送到訊息中介軟體,然後啟用一個執行緒來不斷從訊息中介軟體中拉取資料,進行消費確認即可!
引入訊息中介軟體之後,整個服務開發會變得更加簡單,各負其責。
Kafka 本質其實也是訊息中介軟體的一種,Kafka 出自於 LinkedIn 公司,與 2010 年開源到 github。
LinkedIn 的開發團隊,為了解決資料管道問題,起初採用了 ActiveMQ 來進行資料交換,大約是在 2010 年前後,那時的 ActiveMQ 還遠遠無法滿足 LinkedIn 對資料傳遞系統的要求,經常由於各種缺陷而導致訊息阻塞或者服務無法正常訪問,為了能夠解決這個問題,LinkedIn 決定研發自己的訊息傳遞系統,
Kafka 由此誕生
。
在 LinkedIn 公司,Kafka 可以有效地處理每天數十億條訊息的指標和使用者活動跟蹤,其強大的處理能力,已經被業界所認可,併成為大資料流水線的首選技術。
二、架構介紹
先來看一張圖,
下面這張圖就是 kafka 生產與消費的核心架構模型
!
如果你看不懂這些概念沒關係,我會帶著大家一起梳理一遍!
Producer
:Producer 即生產者,訊息的產生者,是訊息的入口
Broker
:Broker 是 kafka 一個例項,每個伺服器上有一個或多個 kafka 的例項,簡單的理解就是一臺 kafka 伺服器,
kafka cluster
表示叢集的意思
Topic
:訊息的主題,可以理解為訊息佇列,kafka的資料就儲存在topic。在每個 broker 上都可以建立多個 topic 。
Partition
:Topic的分割槽,每個 topic 可以有多個分割槽,分割槽的作用是做負載,提高 kafka 的吞吐量。
同一個 topic 在不同的分割槽的資料是不重複的,partition 的表現形式就是一個一個的資料夾
!
Replication
:每一個分割槽都有多個副本,副本的作用是做備胎,主分割槽(Leader)會將資料同步到從分割槽(Follower)。當主分割槽(Leader)故障的時候會選擇一個備胎(Follower)上位,成為 Leader。在kafka中預設副本的最大數量是10個,且副本的數量不能大於Broker的數量,follower和leader絕對是在不同的機器,同一機器對同一個分割槽也只可能存放一個副本
Message
:每一條傳送的訊息主體。
Consumer
:消費者,即訊息的消費方,是訊息的出口。
Consumer Group
:我們可以將多個消費組組成一個消費者組,
在 kafka 的設計中同一個分割槽的資料只能被消費者組中的某一個消費者消費。同一個消費者組的消費者可以消費同一個topic的不同分割槽的資料,這也是為了提高kafka的吞吐量
!
Zookeeper
:kafka 叢集依賴 zookeeper 來儲存叢集的的元資訊,來保證系統的可用性。
簡而言之,kafka 本質就是一個訊息系統,與大多數的訊息系統一樣,主要的特點如下:
使用推拉模型將生產者和消費者分離
為訊息傳遞系統中的訊息資料提供永續性,以允許多個消費者
提供高可用叢集服務,主從模式,同時支援橫向水平擴充套件
與
ActiveMQ
、
RabbitMQ
、
RocketMQ
不同的地方在於,它有一個
分割槽Partition
的概念。
這個分割槽的意思就是說,如果你建立的
topic
有5個分割槽,當你一次性向 kafka 中推 1000 條資料時,這 1000 條資料預設會分配到 5 個分割槽中,其中每個分割槽儲存 200 條資料。
這樣做的目的,就是方便消費者從不同的分割槽拉取資料,假如你啟動 5 個執行緒同時拉取資料,每個執行緒拉取一個分割槽,消費速度會非常非常快!
這是 kafka 與其他的訊息系統最大的不同!
2。1、傳送資料
和其他的中介軟體一樣,kafka 每次傳送資料都是向
Leader
分割槽傳送資料,並順序寫入到磁碟,然後
Leader
分割槽會將資料同步到各個從分割槽
Follower
,即使主分割槽掛了,也不會影響服務的正常執行。
那 kafka 是如何將資料寫入到對應的分割槽呢?kafka中有以下幾個原則:
1、資料在寫入的時候可以指定需要寫入的分割槽,如果有指定,則寫入對應的分割槽
2、如果沒有指定分割槽,但是設定了資料的key,則會根據key的值hash出一個分割槽
3、如果既沒指定分割槽,又沒有設定key,則會輪詢選出一個分割槽
2。2、消費資料
與生產者一樣,消費者主動的去kafka叢集拉取訊息時,也是從
Leader
分割槽去拉取資料。
這裡我們需要重點了解一個名詞:
消費組
!
考慮到多個消費者的場景,kafka 在設計的時候,可以由多個消費者組成一個消費組,同一個消費組者的消費者可以消費同一個 topic 下不同分割槽的資料,同一個分割槽只會被一個消費組內的某個消費者所消費,防止出現重複消費的問題!
但是不同的組,可以消費同一個分割槽的資料!
你可以這樣理解,一個消費組就是一個客戶端,一個客戶端可以由很多個消費者組成,以便加快訊息的消費能力。
但是,如果一個組下的消費者數量大於分割槽數量,就會出現很多的消費者閒置。
如果分割槽數量大於一個組下的消費者數量,會出現一個消費者負責多個分割槽的消費,會出現消費效能不均衡的情況。
因此,在實際的應用中,建議消費者組的
consumer
的數量與
partition
的數量保持一致!
三、kafka 安裝
光說理論可沒用,下面我們就以 centos7 為例,介紹一下 kafka 的安裝和使用。
kafka 需要 zookeeper 來儲存服務例項的元資訊,因此在安裝 kafka 之前,我們需要先安裝 zookeeper。
3。1、安裝zookeeper
zookeeper 安裝環境依賴於 jdk,因此我們需要事先安裝 jdk
# 安裝jdk1。8yum -y install java-1。8。0-openjdk
下載zookeeper,並解壓檔案包
#線上下載zookeeperwget http://mirrors。hust。edu。cn/apache/zookeeper/zookeeper-3。4。12/zookeeper-3。4。12。tar。gz#解壓tar -zxvf zookeeper-3。4。12。tar。gz
建立資料、日誌目錄
#建立資料和日誌存放目錄cd /usr/zookeeper/mkdir datamkdir log#把conf下的zoo_sample。cfg備份一份,然後重新命名為zoo。cfgcd conf/cp zoo_sample。cfg zoo。cfg
配置zookeeper
#編輯zoo。cfg檔案vim zoo。cfg
重新配置
dataDir
和
dataLogDir
的儲存路徑
最後,啟動 Zookeeper 服務
#進入Zookeeper的bin目錄cd zookeeper/zookeeper-3。4。12/bin#啟動Zookeeper。/zkServer。sh start#查詢Zookeeper狀態。/zkServer。sh status#關閉Zookeeper狀態。/zkServer。sh stop
3。2、安裝kafka
到官網
http://kafka。apache。org/downloads。html
下載想要的版本,我這裡下載是最新穩定版
2。8。0
。
#下載kafka 安裝包wget https://apache。osuosl。org/kafka/2。8。0/kafka-2。8。0-src。tgz#解壓檔案包tar -xvf kafka-2。8。0-src。tgz
按需修改配置檔案
server。properties
(可選)
#進入配置資料夾cd kafka-2。8。0-src/config#編輯server。propertiesvim server。properties
server。properties
檔案內容如下:
broker。id=0listeners=PLAINTEXT://localhost:9092num。network。threads=3num。io。threads=8socket。send。buffer。bytes=102400socket。receive。buffer。bytes=102400socket。request。max。bytes=104857600log。dirs=/tmp/kafka-logsnum。partitions=1num。recovery。threads。per。data。dir=1offsets。topic。replication。factor=1transaction。state。log。replication。factor=1transaction。state。log。min。isr=1log。retention。hours=168log。segment。bytes=1073741824log。retention。check。interval。ms=300000zookeeper。connect=localhost:2181zookeeper。connection。timeout。ms=6000group。initial。rebalance。delay。ms=0
其中有四個重要的引數:
broker。id
:唯一標識ID
listeners=PLAINTEXT://localhost:9092
:kafka服務監聽地址和埠
log。dirs
:日誌儲存目錄
zookeeper。connect
:指定
zookeeper
服務地址
可根據自己需求修改對應的配置!
3。3、啟動 kafka 服務
# 進入bin指令碼目錄cd kafka-2。8。0-src/bin
啟動 kafka 服務
nohup kafka-server-start。sh 。。/config/server。properties server。log 2> server。err &
3。4、建立主題topics
建立一個名為
testTopic
的主題,它只包含一個分割槽,只有一個副本:
# 進入bin指令碼目錄cd kafka-2。8。0-src/bin#建立topicskafka-topics。sh ——create ——zookeeper localhost:2181 ——replication-factor 1 ——partitions 1 ——topic testTopic
執行
list topic
命令,可以看到該主題。
# 進入bin指令碼目錄cd kafka-2。8。0-src/bin#查詢當前kafka上所有的主題kafka-topics。sh ——list ——zookeeper localhost:2181
輸出內容:
testTopic
3。5、傳送訊息
Kafka 附帶一個命令列客戶端,它將從檔案或標準輸入中獲取輸入,並將其作為訊息傳送到 Kafka 叢集。預設情況下,每行將作為單獨的訊息傳送。
執行生產者,然後在控制檯中鍵入一些訊息以傳送到伺服器。
# 進入bin指令碼目錄cd kafka-2。8。0-src/bin#執行一個生產者,向testTopic主題中發訊息kafka-console-producer。sh ——broker-list localhost:9092 ——topic testTopic
輸入兩條內容並回車:
Hello kafka!This is a message
3。5、接受訊息
Kafka 還有一個命令列使用者,它會將訊息轉儲到標準輸出。
# 進入bin指令碼目錄cd kafka-2。8。0-src/bin#執行一個消費者,從testTopic主題中拉取訊息kafka-console-consumer。sh ——bootstrap-server localhost:9092 ——topic testTopic ——from-beginning
輸出結果如下:
Hello kafka!This is a message
四、小結
本文主要圍繞 kafka 的架構模型和安裝環境做了一些初步的介紹,難免會有理解不對的地方,歡迎網友批評、吐槽。
由於篇幅原因,會在下期文章中詳細介紹 java 環境下 kafka 應用場景!