From fff90f7057c099bc181a427673b2ddb0cda8d226 Mon Sep 17 00:00:00 2001 From: "dmy@lab" Date: Fri, 20 Nov 2015 14:19:10 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A0=E5=85=A5=E4=BA=86Tunnel=E7=BB=9F?= =?UTF-8?q?=E8=AE=A1=E5=8A=9F=E8=83=BD=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- benckmark.go | 12 +++++--- main_test.go | 13 +++----- protocol/tcp.go | 27 ++++++++++++++--- protocol/tunnel.go | 16 +++++++++- stat/tunnelstatus.go | 70 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 120 insertions(+), 18 deletions(-) create mode 100644 stat/tunnelstatus.go diff --git a/benckmark.go b/benckmark.go index e87e0ec..c14bdbe 100644 --- a/benckmark.go +++ b/benckmark.go @@ -31,7 +31,11 @@ func doHttp(c chan int) { log.Println("could not get:", err) return } - defer resp.Body.Close() + defer func() { + resp.Body.Close() + resp.Close = true + log.Println("body close") + }() body, err := ioutil.ReadAll(resp.Body) log.Printf("got %d bytes\n", len(body)) if err != nil { @@ -54,18 +58,18 @@ func randMillionSecond() time.Duration { func attack() { c := make(chan int) - for i := 0; i < 10; i++ { + for i := 0; i < 1; i++ { time.Sleep(time.Millisecond * randMillionSecond()) go doHttp(c) } - for i := 0; i < 10; i++ { + for i := 0; i < 1; i++ { <-c } log.Println("Finish") } func main() { - llog.LogTo("applog/log.txt", "DEBUG") + llog.LogTo("applog/log.txt", "INFO") defer profile.Start(profile.CPUProfile).Stop() benchSeed = 0 go transClient() diff --git a/main_test.go b/main_test.go index 69285a5..bc6133b 100644 --- a/main_test.go +++ b/main_test.go @@ -4,6 +4,7 @@ import ( "bufio" "fmt" "github.com/TransX/log" + "github.com/TransX/protocol" "io" "net" "os" @@ -90,13 +91,7 @@ func serverBin(t *testing.T) { } } - //// - f, _ := os.Create("server.bin") - f.Write(bytes[1 : nBinByte+1]) - f.Close() - //// log.Info("Test Server. All Matches.") - // log.Info("Test Server Receive %s", string(bytes[:n])) _, err = conn.Write([]byte("OK")) log.Info("Test Server write") @@ -220,12 +215,12 @@ func clientText(t *testing.T) { } func TestTunnel(t *testing.T) { - log.LogTo("stdout", "ERROR") + log.LogTo("applog/test.log", "DEBUG") log.Info("Test Start testing.") go serverBin(t) go clientBin(t) - trans1 := NewTransTCP() + trans1 := protocol.NewTransTCP() go trans1.Start("1200", "127.0.0.1", "1201", "client") - trans2 := NewTransTCP() + trans2 := protocol.NewTransTCP() trans2.Start("1201", "127.0.0.1", "1244", "server") } diff --git a/protocol/tcp.go b/protocol/tcp.go index 74fdaad..1309f6b 100644 --- a/protocol/tcp.go +++ b/protocol/tcp.go @@ -4,10 +4,17 @@ import ( "errors" "fmt" "github.com/TransX/log" + "github.com/TransX/stat" "net" "os" ) +var tunMng *stat.TunnelStatusManager + +func init() { + tunMng = stat.NewTunnelStatusManager() +} + type TransTCP struct { } @@ -68,12 +75,24 @@ func (this *TransTCP) Start(listenPort, destIP, destPort string, clientOrServer log.Info("Dial %s", destConn.RemoteAddr().String()) //tunnel model : [ -->>server ---- client -->> ](this is a tunnel) if clientOrServer == "client" { - go NewTunnel(listenerConn, destConn, SEND).run() - go NewTunnel(destConn, listenerConn, RECEIVE).run() + ntSend := NewTunnel(listenerConn, destConn, SEND) + ntSend.SetRegChan(tunMng.GetRegChan()) + ntSend.SetUnRegChan(tunMng.GetUnregChan()) + ntRev := NewTunnel(destConn, listenerConn, RECEIVE) + ntRev.SetRegChan(tunMng.GetRegChan()) + ntRev.SetUnRegChan(tunMng.GetUnregChan()) + go ntSend.Run() + go ntRev.Run() } if clientOrServer == "server" { - go NewTunnel(listenerConn, destConn, RECEIVE).run() - go NewTunnel(destConn, listenerConn, SEND).run() + ntRev := NewTunnel(listenerConn, destConn, RECEIVE) + ntRev.SetRegChan(tunMng.GetRegChan()) + ntRev.SetUnRegChan(tunMng.GetUnregChan()) + ntSend := NewTunnel(destConn, listenerConn, SEND) + ntSend.SetRegChan(tunMng.GetRegChan()) + ntSend.SetUnRegChan(tunMng.GetUnregChan()) + go ntRev.Run() + go ntSend.Run() } }() diff --git a/protocol/tunnel.go b/protocol/tunnel.go index 2057903..cce921a 100644 --- a/protocol/tunnel.go +++ b/protocol/tunnel.go @@ -23,6 +23,8 @@ type Tunnel struct { src net.Conn dest net.Conn cipherDirection Direction + regChan chan interface{} + unregChan chan interface{} } func NewTunnel(src, dest net.Conn, cipherDirection Direction) *Tunnel { @@ -42,14 +44,26 @@ func (this *Tunnel) SetID(id string) { //rarely used this.id = id } +func (this *Tunnel) SetRegChan(c chan interface{}) { + this.regChan = c +} + +func (this *Tunnel) SetUnRegChan(c chan interface{}) { + this.unregChan = c +} + //tunnel model : [ -->>server ---- client -->> ](this is a tunnel) -func (this *Tunnel) run() { //单向的,从src发送到dest +func (this *Tunnel) Run() { //单向的,从src发送到dest + //进行注册 + this.regChan <- this src := this.src dest := this.dest cipherDirection := this.cipherDirection id := this.id defer func() { log.Info("tunnel id %s ends", id) + //注销 + this.unregChan <- this if r := recover(); r != nil { if src != nil { src.Close() diff --git a/stat/tunnelstatus.go b/stat/tunnelstatus.go new file mode 100644 index 0000000..fae7fea --- /dev/null +++ b/stat/tunnelstatus.go @@ -0,0 +1,70 @@ +package stat + +import ( + "container/list" + "fmt" + "github.com/TransX/log" + "sync" +) + +type TunnelStatusManager struct { + mux sync.Mutex + tunnelList list.List + regChan chan interface{} + unregChan chan interface{} +} + +func NewTunnelStatusManager() *TunnelStatusManager { + t := new(TunnelStatusManager) + t.regChan = make(chan interface{}) + t.unregChan = make(chan interface{}) + go t.chanListener() + return t +} + +func (this *TunnelStatusManager) GetRegChan() chan interface{} { + return this.regChan +} + +func (this *TunnelStatusManager) GetUnregChan() chan interface{} { + return this.unregChan +} + +func (this *TunnelStatusManager) register(t interface{}) { + this.mux.Lock() + defer this.mux.Unlock() + this.tunnelList.PushBack(t) +} + +func (this *TunnelStatusManager) unregister(t interface{}) { + this.mux.Lock() + defer this.mux.Unlock() + l := this.tunnelList + for e := l.Front(); e != nil; e = e.Next() { + if e == t { + l.Remove(e) + break + } + } + +} + +func (this *TunnelStatusManager) chanListener() { + for { + select { + case r := <-this.regChan: + this.register(r) + log.Info("A tunnel registered") + log.Info(this.QueryStatString()) + case ur := <-this.unregChan: + this.unregister(ur) + log.Info("A tunnel unregistered") + } + } +} + +func (this *TunnelStatusManager) QueryStatString() string { + this.mux.Lock() + defer this.mux.Unlock() + return fmt.Sprintf("There %d tunnels running.\n", this.tunnelList.Len()) +}