/usr/share/gocode/src/github.com/ngaut/pools/roundrobin.go is in golang-github-ngaut-pools-dev 0.0~git20141008.0.6352e00-4.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 | // Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pools
import (
"fmt"
"sync"
"time"
)
// RoundRobin is deprecated. Use ResourcePool instead.
// RoundRobin allows you to use a pool of resources in a round robin fashion.
type RoundRobin struct {
mu sync.Mutex
available *sync.Cond
resources chan fifoWrapper
size int64
factory Factory
idleTimeout time.Duration
// stats
waitCount int64
waitTime time.Duration
}
type fifoWrapper struct {
resource Resource
timeUsed time.Time
}
// NewRoundRobin creates a new RoundRobin pool.
// capacity is the maximum number of resources RoundRobin will create.
// factory will be the function used to create resources.
// If a resource is unused beyond idleTimeout, it's discarded.
func NewRoundRobin(capacity int, idleTimeout time.Duration) *RoundRobin {
r := &RoundRobin{
resources: make(chan fifoWrapper, capacity),
size: 0,
idleTimeout: idleTimeout,
}
r.available = sync.NewCond(&r.mu)
return r
}
// Open starts allowing the creation of resources
func (rr *RoundRobin) Open(factory Factory) {
rr.mu.Lock()
defer rr.mu.Unlock()
rr.factory = factory
}
// Close empties the pool calling Close on all its resources.
// It waits for all resources to be returned (Put).
func (rr *RoundRobin) Close() {
rr.mu.Lock()
defer rr.mu.Unlock()
for rr.size > 0 {
select {
case fw := <-rr.resources:
go fw.resource.Close()
rr.size--
default:
rr.available.Wait()
}
}
rr.factory = nil
}
func (rr *RoundRobin) IsClosed() bool {
return rr.factory == nil
}
// Get will return the next available resource. If none is available, and capacity
// has not been reached, it will create a new one using the factory. Otherwise,
// it will indefinitely wait till the next resource becomes available.
func (rr *RoundRobin) Get() (resource Resource, err error) {
return rr.get(true)
}
// TryGet will return the next available resource. If none is available, and capacity
// has not been reached, it will create a new one using the factory. Otherwise,
// it will return nil with no error.
func (rr *RoundRobin) TryGet() (resource Resource, err error) {
return rr.get(false)
}
func (rr *RoundRobin) get(wait bool) (resource Resource, err error) {
rr.mu.Lock()
defer rr.mu.Unlock()
// Any waits in this loop will release the lock, and it will be
// reacquired before the waits return.
for {
select {
case fw := <-rr.resources:
// Found a free resource in the channel
if rr.idleTimeout > 0 && fw.timeUsed.Add(rr.idleTimeout).Sub(time.Now()) < 0 {
// resource has been idle for too long. Discard & go for next.
go fw.resource.Close()
rr.size--
// Nobody else should be waiting, but signal anyway.
rr.available.Signal()
continue
}
return fw.resource, nil
default:
// resource channel is empty
if rr.size >= int64(cap(rr.resources)) {
// The pool is full
if wait {
start := time.Now()
rr.available.Wait()
rr.recordWait(start)
continue
}
return nil, nil
}
// Pool is not full. Create a resource.
if resource, err = rr.waitForCreate(); err != nil {
// size was decremented, and somebody could be waiting.
rr.available.Signal()
return nil, err
}
// Creation successful. Account for this by incrementing size.
rr.size++
return resource, err
}
}
}
func (rr *RoundRobin) recordWait(start time.Time) {
rr.waitCount++
rr.waitTime += time.Now().Sub(start)
}
func (rr *RoundRobin) waitForCreate() (resource Resource, err error) {
// Prevent thundering herd: increment size before creating resource, and decrement after.
rr.size++
rr.mu.Unlock()
defer func() {
rr.mu.Lock()
rr.size--
}()
return rr.factory()
}
// Put will return a resource to the pool. You MUST return every resource to the pool,
// even if it's closed. If a resource is closed, you should call Put(nil).
func (rr *RoundRobin) Put(resource Resource) {
rr.mu.Lock()
defer rr.available.Signal()
defer rr.mu.Unlock()
if rr.size > int64(cap(rr.resources)) {
if resource != nil {
go resource.Close()
}
rr.size--
} else if resource == nil {
rr.size--
} else {
if len(rr.resources) == cap(rr.resources) {
panic("unexpected")
}
rr.resources <- fifoWrapper{resource, time.Now()}
}
}
// Set capacity changes the capacity of the pool.
// You can use it to expand or shrink.
func (rr *RoundRobin) SetCapacity(capacity int) error {
rr.mu.Lock()
defer rr.available.Broadcast()
defer rr.mu.Unlock()
nr := make(chan fifoWrapper, capacity)
// This loop transfers resources from the old channel
// to the new one, until it fills up or runs out.
// It discards extras, if any.
for {
select {
case fw := <-rr.resources:
if len(nr) < cap(nr) {
nr <- fw
} else {
go fw.resource.Close()
rr.size--
}
continue
default:
}
break
}
rr.resources = nr
return nil
}
func (rr *RoundRobin) SetIdleTimeout(idleTimeout time.Duration) {
rr.mu.Lock()
defer rr.mu.Unlock()
rr.idleTimeout = idleTimeout
}
func (rr *RoundRobin) StatsJSON() string {
s, c, a, wc, wt, it := rr.Stats()
return fmt.Sprintf("{\"Size\": %v, \"Capacity\": %v, \"Available\": %v, \"WaitCount\": %v, \"WaitTime\": %v, \"IdleTimeout\": %v}", s, c, a, wc, int64(wt), int64(it))
}
func (rr *RoundRobin) Stats() (size, capacity, available, waitCount int64, waitTime, idleTimeout time.Duration) {
rr.mu.Lock()
defer rr.mu.Unlock()
return rr.size, int64(cap(rr.resources)), int64(len(rr.resources)), rr.waitCount, rr.waitTime, rr.idleTimeout
}
|