transx/protocol/tunnel.go

162 lines
3.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package protocol
import (
"bytes"
"crypto/md5"
"encoding/hex"
// "fmt"
"github.com/TransX/cache"
"github.com/TransX/log"
"github.com/TransX/tscipher"
"net"
"strconv"
"sync/atomic"
"time"
)
var seed int32
func init() {
seed = 0
}
type Tunnel struct {
id string
src net.Conn
dest net.Conn
cipherDirection Direction
regChan chan interface{}
unregChan chan interface{}
}
func NewTunnel(src, dest net.Conn, cipherDirection Direction) *Tunnel {
return &Tunnel{
id: tunnelID(),
src: src,
dest: dest,
cipherDirection: cipherDirection,
}
}
func (this *Tunnel) GetID(id string) string {
return this.id
}
func (this *Tunnel) SetID(id string) { //rarely used
this.id = id
}
func (this *Tunnel) SetRegChan(c chan interface{}) {
this.regChan = c
}
func (this *Tunnel) SetUnRegChan(c chan interface{}) {
this.unregChan = c
}
//tunnel model : [ -->>server ---- client -->> ](this is a tunnel)
func (this *Tunnel) Run() { //单向的从src发送到dest
//进行注册
this.regChan <- this
src := this.src
dest := this.dest
// cipherDirection := this.cipherDirection
id := this.id
// cache := make([]byte, 1024*4) //4kB
//构建Carrier
queCache := cache.NewUnblockingQueueCache(1)
msg := cache.NewBlockingQueueCache(1)
revCarrier := tscipher.NewCarrier(src, tscipher.NewCipher("XOR"), queCache, msg, id)
sendCarrier := tscipher.NewCarrier(dest, tscipher.NewCipher("XOR"), queCache, msg, id)
//timer
// for {
// nCh := make(chan int)
go this.receive(revCarrier)
go this.send(sendCarrier)
// log.Info("id %s send %d /receive %d duration %d ms", n, nByte, id, time.Since(srTimer)/1000)
// }
}
func (this *Tunnel) receive(revCarrier *tscipher.Carrier) {
src := this.src
dest := this.dest
cipherDirection := this.cipherDirection
id := this.id
defer func() {
// log.Debug("tunnel id %s ends", id)
//注销
// this.unregChan <- this
if r := recover(); r != nil {
if src != nil {
src.Close()
}
if dest != nil {
dest.Close()
}
}
}()
// srTimer := time.Now() //send receive timer
var n int
var err error
for {
rTimer := time.Now() //receive timer
if cipherDirection != RECEIVE {
revCarrier.Cipher = nil
n, err = tscipher.RowReceiveData(revCarrier)
} else {
n, err = tscipher.ReceiveData(revCarrier)
}
// fmt.Println("receive")
log.Info("id %s time to receive %d", id, time.Since(rTimer)/1000)
if err != nil {
log.Panic("Read panic. Tunnel id: %s. Remote Add: %s Local: %s. Err:%s", id, src.RemoteAddr().String(), src.LocalAddr().String(), err.Error())
}
log.Debug("Reived %d bytes (local add %s) from %s. Tunnel: id %s", n, src.LocalAddr().String(),src.RemoteAddr().String(), id)
// nCh <- n
}
}
func (this *Tunnel) send(sendCarrier *tscipher.Carrier) {
src := this.src
dest := this.dest
cipherDirection := this.cipherDirection
id := this.id
defer func() {
// log.Debug("tunnel id %s ends", id)
//注销
this.unregChan <- this
if r := recover(); r != nil {
if src != nil {
src.Close()
}
if dest != nil {
dest.Close()
}
}
}()
if cipherDirection != SEND {
sendCarrier.Cipher = nil
}
for {
// nByte := <-nCh
// fmt.Println("in send loop\n")
sTimer := time.Now() //send timer
_, err := tscipher.SendData(sendCarrier)
// fmt.Println("send")
log.Info("id %s time to send %d", id, time.Since(sTimer)/1000)
if err != nil {
log.Panic("Write panic. ID: %s, Err: %s, Remote Add: %s", id, err, dest.RemoteAddr().String())
}
// log.Debug("Write %d bytes from %s to %s. Tunnel: %s . 18 bytes %x", n, dest.LocalAddr(), dest.RemoteAddr().String(), id, sendCarrier.Cache[:18])
}
}
func tunnelID() string {
nowString := time.Now().String() + strconv.Itoa(int(seed))
atomic.AddInt32(&seed, 1) //避免多线程情况下获得的种子相同
md5Byte := md5.Sum(bytes.NewBufferString(nowString).Bytes())
return hex.EncodeToString(md5Byte[:])
}