/usr/share/gocode/src/github.com/hashicorp/serf/serf/ping_delegate.go is in golang-github-hashicorp-serf-dev 0.7.0~ds1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 | package serf
import (
"bytes"
"log"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/serf/coordinate"
)
// pingDelegate is notified when memberlist successfully completes a direct ping
// of a peer node. We use this to update our estimated network coordinate, as
// well as cache the coordinate of the peer.
type pingDelegate struct {
serf *Serf
}
const (
// PingVersion is an internal version for the ping message, above the normal
// versioning we get from the protocol version. This enables small updates
// to the ping message without a full protocol bump.
PingVersion = 1
)
// AckPayload is called to produce a payload to send back in response to a ping
// request.
func (p *pingDelegate) AckPayload() []byte {
var buf bytes.Buffer
// The first byte is the version number, forming a simple header.
version := []byte{PingVersion}
buf.Write(version)
// The rest of the message is the serialized coordinate.
enc := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
if err := enc.Encode(p.serf.coordClient.GetCoordinate()); err != nil {
log.Printf("[ERR] serf: Failed to encode coordinate: %v\n", err)
}
return buf.Bytes()
}
// NotifyPingComplete is called when this node successfully completes a direct ping
// of a peer node.
func (p *pingDelegate) NotifyPingComplete(other *memberlist.Node, rtt time.Duration, payload []byte) {
if payload == nil || len(payload) == 0 {
return
}
// Verify ping version in the header.
version := payload[0]
if version != PingVersion {
log.Printf("[ERR] serf: Unsupported ping version: %v", version)
return
}
// Process the remainder of the message as a coordinate.
r := bytes.NewReader(payload[1:])
dec := codec.NewDecoder(r, &codec.MsgpackHandle{})
var coord coordinate.Coordinate
if err := dec.Decode(&coord); err != nil {
log.Printf("[ERR] serf: Failed to decode coordinate from ping: %v", err)
}
// Apply the update. Since this is a coordinate coming from some place
// else we harden this and look for dimensionality problems proactively.
before := p.serf.coordClient.GetCoordinate()
if before.IsCompatibleWith(&coord) {
after := p.serf.coordClient.Update(other.Name, &coord, rtt)
// Publish some metrics to give us an idea of how much we are
// adjusting each time we update.
d := float32(before.DistanceTo(after).Seconds() * 1.0e3)
metrics.AddSample([]string{"serf", "coordinate", "adjustment-ms"}, d)
// Cache the coordinate for the other node, and add our own
// to the cache as well since it just got updated. This lets
// users call GetCachedCoordinate with our node name, which is
// more friendly.
p.serf.coordCacheLock.Lock()
p.serf.coordCache[other.Name] = &coord
p.serf.coordCache[p.serf.config.NodeName] = p.serf.coordClient.GetCoordinate()
p.serf.coordCacheLock.Unlock()
} else {
log.Printf("[ERR] serf: Rejected bad coordinate: %v\n", coord)
}
}
|