加入了Tunnel统计功能。

This commit is contained in:
dmy@lab 2015-11-20 14:19:10 +08:00
parent 6f8bda54e4
commit fff90f7057
5 changed files with 120 additions and 18 deletions

View File

@ -31,7 +31,11 @@ func doHttp(c chan int) {
log.Println("could not get:", err)
return
}
defer resp.Body.Close()
defer func() {
resp.Body.Close()
resp.Close = true
log.Println("body close")
}()
body, err := ioutil.ReadAll(resp.Body)
log.Printf("got %d bytes\n", len(body))
if err != nil {
@ -54,18 +58,18 @@ func randMillionSecond() time.Duration {
func attack() {
c := make(chan int)
for i := 0; i < 10; i++ {
for i := 0; i < 1; i++ {
time.Sleep(time.Millisecond * randMillionSecond())
go doHttp(c)
}
for i := 0; i < 10; i++ {
for i := 0; i < 1; i++ {
<-c
}
log.Println("Finish")
}
func main() {
llog.LogTo("applog/log.txt", "DEBUG")
llog.LogTo("applog/log.txt", "INFO")
defer profile.Start(profile.CPUProfile).Stop()
benchSeed = 0
go transClient()

View File

@ -4,6 +4,7 @@ import (
"bufio"
"fmt"
"github.com/TransX/log"
"github.com/TransX/protocol"
"io"
"net"
"os"
@ -90,13 +91,7 @@ func serverBin(t *testing.T) {
}
}
////
f, _ := os.Create("server.bin")
f.Write(bytes[1 : nBinByte+1])
f.Close()
////
log.Info("Test Server. All Matches.")
// log.Info("Test Server Receive %s", string(bytes[:n]))
_, err = conn.Write([]byte("OK"))
log.Info("Test Server write")
@ -220,12 +215,12 @@ func clientText(t *testing.T) {
}
func TestTunnel(t *testing.T) {
log.LogTo("stdout", "ERROR")
log.LogTo("applog/test.log", "DEBUG")
log.Info("Test Start testing.")
go serverBin(t)
go clientBin(t)
trans1 := NewTransTCP()
trans1 := protocol.NewTransTCP()
go trans1.Start("1200", "127.0.0.1", "1201", "client")
trans2 := NewTransTCP()
trans2 := protocol.NewTransTCP()
trans2.Start("1201", "127.0.0.1", "1244", "server")
}

View File

@ -4,10 +4,17 @@ import (
"errors"
"fmt"
"github.com/TransX/log"
"github.com/TransX/stat"
"net"
"os"
)
var tunMng *stat.TunnelStatusManager
func init() {
tunMng = stat.NewTunnelStatusManager()
}
type TransTCP struct {
}
@ -68,12 +75,24 @@ func (this *TransTCP) Start(listenPort, destIP, destPort string, clientOrServer
log.Info("Dial %s", destConn.RemoteAddr().String())
//tunnel model : [ -->>server ---- client -->> ](this is a tunnel)
if clientOrServer == "client" {
go NewTunnel(listenerConn, destConn, SEND).run()
go NewTunnel(destConn, listenerConn, RECEIVE).run()
ntSend := NewTunnel(listenerConn, destConn, SEND)
ntSend.SetRegChan(tunMng.GetRegChan())
ntSend.SetUnRegChan(tunMng.GetUnregChan())
ntRev := NewTunnel(destConn, listenerConn, RECEIVE)
ntRev.SetRegChan(tunMng.GetRegChan())
ntRev.SetUnRegChan(tunMng.GetUnregChan())
go ntSend.Run()
go ntRev.Run()
}
if clientOrServer == "server" {
go NewTunnel(listenerConn, destConn, RECEIVE).run()
go NewTunnel(destConn, listenerConn, SEND).run()
ntRev := NewTunnel(listenerConn, destConn, RECEIVE)
ntRev.SetRegChan(tunMng.GetRegChan())
ntRev.SetUnRegChan(tunMng.GetUnregChan())
ntSend := NewTunnel(destConn, listenerConn, SEND)
ntSend.SetRegChan(tunMng.GetRegChan())
ntSend.SetUnRegChan(tunMng.GetUnregChan())
go ntRev.Run()
go ntSend.Run()
}
}()

View File

@ -23,6 +23,8 @@ type Tunnel struct {
src net.Conn
dest net.Conn
cipherDirection Direction
regChan chan interface{}
unregChan chan interface{}
}
func NewTunnel(src, dest net.Conn, cipherDirection Direction) *Tunnel {
@ -42,14 +44,26 @@ func (this *Tunnel) SetID(id string) { //rarely used
this.id = id
}
func (this *Tunnel) SetRegChan(c chan interface{}) {
this.regChan = c
}
func (this *Tunnel) SetUnRegChan(c chan interface{}) {
this.unregChan = c
}
//tunnel model : [ -->>server ---- client -->> ](this is a tunnel)
func (this *Tunnel) run() { //单向的从src发送到dest
func (this *Tunnel) Run() { //单向的从src发送到dest
//进行注册
this.regChan <- this
src := this.src
dest := this.dest
cipherDirection := this.cipherDirection
id := this.id
defer func() {
log.Info("tunnel id %s ends", id)
//注销
this.unregChan <- this
if r := recover(); r != nil {
if src != nil {
src.Close()

70
stat/tunnelstatus.go Normal file
View File

@ -0,0 +1,70 @@
package stat
import (
"container/list"
"fmt"
"github.com/TransX/log"
"sync"
)
type TunnelStatusManager struct {
mux sync.Mutex
tunnelList list.List
regChan chan interface{}
unregChan chan interface{}
}
func NewTunnelStatusManager() *TunnelStatusManager {
t := new(TunnelStatusManager)
t.regChan = make(chan interface{})
t.unregChan = make(chan interface{})
go t.chanListener()
return t
}
func (this *TunnelStatusManager) GetRegChan() chan interface{} {
return this.regChan
}
func (this *TunnelStatusManager) GetUnregChan() chan interface{} {
return this.unregChan
}
func (this *TunnelStatusManager) register(t interface{}) {
this.mux.Lock()
defer this.mux.Unlock()
this.tunnelList.PushBack(t)
}
func (this *TunnelStatusManager) unregister(t interface{}) {
this.mux.Lock()
defer this.mux.Unlock()
l := this.tunnelList
for e := l.Front(); e != nil; e = e.Next() {
if e == t {
l.Remove(e)
break
}
}
}
func (this *TunnelStatusManager) chanListener() {
for {
select {
case r := <-this.regChan:
this.register(r)
log.Info("A tunnel registered")
log.Info(this.QueryStatString())
case ur := <-this.unregChan:
this.unregister(ur)
log.Info("A tunnel unregistered")
}
}
}
func (this *TunnelStatusManager) QueryStatString() string {
this.mux.Lock()
defer this.mux.Unlock()
return fmt.Sprintf("There %d tunnels running.\n", this.tunnelList.Len())
}