/usr/share/gocode/src/github.com/Graylog2/go-gelf/gelf/reader.go is in golang-github-graylog2-go-gelf-dev 0.0~git20160329.0.76d60fc-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | // Copyright 2012 SocialCode. All rights reserved.
// Use of this source code is governed by the MIT
// license that can be found in the LICENSE file.
package gelf
import (
"bytes"
"compress/gzip"
"compress/zlib"
"encoding/json"
"fmt"
"io"
"net"
"strings"
"sync"
)
type Reader struct {
mu sync.Mutex
conn net.Conn
}
func NewReader(addr string) (*Reader, error) {
var err error
udpAddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return nil, fmt.Errorf("ResolveUDPAddr('%s'): %s", addr, err)
}
conn, err := net.ListenUDP("udp", udpAddr)
if err != nil {
return nil, fmt.Errorf("ListenUDP: %s", err)
}
r := new(Reader)
r.conn = conn
return r, nil
}
func (r *Reader) Addr() string {
return r.conn.LocalAddr().String()
}
// FIXME: this will discard data if p isn't big enough to hold the
// full message.
func (r *Reader) Read(p []byte) (int, error) {
msg, err := r.ReadMessage()
if err != nil {
return -1, err
}
var data string
if msg.Full == "" {
data = msg.Short
} else {
data = msg.Full
}
return strings.NewReader(data).Read(p)
}
func (r *Reader) ReadMessage() (*Message, error) {
cBuf := make([]byte, ChunkSize)
var (
err error
n, length int
cid, ocid []byte
seq, total uint8
cHead []byte
cReader io.Reader
chunks [][]byte
)
for got := 0; got < 128 && (total == 0 || got < int(total)); got++ {
if n, err = r.conn.Read(cBuf); err != nil {
return nil, fmt.Errorf("Read: %s", err)
}
cHead, cBuf = cBuf[:2], cBuf[:n]
if bytes.Equal(cHead, magicChunked) {
//fmt.Printf("chunked %v\n", cBuf[:14])
cid, seq, total = cBuf[2:2+8], cBuf[2+8], cBuf[2+8+1]
if ocid != nil && !bytes.Equal(cid, ocid) {
return nil, fmt.Errorf("out-of-band message %v (awaited %v)", cid, ocid)
} else if ocid == nil {
ocid = cid
chunks = make([][]byte, total)
}
n = len(cBuf) - chunkedHeaderLen
//fmt.Printf("setting chunks[%d]: %d\n", seq, n)
chunks[seq] = append(make([]byte, 0, n), cBuf[chunkedHeaderLen:]...)
length += n
} else { //not chunked
if total > 0 {
return nil, fmt.Errorf("out-of-band message (not chunked)")
}
break
}
}
//fmt.Printf("\nchunks: %v\n", chunks)
if length > 0 {
if cap(cBuf) < length {
cBuf = append(cBuf, make([]byte, 0, length-cap(cBuf))...)
}
cBuf = cBuf[:0]
for i := range chunks {
//fmt.Printf("appending %d %v\n", i, chunks[i])
cBuf = append(cBuf, chunks[i]...)
}
cHead = cBuf[:2]
}
// the data we get from the wire is compressed
if bytes.Equal(cHead, magicGzip) {
cReader, err = gzip.NewReader(bytes.NewReader(cBuf))
} else if cHead[0] == magicZlib[0] &&
(int(cHead[0])*256+int(cHead[1]))%31 == 0 {
// zlib is slightly more complicated, but correct
cReader, err = zlib.NewReader(bytes.NewReader(cBuf))
} else {
// compliance with https://github.com/Graylog2/graylog2-server
// treating all messages as uncompressed if they are not gzip, zlib or
// chunked
cReader = bytes.NewReader(cBuf)
}
if err != nil {
return nil, fmt.Errorf("NewReader: %s", err)
}
msg := new(Message)
if err := json.NewDecoder(cReader).Decode(&msg); err != nil {
return nil, fmt.Errorf("json.Unmarshal: %s", err)
}
return msg, nil
}
|