Rewrite timers and related state machines
This commit is contained in:
134
send.go
134
send.go
@@ -6,6 +6,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"golang.org/x/crypto/chacha20poly1305"
|
||||
"golang.org/x/net/ipv4"
|
||||
@@ -46,21 +47,10 @@ type QueueOutboundElement struct {
|
||||
buffer *[MaxMessageSize]byte // slice holding the packet data
|
||||
packet []byte // slice of "buffer" (always!)
|
||||
nonce uint64 // nonce for encryption
|
||||
keyPair *KeyPair // key-pair for encryption
|
||||
keyPair *Keypair // key-pair for encryption
|
||||
peer *Peer // related peer
|
||||
}
|
||||
|
||||
func (peer *Peer) flushNonceQueue() {
|
||||
elems := len(peer.queue.nonce)
|
||||
for i := 0; i < elems; i++ {
|
||||
select {
|
||||
case <-peer.queue.nonce:
|
||||
default:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (device *Device) NewOutboundElement() *QueueOutboundElement {
|
||||
return &QueueOutboundElement{
|
||||
dropped: AtomicFalse,
|
||||
@@ -114,6 +104,73 @@ func addToEncryptionQueue(
|
||||
}
|
||||
}
|
||||
|
||||
/* Queues a keepalive if no packets are queued for peer
|
||||
*/
|
||||
func (peer *Peer) SendKeepalive() bool {
|
||||
if len(peer.queue.nonce) != 0 || peer.queue.packetInNonceQueueIsAwaitingKey {
|
||||
return false
|
||||
}
|
||||
elem := peer.device.NewOutboundElement()
|
||||
elem.packet = nil
|
||||
select {
|
||||
case peer.queue.nonce <- elem:
|
||||
peer.device.log.Debug.Println(peer, ": Sending keepalive packet")
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
/* Sends a new handshake initiation message to the peer (endpoint)
|
||||
*/
|
||||
func (peer *Peer) SendHandshakeInitiation(isRetry bool) error {
|
||||
if !isRetry {
|
||||
peer.timers.handshakeAttempts = 0
|
||||
}
|
||||
|
||||
if time.Now().Sub(peer.timers.lastSentHandshake) < RekeyTimeout {
|
||||
return nil
|
||||
}
|
||||
peer.timers.lastSentHandshake = time.Now() //TODO: locking for this variable?
|
||||
|
||||
// create initiation message
|
||||
|
||||
msg, err := peer.device.CreateMessageInitiation(peer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
peer.device.log.Debug.Println(peer, ": Sending handshake initiation")
|
||||
|
||||
// marshal handshake message
|
||||
|
||||
var buff [MessageInitiationSize]byte
|
||||
writer := bytes.NewBuffer(buff[:0])
|
||||
binary.Write(writer, binary.LittleEndian, msg)
|
||||
packet := writer.Bytes()
|
||||
peer.mac.AddMacs(packet)
|
||||
|
||||
// send to endpoint
|
||||
|
||||
peer.timersAnyAuthenticatedPacketTraversal()
|
||||
peer.timersHandshakeInitiated()
|
||||
return peer.SendBuffer(packet)
|
||||
}
|
||||
|
||||
/* Called when a new authenticated message has been send
|
||||
*
|
||||
*/
|
||||
func (peer *Peer) keepKeyFreshSending() {
|
||||
kp := peer.keyPairs.Current()
|
||||
if kp == nil {
|
||||
return
|
||||
}
|
||||
nonce := atomic.LoadUint64(&kp.sendNonce)
|
||||
if nonce > RekeyAfterMessages || (kp.isInitiator && time.Now().Sub(kp.created) > RekeyAfterTime) {
|
||||
peer.SendHandshakeInitiation(false)
|
||||
}
|
||||
}
|
||||
|
||||
/* Reads packets from the TUN and inserts
|
||||
* into nonce queue for peer
|
||||
*
|
||||
@@ -180,13 +237,22 @@ func (device *Device) RoutineReadFromTUN() {
|
||||
// insert into nonce/pre-handshake queue
|
||||
|
||||
if peer.isRunning.Get() {
|
||||
peer.event.handshakePushDeadline.Fire()
|
||||
if peer.queue.packetInNonceQueueIsAwaitingKey {
|
||||
peer.SendHandshakeInitiation(false)
|
||||
}
|
||||
addToOutboundQueue(peer.queue.nonce, elem)
|
||||
elem = device.NewOutboundElement()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (peer *Peer) FlushNonceQueue() {
|
||||
select {
|
||||
case peer.signals.flushNonceQueue <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
/* Queues packets when there is no handshake.
|
||||
* Then assigns nonces to packets sequentially
|
||||
* and creates "work" structs for workers
|
||||
@@ -194,13 +260,14 @@ func (device *Device) RoutineReadFromTUN() {
|
||||
* Obs. A single instance per peer
|
||||
*/
|
||||
func (peer *Peer) RoutineNonce() {
|
||||
var keyPair *KeyPair
|
||||
var keyPair *Keypair
|
||||
|
||||
device := peer.device
|
||||
logDebug := device.log.Debug
|
||||
|
||||
defer func() {
|
||||
logDebug.Println(peer, ": Routine: nonce worker - stopped")
|
||||
peer.queue.packetInNonceQueueIsAwaitingKey = false
|
||||
peer.routines.stopping.Done()
|
||||
}()
|
||||
|
||||
@@ -209,8 +276,7 @@ func (peer *Peer) RoutineNonce() {
|
||||
|
||||
for {
|
||||
NextPacket:
|
||||
|
||||
peer.event.flushNonceQueue.Clear()
|
||||
peer.queue.packetInNonceQueueIsAwaitingKey = false
|
||||
|
||||
select {
|
||||
case <-peer.routines.stop:
|
||||
@@ -225,34 +291,48 @@ func (peer *Peer) RoutineNonce() {
|
||||
// wait for key pair
|
||||
|
||||
for {
|
||||
|
||||
peer.event.newKeyPair.Clear()
|
||||
|
||||
keyPair = peer.keyPairs.Current()
|
||||
if keyPair != nil && keyPair.sendNonce < RejectAfterMessages {
|
||||
if time.Now().Sub(keyPair.created) < RejectAfterTime {
|
||||
break
|
||||
}
|
||||
}
|
||||
peer.queue.packetInNonceQueueIsAwaitingKey = true
|
||||
|
||||
peer.event.handshakeBegin.Fire()
|
||||
select {
|
||||
case <-peer.signals.newKeypairArrived:
|
||||
default:
|
||||
}
|
||||
|
||||
peer.SendHandshakeInitiation(false)
|
||||
|
||||
logDebug.Println(peer, ": Awaiting key-pair")
|
||||
|
||||
select {
|
||||
case <-peer.event.newKeyPair.C:
|
||||
case <-peer.signals.newKeypairArrived:
|
||||
logDebug.Println(peer, ": Obtained awaited key-pair")
|
||||
case <-peer.event.flushNonceQueue.C:
|
||||
goto NextPacket
|
||||
case <-peer.signals.flushNonceQueue:
|
||||
for {
|
||||
select {
|
||||
case <-peer.queue.nonce:
|
||||
default:
|
||||
goto NextPacket
|
||||
}
|
||||
}
|
||||
case <-peer.routines.stop:
|
||||
return
|
||||
}
|
||||
}
|
||||
peer.queue.packetInNonceQueueIsAwaitingKey = false
|
||||
|
||||
// populate work element
|
||||
|
||||
elem.peer = peer
|
||||
elem.nonce = atomic.AddUint64(&keyPair.sendNonce, 1) - 1
|
||||
// double check in case of race condition added by future code
|
||||
if elem.nonce >= RejectAfterMessages {
|
||||
goto NextPacket
|
||||
}
|
||||
elem.keyPair = keyPair
|
||||
elem.dropped = AtomicFalse
|
||||
elem.mutex.Lock()
|
||||
@@ -288,7 +368,7 @@ func (device *Device) RoutineEncryption() {
|
||||
// fetch next element
|
||||
|
||||
select {
|
||||
case <-device.signal.stop.Wait():
|
||||
case <-device.signals.stop:
|
||||
return
|
||||
|
||||
case elem, ok := <-device.queue.encryption:
|
||||
@@ -389,11 +469,11 @@ func (peer *Peer) RoutineSequentialSender() {
|
||||
|
||||
// update timers
|
||||
|
||||
peer.event.anyAuthenticatedPacketTraversal.Fire()
|
||||
peer.timersAnyAuthenticatedPacketTraversal()
|
||||
if len(elem.packet) != MessageKeepaliveSize {
|
||||
peer.event.dataSent.Fire()
|
||||
peer.timersDataSent()
|
||||
}
|
||||
peer.KeepKeyFreshSending()
|
||||
peer.keepKeyFreshSending()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user