This file is indexed.

/usr/share/gocode/src/github.com/hashicorp/serf/command/agent/ipc_query_response_stream.go is in golang-github-hashicorp-serf-dev 0.7.0~ds1-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package agent

import (
	"github.com/hashicorp/serf/serf"
	"log"
	"time"
)

// queryResponseStream is used to stream the query results back to a client
type queryResponseStream struct {
	client streamClient
	logger *log.Logger
	seq    uint64
}

func newQueryResponseStream(client streamClient, seq uint64, logger *log.Logger) *queryResponseStream {
	qs := &queryResponseStream{
		client: client,
		logger: logger,
		seq:    seq,
	}
	return qs
}

// Stream is a long running routine used to stream the results of a query back to a client
func (qs *queryResponseStream) Stream(resp *serf.QueryResponse) {
	// Setup a timer for the query ending
	remaining := resp.Deadline().Sub(time.Now())
	done := time.After(remaining)

	ackCh := resp.AckCh()
	respCh := resp.ResponseCh()
	for {
		select {
		case a := <-ackCh:
			if err := qs.sendAck(a); err != nil {
				qs.logger.Printf("[ERR] agent.ipc: Failed to stream ack to %v: %v", qs.client, err)
				return
			}
		case r := <-respCh:
			if err := qs.sendResponse(r.From, r.Payload); err != nil {
				qs.logger.Printf("[ERR] agent.ipc: Failed to stream response to %v: %v", qs.client, err)
				return
			}
		case <-done:
			if err := qs.sendDone(); err != nil {
				qs.logger.Printf("[ERR] agent.ipc: Failed to stream query end to %v: %v", qs.client, err)
			}
			return
		}
	}
}

// sendAck is used to send a single ack
func (qs *queryResponseStream) sendAck(from string) error {
	header := responseHeader{
		Seq:   qs.seq,
		Error: "",
	}
	rec := queryRecord{
		Type: queryRecordAck,
		From: from,
	}
	return qs.client.Send(&header, &rec)
}

// sendResponse is used to send a single response
func (qs *queryResponseStream) sendResponse(from string, payload []byte) error {
	header := responseHeader{
		Seq:   qs.seq,
		Error: "",
	}
	rec := queryRecord{
		Type:    queryRecordResponse,
		From:    from,
		Payload: payload,
	}
	return qs.client.Send(&header, &rec)
}

// sendDone is used to signal the end
func (qs *queryResponseStream) sendDone() error {
	header := responseHeader{
		Seq:   qs.seq,
		Error: "",
	}
	rec := queryRecord{
		Type: queryRecordDone,
	}
	return qs.client.Send(&header, &rec)
}