golang 通用連線池 高效能連線各種tcp連線,例項rabbitmq,mysql

golang 通用連線池 高效能連線各種tcp連線,例項rabbitmq,mysql ,redis。。。。

完整程式碼在https://github。com/human2312/tcp-pool

package Pool// @Time : 2020年3月13日12:27:33// @Author : Lemyhello// @Desc: 通用連線池import ( “errors” “fmt” “sync” “time”)var ( //ErrMaxActiveConnReached 連線池超限 ErrMaxActiveConnReached = errors。New(“MaxActiveConnReached”))// Config 連線池相關配置type Config struct { //連線池中擁有的最小連線數 InitialCap int //最大併發存活連線數 MaxCap int //最大空閒連線 MaxIdle int //生成連線的方法 Factory func() (interface{}, error) //關閉連線的方法 Close func(interface{}) error //檢查連線是否有效的方法 Ping func(interface{}) error //連線最大空閒時間,超過該事件則將失效 IdleTimeout time。Duration}// channelPool 存放連線資訊type channelPool struct { mu sync。RWMutex conns chan *idleConn factory func() (interface{}, error) close func(interface{}) error ping func(interface{}) error idleTimeout, waitTimeOut time。Duration maxActive int openingConns int}type idleConn struct { conn interface{} t time。Time}var ( //ErrClosed 連線池已經關閉Error ErrClosed = errors。New(“pool is closed”))// Pool 基本方法type Pool interface { Get() (interface{}, error) Put(interface{}) error Close(interface{}) error Release() Len() int}// NewChannelPool 初始化連線func NewChannelPool(poolConfig *Config) (Pool, error) { if ! (poolConfig。InitialCap <= poolConfig。MaxIdle && poolConfig。MaxCap >= poolConfig。MaxIdle && poolConfig。InitialCap >= 0 ){ return nil, errors。New(“invalid capacity settings”) } if poolConfig。Factory == nil { return nil, errors。New(“invalid factory func settings”) } if poolConfig。Close == nil { return nil, errors。New(“invalid close func settings”) } c := &channelPool{ conns: make(chan *idleConn, poolConfig。MaxIdle), factory: poolConfig。Factory, close: poolConfig。Close, idleTimeout: poolConfig。IdleTimeout, maxActive: poolConfig。MaxCap, openingConns: poolConfig。InitialCap, } if poolConfig。Ping != nil { c。ping = poolConfig。Ping } for i := 0; i < poolConfig。InitialCap; i++ { conn, err := c。factory() if err != nil { c。Release() return nil, fmt。Errorf(“factory is not able to fill the pool: %s”, err) } c。conns <- &idleConn{conn: conn, t: time。Now()} } return c, nil}// getConns 獲取所有連線func (c *channelPool) getConns() chan *idleConn { c。mu。Lock() conns := c。conns c。mu。Unlock() return conns}// Get 從pool中取一個連線func (c *channelPool) Get() (interface{}, error) { conns := c。getConns() if conns == nil { return nil, ErrClosed } for { select { case wrapConn := <-conns: if wrapConn == nil { return nil, ErrClosed } //判斷是否超時,超時則丟棄 if timeout := c。idleTimeout; timeout > 0 { if wrapConn。t。Add(timeout)。Before(time。Now()) { //丟棄並關閉該連線 c。Close(wrapConn。conn) continue } } //判斷是否失效,失效則丟棄,如果使用者沒有設定 ping 方法,就不檢查 if c。ping != nil { if err := c。Ping(wrapConn。conn); err != nil { c。Close(wrapConn。conn) continue } } return wrapConn。conn, nil default: c。mu。Lock() defer c。mu。Unlock() if c。openingConns >= c。maxActive { return nil, ErrMaxActiveConnReached } if c。factory == nil { return nil, ErrClosed } conn, err := c。factory() if err != nil { return nil, err } c。openingConns++ return conn, nil } }}// Put 將連線放回pool中func (c *channelPool) Put(conn interface{}) error { if conn == nil { return errors。New(“connection is nil。 rejecting”) } c。mu。Lock() if c。conns == nil { c。mu。Unlock() return c。Close(conn) } select { case c。conns <- &idleConn{conn: conn, t: time。Now()}: c。mu。Unlock() return nil default: c。mu。Unlock() //連線池已滿,直接關閉該連線 return c。Close(conn) }}// Close 關閉單條連線func (c *channelPool) Close(conn interface{}) error { if conn == nil { return errors。New(“connection is nil。 rejecting”) } c。mu。Lock() defer c。mu。Unlock() if c。close == nil { return nil } c。openingConns—— return c。close(conn)}// Ping 檢查單條連線是否有效func (c *channelPool) Ping(conn interface{}) error { if conn == nil { return errors。New(“connection is nil。 rejecting”) } return c。ping(conn)}// Release 釋放連線池中所有連線func (c *channelPool) Release() { c。mu。Lock() conns := c。conns c。conns = nil c。factory = nil c。ping = nil closeFun := c。close c。close = nil c。mu。Unlock() if conns == nil { return } close(conns) for wrapConn := range conns { //log。Printf(“Type %v\n”,reflect。TypeOf(wrapConn。conn)) closeFun(wrapConn。conn) }}// Len 連線池中已有的連線func (c *channelPool) Len() int { return len(c。getConns())}

main函式例項除錯

package main// @Time : 2020年3月13日19:35:35// @Author : Lemyhello// @Desc: 展示如何使用Pool連線池拿到tcp各種應用例項import ( “fmt” “github。com/streadway/amqp” “net” “time” “tcp-pool/Pool”)var ( mqurl = “amqp://root:root@127。0。0。1:5672/test” //根據實際情況填寫mq配置連線 mqPool Pool。Pool)func main() { rabbitmq() mysql() redis() //拿到一個連線 mq,_ := mqPool。Get() //例項化物件 mqconn :=mq。(*amqp。Connection) //將連線放回連線池中 defer mqPool。Put(mq) //開始操作rabbitmq。。。 mqconn。Channel() //do something。。。。}//rabbitmq rabbitmq連線池func rabbitmq() { //factory 建立連線的方法 factory := func() (interface{}, error) { return amqp。Dial(mqurl) } //close 關閉連線的方法 close := func(v interface{}) error { return v。(net。Conn)。Close() } //建立一個連線池: 初始化2,最大連線5,空閒連線數是4 poolConfig := &Pool。Config{ InitialCap: 2, MaxIdle: 5, MaxCap: 4, Factory: factory, Close: close, //連線最大空閒時間,超過該時間的連線 將會關閉,可避免空閒時連線EOF,自動失效的問題 IdleTimeout: 15 * time。Second, } mqPool, _ = Pool。NewChannelPool(poolConfig) //從連線池中取得一個連線 //v, err := p。Get() //do something //conn :=v。(*amqp。Connection) //將連線放回連線池中 //p。Put(v) //釋放連線池中的所有連線 //p。Release() //檢視當前連線中的數量 current := mqPool。Len() fmt。Println(“len=”, current) return}//mysql mysql連線池func mysql() {}//redis redis連線池func redis() {}

最後測試一下連線池能不能正常使用

golang 通用連線池 高效能連線各種tcp連線,例項rabbitmq,mysql

完美。。。