This file is indexed.

/usr/share/gocode/src/github.com/hashicorp/serf/serf/serf.go is in golang-github-hashicorp-serf-dev 0.7.0~ds1-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

   1
   2
   3
   4
   5
   6
   7
   8
   9
  10
  11
  12
  13
  14
  15
  16
  17
  18
  19
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
package serf

import (
	"bytes"
	"encoding/base64"
	"encoding/json"
	"errors"
	"fmt"
	"io/ioutil"
	"log"
	"math/rand"
	"net"
	"strconv"
	"sync"
	"time"

	"github.com/armon/go-metrics"
	"github.com/hashicorp/go-msgpack/codec"
	"github.com/hashicorp/memberlist"
	"github.com/hashicorp/serf/coordinate"
)

// These are the protocol versions that Serf can _understand_. These are
// Serf-level protocol versions that are passed down as the delegate
// version to memberlist below.
const (
	ProtocolVersionMin uint8 = 2
	ProtocolVersionMax       = 4
)

const (
	// Used to detect if the meta data is tags
	// or if it is a raw role
	tagMagicByte uint8 = 255
)

var (
	// FeatureNotSupported is returned if a feature cannot be used
	// due to an older protocol version being used.
	FeatureNotSupported = fmt.Errorf("Feature not supported")
)

func init() {
	// Seed the random number generator
	rand.Seed(time.Now().UnixNano())
}

// Serf is a single node that is part of a single cluster that gets
// events about joins/leaves/failures/etc. It is created with the Create
// method.
//
// All functions on the Serf structure are safe to call concurrently.
type Serf struct {
	// The clocks for different purposes. These MUST be the first things
	// in this struct due to Golang issue #599.
	clock      LamportClock
	eventClock LamportClock
	queryClock LamportClock

	broadcasts    *memberlist.TransmitLimitedQueue
	config        *Config
	failedMembers []*memberState
	leftMembers   []*memberState
	memberlist    *memberlist.Memberlist
	memberLock    sync.RWMutex
	members       map[string]*memberState

	// Circular buffers for recent intents, used
	// in case we get the intent before the relevant event
	recentLeave      []nodeIntent
	recentLeaveIndex int
	recentJoin       []nodeIntent
	recentJoinIndex  int

	eventBroadcasts *memberlist.TransmitLimitedQueue
	eventBuffer     []*userEvents
	eventJoinIgnore bool
	eventMinTime    LamportTime
	eventLock       sync.RWMutex

	queryBroadcasts *memberlist.TransmitLimitedQueue
	queryBuffer     []*queries
	queryMinTime    LamportTime
	queryResponse   map[LamportTime]*QueryResponse
	queryLock       sync.RWMutex

	logger     *log.Logger
	joinLock   sync.Mutex
	stateLock  sync.Mutex
	state      SerfState
	shutdownCh chan struct{}

	snapshotter *Snapshotter
	keyManager  *KeyManager

	coordClient    *coordinate.Client
	coordCache     map[string]*coordinate.Coordinate
	coordCacheLock sync.RWMutex
}

// SerfState is the state of the Serf instance.
type SerfState int

const (
	SerfAlive SerfState = iota
	SerfLeaving
	SerfLeft
	SerfShutdown
)

func (s SerfState) String() string {
	switch s {
	case SerfAlive:
		return "alive"
	case SerfLeaving:
		return "leaving"
	case SerfLeft:
		return "left"
	case SerfShutdown:
		return "shutdown"
	default:
		return "unknown"
	}
}

// Member is a single member of the Serf cluster.
type Member struct {
	Name   string
	Addr   net.IP
	Port   uint16
	Tags   map[string]string
	Status MemberStatus

	// The minimum, maximum, and current values of the protocol versions
	// and delegate (Serf) protocol versions that each member can understand
	// or is speaking.
	ProtocolMin uint8
	ProtocolMax uint8
	ProtocolCur uint8
	DelegateMin uint8
	DelegateMax uint8
	DelegateCur uint8
}

// MemberStatus is the state that a member is in.
type MemberStatus int

const (
	StatusNone MemberStatus = iota
	StatusAlive
	StatusLeaving
	StatusLeft
	StatusFailed
)

func (s MemberStatus) String() string {
	switch s {
	case StatusNone:
		return "none"
	case StatusAlive:
		return "alive"
	case StatusLeaving:
		return "leaving"
	case StatusLeft:
		return "left"
	case StatusFailed:
		return "failed"
	default:
		panic(fmt.Sprintf("unknown MemberStatus: %d", s))
	}
}

// memberState is used to track members that are no longer active due to
// leaving, failing, partitioning, etc. It tracks the member along with
// when that member was marked as leaving.
type memberState struct {
	Member
	statusLTime LamportTime // lamport clock time of last received message
	leaveTime   time.Time   // wall clock time of leave
}

// nodeIntent is used to buffer intents for out-of-order deliveries
type nodeIntent struct {
	LTime LamportTime
	Node  string
}

// userEvent is used to buffer events to prevent re-delivery
type userEvent struct {
	Name    string
	Payload []byte
}

func (ue *userEvent) Equals(other *userEvent) bool {
	if ue.Name != other.Name {
		return false
	}
	if bytes.Compare(ue.Payload, other.Payload) != 0 {
		return false
	}
	return true
}

// userEvents stores all the user events at a specific time
type userEvents struct {
	LTime  LamportTime
	Events []userEvent
}

// queries stores all the query ids at a specific time
type queries struct {
	LTime    LamportTime
	QueryIDs []uint32
}

const (
	UserEventSizeLimit     = 512        // Maximum byte size for event name and payload
	snapshotSizeLimit      = 128 * 1024 // Maximum 128 KB snapshot
)

