/usr/share/gocode/src/github.com/hashicorp/serf/command/agent/ipc_query_response_stream.go is in golang-github-hashicorp-serf-dev 0.7.0~ds1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 | package agent
import (
"github.com/hashicorp/serf/serf"
"log"
"time"
)
// queryResponseStream is used to stream the query results back to a client
type queryResponseStream struct {
client streamClient
logger *log.Logger
seq uint64
}
func newQueryResponseStream(client streamClient, seq uint64, logger *log.Logger) *queryResponseStream {
qs := &queryResponseStream{
client: client,
logger: logger,
seq: seq,
}
return qs
}
// Stream is a long running routine used to stream the results of a query back to a client
func (qs *queryResponseStream) Stream(resp *serf.QueryResponse) {
// Setup a timer for the query ending
remaining := resp.Deadline().Sub(time.Now())
done := time.After(remaining)
ackCh := resp.AckCh()
respCh := resp.ResponseCh()
for {
select {
case a := <-ackCh:
if err := qs.sendAck(a); err != nil {
qs.logger.Printf("[ERR] agent.ipc: Failed to stream ack to %v: %v", qs.client, err)
return
}
case r := <-respCh:
if err := qs.sendResponse(r.From, r.Payload); err != nil {
qs.logger.Printf("[ERR] agent.ipc: Failed to stream response to %v: %v", qs.client, err)
return
}
case <-done:
if err := qs.sendDone(); err != nil {
qs.logger.Printf("[ERR] agent.ipc: Failed to stream query end to %v: %v", qs.client, err)
}
return
}
}
}
// sendAck is used to send a single ack
func (qs *queryResponseStream) sendAck(from string) error {
header := responseHeader{
Seq: qs.seq,
Error: "",
}
rec := queryRecord{
Type: queryRecordAck,
From: from,
}
return qs.client.Send(&header, &rec)
}
// sendResponse is used to send a single response
func (qs *queryResponseStream) sendResponse(from string, payload []byte) error {
header := responseHeader{
Seq: qs.seq,
Error: "",
}
rec := queryRecord{
Type: queryRecordResponse,
From: from,
Payload: payload,
}
return qs.client.Send(&header, &rec)
}
// sendDone is used to signal the end
func (qs *queryResponseStream) sendDone() error {
header := responseHeader{
Seq: qs.seq,
Error: "",
}
rec := queryRecord{
Type: queryRecordDone,
}
return qs.client.Send(&header, &rec)
}
|