Completed initial version of outbound flow
This commit is contained in:
218
src/send.go
218
src/send.go
@@ -5,6 +5,8 @@ import (
|
||||
"golang.org/x/crypto/chacha20poly1305"
|
||||
"net"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
/* Handles outbound flow
|
||||
@@ -29,6 +31,7 @@ type QueueOutboundElement struct {
|
||||
packet []byte
|
||||
nonce uint64
|
||||
keyPair *KeyPair
|
||||
peer *Peer
|
||||
}
|
||||
|
||||
func (peer *Peer) FlushNonceQueue() {
|
||||
@@ -46,6 +49,7 @@ func (peer *Peer) InsertOutbound(elem *QueueOutboundElement) {
|
||||
for {
|
||||
select {
|
||||
case peer.queue.outbound <- elem:
|
||||
return
|
||||
default:
|
||||
select {
|
||||
case <-peer.queue.outbound:
|
||||
@@ -61,11 +65,15 @@ func (peer *Peer) InsertOutbound(elem *QueueOutboundElement) {
|
||||
* Obs. Single instance per TUN device
|
||||
*/
|
||||
func (device *Device) RoutineReadFromTUN(tun TUNDevice) {
|
||||
if tun.MTU() == 0 {
|
||||
// Dummy
|
||||
return
|
||||
}
|
||||
|
||||
device.log.Debug.Println("Routine, TUN Reader: started")
|
||||
for {
|
||||
// read packet
|
||||
|
||||
device.log.Debug.Println("Read")
|
||||
packet := make([]byte, 1<<16) // TODO: Fix & avoid dynamic allocation
|
||||
size, err := tun.Read(packet)
|
||||
if err != nil {
|
||||
@@ -94,13 +102,16 @@ func (device *Device) RoutineReadFromTUN(tun TUNDevice) {
|
||||
|
||||
default:
|
||||
device.log.Debug.Println("Receieved packet with unknown IP version")
|
||||
return
|
||||
}
|
||||
|
||||
if peer == nil {
|
||||
device.log.Debug.Println("No peer configured for IP")
|
||||
continue
|
||||
}
|
||||
if peer.endpoint == nil {
|
||||
device.log.Debug.Println("No known endpoint for peer", peer.id)
|
||||
continue
|
||||
}
|
||||
|
||||
// insert into nonce/pre-handshake queue
|
||||
|
||||
@@ -131,69 +142,95 @@ func (peer *Peer) RoutineNonce() {
|
||||
var packet []byte
|
||||
var keyPair *KeyPair
|
||||
|
||||
for {
|
||||
device := peer.device
|
||||
logger := device.log.Debug
|
||||
|
||||
// wait for packet
|
||||
logger.Println("Routine, nonce worker, started for peer", peer.id)
|
||||
|
||||
if packet == nil {
|
||||
select {
|
||||
case packet = <-peer.queue.nonce:
|
||||
case <-peer.signal.stopSending:
|
||||
close(peer.queue.outbound)
|
||||
return
|
||||
func() {
|
||||
|
||||
for {
|
||||
NextPacket:
|
||||
|
||||
// wait for packet
|
||||
|
||||
if packet == nil {
|
||||
select {
|
||||
case packet = <-peer.queue.nonce:
|
||||
case <-peer.signal.stop:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// wait for key pair
|
||||
// wait for key pair
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-peer.signal.newKeyPair:
|
||||
default:
|
||||
}
|
||||
|
||||
for keyPair == nil {
|
||||
peer.signal.newHandshake <- true
|
||||
select {
|
||||
case <-peer.keyPairs.newKeyPair:
|
||||
keyPair = peer.keyPairs.Current()
|
||||
continue
|
||||
case <-peer.signal.flushNonceQueue:
|
||||
peer.FlushNonceQueue()
|
||||
packet = nil
|
||||
continue
|
||||
case <-peer.signal.stopSending:
|
||||
close(peer.queue.outbound)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// process current packet
|
||||
|
||||
if packet != nil {
|
||||
|
||||
// create work element
|
||||
|
||||
work := new(QueueOutboundElement) // TODO: profile, maybe use pool
|
||||
work.keyPair = keyPair
|
||||
work.packet = packet
|
||||
work.nonce = keyPair.sendNonce
|
||||
work.mutex.Lock()
|
||||
|
||||
packet = nil
|
||||
keyPair.sendNonce += 1
|
||||
|
||||
// drop packets until there is space
|
||||
|
||||
func() {
|
||||
for {
|
||||
select {
|
||||
case peer.device.queue.encryption <- work:
|
||||
return
|
||||
default:
|
||||
drop := <-peer.device.queue.encryption
|
||||
drop.packet = nil
|
||||
drop.mutex.Unlock()
|
||||
if keyPair != nil && keyPair.sendNonce < RejectAfterMessages {
|
||||
if time.Now().Sub(keyPair.created) < RejectAfterTime {
|
||||
break
|
||||
}
|
||||
}
|
||||
}()
|
||||
peer.queue.outbound <- work
|
||||
|
||||
sendSignal(peer.signal.handshakeBegin)
|
||||
logger.Println("Waiting for key-pair, peer", peer.id)
|
||||
|
||||
select {
|
||||
case <-peer.signal.newKeyPair:
|
||||
logger.Println("Key-pair negotiated for peer", peer.id)
|
||||
goto NextPacket
|
||||
|
||||
case <-peer.signal.flushNonceQueue:
|
||||
logger.Println("Clearing queue for peer", peer.id)
|
||||
peer.FlushNonceQueue()
|
||||
packet = nil
|
||||
goto NextPacket
|
||||
|
||||
case <-peer.signal.stop:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// process current packet
|
||||
|
||||
if packet != nil {
|
||||
|
||||
// create work element
|
||||
|
||||
work := new(QueueOutboundElement) // TODO: profile, maybe use pool
|
||||
work.keyPair = keyPair
|
||||
work.packet = packet
|
||||
work.nonce = atomic.AddUint64(&keyPair.sendNonce, 1)
|
||||
work.peer = peer
|
||||
work.mutex.Lock()
|
||||
|
||||
packet = nil
|
||||
|
||||
// drop packets until there is space
|
||||
|
||||
func() {
|
||||
for {
|
||||
select {
|
||||
case peer.device.queue.encryption <- work:
|
||||
return
|
||||
default:
|
||||
drop := <-peer.device.queue.encryption
|
||||
drop.packet = nil
|
||||
drop.mutex.Unlock()
|
||||
}
|
||||
}
|
||||
}()
|
||||
peer.queue.outbound <- work
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
logger.Println("Routine, nonce worker, stopped for peer", peer.id)
|
||||
}
|
||||
|
||||
/* Encrypts the elements in the queue
|
||||
@@ -227,6 +264,10 @@ func (device *Device) RoutineEncryption() {
|
||||
nil,
|
||||
)
|
||||
work.mutex.Unlock()
|
||||
|
||||
// initiate new handshake
|
||||
|
||||
work.peer.KeepKeyFreshSending()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -235,21 +276,54 @@ func (device *Device) RoutineEncryption() {
|
||||
* Obs. Single instance per peer.
|
||||
* The routine terminates then the outbound queue is closed.
|
||||
*/
|
||||
func (peer *Peer) RoutineSequential() {
|
||||
for work := range peer.queue.outbound {
|
||||
work.mutex.Lock()
|
||||
func() {
|
||||
peer.mutex.RLock()
|
||||
defer peer.mutex.RUnlock()
|
||||
if work.packet == nil {
|
||||
return
|
||||
}
|
||||
if peer.endpoint == nil {
|
||||
return
|
||||
}
|
||||
peer.device.conn.WriteToUDP(work.packet, peer.endpoint)
|
||||
peer.timer.sendKeepalive.Reset(peer.persistentKeepaliveInterval)
|
||||
}()
|
||||
work.mutex.Unlock()
|
||||
func (peer *Peer) RoutineSequentialSender() {
|
||||
logger := peer.device.log.Debug
|
||||
logger.Println("Routine, sequential sender, started for peer", peer.id)
|
||||
|
||||
device := peer.device
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-peer.signal.stop:
|
||||
logger.Println("Routine, sequential sender, stopped for peer", peer.id)
|
||||
return
|
||||
case work := <-peer.queue.outbound:
|
||||
work.mutex.Lock()
|
||||
func() {
|
||||
if work.packet == nil {
|
||||
return
|
||||
}
|
||||
|
||||
peer.mutex.RLock()
|
||||
defer peer.mutex.RUnlock()
|
||||
|
||||
if peer.endpoint == nil {
|
||||
logger.Println("No endpoint for peer:", peer.id)
|
||||
return
|
||||
}
|
||||
|
||||
device.net.mutex.RLock()
|
||||
defer device.net.mutex.RUnlock()
|
||||
|
||||
if device.net.conn == nil {
|
||||
logger.Println("No source for device")
|
||||
return
|
||||
}
|
||||
|
||||
logger.Println("Sending packet for peer", peer.id, work.packet)
|
||||
|
||||
_, err := device.net.conn.WriteToUDP(work.packet, peer.endpoint)
|
||||
logger.Println("SEND:", peer.endpoint, err)
|
||||
atomic.AddUint64(&peer.tx_bytes, uint64(len(work.packet)))
|
||||
|
||||
// shift keep-alive timer
|
||||
|
||||
if peer.persistentKeepaliveInterval != 0 {
|
||||
interval := time.Duration(peer.persistentKeepaliveInterval) * time.Second
|
||||
peer.timer.sendKeepalive.Reset(interval)
|
||||
}
|
||||
}()
|
||||
work.mutex.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user