分散式檔案儲存選型比較、架構設計及fastdfs原始碼分析

1。 分散式檔案儲存選型比較、架構設計

1。1 分散式檔案儲存的來源

在這個資料爆炸的時代,產生的資料量不斷地在攀升,從GB,TB,PB,ZB。挖掘其中資料的價值也是企業在不斷地追求的終極目標。但是要想對海量的資料進行挖掘,首先要考慮的就是海量資料的儲存問題,比如Tb量級的資料。

談到資料的儲存,則不得不說的是磁碟的資料讀寫速度問題。早在上個世紀90年代初期,普通硬碟的可以儲存的容量大概是1G左右,硬碟的讀取速度大概為4。4MB/s。讀取一張硬碟大概需要5分鐘時間,但是如今硬碟的容量都在1TB左右了,相比擴充套件了近千倍。但是硬碟的讀取速度大概是100MB/s。讀完一個硬碟所需要的時間大概是2。5個小時。所以如果是基於TB級別的資料進行分析的話,光硬碟讀取完資料都要好幾天了,更談不上計算分析了。那麼該如何處理大資料的儲存,計算分析呢?

1。2 常用的分散式檔案儲存

1。2。1 常見的分散式檔案系統

GFS、HDFS、Lustre 、Ceph 、GridFS 、mogileFS、TFS、FastDFS等。各自適用於不同的領域。它們都不是系統級的分散式檔案系統,而是應用級的分散式檔案存 儲服務。

分散式檔案儲存選型比較

整理了很多Linux後臺架構師學習資料,影片,面試題,

請私信

1。2。2 系統整體對比

對比說明

/檔案系統

TFS

FastDFS

MogileFS

MooseFS

GlusterFS

Ceph

開發語言

C++

C

Perl

C

C

C++

開源協議

GPL V2

GPL V3

GPL

GPL V3

GPL V3

LGPL

資料儲存方式

檔案/Trunk

檔案

檔案/

物件/

檔案/塊

叢集節點通訊協議

