transx/model/tunnel.go

142 lines
3.6 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package model
import (
// "fmt"
"net"
"time"
"github.com/TransX/cache"
"github.com/TransX/communicator"
"github.com/TransX/constant"
"github.com/TransX/log"
"github.com/TransX/tscipher"
"github.com/spf13/viper"
)
type Tunnel struct {
id string
src net.Conn
dest net.Conn
cipherDirection constant.Direction
regChan chan interface{}
unregChan chan interface{}
unregistered bool
}
func NewTunnel(id string, src, dest net.Conn, cipherDirection constant.Direction) *Tunnel {
return &Tunnel{
id: id,
src: src,
dest: dest,
cipherDirection: cipherDirection,
unregistered: false,
}
}
func (this *Tunnel) GetID() string {
return this.id
}
func (this *Tunnel) SetID(id string) { //rarely used
this.id = id
}
func (this *Tunnel) SetRegChan(c chan interface{}) {
this.regChan = c
}
func (this *Tunnel) SetUnRegChan(c chan interface{}) {
this.unregChan = c
}
//tunnel model : [ -->>server ---- client -->> ](this is a tunnel)
func (this *Tunnel) Run() { //单向的从src发送到dest
//进行注册
this.regChan <- this
src := this.src
dest := this.dest
id := this.id
//构建Carrier
queLength := viper.GetInt("queueLength")
queCache := cache.NewBlockingQueueCache(queLength)
cacheSize := viper.GetInt("cacheSize")
for i := 0; i < queLength; i++ {
queCache.Put(make([]byte, cacheSize), 0)
}
msg := cache.NewBlockingQueueCache(queLength)
revCarrier := tscipher.NewCarrier(src, tscipher.NewCipher("XOR"), queCache, msg, id)
sendCarrier := tscipher.NewCarrier(dest, tscipher.NewCipher("XOR"), queCache, msg, id)
go this.receive(revCarrier)
go this.send(sendCarrier)
}
func (this *Tunnel) onError() {
src := this.src
dest := this.dest
//注销
if !this.unregistered { // 应该不存在异步问题
this.unregChan <- this
this.unregistered = true
}
if r := recover(); r != nil {
if src != nil {
src.Close()
}
if dest != nil {
dest.Close()
}
}
}
func (this *Tunnel) receive(revCarrier *tscipher.Carrier) {
src := this.src
cipherDirection := this.cipherDirection
id := this.id
defer this.onError()
var n int
var err error
var receiver communicator.Receiver
if cipherDirection != constant.RECEIVE {
// revCarrier.Cipher = nil
// n, err = tscipher.RowReceiveData(revCarrier)
receiver = communicator.NewDirectReceiver(revCarrier)
} else {
receiver = communicator.NewNormalReceiver(revCarrier)
}
for {
rTimer := time.Now() //receive timer
n, err = receiver.Receive()
log.Info("id %s time to receive %d", id, time.Since(rTimer)/1000)
if err != nil {
log.Panic("Read panic. Tunnel id: %s. Remote Add: %s Local: %s. Err:%s", id, src.RemoteAddr().String(), src.LocalAddr().String(), err.Error())
}
log.Debug("Reived %d bytes (local add %s) from %s. Tunnel: id %s", n, src.LocalAddr().String(), src.RemoteAddr().String(), id)
}
}
func (this *Tunnel) send(sendCarrier *tscipher.Carrier) {
dest := this.dest
cipherDirection := this.cipherDirection
id := this.id
var sender communicator.Sender
defer this.onError()
if cipherDirection != constant.SEND {
// sendCarrier.Cipher = nil
sender = communicator.NewDirectSender(sendCarrier)
} else {
sender = communicator.NewNormalSender(sendCarrier)
}
for {
sTimer := time.Now() //send timer
n, err := sender.Send()
log.Info("id %s time to send %d", id, time.Since(sTimer)/1000)
if err != nil {
log.Panic("Write panic. ID: %s, Err: %s, Remote Add: %s", id, err, dest.RemoteAddr().String())
}
log.Info("id %s Send %d bytes (local add %s),to %s", id, n, dest.LocalAddr().String(), dest.RemoteAddr().String())
}
}