// Create creates a new Serf instance, starting all the background tasks
// to maintain cluster membership information.
//
// After calling this function, the configuration should no longer be used
// or modified by the caller.
func Create(conf *Config) (*Serf, error) {
	conf.Init()
	if conf.ProtocolVersion < ProtocolVersionMin {
		return nil, fmt.Errorf("Protocol version '%d' too low. Must be in range: [%d, %d]",
			conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
	} else if conf.ProtocolVersion > ProtocolVersionMax {
		return nil, fmt.Errorf("Protocol version '%d' too high. Must be in range: [%d, %d]",
			conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
	}

	serf := &Serf{
		config:        conf,
		logger:        log.New(conf.LogOutput, "", log.LstdFlags),
		members:       make(map[string]*memberState),
		queryResponse: make(map[LamportTime]*QueryResponse),
		shutdownCh:    make(chan struct{}),
		state:         SerfAlive,
	}

	// Check that the meta data length is okay
	if len(serf.encodeTags(conf.Tags)) > memberlist.MetaMaxSize {
		return nil, fmt.Errorf("Encoded length of tags exceeds limit of %d bytes", memberlist.MetaMaxSize)
	}

	// Check if serf member event coalescing is enabled
	if conf.CoalescePeriod > 0 && conf.QuiescentPeriod > 0 && conf.EventCh != nil {
		c := &memberEventCoalescer{
			lastEvents:   make(map[string]EventType),
			latestEvents: make(map[string]coalesceEvent),
		}

		conf.EventCh = coalescedEventCh(conf.EventCh, serf.shutdownCh,
			conf.CoalescePeriod, conf.QuiescentPeriod, c)
	}

	// Check if user event coalescing is enabled
	if conf.UserCoalescePeriod > 0 && conf.UserQuiescentPeriod > 0 && conf.EventCh != nil {
		c := &userEventCoalescer{
			events: make(map[string]*latestUserEvents),
		}

		conf.EventCh = coalescedEventCh(conf.EventCh, serf.shutdownCh,
			conf.UserCoalescePeriod, conf.UserQuiescentPeriod, c)
	}

	// Listen for internal Serf queries. This is setup before the snapshotter, since
	// we want to capture the query-time, but the internal listener does not passthrough
	// the queries
	outCh, err := newSerfQueries(serf, serf.logger, conf.EventCh, serf.shutdownCh)
	if err != nil {
		return nil, fmt.Errorf("Failed to setup serf query handler: %v", err)
	}
	conf.EventCh = outCh

	// Set up network coordinate client.
	if !conf.DisableCoordinates {
		serf.coordClient, err = coordinate.NewClient(coordinate.DefaultConfig())
		if err != nil {
			return nil, fmt.Errorf("Failed to create coordinate client: %v", err)
		}
	}

	// Try access the snapshot
	var oldClock, oldEventClock, oldQueryClock LamportTime
	var prev []*PreviousNode
	if conf.SnapshotPath != "" {
		eventCh, snap, err := NewSnapshotter(
			conf.SnapshotPath,
			snapshotSizeLimit,
			conf.RejoinAfterLeave,
			serf.logger,
			&serf.clock,
			serf.coordClient,
			conf.EventCh,
			serf.shutdownCh)
		if err != nil {
			return nil, fmt.Errorf("Failed to setup snapshot: %v", err)
		}
		serf.snapshotter = snap
		conf.EventCh = eventCh
		prev = snap.AliveNodes()
		oldClock = snap.LastClock()
		oldEventClock = snap.LastEventClock()
		oldQueryClock = snap.LastQueryClock()
		serf.eventMinTime = oldEventClock + 1
		serf.queryMinTime = oldQueryClock + 1
	}

	// Set up the coordinate cache. We do this after we read the snapshot to
	// make sure we get a good initial value from there, if we got one.
	if !conf.DisableCoordinates {
		serf.coordCache = make(map[string]*coordinate.Coordinate)
		serf.coordCache[conf.NodeName] = serf.coordClient.GetCoordinate()
	}

	// Setup the various broadcast queues, which we use to send our own
	// custom broadcasts along the gossip channel.
	serf.broadcasts = &memberlist.TransmitLimitedQueue{
		NumNodes: func() int {
			return len(serf.members)
		},
		RetransmitMult: conf.MemberlistConfig.RetransmitMult,
	}
	serf.eventBroadcasts = &memberlist.TransmitLimitedQueue{
		NumNodes: func() int {
			return len(serf.members)
		},
		RetransmitMult: conf.MemberlistConfig.RetransmitMult,
	}
	serf.queryBroadcasts = &memberlist.TransmitLimitedQueue{
		NumNodes: func() int {
			return len(serf.members)
		},
		RetransmitMult: conf.MemberlistConfig.RetransmitMult,
	}

	// Create the buffer for recent intents
	serf.recentJoin = make([]nodeIntent, conf.RecentIntentBuffer)
	serf.recentLeave = make([]nodeIntent, conf.RecentIntentBuffer)

	// Create a buffer for events and queries
	serf.eventBuffer = make([]*userEvents, conf.EventBuffer)
	serf.queryBuffer = make([]*queries, conf.QueryBuffer)

	// Ensure our lamport clock is at least 1, so that the default
	// join LTime of 0 does not cause issues
	serf.clock.Increment()
	serf.eventClock.Increment()
	serf.queryClock.Increment()

	// Restore the clock from snap if we have one
	serf.clock.Witness(oldClock)
	serf.eventClock.Witness(oldEventClock)
	serf.queryClock.Witness(oldQueryClock)

	// Modify the memberlist configuration with keys that we set
	conf.MemberlistConfig.Events = &eventDelegate{serf: serf}
	conf.MemberlistConfig.Conflict = &conflictDelegate{serf: serf}
	conf.MemberlistConfig.Delegate = &delegate{serf: serf}
	conf.MemberlistConfig.DelegateProtocolVersion = conf.ProtocolVersion
	conf.MemberlistConfig.DelegateProtocolMin = ProtocolVersionMin
	conf.MemberlistConfig.DelegateProtocolMax = ProtocolVersionMax
	conf.MemberlistConfig.Name = conf.NodeName
	conf.MemberlistConfig.ProtocolVersion = ProtocolVersionMap[conf.ProtocolVersion]
	if !conf.DisableCoordinates {
		conf.MemberlistConfig.Ping = &pingDelegate{serf: serf}
	}

	// Setup a merge delegate if necessary
	if conf.Merge != nil {
		md := &mergeDelegate{serf: serf}
		conf.MemberlistConfig.Merge = md
		conf.MemberlistConfig.Alive = md
	}

	// Create the underlying memberlist that will manage membership
	// and failure detection for the Serf instance.
	memberlist, err := memberlist.Create(conf.MemberlistConfig)
	if err != nil {
		return nil, fmt.Errorf("Failed to create memberlist: %v", err)
	}

	serf.memberlist = memberlist

	// Create a key manager for handling all encryption key changes
	serf.keyManager = &KeyManager{serf: serf}

	// Start the background tasks. See the documentation above each method
	// for more information on their role.
	go serf.handleReap()
	go serf.handleReconnect()
	go serf.checkQueueDepth("Intent", serf.broadcasts)
	go serf.checkQueueDepth("Event", serf.eventBroadcasts)
	go serf.checkQueueDepth("Query", serf.queryBroadcasts)

	// Attempt to re-join the cluster if we have known nodes
	if len(prev) != 0 {
		go serf.handleRejoin(prev)
	}

	return serf, nil
}

// ProtocolVersion returns the current protocol version in use by Serf.
// This is the Serf protocol version, not the memberlist protocol version.
func (s *Serf) ProtocolVersion() uint8 {
	return s.config.ProtocolVersion
}

// EncryptionEnabled is a predicate that determines whether or not encryption
// is enabled, which can be possible in one of 2 cases:
//   - Single encryption key passed at agent start (no persistence)
//   - Keyring file provided at agent start
func (s *Serf) EncryptionEnabled() bool {
	return s.config.MemberlistConfig.Keyring != nil
}

// KeyManager returns the key manager for the current Serf instance.
func (s *Serf) KeyManager() *KeyManager {
	return s.keyManager
}

// UserEvent is used to broadcast a custom user event with a given
// name and payload. The events must be fairly small, and if the
// size limit is exceeded and error will be returned. If coalesce is enabled,
// nodes are allowed to coalesce this event. Coalescing is only available
// starting in v0.2
func (s *Serf) UserEvent(name string, payload []byte, coalesce bool) error {
	// Check the size limit
	if len(name)+len(payload) > UserEventSizeLimit {
		return fmt.Errorf("user event exceeds limit of %d bytes", UserEventSizeLimit)
	}

	// Create a message
	msg := messageUserEvent{
		LTime:   s.eventClock.Time(),
		Name:    name,
		Payload: payload,
		CC:      coalesce,
	}
	s.eventClock.Increment()

	// Process update locally
	s.handleUserEvent(&msg)

	// Start broadcasting the event
	raw, err := encodeMessage(messageUserEventType, &msg)
	if err != nil {
		return err
	}
	s.eventBroadcasts.QueueBroadcast(&broadcast{
		msg: raw,
	})
	return nil
}

// Query is used to broadcast a new query. The query must be fairly small,
// and an error will be returned if the size limit is exceeded. This is only
// available with protocol version 4 and newer. Query parameters are optional,
// and if not provided, a sane set of defaults will be used.
func (s *Serf) Query(name string, payload []byte, params *QueryParam) (*QueryResponse, error) {
	// Check that the latest protocol is in use
	if s.ProtocolVersion() < 4 {
		return nil, FeatureNotSupported
	}

	// Provide default parameters if none given
	if params == nil {
		params = s.DefaultQueryParams()
	} else if params.Timeout == 0 {
		params.Timeout = s.DefaultQueryTimeout()
	}

	// Get the local node
	local := s.memberlist.LocalNode()

	// Encode the filters
	filters, err := params.encodeFilters()
	if err != nil {
		return nil, fmt.Errorf("Failed to format filters: %v", err)
	}

	// Setup the flags
	var flags uint32
	if params.RequestAck {
		flags |= queryFlagAck
	}

	// Create a message
	q := messageQuery{
		LTime:   s.queryClock.Time(),
		ID:      uint32(rand.Int31()),
		Addr:    local.Addr,
		Port:    local.Port,
		Filters: filters,
		Flags:   flags,
		Timeout: params.Timeout,
		Name:    name,
		Payload: payload,
	}

	// Encode the query
	raw, err := encodeMessage(messageQueryType, &q)
	if err != nil {
		return nil, err
	}

	// Check the size
	if len(raw) > s.config.QuerySizeLimit {
		return nil, fmt.Errorf("query exceeds limit of %d bytes", s.config.QuerySizeLimit)
	}

	// Register QueryResponse to track acks and responses
	resp := newQueryResponse(s.memberlist.NumMembers(), &q)
	s.registerQueryResponse(params.Timeout, resp)

	// Process query locally
	s.handleQuery(&q)

	// Start broadcasting the event
	s.queryBroadcasts.QueueBroadcast(&broadcast{
		msg: raw,
	})
	return resp, nil
}

// registerQueryResponse is used to setup the listeners for the query,
// and to schedule closing the query after the timeout.
func (s *Serf) registerQueryResponse(timeout time.Duration, resp *QueryResponse) {
	s.queryLock.Lock()
	defer s.queryLock.Unlock()

	// Map the LTime to the QueryResponse. This is necessarily 1-to-1,
	// since we increment the time for each new query.
	s.queryResponse[resp.lTime] = resp

	// Setup a timer to close the response and deregister after the timeout
	time.AfterFunc(timeout, func() {
		s.queryLock.Lock()
		delete(s.queryResponse, resp.lTime)
		resp.Close()
		s.queryLock.Unlock()
	})
}

// SetTags is used to dynamically update the tags associated with
// the local node. This will propagate the change to the rest of
// the cluster. Blocks until a the message is broadcast out.
func (s *Serf) SetTags(tags map[string]string) error {
	// Check that the meta data length is okay
	if len(s.encodeTags(tags)) > memberlist.MetaMaxSize {
		return fmt.Errorf("Encoded length of tags exceeds limit of %d bytes",
			memberlist.MetaMaxSize)
	}

	// Update the config
	s.config.Tags = tags

	// Trigger a memberlist update
	return s.memberlist.UpdateNode(s.config.BroadcastTimeout)
}

// Join joins an existing Serf cluster. Returns the number of nodes
// successfully contacted. The returned error will be non-nil only in the
// case that no nodes could be contacted. If ignoreOld is true, then any
// user messages sent prior to the join will be ignored.
func (s *Serf) Join(existing []string, ignoreOld bool) (int, error) {
	// Do a quick state check
	if s.State() != SerfAlive {
		return 0, fmt.Errorf("Serf can't Join after Leave or Shutdown")
	}

	// Hold the joinLock, this is to make eventJoinIgnore safe
	s.joinLock.Lock()
	defer s.joinLock.Unlock()

	// Ignore any events from a potential join. This is safe since we hold
	// the joinLock and nobody else can be doing a Join
	if ignoreOld {
		s.eventJoinIgnore = true
		defer func() {
			s.eventJoinIgnore = false
		}()
	}

	// Have memberlist attempt to join
	num, err := s.memberlist.Join(existing)

	// If we joined any nodes, broadcast the join message
	if num > 0 {
		// Start broadcasting the update
		if err := s.broadcastJoin(s.clock.Time()); err != nil {
			return num, err
		}
	}

	return num, err
}

// broadcastJoin broadcasts a new join intent with a
// given clock value. It is used on either join, or if
// we need to refute an older leave intent. Cannot be called
// with the memberLock held.
func (s *Serf) broadcastJoin(ltime LamportTime) error {
	// Construct message to update our lamport clock
	msg := messageJoin{
		LTime: ltime,
		Node:  s.config.NodeName,
	}
	s.clock.Witness(ltime)

	// Process update locally
	s.handleNodeJoinIntent(&msg)

	// Start broadcasting the update
	if err := s.broadcast(messageJoinType, &msg, nil); err != nil {
		s.logger.Printf("[WARN] serf: Failed to broadcast join intent: %v", err)
		return err
	}
	return nil
}

// Leave gracefully exits the cluster. It is safe to call this multiple
// times.
func (s *Serf) Leave() error {
	// Check the current state
	s.stateLock.Lock()
	if s.state == SerfLeft {
		s.stateLock.Unlock()
		return nil
	} else if s.state == SerfLeaving {
		s.stateLock.Unlock()
		return fmt.Errorf("Leave already in progress")
	} else if s.state == SerfShutdown {
		s.stateLock.Unlock()
		return fmt.Errorf("Leave called after Shutdown")
	}
	s.state = SerfLeaving
	s.stateLock.Unlock()

	// If we have a snapshot, mark we are leaving
	if s.snapshotter != nil {
		s.snapshotter.Leave()
	}

	// Construct the message for the graceful leave
	msg := messageLeave{
		LTime: s.clock.Time(),
		Node:  s.config.NodeName,
	}
	s.clock.Increment()

	// Process the leave locally
	s.handleNodeLeaveIntent(&msg)

	// Only broadcast the leave message if there is at least one
	// other node alive.
	if s.hasAliveMembers() {
		notifyCh := make(chan struct{})
		if err := s.broadcast(messageLeaveType, &msg, notifyCh); err != nil {
			return err
		}

		select {
		case <-notifyCh:
		case <-time.After(s.config.BroadcastTimeout):
			return errors.New("timeout while waiting for graceful leave")
		}
	}

	// Attempt the memberlist leave
	err := s.memberlist.Leave(s.config.BroadcastTimeout)
	if err != nil {
		return err
	}

	// Transition to Left only if we not already shutdown
	s.stateLock.Lock()
	if s.state != SerfShutdown {
		s.state = SerfLeft
	}
	s.stateLock.Unlock()
	return nil
}

// hasAliveMembers is called to check for any alive members other than
// ourself.
func (s *Serf) hasAliveMembers() bool {
	s.memberLock.RLock()
	defer s.memberLock.RUnlock()

	hasAlive := false
	for _, m := range s.members {
		// Skip ourself, we want to know if OTHER members are alive
		if m.Name == s.config.NodeName {
			continue
		}

		if m.Status == StatusAlive {
			hasAlive = true
			break
		}
	}
	return hasAlive
}

// LocalMember returns the Member information for the local node
func (s *Serf) LocalMember() Member {
	s.memberLock.RLock()
	defer s.memberLock.RUnlock()
	return s.members[s.config.NodeName].Member
}

// Members returns a point-in-time snapshot of the members of this cluster.
func (s *Serf) Members() []Member {
	s.memberLock.RLock()
	defer s.memberLock.RUnlock()

	members := make([]Member, 0, len(s.members))
	for _, m := range s.members {
		members = append(members, m.Member)
	}

	return members
}

// RemoveFailedNode forcibly removes a failed node from the cluster
// immediately, instead of waiting for the reaper to eventually reclaim it.
// This also has the effect that Serf will no longer attempt to reconnect
// to this node.
func (s *Serf) RemoveFailedNode(node string) error {
	// Construct the message to broadcast
	msg := messageLeave{
		LTime: s.clock.Time(),
		Node:  node,
	}
	s.clock.Increment()

	// Process our own event
	s.handleNodeLeaveIntent(&msg)

	// If we have no members, then we don't need to broadcast
	if !s.hasAliveMembers() {
		return nil
	}

	// Broadcast the remove
	notifyCh := make(chan struct{})
	if err := s.broadcast(messageLeaveType, &msg, notifyCh); err != nil {
		return err
	}

	// Wait for the broadcast
	select {
	case <-notifyCh:
	case <-time.After(s.config.BroadcastTimeout):
		return fmt.Errorf("timed out broadcasting node removal")
	}

	return nil
}

// Shutdown forcefully shuts down the Serf instance, stopping all network
// activity and background maintenance associated with the instance.
//
// This is not a graceful shutdown, and should be preceded by a call
// to Leave. Otherwise, other nodes in the cluster will detect this node's
// exit as a node failure.
//
// It is safe to call this method multiple times.
func (s *Serf) Shutdown() error {
	s.stateLock.Lock()
	defer s.stateLock.Unlock()

	if s.state == SerfShutdown {
		return nil
	}

	if s.state != SerfLeft {
		s.logger.Printf("[WARN] serf: Shutdown without a Leave")
	}

	s.state = SerfShutdown
	close(s.shutdownCh)

	err := s.memberlist.Shutdown()
	if err != nil {
		return err
	}

	// Wait for the snapshoter to finish if we have one
	if s.snapshotter != nil {
		s.snapshotter.Wait()
	}

	return nil
}

// ShutdownCh returns a channel that can be used to wait for
// Serf to shutdown.
func (s *Serf) ShutdownCh() <-chan struct{} {
	return s.shutdownCh
}

// Memberlist is used to get access to the underlying Memberlist instance
func (s *Serf) Memberlist() *memberlist.Memberlist {
	return s.memberlist
}

// State is the current state of this Serf instance.
func (s *Serf) State() SerfState {
	s.stateLock.Lock()
	defer s.stateLock.Unlock()
	return s.state
}

// broadcast takes a Serf message type, encodes it for the wire, and queues
// the broadcast. If a notify channel is given, this channel will be closed
// when the broadcast is sent.
func (s *Serf) broadcast(t messageType, msg interface{}, notify chan<- struct{}) error {
	raw, err := encodeMessage(t, msg)
	if err != nil {
		return err
	}

	s.broadcasts.QueueBroadcast(&broadcast{
		msg:    raw,
		notify: notify,
	})
	return nil
}

// handleNodeJoin is called when a node join event is received
// from memberlist.
func (s *Serf) handleNodeJoin(n *memberlist.Node) {
	s.memberLock.Lock()
	defer s.memberLock.Unlock()

	var oldStatus MemberStatus
	member, ok := s.members[n.Name]
	if !ok {
		oldStatus = StatusNone
		member = &memberState{
			Member: Member{
				Name:   n.Name,
				Addr:   net.IP(n.Addr),
				Port:   n.Port,
				Tags:   s.decodeTags(n.Meta),
				Status: StatusAlive,
			},
		}

		// Check if we have a join intent and use the LTime
		if join := recentIntent(s.recentJoin, n.Name); join != nil {
			member.statusLTime = join.LTime
		}

		// Check if we have a leave intent
		if leave := recentIntent(s.recentLeave, n.Name); leave != nil {
			if leave.LTime > member.statusLTime {
				member.Status = StatusLeaving
				member.statusLTime = leave.LTime
			}
		}

		s.members[n.Name] = member
	} else {
		oldStatus = member.Status
		member.Status = StatusAlive
		member.leaveTime = time.Time{}
		member.Addr = net.IP(n.Addr)
		member.Port = n.Port
		member.Tags = s.decodeTags(n.Meta)
	}

	// Update the protocol versions every time we get an event
	member.ProtocolMin = n.PMin
	member.ProtocolMax = n.PMax
	member.ProtocolCur = n.PCur
	member.DelegateMin = n.DMin
	member.DelegateMax = n.DMax
	member.DelegateCur = n.DCur

	// If node was previously in a failed state, then clean up some
	// internal accounting.
	// TODO(mitchellh): needs tests to verify not reaped
	if oldStatus == StatusFailed || oldStatus == StatusLeft {
		s.failedMembers = removeOldMember(s.failedMembers, member.Name)
		s.leftMembers = removeOldMember(s.leftMembers, member.Name)
	}

	// Update some metrics
	metrics.IncrCounter([]string{"serf", "member", "join"}, 1)

	// Send an event along
	s.logger.Printf("[INFO] serf: EventMemberJoin: %s %s",
		member.Member.Name, member.Member.Addr)
	if s.config.EventCh != nil {
		s.config.EventCh <- MemberEvent{
			Type:    EventMemberJoin,
			Members: []Member{member.Member},
		}
	}
}

// handleNodeLeave is called when a node leave event is received
// from memberlist.
func (s *Serf) handleNodeLeave(n *memberlist.Node) {
	s.memberLock.Lock()
	defer s.memberLock.Unlock()

	member, ok := s.members[n.Name]
	if !ok {
		// We've never even heard of this node that is supposedly
		// leaving. Just ignore it completely.
		return
	}

	switch member.Status {
	case StatusLeaving:
		member.Status = StatusLeft
		member.leaveTime = time.Now()
		s.leftMembers = append(s.leftMembers, member)
	case StatusAlive:
		member.Status = StatusFailed
		member.leaveTime = time.Now()
		s.failedMembers = append(s.failedMembers, member)
	default:
		// Unknown state that it was in? Just don't do anything
		s.logger.Printf("[WARN] serf: Bad state when leave: %d", member.Status)
		return
	}

	// Send an event along
	event := EventMemberLeave
	eventStr := "EventMemberLeave"
	if member.Status != StatusLeft {
		event = EventMemberFailed
		eventStr = "EventMemberFailed"
	}

	// Update some metrics
	metrics.IncrCounter([]string{"serf", "member", member.Status.String()}, 1)

	s.logger.Printf("[INFO] serf: %s: %s %s",
		eventStr, member.Member.Name, member.Member.Addr)
	if s.config.EventCh != nil {
		s.config.EventCh <- MemberEvent{
			Type:    event,
			Members: []Member{member.Member},
		}
	}
}

// handleNodeUpdate is called when a node meta data update
// has taken place
func (s *Serf) handleNodeUpdate(n *memberlist.Node) {
	s.memberLock.Lock()
	defer s.memberLock.Unlock()

	member, ok := s.members[n.Name]
	if !ok {
		// We've never even heard of this node that is updating.
		// Just ignore it completely.
		return
	}

	// Update the member attributes
	member.Addr = net.IP(n.Addr)
	member.Port = n.Port
	member.Tags = s.decodeTags(n.Meta)

	// Snag the latest versions. NOTE - the current memberlist code will NOT
	// fire an update event if the metadata (for Serf, tags) stays the same
	// and only the protocol versions change. If we wake any Serf-level
	// protocol changes where we want to get this event under those
	// circumstances, we will need to update memberlist to do a check of
	// versions as well as the metadata.
	member.ProtocolMin = n.PMin
	member.ProtocolMax = n.PMax
	member.ProtocolCur = n.PCur
	member.DelegateMin = n.DMin
	member.DelegateMax = n.DMax
	member.DelegateCur = n.DCur

	// Update some metrics
	metrics.IncrCounter([]string{"serf", "member", "update"}, 1)

	// Send an event along
	s.logger.Printf("[INFO] serf: EventMemberUpdate: %s", member.Member.Name)
	if s.config.EventCh != nil {
		s.config.EventCh <- MemberEvent{
			Type:    EventMemberUpdate,
			Members: []Member{member.Member},
		}
	}
}

// handleNodeLeaveIntent is called when an intent to leave is received.
func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool {
	// Witness a potentially newer time
	s.clock.Witness(leaveMsg.LTime)

	s.memberLock.Lock()
	defer s.memberLock.Unlock()

	member, ok := s.members[leaveMsg.Node]
	if !ok {
		// If we've already seen this message don't rebroadcast
		if recentIntent(s.recentLeave, leaveMsg.Node) != nil {
			return false
		}

		// We don't know this member so store it in a buffer for now
		s.recentLeave[s.recentLeaveIndex] = nodeIntent{
			LTime: leaveMsg.LTime,
			Node:  leaveMsg.Node,
		}
		s.recentLeaveIndex = (s.recentLeaveIndex + 1) % len(s.recentLeave)
		return true
	}

	// If the message is old, then it is irrelevant and we can skip it
	if leaveMsg.LTime <= member.statusLTime {
		return false
	}

	// Refute us leaving if we are in the alive state
	// Must be done in another goroutine since we have the memberLock
	if leaveMsg.Node == s.config.NodeName && s.state == SerfAlive {
		s.logger.Printf("[DEBUG] serf: Refuting an older leave intent")
		go s.broadcastJoin(s.clock.Time())
		return false
	}

	// State transition depends on current state
	switch member.Status {
	case StatusAlive:
		member.Status = StatusLeaving
		member.statusLTime = leaveMsg.LTime
		return true
	case StatusFailed:
		member.Status = StatusLeft
		member.statusLTime = leaveMsg.LTime

		// Remove from the failed list and add to the left list. We add
		// to the left list so that when we do a sync, other nodes will
		// remove it from their failed list.
		s.failedMembers = removeOldMember(s.failedMembers, member.Name)
		s.leftMembers = append(s.leftMembers, member)

		// We must push a message indicating the node has now
		// left to allow higher-level applications to handle the
		// graceful leave.
		s.logger.Printf("[INFO] serf: EventMemberLeave (forced): %s %s",
			member.Member.Name, member.Member.Addr)
		if s.config.EventCh != nil {
			s.config.EventCh <- MemberEvent{
				Type:    EventMemberLeave,
				Members: []Member{member.Member},
			}
		}
		return true
	default:
		return false
	}
}

// handleNodeJoinIntent is called when a node broadcasts a
// join message to set the lamport time of its join
func (s *Serf) handleNodeJoinIntent(joinMsg *messageJoin) bool {
	// Witness a potentially newer time
	s.clock.Witness(joinMsg.LTime)

	s.memberLock.Lock()
	defer s.memberLock.Unlock()

	member, ok := s.members[joinMsg.Node]
	if !ok {
		// If we've already seen this message don't rebroadcast
		if recentIntent(s.recentJoin, joinMsg.Node) != nil {
			return false
		}

		// We don't know this member so store it in a buffer for now
		s.recentJoin[s.recentJoinIndex] = nodeIntent{LTime: joinMsg.LTime, Node: joinMsg.Node}
		s.recentJoinIndex = (s.recentJoinIndex + 1) % len(s.recentJoin)
		return true
	}

	// Check if this time is newer than what we have
	if joinMsg.LTime <= member.statusLTime {
		return false
	}

	// Update the LTime
	member.statusLTime = joinMsg.LTime

	// If we are in the leaving state, we should go back to alive,
	// since the leaving message must have been for an older time
	if member.Status == StatusLeaving {
		member.Status = StatusAlive
	}
	return true
}

// handleUserEvent is called when a user event broadcast is
// received. Returns if the message should be rebroadcast.
func (s *Serf) handleUserEvent(eventMsg *messageUserEvent) bool {
	// Witness a potentially newer time
	s.eventClock.Witness(eventMsg.LTime)

	s.eventLock.Lock()
	defer s.eventLock.Unlock()

	// Ignore if it is before our minimum event time
	if eventMsg.LTime < s.eventMinTime {
		return false
	}

	// Check if this message is too old
	curTime := s.eventClock.Time()
	if curTime > LamportTime(len(s.eventBuffer)) &&
		eventMsg.LTime < curTime-LamportTime(len(s.eventBuffer)) {
		s.logger.Printf(
			"[WARN] serf: received old event %s from time %d (current: %d)",
			eventMsg.Name,
			eventMsg.LTime,
			s.eventClock.Time())
		return false
	}

	// Check if we've already seen this
	idx := eventMsg.LTime % LamportTime(len(s.eventBuffer))
	seen := s.eventBuffer[idx]
	userEvent := userEvent{Name: eventMsg.Name, Payload: eventMsg.Payload}
	if seen != nil && seen.LTime == eventMsg.LTime {
		for _, previous := range seen.Events {
			if previous.Equals(&userEvent) {
				return false
			}
		}
	} else {
		seen = &userEvents{LTime: eventMsg.LTime}
		s.eventBuffer[idx] = seen
	}

	// Add to recent events
	seen.Events = append(seen.Events, userEvent)

	// Update some metrics
	metrics.IncrCounter([]string{"serf", "events"}, 1)
	metrics.IncrCounter([]string{"serf", "events", eventMsg.Name}, 1)

	if s.config.EventCh != nil {
		s.config.EventCh <- UserEvent{
			LTime:    eventMsg.LTime,
			Name:     eventMsg.Name,
			Payload:  eventMsg.Payload,
			Coalesce: eventMsg.CC,
		}
	}
	return true
}

// handleQuery is called when a query broadcast is
// received. Returns if the message should be rebroadcast.
func (s *Serf) handleQuery(query *messageQuery) bool {
	// Witness a potentially newer time
	s.queryClock.Witness(query.LTime)

	s.queryLock.Lock()
	defer s.queryLock.Unlock()

	// Ignore if it is before our minimum query time
	if query.LTime < s.queryMinTime {
		return false
	}

	// Check if this message is too old
	curTime := s.queryClock.Time()
	if curTime > LamportTime(len(s.queryBuffer)) &&
		query.LTime < curTime-LamportTime(len(s.queryBuffer)) {
		s.logger.Printf(
			"[WARN] serf: received old query %s from time %d (current: %d)",
			query.Name,
			query.LTime,
			s.queryClock.Time())
		return false
	}

	// Check if we've already seen this
	idx := query.LTime % LamportTime(len(s.queryBuffer))
	seen := s.queryBuffer[idx]
	if seen != nil && seen.LTime == query.LTime {
		for _, previous := range seen.QueryIDs {
			if previous == query.ID {
				// Seen this ID already
				return false
			}
		}
	} else {
		seen = &queries{LTime: query.LTime}
		s.queryBuffer[idx] = seen
	}

	// Add to recent queries
	seen.QueryIDs = append(seen.QueryIDs, query.ID)

	// Update some metrics
	metrics.IncrCounter([]string{"serf", "queries"}, 1)
	metrics.IncrCounter([]string{"serf", "queries", query.Name}, 1)

	// Check if we should rebroadcast, this may be disabled by a flag
	rebroadcast := true
	if query.NoBroadcast() {
		rebroadcast = false
	}

	// Filter the query
	if !s.shouldProcessQuery(query.Filters) {
		// Even if we don't process it further, we should rebroadcast,
		// since it is the first time we've seen this.
		return rebroadcast
	}

	// Send ack if requested, without waiting for client to Respond()
	if query.Ack() {
		ack := messageQueryResponse{
			LTime: query.LTime,
			ID:    query.ID,
			From:  s.config.NodeName,
			Flags: queryFlagAck,
		}
		raw, err := encodeMessage(messageQueryResponseType, &ack)
		if err != nil {
			s.logger.Printf("[ERR] serf: failed to format ack: %v", err)
		} else {
			addr := net.UDPAddr{IP: query.Addr, Port: int(query.Port)}
			if err := s.memberlist.SendTo(&addr, raw); err != nil {
				s.logger.Printf("[ERR] serf: failed to send ack: %v", err)
			}
		}
	}

	if s.config.EventCh != nil {
		s.config.EventCh <- &Query{
			LTime:    query.LTime,
			Name:     query.Name,
			Payload:  query.Payload,
			serf:     s,
			id:       query.ID,
			addr:     query.Addr,
			port:     query.Port,
			deadline: time.Now().Add(query.Timeout),
		}
	}
	return rebroadcast
}

// handleResponse is called when a query response is
// received.
func (s *Serf) handleQueryResponse(resp *messageQueryResponse) {
	// Look for a corresponding QueryResponse
	s.queryLock.RLock()
	query, ok := s.queryResponse[resp.LTime]
	s.queryLock.RUnlock()
	if !ok {
		s.logger.Printf("[WARN] serf: reply for non-running query (LTime: %d, ID: %d) From: %s",
			resp.LTime, resp.ID, resp.From)
		return
	}

	// Verify the ID matches
	if query.id != resp.ID {
		s.logger.Printf("[WARN] serf: query reply ID mismatch (Local: %d, Response: %d)",
			query.id, resp.ID)
		return
	}

	// Check if the query is closed
	if query.Finished() {
		return
	}

	// Process each type of response
	if resp.Ack() {
		metrics.IncrCounter([]string{"serf", "query_acks"}, 1)
		select {
		case query.ackCh <- resp.From:
		default:
			s.logger.Printf("[WARN] serf: Failed to delivery query ack, dropping")
		}
	} else {
		metrics.IncrCounter([]string{"serf", "query_responses"}, 1)
		select {
		case query.respCh <- NodeResponse{From: resp.From, Payload: resp.Payload}:
		default:
			s.logger.Printf("[WARN] serf: Failed to delivery query response, dropping")
		}
	}
}

// handleNodeConflict is invoked when a join detects a conflict over a name.
// This means two different nodes (IP/Port) are claiming the same name. Memberlist
// will reject the "new" node mapping, but we can still be notified
func (s *Serf) handleNodeConflict(existing, other *memberlist.Node) {
	// Log a basic warning if the node is not us...
	if existing.Name != s.config.NodeName {
		s.logger.Printf("[WARN] serf: Name conflict for '%s' both %s:%d and %s:%d are claiming",
			existing.Name, existing.Addr, existing.Port, other.Addr, other.Port)
		return
	}

	// The current node is conflicting! This is an error
	s.logger.Printf("[ERR] serf: Node name conflicts with another node at %s:%d. Names must be unique! (Resolution enabled: %v)",
		other.Addr, other.Port, s.config.EnableNameConflictResolution)

	// If automatic resolution is enabled, kick off the resolution
	if s.config.EnableNameConflictResolution {
		go s.resolveNodeConflict()
	}
}

// resolveNodeConflict is used to determine which node should remain during
// a name conflict. This is done by running an internal query.
func (s *Serf) resolveNodeConflict() {
	// Get the local node
	local := s.memberlist.LocalNode()

	// Start a name resolution query
	qName := internalQueryName(conflictQuery)
	payload := []byte(s.config.NodeName)
	resp, err := s.Query(qName, payload, nil)
	if err != nil {
		s.logger.Printf("[ERR] serf: Failed to start name resolution query: %v", err)
		return
	}

	// Counter to determine winner
	var responses, matching int

	// Gather responses
	respCh := resp.ResponseCh()
	for r := range respCh {
		// Decode the response
		if len(r.Payload) < 1 || messageType(r.Payload[0]) != messageConflictResponseType {
			s.logger.Printf("[ERR] serf: Invalid conflict query response type: %v", r.Payload)
			continue
		}
		var member Member
		if err := decodeMessage(r.Payload[1:], &member); err != nil {
			s.logger.Printf("[ERR] serf: Failed to decode conflict query response: %v", err)
			continue
		}

		// Update the counters
		responses++
		if bytes.Equal(member.Addr, local.Addr) && member.Port == local.Port {
			matching++
		}
	}

	// Query over, determine if we should live
	majority := (responses / 2) + 1
	if matching >= majority {
		s.logger.Printf("[INFO] serf: majority in name conflict resolution [%d / %d]",
			matching, responses)
		return
	}

	// Since we lost the vote, we need to exit
	s.logger.Printf("[WARN] serf: minority in name conflict resolution, quiting [%d / %d]",
		matching, responses)
	if err := s.Shutdown(); err != nil {
		s.logger.Printf("[ERR] serf: Failed to shutdown: %v", err)
	}
}

// handleReap periodically reaps the list of failed and left members.
func (s *Serf) handleReap() {
	for {
		select {
		case <-time.After(s.config.ReapInterval):
			s.memberLock.Lock()
			s.failedMembers = s.reap(s.failedMembers, s.config.ReconnectTimeout)
			s.leftMembers = s.reap(s.leftMembers, s.config.TombstoneTimeout)
			s.memberLock.Unlock()
		case <-s.shutdownCh:
			return
		}
	}
}

// handleReconnect attempts to reconnect to recently failed nodes
// on configured intervals.
func (s *Serf) handleReconnect() {
	for {
		select {
		case <-time.After(s.config.ReconnectInterval):
			s.reconnect()
		case <-s.shutdownCh:
			return
		}
	}
}

// reap is called with a list of old members and a timeout, and removes
// members that have exceeded the timeout. The members are removed from
// both the old list and the members itself. Locking is left to the caller.
func (s *Serf) reap(old []*memberState, timeout time.Duration) []*memberState {
	now := time.Now()
	n := len(old)
	for i := 0; i < n; i++ {
		m := old[i]

		// Skip if the timeout is not yet reached
		if now.Sub(m.leaveTime) <= timeout {
			continue
		}

		// Delete from the list
		old[i], old[n-1] = old[n-1], nil
		old = old[:n-1]
		n--
		i--

		// Delete from members
		delete(s.members, m.Name)

		// Tell the coordinate client the node has gone away and delete
		// its cached coordinates.
		if !s.config.DisableCoordinates {
			s.coordClient.ForgetNode(m.Name)

			s.coordCacheLock.Lock()
			delete(s.coordCache, m.Name)
			s.coordCacheLock.Unlock()
		}

		// Send an event along
		s.logger.Printf("[INFO] serf: EventMemberReap: %s", m.Name)
		if s.config.EventCh != nil {
			s.config.EventCh <- MemberEvent{
				Type:    EventMemberReap,
				Members: []Member{m.Member},
			}
		}
	}

	return old
}

// reconnect attempts to reconnect to recently fail nodes.
func (s *Serf) reconnect() {
	s.memberLock.RLock()

	// Nothing to do if there are no failed members
	n := len(s.failedMembers)
	if n == 0 {
		s.memberLock.RUnlock()
		return
	}

	// Probability we should attempt to reconect is given
	// by num failed / (num members - num failed - num left)
	// This means that we probabilistically expect the cluster
	// to attempt to connect to each failed member once per
	// reconnect interval
	numFailed := float32(len(s.failedMembers))
	numAlive := float32(len(s.members) - len(s.failedMembers) - len(s.leftMembers))
	if numAlive == 0 {
		numAlive = 1 // guard against zero divide
	}
	prob := numFailed / numAlive
	if rand.Float32() > prob {
		s.memberLock.RUnlock()
		s.logger.Printf("[DEBUG] serf: forgoing reconnect for random throttling")
		return
	}

	// Select a random member to try and join
	idx := int(rand.Uint32() % uint32(n))
	mem := s.failedMembers[idx]
	s.memberLock.RUnlock()

	// Format the addr
	addr := net.UDPAddr{IP: mem.Addr, Port: int(mem.Port)}
	s.logger.Printf("[INFO] serf: attempting reconnect to %v %s", mem.Name, addr.String())

	// Attempt to join at the memberlist level
	s.memberlist.Join([]string{addr.String()})
}

// checkQueueDepth periodically checks the size of a queue to see if
// it is too large
func (s *Serf) checkQueueDepth(name string, queue *memberlist.TransmitLimitedQueue) {
	for {
		select {
		case <-time.After(time.Second):
			numq := queue.NumQueued()
			metrics.AddSample([]string{"serf", "queue", name}, float32(numq))
			if numq >= s.config.QueueDepthWarning {
				s.logger.Printf("[WARN] serf: %s queue depth: %d", name, numq)
			}
			if numq > s.config.MaxQueueDepth {
				s.logger.Printf("[WARN] serf: %s queue depth (%d) exceeds limit (%d), dropping messages!",
					name, numq, s.config.MaxQueueDepth)
				queue.Prune(s.config.MaxQueueDepth)
			}
		case <-s.shutdownCh:
			return
		}
	}
}

// removeOldMember is used to remove an old member from a list of old
// members.
func removeOldMember(old []*memberState, name string) []*memberState {
	for i, m := range old {
		if m.Name == name {
			n := len(old)
			old[i], old[n-1] = old[n-1], nil
			return old[:n-1]
		}
	}

	return old
}

// recentIntent checks the recent intent buffer for a matching
// entry for a given node, and either returns the message or nil
func recentIntent(recent []nodeIntent, node string) (intent *nodeIntent) {
	for i := 0; i < len(recent); i++ {
		// Break fast if we hit a zero entry
		if recent[i].LTime == 0 {
			break
		}

		// Check for a node match
		if recent[i].Node == node {
			// Take the most recent entry
			if intent == nil || recent[i].LTime > intent.LTime {
				intent = &recent[i]
			}
		}
	}
	return
}

// handleRejoin attempts to reconnect to previously known alive nodes
func (s *Serf) handleRejoin(previous []*PreviousNode) {
	for _, prev := range previous {
		// Do not attempt to join ourself
		if prev.Name == s.config.NodeName {
			continue
		}

		s.logger.Printf("[INFO] serf: Attempting re-join to previously known node: %s", prev)
		_, err := s.memberlist.Join([]string{prev.Addr})
		if err == nil {
			s.logger.Printf("[INFO] serf: Re-joined to previously known node: %s", prev)
			return
		}
	}
	s.logger.Printf("[WARN] serf: Failed to re-join any previously known node")
}

// encodeTags is used to encode a tag map
func (s *Serf) encodeTags(tags map[string]string) []byte {
	// Support role-only backwards compatibility
	if s.ProtocolVersion() < 3 {
		role := tags["role"]
		return []byte(role)
	}

	// Use a magic byte prefix and msgpack encode the tags
	var buf bytes.Buffer
	buf.WriteByte(tagMagicByte)
	enc := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
	if err := enc.Encode(tags); err != nil {
		panic(fmt.Sprintf("Failed to encode tags: %v", err))
	}
	return buf.Bytes()
}

// decodeTags is used to decode a tag map
func (s *Serf) decodeTags(buf []byte) map[string]string {
	tags := make(map[string]string)

	// Backwards compatibility mode
	if len(buf) == 0 || buf[0] != tagMagicByte {
		tags["role"] = string(buf)
		return tags
	}

	// Decode the tags
	r := bytes.NewReader(buf[1:])
	dec := codec.NewDecoder(r, &codec.MsgpackHandle{})
	if err := dec.Decode(&tags); err != nil {
		s.logger.Printf("[ERR] serf: Failed to decode tags: %v", err)
	}
	return tags
}

// Stats is used to provide operator debugging information
func (s *Serf) Stats() map[string]string {
	toString := func(v uint64) string {
		return strconv.FormatUint(v, 10)
	}
	stats := map[string]string{
		"members":      toString(uint64(len(s.members))),
		"failed":       toString(uint64(len(s.failedMembers))),
		"left":         toString(uint64(len(s.leftMembers))),
		"member_time":  toString(uint64(s.clock.Time())),
		"event_time":   toString(uint64(s.eventClock.Time())),
		"query_time":   toString(uint64(s.queryClock.Time())),
		"intent_queue": toString(uint64(s.broadcasts.NumQueued())),
		"event_queue":  toString(uint64(s.eventBroadcasts.NumQueued())),
		"query_queue":  toString(uint64(s.queryBroadcasts.NumQueued())),
		"encrypted":    fmt.Sprintf("%v", s.EncryptionEnabled()),
	}
	return stats
}

// WriteKeyringFile will serialize the current keyring and save it to a file.
func (s *Serf) writeKeyringFile() error {
	if len(s.config.KeyringFile) == 0 {
		return nil
	}

	keyring := s.config.MemberlistConfig.Keyring
	keysRaw := keyring.GetKeys()
	keysEncoded := make([]string, len(keysRaw))

	for i, key := range keysRaw {
		keysEncoded[i] = base64.StdEncoding.EncodeToString(key)
	}

	encodedKeys, err := json.MarshalIndent(keysEncoded, "", "  ")
	if err != nil {
		return fmt.Errorf("Failed to encode keys: %s", err)
	}

	// Use 0600 for permissions because key data is sensitive
	if err = ioutil.WriteFile(s.config.KeyringFile, encodedKeys, 0600); err != nil {
		return fmt.Errorf("Failed to write keyring file: %s", err)
	}

	// Success!
	return nil
}

// GetCoordinate returns the network coordinate of the local node.
func (s *Serf) GetCoordinate() (*coordinate.Coordinate, error) {
	if !s.config.DisableCoordinates {
		return s.coordClient.GetCoordinate(), nil
	}

	return nil, fmt.Errorf("Coordinates are disabled")
}

// GetCachedCoordinate returns the network coordinate for the node with the given
// name. This will only be valid if DisableCoordinates is set to false.
func (s *Serf) GetCachedCoordinate(name string) (coord *coordinate.Coordinate, ok bool) {
	if !s.config.DisableCoordinates {
		s.coordCacheLock.RLock()
		defer s.coordCacheLock.RUnlock()
		if coord, ok = s.coordCache[name]; ok {
			return coord, true
		}

		return nil, false
	}

	return nil, false
}