私有協議(TCP

私有協議(TCP

HTTP

私有協議(TCP

私有協議(TCP

)/ RDAM(遠端直接訪問記憶體)

私有協議(TCP

專用元資料儲存點

佔用NS

佔用DB

佔用MFS

佔用MDS

線上擴容

支援

支援

支援

支援

支援

支援

冗餘備份

支援

支援

-

支援

支援

支援

單點故障

存在

不存在

存在

存在

不存在

存在

跨叢集同步

支援

部分支援

-

-

支援

不適用

易用性

安裝複雜,官方文件少

安裝簡單,社群相對活躍

-

安裝簡單,官方文件多

安裝簡單,官方文件專業化

安裝簡單,官方文件專業化

適用場景

跨叢集的小檔案

單叢集的中小檔案

-

單叢集的大中檔案

跨叢集雲端儲存

單叢集的大中小檔案

1。3 知名開源分散式檔案儲存

1。3。1 GFS(Google File System)

Google公司為了滿足本公司需求而開發的基於Linux的專有分散式檔案系統。儘管Google公佈了該系統的一些技術細節,但Google並沒有將該系統的軟體部分作為開源軟體釋出。

1。3。2 HDFS

Hadoop 實現了一個分散式檔案系統(Hadoop Distributed File System),簡稱HDFS。 Hadoop是Apache Lucene創始人Doug Cutting開發的使用廣泛的文字搜尋庫。它起源於Apache Nutch,

後者是一個開源的網路搜尋引擎,本身也是Luene專案的一部分。Aapche Hadoop架構是MapReduce演算法的一種開源應用,是Google開創其帝國的重要基石。

1。3。3 TFS

TFS(Taobao FileSystem)是一個高可擴充套件、高可用、高效能、面向網際網路服務的分散式檔案系統,主要針對海量的非結構化資料,它構築在普通的Linux機器 叢集上,可為外部提供高可靠

和高併發的儲存訪問。TFS為淘寶提供海量小檔案儲存,通常檔案大小不超過1M,滿足了淘寶對小檔案儲存的需求,被廣泛地應用 在淘寶各項應用中。它採用了HA架構和平滑擴容,保證了整個檔案系統的可用性和擴充套件性。同時扁平化的資料組織結構,可將檔名對映到檔案的物理地址,簡化 了檔案的訪問流程,一定程度上為TFS提供了良好的讀寫效能。

Google學術論文,這是眾多分散式檔案系統的起源,HDFS和TFS都是參考Google的GFS設計出來的。

1。4 典型的分散式檔案儲存的架構設計

我以hadoop的HDFS為例,畢竟開源的分散式檔案儲存使用的最多。

Hadoop分散式檔案系統(HDFS)被設計成適合執行在通用硬體(commodity hardware)上的分散式檔案系統。HDFS是一個高度容錯性的系統,適合部署在廉價的機器上。HDFS能提供高吞吐量的資料訪問,非常適合大規模資料集上的應用。HDFS放寬了一部分POSIX約束,來實現流式讀取檔案系統資料的目的。

1。4。1 大規模資料集

執行在HDFS上的應用具有很大的資料集。HDFS上的一個典型檔案大小一般都在G位元組至T位元組。因此,HDFS被調節以支援大檔案儲存。它應該能提供整體上高的資料傳輸頻寬,能在一個集群裡擴充套件到數百個節點。一個單一的HDFS例項應該能支撐數以千萬計的檔案。

1。4。2 簡單的一致性模型

HDFS應用需要一個“一次寫入多次讀取”的檔案訪問模型。一個檔案經過建立、寫入和關閉之後就不需要改變。這一假設簡化了資料一致性問題,並且使高吞吐量的資料訪問成為可能。Map/Reduce應用或者網路爬蟲應用都非常適合這個模型。目前還有計劃在將來擴充這個模型,使之支援檔案的附加寫操作。

1。4。3 異構軟硬體平臺間的可移植性

HDFS在設計的時候就考慮到平臺的可移植性。這種特性方便了HDFS作為大規模資料應用平臺的推廣。

1。4。4 Namenode 和 Datanode

HDFS採用master/slave架構。一個HDFS叢集是由一個Namenode和一定數目的Datanodes組成。

Namenode是一箇中心伺服器,負責管理檔案系統的名字空間(namespace)以及客戶端對檔案的訪問。

叢集中的Datanode一般是一個節點一個,負責管理它所在節點上的儲存。HDFS暴露了檔案系統的名字空間,使用者能夠以檔案的形式在上面儲存資料。從內部看,一個檔案其實被分成一個或多個數據塊,這些塊儲存在一組Datanode上。

Namenode執行檔案系統的名字空間操作,比如開啟、關閉、重新命名檔案或目錄。它也負責確定資料塊到具體Datanode節點的對映。Datanode負責處理檔案系統客戶端的讀寫請求。在Namenode的統一排程下進行資料塊的建立、刪除和複製。

分散式檔案儲存選型比較、架構設計及fastdfs原始碼分析

Namenode和Datanode被設計成可以在普通的商用機器上執行。這些機器一般執行著GNU/Linux作業系統(OS)。HDFS採用Java語言開發,因此任何支援Java的機器都可以部署Namenode或Datanode。由於採用了可移植性極強的Java語言,使得HDFS可以部署到多種型別的機器上。一個典型的部署場景是一臺機器上只執行一個Namenode例項,而叢集中的其它機器分別執行一個Datanode例項。這種架構並不排斥在一臺機器上執行多個Datanode,只不過這樣的情況比較少見。

2。 fastdfs原始碼分析

fastdfs是一個輕量級的分散式檔案系統,主要由

tracker server,

storage server

以及client組成,這裡主要涉及兩點 :

1)客戶端上傳檔案流程和協議分析

2)實現一個簡單的檔案上傳函式

2。1 檔案上傳的基本流程

分散式檔案儲存選型比較、架構設計及fastdfs原始碼分析

檔案上傳

fastdfs中上傳一個檔案,主要涉及以下幾個步驟:

上傳連線請求,客戶端會向tracker server發出上傳檔案的請求

tracker收到請求後,返回storage server的ip和埠

客戶端連線storage,並且上傳檔案

檔案上傳完成後,storage返回路徑資訊

以下具體分析檔案上傳過程中的協議和各種操作

2。1。1 fastdfs協議頭部

typedef struct{    char pkg_len[FDFS_PROTO_PKG_LEN_SIZE];  //body length, not including header(8個位元組)    char cmd;    //command code     TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE    char status; //status code for response} TrackerHeader;

fastdfs協議的頭部是由10個位元組大小的結構體構成,

傳送

:傳送資料時,先發送TrackerHeader到伺服器,隨後傳送具體的資料

接收

:接收資料時,先接收sizeof(TrackerHeader)大小的報文頭部,隨後接收pkg_len長度的報文體

status

: 傳送的時候設定為0

cmd

: 命令

pkg_len

:一個int64_t的整型,除去TrackerHeader長度的報文長度

2。2 客戶端向tracker server傳送獲取storage地址請求

//            協議頭//            pkg_len | cmd     | status//            8 bytes | 1 bytes | 1 bytes//向tracker server請求storage server cmd#define TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE     101     TrackerHeader header;//協議頭部    memset(&header, 0, sizeof(TrackerHeader));    header。cmd = TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE;    //向tracker server 請求 storage,tcpsenddata 返回非0,表示傳送成功    if(tcpsenddata(sockfd, &header, sizeof(TrackerHeader), 10, &count) != 0)    {        fprintf(stderr, “tcpsenddata error: %s\n”, strerror(errno));        return 1;    }    else//請求傳送成功,等待tracker回覆    {        //接收頭部,頭部是一個TrackerHeader型別,10個位元組        TrackerHeader resp;        if((ret_code = tcprecvdata(sockfd, &resp, sizeof(TrackerHeader), 10, &count)) != 0)         {            fprintf(stderr, “tcprecvdata error: %s\n”, strerror(ret_code));            return 1;        }        //開始接收報文體        //int64_t read_int64(const char *buff)        //{        //      unsigned char *p;        //      p = (unsigned char *)buff;        //      return  (((int64_t)(*p)) << 56) | \        //      (((int64_t)(*(p+1))) << 48) |  \        //      (((int64_t)(*(p+2))) << 40) |  \        //      (((int64_t)(*(p+3))) << 32) |  \        //      (((int64_t)(*(p+4))) << 24) |  \        //      (((int64_t)(*(p+5))) << 16) |  \        //      (((int64_t)(*(p+6))) << 8) | \        //      ((int64_t)(*(p+7)));        //}        int size = read_int64(resp。pkg_len);//獲取報體長度        char *buf = (char*)calloc(size + 1, sizeof(char));        if((ret_code = tcprecvdata(sockfd, buf, size, 10, &count) != 0))         {            fprintf(stderr, “tcprecvdata error: %s\n”, strerror(ret_code));            return 1;        }        //  報文體        //  group_name  |ip         |port       |storage_index        //  16 bytes    |16 bytes   |8 bytes    |        //#define TRACKER_QUERY_STORAGE_STORE_BODY_LEN 40        if(count != TRACKER_QUERY_STORAGE_STORE_BODY_LEN)        {            fprintf(stderr, “invalid message”);            return 1;        }        //group name            //#define FDFS_GROUP_NAME_MAX_LEN  16        char group_name[FDFS_GROUP_NAME_MAX_LEN + 1]  = {0};        memcpy(group_name, buf, FDFS_GROUP_NAME_MAX_LEN);        group_name[FDFS_GROUP_NAME_MAX_LEN] = ‘\0’;        //ip: port        //#define IP_ADDRESS_SIZE 16         //port:8 bytes        char ip[IP_ADDRESS_SIZE + 1] = {0};        memcpy(ip, buf + FDFS_GROUP_NAME_MAX_LEN, IP_ADDRESS_SIZE - 1);        char szPort[8] = {0};        memcpy(szPort, buf + FDFS_GROUP_NAME_MAX_LEN + IP_ADDRESS_SIZE - 1, 8);        ip[IP_ADDRESS_SIZE] = ‘\0’;        int port = read_int64(szPort);        //storage index;        char *storage_index = buf + FDFS_GROUP_NAME_MAX_LEN + IP_ADDRESS_SIZE - 1 + FDFS_PROTO_PKG_LEN_SIZE;        。。。    }

2。3 以上步驟完成後,獲取storage的ip 和 port後,就可以上傳檔案了

在官方的客戶端中,檔案操作有upload,download, append,delete等,這裡只涉及upload

上傳檔案中,官方給出了三種方式

透過buffer上傳,即將檔案讀取進記憶體,然後在傳送

使用sendfile,sendfile是Linux提供的一個庫函式

透過回撥函式的方式

這裡主要涉及的是第一種,透過buffer上傳的方式

檔案上傳協議:

//檔案上傳協議頭部    10 bytes        | 1 bytes        | 8 bytes    | 6 bytes        |    TrackerHeader   | storage_index  | 檔案長度    | 檔名或者全為0)  |//storage_index 是客戶端向tracker server申請storage index時候返回的結果//檔名 如果不為空,那麼取前6位,或者可以全部設定為0//上傳完成 storage回覆客戶端協議10 bytes      | 16 bytes    | TrackerHeader。pkg_len - 16bytes TrackerHeader | groupname   | remote file name  void uploadfile(int sockfd, const char *filepath, char *storage_index) {    char out_buf[512];    TrackerHeader *pHeader;    char *p = out_buf;    char *buf = NULL;     //TrackerHeader 10 bytes    //檔案上傳協議頭部    //10 bytes        | 1 bytes        | 8 bytes    | 6 bytes        |    //TrackerHeader   | storage_index  | 檔案長度    | 檔名或者全為0)  |    pHeader = (TrackerHeader*)out_buf;    p += sizeof(TrackerHeader);    //storage index  1 bytes    *p++ = *storage_index;    //filesize 8bytes    long int filesize = 0;    int ret = 0;    //讀取檔案到buf,並且返回檔案長度 filesize    if((ret = getfilebuf(&buf, &filesize, filepath) != 0))    {        fprintf(stderr, “getfilebuf failed: %s\n”, strerror(ret));        return;    }    //void write_int64(int64_t n, char *buff)    //{    //  unsigned char *p;    //  p = (unsigned char *)buff;    //  *p++ = (n >> 56) & 0xFF;    //  *p++ = (n >> 48) & 0xFF;    //  *p++ = (n >> 40) & 0xFF;    //  *p++ = (n >> 32) & 0xFF;    //  *p++ = (n >> 24) & 0xFF;    //  *p++ = (n >> 16) & 0xFF;    //  *p++ = (n >> 8) & 0xFF;    //  *p++ = n & 0xFF;    //}    write_int64(filesize, p);    //#define FDFS_PROTO_PKG_LEN_SIZE 8    p += FDFS_PROTO_PKG_LEN_SIZE;    //ext_name    //#define FDFS_FILE_EXT_NAME_MAX_LEN 6    memset(p, 0, FDFS_FILE_EXT_NAME_MAX_LEN);    p += FDFS_FILE_EXT_NAME_MAX_LEN;    //set TrackerHeader    write_int64(p - out_buf + filesize - sizeof(TrackerHeader), pHeader->pkg_len);    //#define STORAGE_PROTO_CMD_UPLOAD_FILE 11    pHeader->cmd  = STORAGE_PROTO_CMD_UPLOAD_FILE;    pHeader->status = 0;    //傳送報文頭部    int count;    int ret_code = 0;    if((ret_code = tcpsenddata(sockfd, out_buf, p - out_buf, 10, &count) != 0)) {        fprintf(stderr, “tcpsenddata failed: %s\n”, strerror(errno));        return;    }    //傳送報文體,具體檔案資料    if((ret_code = tcpsenddata(sockfd, buf, filesize, 10, &count)) != 0) {        fprintf(stderr, “tcpsenddata body failed: %s\n”, strerror(errno));        return;    }    //接收storage server回覆    //上傳完成 storage回覆客戶端協議    //10 bytes      | 16 bytes    | TrackerHeader。pkg_len - 16bytes     //TrackerHeader | groupname   | remote file name    TrackerHeader resp;    if((ret_code = tcprecvdata(sockfd, &resp, sizeof(TrackerHeader), 1000, &count)) != 0) {        fprintf(stderr, “tcprecvdata failed: %s\n”, strerror(ret_code));        return;    }       if(count != sizeof(TrackerHeader)) {        fprintf(stderr, “invalid header”);        return;    }    int64_t bodylen = read_int64(resp。pkg_len);    //接收報文體    char *in_buf = (char*)calloc(bodylen + 1, sizeof(char));    if((ret_code = tcprecvdata(sockfd, in_buf, bodylen, 10, &count)) != 0)     {        fprintf(stderr, “read body failed: %s\n”, strerror(ret_code));        return;    }    //groupname    //#define FDFS_GROUP_NAME_MAX_LEN  16    char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];    memcpy(group_name, in_buf, FDFS_GROUP_NAME_MAX_LEN);    group_name[FDFS_GROUP_NAME_MAX_LEN] = ‘\0’;    //remote filename    char remote_filename[bodylen - FDFS_GROUP_NAME_MAX_LEN + 1];    memcpy(remote_filename, in_buf + FDFS_GROUP_NAME_MAX_LEN, bodylen - FDFS_GROUP_NAME_MAX_LEN + 1);    cout << “groupname: ” << group_name << endl;    cout << “remote_filename: ” << remote_filename << endl;    char httpaddr[128] = {0};    sprintf(httpaddr, “http://106。75。129。177:8080/%s/%s”, group_name, remote_filename);    cout << “httpaddr: ” <<  httpaddr << endl;//http地址}

以下附上完整程式碼, ubuntu14位, 編譯器 g++,測試已透過

#include #include #include #include #include #include #include #include #include using namespace std;#define FDFS_GROUP_NAME_MAX_LEN     16#define FDFS_PROTO_PKG_LEN_SIZE     8#define IP_ADDRESS_SIZE         16//cmd#define TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE     101#define STORAGE_PROTO_CMD_UPLOAD_FILE       11 #define TRACKER_QUERY_STORAGE_STORE_BODY_LEN    (FDFS_GROUP_NAME_MAX_LEN \    + IP_ADDRESS_SIZE - 1 + FDFS_PROTO_PKG_LEN_SIZE + 1)#define FDFS_FILE_EXT_NAME_MAX_LEN 6typedef struct {    char pkg_len[FDFS_PROTO_PKG_LEN_SIZE];    char cmd;    char status;}TrackerHeader; //set socketfd nonblockingint setnonblocking(int sockfd);int tcprecvdata(int sockfd, void *data, const int size,\    const int timeout_ms, int *count);int tcpsenddata(int sockfd, void *data, const int size,\const int timeout_ms, int *count); int64_t read_int64(const char* buf);void write_int64(int64_t n, char* buf); void uploadfile(int sockfd, const char *filepath, char *storage_index);int getfilebuf(char **buf, long int *filesize, const char* filepath);//apply storage address from tracker serverint main() {    const char *ip = “127。0。0。1”;    uint16_t port = 22122;    int ret_code = 0;    int sockfd = -1;    int count = 0;    //connect tracker server    if((sockfd = socket(PF_INET, SOCK_STREAM, 0)) < 0)     {        fprintf(stderr, “socket errnor: %s\n”, strerror(errno));        return 1;    }    if((ret_code = setnonblocking(sockfd)) != 0)     {        fprintf(stderr, “setnonblocking error: %s\n”, strerror(ret_code));        return 1;    }    struct sockaddr_in addr;    addr。sin_addr。s_addr = inet_addr(ip);    addr。sin_port = htons(port);addr。sin_family = AF_INET;     socklen_t len = sizeof(struct sockaddr);    if(connect(sockfd, (struct sockaddr*)&addr, len) < 0)     {        fprintf(stderr, “connect error: %s\n”, strerror(errno));        return 1;}     TrackerHeader header;    memset(&header, 0, sizeof(TrackerHeader));header。cmd = TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE;     if(tcpsenddata(sockfd, &header, sizeof(TrackerHeader), 10, &count) != 0)    {        fprintf(stderr, “tcpsenddata error: %s\n”, strerror(errno));        return 1;    }    else    {        //recv header        TrackerHeader resp;        if((ret_code = tcprecvdata(sockfd, &resp, sizeof(TrackerHeader), 10, &count)) != 0)         {            fprintf(stderr, “tcprecvdata error: %s\n”, strerror(ret_code));            return 1;        }         cout << “recv header: ” << count << endl;         //read body;        int size = read_int64(resp。pkg_len);        char *buf = (char*)calloc(size + 1, sizeof(char));         if((ret_code = tcprecvdata(sockfd, buf, size, 10, &count) != 0))         {            fprintf(stderr, “tcprecvdata error: %s\n”, strerror(ret_code));            return 1;        }         //body        //  group_name  |ip     |port       |storage_index        //  16bytes     |16bytes    |8bytes     |        cout << “read body: ” << count << endl;        if(count != TRACKER_QUERY_STORAGE_STORE_BODY_LEN)        {            fprintf(stderr, “invalid message”);            return 1;        }        //group name            char group_name[FDFS_GROUP_NAME_MAX_LEN + 1]  = {0};        memcpy(group_name, buf, FDFS_GROUP_NAME_MAX_LEN);        group_name[FDFS_GROUP_NAME_MAX_LEN] = ‘\0’;        cout << “group name: ” << group_name << endl;        //ip: port        char ip[IP_ADDRESS_SIZE + 1] = {0};        memcpy(ip, buf + FDFS_GROUP_NAME_MAX_LEN, IP_ADDRESS_SIZE - 1);        char szPort[8] = {0};        memcpy(szPort, buf + FDFS_GROUP_NAME_MAX_LEN + IP_ADDRESS_SIZE - 1, 8);        ip[IP_ADDRESS_SIZE] = ‘\0’;        int port = read_int64(szPort);        cout << “address: ” << ip << “:” << port << endl;        //storage index;        char *storage_index = buf + FDFS_GROUP_NAME_MAX_LEN + IP_ADDRESS_SIZE - 1 + FDFS_PROTO_PKG_LEN_SIZE;        cout << “storage_index: ” << storage_index << endl;        free(buf);         //connect storage server        sockaddr_in st_addr;        st_addr。sin_addr。s_addr = inet_addr(ip);        st_addr。sin_family = AF_INET;        st_addr。sin_port = htons(port);         int storage_fd = socket(AF_INET, SOCK_STREAM, 0);        if(storage_fd < 0) {            fprintf(stderr, “socket failed: %s\n”, strerror(errno));            return 1;        }        socklen_t len2 = sizeof(sockaddr_in);        if(connect(storage_fd, (struct sockaddr*)&st_addr, len2) < 0) {            fprintf(stderr, “connect failed: %s\n”, strerror(errno));            return 1;        }         uploadfile(storage_fd, “1。jpg”, storage_index);    }    return 0;} int getfilebuf(char **buf, long int *filesize, const char *filepath) {    int ret_code = 0;    FILE *fp = fopen(filepath, “rb+”);    if(fp == NULL)    {        ret_code = errno;        return ret_code;    }    //get filesize;    fseek(fp, 0, SEEK_END);    *filesize = ftell(fp);    fseek(fp, 0, SEEK_SET);    cout << “get filesize: ” <<*filesize << endl;    //malloc buf    *buf = (char*)calloc(*filesize + 1, sizeof(char));    if(*buf == NULL) {        ret_code = errno;        return ret_code;}     int read_bytes = 0;    int left_bytes = *filesize;    char *p = *buf;    while(left_bytes > 0) {        read_bytes = fread(p, sizeof(char), left_bytes, fp);        left_bytes -= read_bytes;        p += read_bytes;    }       return ret_code;}void uploadfile(int sockfd, const char *filepath, char *storage_index) {    char out_buf[512];    TrackerHeader *pHeader;    char *p = out_buf;char *buf = NULL;     //TrackerHeader 10 bytes    pHeader = (TrackerHeader*)out_buf;p += sizeof(TrackerHeader);     //storage index  1 bytes    *p++ = *storage_index;    //filesize 8bytes    long int filesize = 0;    int ret = 0;    if((ret = getfilebuf(&buf, &filesize, filepath) != 0))    {        fprintf(stderr, “getfilebuf failed: %s\n”, strerror(ret));        return;    }    printf(“filesize: %ld\n”, filesize);    write_int64(filesize, p);p += FDFS_PROTO_PKG_LEN_SIZE;     //ext_name    memset(p, 0, FDFS_FILE_EXT_NAME_MAX_LEN);p += FDFS_FILE_EXT_NAME_MAX_LEN;     //set TrackerHeader    write_int64(p - out_buf + filesize - sizeof(TrackerHeader), pHeader->pkg_len);    pHeader->cmd  = STORAGE_PROTO_CMD_UPLOAD_FILE;pHeader->status = 0;     //send header    int count;    int ret_code = 0;    if((ret_code = tcpsenddata(sockfd, out_buf, p - out_buf, 10, &count) != 0)) {        fprintf(stderr, “tcpsenddata failed: %s\n”, strerror(errno));        return;    }    //send body    if((ret_code = tcpsenddata(sockfd, buf, filesize, 10, &count)) != 0) {        fprintf(stderr, “tcpsenddata body failed: %s\n”, strerror(errno));        return;}     //recv response    TrackerHeader resp;    if((ret_code = tcprecvdata(sockfd, &resp, sizeof(TrackerHeader), 1000, &count)) != 0) {        fprintf(stderr, “tcprecvdata failed: %s\n”, strerror(ret_code));        return;    }       if(count != sizeof(TrackerHeader)) {        fprintf(stderr, “invalid header”);        return;} int64_t bodylen = read_int64(resp。pkg_len);     char *in_buf = (char*)calloc(bodylen + 1, sizeof(char));    if((ret_code = tcprecvdata(sockfd, in_buf, bodylen, 10, &count)) != 0)     {        fprintf(stderr, “read body failed: %s\n”, strerror(ret_code));        return;}     //groupname    char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];    memcpy(group_name, in_buf, FDFS_GROUP_NAME_MAX_LEN);group_name[FDFS_GROUP_NAME_MAX_LEN] = ‘\0’;     //remote filename    char remote_filename[bodylen - FDFS_GROUP_NAME_MAX_LEN + 1];memcpy(remote_filename, in_buf + FDFS_GROUP_NAME_MAX_LEN, bodylen - FDFS_GROUP_NAME_MAX_LEN + 1);     cout << “groupname: ” << group_name << endl;    cout << “remote_filename: ” << remote_filename << endl;     char httpaddr[128] = {0};    sprintf(httpaddr, “http://127。0。0。1:8080/%s/%s”, group_name, remote_filename);    cout << “httpaddr” <<  httpaddr << endl;} void write_int64(int64_t n, char *buff){    unsigned char *p;    p = (unsigned char *)buff;    *p++ = (n >> 56) & 0xFF;    *p++ = (n >> 48) & 0xFF;    *p++ = (n >> 40) & 0xFF;    *p++ = (n >> 32) & 0xFF;    *p++ = (n >> 24) & 0xFF;    *p++ = (n >> 16) & 0xFF;    *p++ = (n >> 8) & 0xFF;    *p++ = n & 0xFF;}int64_t read_int64(const char *buff){    unsigned char *p;    p = (unsigned char *)buff;    return  (((int64_t)(*p)) << 56) | \        (((int64_t)(*(p+1))) << 48) |  \        (((int64_t)(*(p+2))) << 40) |  \        (((int64_t)(*(p+3))) << 32) |  \        (((int64_t)(*(p+4))) << 24) |  \        (((int64_t)(*(p+5))) << 16) |  \        (((int64_t)(*(p+6))) << 8) | \        ((int64_t)(*(p+7)));} int setnonblocking(int sockfd){    int ret_code = 0;    if(fcntl(sockfd, F_SETFD, O_NONBLOCK) < 0) {        ret_code = errno;    }    return ret_code;}int tcpsenddata(int sockfd, void *data, const int size,\const int timeout_ms, int *count) {    int left_bytes = size;    int write_bytes = 0;    int ret_code = 0;    int res = 0;char *p = (char*)data;     fd_set rfds;    FD_ZERO(&rfds);    FD_SET(sockfd, &rfds);    while(left_bytes > 0) {        write_bytes = send(sockfd, p, left_bytes, 0);        if(write_bytes > 0)         {            left_bytes -= write_bytes;            p += write_bytes;            continue;        }        else if(write_bytes < 0)         {            if(!(errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK))            {                ret_code = errno == 0 ? errno : EINTR;                break;            }        }        else        {            ret_code = ENOTCONN;            break;        }        if(timeout_ms <= 0)         {            res = select(sockfd + 1, &rfds, NULL, NULL, NULL);         }        else         {            struct timeval tv;            tv。tv_usec = timeout_ms;            tv。tv_sec = 0;            res = select(sockfd + 1, &rfds, NULL, NULL, &tv);        }        if(res == 0)        {            ret_code = ETIMEDOUT;            break;        }        if(res < 0)         {            if(errno == EINTR)            {                continue;            }            ret_code = errno == 0 ? errno : EINTR;        }    }    if(count != NULL)    {        *count = size - left_bytes;    }    return ret_code;} int tcprecvdata(int sockfd, void *data, const int size,\const int timeout_ms, int *count) {     int left_bytes = size;    int read_bytes = 0;    int ret_code = 0;    int res = 0;char *p = (char*)data;     fd_set rfds;    FD_ZERO(&rfds);FD_SET(sockfd, &rfds);     while(left_bytes > 0) {        read_bytes = recv(sockfd, p, left_bytes, 0);        if(read_bytes > 0)         {            left_bytes -= read_bytes;            p += read_bytes;            continue;        }         else if(read_bytes < 0)         {            if(!(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR))             {                ret_code = errno != 0 ? errno : EINTR;                break;              }        }        else         {            ret_code = ENOTCONN;            break;        }         if(timeout_ms <= 0)         {            res = select(sockfd + 1, &rfds, NULL, NULL, NULL);        }           else         {            struct timeval tv;            tv。tv_usec = timeout_ms;            tv。tv_sec  = 0;            res = select(sockfd + 1, &rfds, NULL, NULL, &tv);        }         if(res == 0)         {            ret_code = ETIMEDOUT;            break;        }        if(res < 0)        {            if(errno == EINTR)            {                continue;            }            ret_code = errno == 0 ? errno : EINTR;            break;                  }}     if(count != NULL)    {        *count = size - left_bytes;    }    return ret_code;}