Handshake negotiation functioning
This commit is contained in:
44
src/send.go
44
src/send.go
@@ -27,6 +27,7 @@ import (
|
||||
* workers release lock when they have completed work on the packet.
|
||||
*/
|
||||
type QueueOutboundElement struct {
|
||||
state uint32
|
||||
mutex sync.Mutex
|
||||
packet []byte
|
||||
nonce uint64
|
||||
@@ -59,6 +60,14 @@ func (peer *Peer) InsertOutbound(elem *QueueOutboundElement) {
|
||||
}
|
||||
}
|
||||
|
||||
func (elem *QueueOutboundElement) Drop() {
|
||||
atomic.StoreUint32(&elem.state, ElementStateDropped)
|
||||
}
|
||||
|
||||
func (elem *QueueOutboundElement) IsDropped() bool {
|
||||
return atomic.LoadUint32(&elem.state) == ElementStateDropped
|
||||
}
|
||||
|
||||
/* Reads packets from the TUN and inserts
|
||||
* into nonce queue for peer
|
||||
*
|
||||
@@ -162,6 +171,8 @@ func (peer *Peer) RoutineNonce() {
|
||||
}
|
||||
}
|
||||
|
||||
logger.Println("PACKET:", packet)
|
||||
|
||||
// wait for key pair
|
||||
|
||||
for {
|
||||
@@ -176,6 +187,7 @@ func (peer *Peer) RoutineNonce() {
|
||||
break
|
||||
}
|
||||
}
|
||||
logger.Println("Key pair:", keyPair)
|
||||
|
||||
sendSignal(peer.signal.handshakeBegin)
|
||||
logger.Println("Waiting for key-pair, peer", peer.id)
|
||||
@@ -205,10 +217,12 @@ func (peer *Peer) RoutineNonce() {
|
||||
work := new(QueueOutboundElement) // TODO: profile, maybe use pool
|
||||
work.keyPair = keyPair
|
||||
work.packet = packet
|
||||
work.nonce = atomic.AddUint64(&keyPair.sendNonce, 1)
|
||||
work.nonce = atomic.AddUint64(&keyPair.sendNonce, 1) - 1
|
||||
work.peer = peer
|
||||
work.mutex.Lock()
|
||||
|
||||
logger.Println("WORK:", work)
|
||||
|
||||
packet = nil
|
||||
|
||||
// drop packets until there is space
|
||||
@@ -219,9 +233,11 @@ func (peer *Peer) RoutineNonce() {
|
||||
case peer.device.queue.encryption <- work:
|
||||
return
|
||||
default:
|
||||
drop := <-peer.device.queue.encryption
|
||||
drop.packet = nil
|
||||
drop.mutex.Unlock()
|
||||
select {
|
||||
case elem := <-peer.device.queue.encryption:
|
||||
elem.Drop()
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
@@ -241,18 +257,22 @@ func (peer *Peer) RoutineNonce() {
|
||||
func (device *Device) RoutineEncryption() {
|
||||
var nonce [chacha20poly1305.NonceSize]byte
|
||||
for work := range device.queue.encryption {
|
||||
if work.IsDropped() {
|
||||
continue
|
||||
}
|
||||
|
||||
// pad packet
|
||||
|
||||
padding := device.mtu - len(work.packet)
|
||||
if padding < 0 {
|
||||
// drop
|
||||
work.packet = nil
|
||||
work.mutex.Unlock()
|
||||
work.Drop()
|
||||
continue
|
||||
}
|
||||
|
||||
for n := 0; n < padding; n += 1 {
|
||||
work.packet = append(work.packet, 0)
|
||||
}
|
||||
device.log.Debug.Println(work.packet)
|
||||
|
||||
// encrypt
|
||||
|
||||
@@ -288,6 +308,9 @@ func (peer *Peer) RoutineSequentialSender() {
|
||||
logger.Println("Routine, sequential sender, stopped for peer", peer.id)
|
||||
return
|
||||
case work := <-peer.queue.outbound:
|
||||
if work.IsDropped() {
|
||||
continue
|
||||
}
|
||||
work.mutex.Lock()
|
||||
func() {
|
||||
if work.packet == nil {
|
||||
@@ -310,10 +333,12 @@ func (peer *Peer) RoutineSequentialSender() {
|
||||
return
|
||||
}
|
||||
|
||||
logger.Println("Sending packet for peer", peer.id, work.packet)
|
||||
logger.Println(work.packet)
|
||||
|
||||
_, err := device.net.conn.WriteToUDP(work.packet, peer.endpoint)
|
||||
logger.Println("SEND:", peer.endpoint, err)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
atomic.AddUint64(&peer.tx_bytes, uint64(len(work.packet)))
|
||||
|
||||
// shift keep-alive timer
|
||||
@@ -323,7 +348,6 @@ func (peer *Peer) RoutineSequentialSender() {
|
||||
peer.timer.sendKeepalive.Reset(interval)
|
||||
}
|
||||
}()
|
||||
work.mutex.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user