前言
本文主要給大家介紹了關于Golang實現TCP連接的雙向拷貝的相關內容,分享出來供大家參考學習,下面話不多說了,來一起看看詳細的介紹吧。
最簡單的實現
每次來一個Server的連接,就新開一個Client的連接。用一個goroutine從server拷貝到client,再用另外一個goroutine從client拷貝到server。任何一方斷開連接,雙向都斷開連接。
func main() { runtime.GOMAXPROCS(1) listener, err := net.Listen("tcp", "127.0.0.1:8848") if err != nil { panic(err) } for { conn, err := listener.Accept() if err != nil { panic(err) } go handle(conn.(*net.TCPConn)) } } func handle(server *net.TCPConn) { defer server.Close() client, err := net.Dial("tcp", "127.0.0.1:8849") if err != nil { fmt.Print(err) return } defer client.Close() go func() { defer server.Close() defer client.Close() buf := make([]byte, 2048) io.CopyBuffer(server, client, buf) }() buf := make([]byte, 2048) io.CopyBuffer(client, server, buf) }
一個值得注意的地方是io.Copy的默認buffer比較大,給一個小的buffer可以支持更多的并發連接。
這兩個goroutine并序在一個退出之后,另外一個也退出。這個的實現是通過關閉server或者client的socket來實現的。因為socket被關閉了,io.CopyBuffer 就會退出。
Client端實現連接池
一個顯而易見的問題是,每次Server的連接進來之后都需要臨時去建立一個新的Client的端的連接。這樣在代理的總耗時里就包括了一個tcp連接的握手時間。如果能夠讓Client端實現連接池復用已有連接的話,可以縮短端到端的延遲。
var pool = make(chan net.Conn, 100) func borrow() (net.Conn, error) { select { case conn := <- pool: return conn, nil default: return net.Dial("tcp", "127.0.0.1:8849") } } func release(conn net.Conn) error { select { case pool <- conn: // returned to pool return nil default: // pool is overflow return conn.Close() } } func handle(server *net.TCPConn) { defer server.Close() client, err := borrow() if err != nil { fmt.Print(err) return } defer release(client) go func() { defer server.Close() defer release(client) buf := make([]byte, 2048) io.CopyBuffer(server, client, buf) }() buf := make([]byte, 2048) io.CopyBuffer(client, server, buf) }
這個版本的實現是顯而易見有問題的。因為連接在歸還到池里的時候并不能保證是還保持連接的狀態。另外一個更嚴重的問題是,因為client的連接不再被關閉了,當server端關閉連接時,從client向server做io.CopyBuffer的goroutine就無法退出了。
所以,有以下幾個問題要解決:
通過SetDeadline中斷Goroutine
一個普遍的觀點是Goroutine是無法被中斷的。當一個Goroutine在做conn.Read時,這個協程就被阻塞在那里了。實際上并不是毫無辦法的,我們可以通過conn.Close來中斷Goroutine。但是在連接池的情況下,又無法Close鏈接。另外一種做法就是通過SetDeadline為一個過去的時間戳來中斷當前正在進行的阻塞讀或者阻塞寫。
var pool = make(chan net.Conn, 100) type client struct { conn net.Conn inUse *sync.WaitGroup } func borrow() (clt *client, err error) { var conn net.Conn select { case conn = <- pool: default: conn, err = net.Dial("tcp", "127.0.0.1:18849") } if err != nil { return nil, err } clt = &client{ conn: conn, inUse: &sync.WaitGroup{}, } return } func release(clt *client) error { clt.conn.SetDeadline(time.Now().Add(-time.Second)) clt.inUse.Done() clt.inUse.Wait() select { case pool <- clt.conn: // returned to pool return nil default: // pool is overflow return clt.conn.Close() } } func handle(server *net.TCPConn) { defer server.Close() clt, err := borrow() if err != nil { fmt.Print(err) return } clt.inUse.Add(1) defer release(clt) go func() { clt.inUse.Add(1) defer server.Close() defer release(clt) buf := make([]byte, 2048) io.CopyBuffer(server, clt.conn, buf) }() buf := make([]byte, 2048) io.CopyBuffer(clt.conn, server, buf) }
通過SetDeadline實現了goroutine的中斷,然后通過sync.WaitGroup來保證這些使用方都退出了之后再歸還給連接池。否則一個連接被復用的時候,之前的使用方可能還沒有退出。
連接有效性
為了保證在歸還給pool之前,連接仍然是有效的。連接在被讀寫的過程中如果發現了error,我們就要標記這個連接是有問題的,會釋放之后直接close掉。但是SetDeadline必然會導致讀取或者寫入的時候出現一次timeout的錯誤,所以還需要把timeout排除掉。
var pool = make(chan net.Conn, 100) type client struct { conn net.Conn inUse *sync.WaitGroup isValid int32 } const maybeValid = 0 const isValid = 1 const isInvalid = 2 func (clt *client) Read(b []byte) (n int, err error) { n, err = clt.conn.Read(b) if err != nil { if !isTimeoutError(err) { atomic.StoreInt32(&clt.isValid, isInvalid) } } else { atomic.StoreInt32(&clt.isValid, isValid) } return } func (clt *client) Write(b []byte) (n int, err error) { n, err = clt.conn.Write(b) if err != nil { if !isTimeoutError(err) { atomic.StoreInt32(&clt.isValid, isInvalid) } } else { atomic.StoreInt32(&clt.isValid, isValid) } return } type timeoutErr interface { Timeout() bool } func isTimeoutError(err error) bool { timeoutErr, _ := err.(timeoutErr) if timeoutErr == nil { return false } return timeoutErr.Timeout() } func borrow() (clt *client, err error) { var conn net.Conn select { case conn = <- pool: default: conn, err = net.Dial("tcp", "127.0.0.1:18849") } if err != nil { return nil, err } clt = &client{ conn: conn, inUse: &sync.WaitGroup{}, isValid: maybeValid, } return } func release(clt *client) error { clt.conn.SetDeadline(time.Now().Add(-time.Second)) clt.inUse.Done() clt.inUse.Wait() if clt.isValid == isValid { return clt.conn.Close() } select { case pool <- clt.conn: // returned to pool return nil default: // pool is overflow return clt.conn.Close() } } func handle(server *net.TCPConn) { defer server.Close() clt, err := borrow() if err != nil { fmt.Print(err) return } clt.inUse.Add(1) defer release(clt) go func() { clt.inUse.Add(1) defer server.Close() defer release(clt) buf := make([]byte, 2048) io.CopyBuffer(server, clt, buf) }() buf := make([]byte, 2048) io.CopyBuffer(clt, server, buf) }
判斷 error 是否是 timeout 需要類型強轉來實現。
對于連接池里的conn是否仍然是有效的,如果用后臺不斷ping的方式來實現成本比較高。因為不同的協議要連接保持需要不同的ping的方式。一個最簡單的辦法就是下次用的時候試一下。如果連接不好用了,則改成新建一個連接,避免連續拿到無效的連接。通過這種方式把無效的連接給淘汰掉。
關于正確性
本文在杭州機場寫成,完全不保證內容的正確性
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對億速云的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。