transx/model/tunnel.go

136 lines
3.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package model
import (
// "fmt"
"github.com/TransX/cache"
"github.com/TransX/constant"
"github.com/TransX/log"
"github.com/TransX/tscipher"
"net"
"time"
)
type Tunnel struct {
id string
src net.Conn
dest net.Conn
cipherDirection constant.Direction
regChan chan interface{}
unregChan chan interface{}
}
func NewTunnel(id string, src, dest net.Conn, cipherDirection constant.Direction) *Tunnel {
return &Tunnel{
id: id,
src: src,
dest: dest,
cipherDirection: cipherDirection,
}
}
func (this *Tunnel) GetID() string {
return this.id
}
func (this *Tunnel) SetID(id string) { //rarely used
this.id = id
}
func (this *Tunnel) SetRegChan(c chan interface{}) {
this.regChan = c
}
func (this *Tunnel) SetUnRegChan(c chan interface{}) {
this.unregChan = c
}
//tunnel model : [ -->>server ---- client -->> ](this is a tunnel)
func (this *Tunnel) Run() { //单向的从src发送到dest
//进行注册
this.regChan <- this
src := this.src
dest := this.dest
// cipherDirection := this.cipherDirection
id := this.id
// cache := make([]byte, 1024*4) //4kB
//构建Carrier
queCache := cache.NewBlockingQueueCache(1)
for i := 0; i < 1; i++ {
queCache.Put(make([]byte, 1024*4), 0)
}
msg := cache.NewBlockingQueueCache(1)
revCarrier := tscipher.NewCarrier(src, tscipher.NewCipher("XOR"), queCache, msg, id)
sendCarrier := tscipher.NewCarrier(dest, tscipher.NewCipher("XOR"), queCache, msg, id)
go this.receive(revCarrier)
go this.send(sendCarrier)
}
func (this *Tunnel) receive(revCarrier *tscipher.Carrier) {
src := this.src
dest := this.dest
cipherDirection := this.cipherDirection
id := this.id
defer func() {
// log.Debug("tunnel id %s ends", id)
//注销
// this.unregChan <- this
if r := recover(); r != nil {
if src != nil {
src.Close()
}
if dest != nil {
dest.Close()
}
}
}()
var n int
var err error
for {
rTimer := time.Now() //receive timer
if cipherDirection != constant.RECEIVE {
revCarrier.Cipher = nil
n, err = tscipher.RowReceiveData(revCarrier)
} else {
n, err = tscipher.ReceiveData(revCarrier)
}
log.Info("id %s time to receive %d", id, time.Since(rTimer)/1000)
if err != nil {
log.Panic("Read panic. Tunnel id: %s. Remote Add: %s Local: %s. Err:%s", id, src.RemoteAddr().String(), src.LocalAddr().String(), err.Error())
}
log.Debug("Reived %d bytes (local add %s) from %s. Tunnel: id %s", n, src.LocalAddr().String(), src.RemoteAddr().String(), id)
}
}
func (this *Tunnel) send(sendCarrier *tscipher.Carrier) {
src := this.src
dest := this.dest
cipherDirection := this.cipherDirection
id := this.id
defer func() {
//注销
this.unregChan <- this
if r := recover(); r != nil {
if src != nil {
src.Close()
}
if dest != nil {
dest.Close()
}
}
}()
if cipherDirection != constant.SEND {
sendCarrier.Cipher = nil
}
for {
sTimer := time.Now() //send timer
n, err := tscipher.SendData(sendCarrier)
log.Info("id %s time to send %d", id, time.Since(sTimer)/1000)
if err != nil {
log.Panic("Write panic. ID: %s, Err: %s, Remote Add: %s", id, err, dest.RemoteAddr().String())
}
log.Info("id %s Send %d bytes (local add %s),to %s", id, n, dest.LocalAddr().String(), dest.RemoteAddr().String())
}
}