summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/golang/groupcache/groupcache.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/golang/groupcache/groupcache.go')
-rw-r--r--vendor/github.com/golang/groupcache/groupcache.go489
1 files changed, 489 insertions, 0 deletions
diff --git a/vendor/github.com/golang/groupcache/groupcache.go b/vendor/github.com/golang/groupcache/groupcache.go
new file mode 100644
index 000000000..40410a0cc
--- /dev/null
+++ b/vendor/github.com/golang/groupcache/groupcache.go
@@ -0,0 +1,489 @@
+/*
+Copyright 2012 Google Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Package groupcache provides a data loading mechanism with caching
+// and de-duplication that works across a set of peer processes.
+//
+// Each data Get first consults its local cache, otherwise delegates
+// to the requested key's canonical owner, which then checks its cache
+// or finally gets the data. In the common case, many concurrent
+// cache misses across a set of peers for the same key result in just
+// one cache fill.
+package groupcache
+
+import (
+ "errors"
+ "math/rand"
+ "strconv"
+ "sync"
+ "sync/atomic"
+
+ pb "github.com/golang/groupcache/groupcachepb"
+ "github.com/golang/groupcache/lru"
+ "github.com/golang/groupcache/singleflight"
+)
+
+// A Getter loads data for a key.
+type Getter interface {
+ // Get returns the value identified by key, populating dest.
+ //
+ // The returned data must be unversioned. That is, key must
+ // uniquely describe the loaded data, without an implicit
+ // current time, and without relying on cache expiration
+ // mechanisms.
+ Get(ctx Context, key string, dest Sink) error
+}
+
+// A GetterFunc implements Getter with a function.
+type GetterFunc func(ctx Context, key string, dest Sink) error
+
+func (f GetterFunc) Get(ctx Context, key string, dest Sink) error {
+ return f(ctx, key, dest)
+}
+
+var (
+ mu sync.RWMutex
+ groups = make(map[string]*Group)
+
+ initPeerServerOnce sync.Once
+ initPeerServer func()
+)
+
+// GetGroup returns the named group previously created with NewGroup, or
+// nil if there's no such group.
+func GetGroup(name string) *Group {
+ mu.RLock()
+ g := groups[name]
+ mu.RUnlock()
+ return g
+}
+
+// NewGroup creates a coordinated group-aware Getter from a Getter.
+//
+// The returned Getter tries (but does not guarantee) to run only one
+// Get call at once for a given key across an entire set of peer
+// processes. Concurrent callers both in the local process and in
+// other processes receive copies of the answer once the original Get
+// completes.
+//
+// The group name must be unique for each getter.
+func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
+ return newGroup(name, cacheBytes, getter, nil)
+}
+
+// If peers is nil, the peerPicker is called via a sync.Once to initialize it.
+func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group {
+ if getter == nil {
+ panic("nil Getter")
+ }
+ mu.Lock()
+ defer mu.Unlock()
+ initPeerServerOnce.Do(callInitPeerServer)
+ if _, dup := groups[name]; dup {
+ panic("duplicate registration of group " + name)
+ }
+ g := &Group{
+ name: name,
+ getter: getter,
+ peers: peers,
+ cacheBytes: cacheBytes,
+ loadGroup: &singleflight.Group{},
+ }
+ if fn := newGroupHook; fn != nil {
+ fn(g)
+ }
+ groups[name] = g
+ return g
+}
+
+// newGroupHook, if non-nil, is called right after a new group is created.
+var newGroupHook func(*Group)
+
+// RegisterNewGroupHook registers a hook that is run each time
+// a group is created.
+func RegisterNewGroupHook(fn func(*Group)) {
+ if newGroupHook != nil {
+ panic("RegisterNewGroupHook called more than once")
+ }
+ newGroupHook = fn
+}
+
+// RegisterServerStart registers a hook that is run when the first
+// group is created.
+func RegisterServerStart(fn func()) {
+ if initPeerServer != nil {
+ panic("RegisterServerStart called more than once")
+ }
+ initPeerServer = fn
+}
+
+func callInitPeerServer() {
+ if initPeerServer != nil {
+ initPeerServer()
+ }
+}
+
+// A Group is a cache namespace and associated data loaded spread over
+// a group of 1 or more machines.
+type Group struct {
+ name string
+ getter Getter
+ peersOnce sync.Once
+ peers PeerPicker
+ cacheBytes int64 // limit for sum of mainCache and hotCache size
+
+ // mainCache is a cache of the keys for which this process
+ // (amongst its peers) is authorative. That is, this cache
+ // contains keys which consistent hash on to this process's
+ // peer number.
+ mainCache cache
+
+ // hotCache contains keys/values for which this peer is not
+ // authorative (otherwise they would be in mainCache), but
+ // are popular enough to warrant mirroring in this process to
+ // avoid going over the network to fetch from a peer. Having
+ // a hotCache avoids network hotspotting, where a peer's
+ // network card could become the bottleneck on a popular key.
+ // This cache is used sparingly to maximize the total number
+ // of key/value pairs that can be stored globally.
+ hotCache cache
+
+ // loadGroup ensures that each key is only fetched once
+ // (either locally or remotely), regardless of the number of
+ // concurrent callers.
+ loadGroup flightGroup
+
+ // Stats are statistics on the group.
+ Stats Stats
+}
+
+// flightGroup is defined as an interface which flightgroup.Group
+// satisfies. We define this so that we may test with an alternate
+// implementation.
+type flightGroup interface {
+ // Done is called when Do is done.
+ Do(key string, fn func() (interface{}, error)) (interface{}, error)
+}
+
+// Stats are per-group statistics.
+type Stats struct {
+ Gets AtomicInt // any Get request, including from peers
+ CacheHits AtomicInt // either cache was good
+ PeerLoads AtomicInt // either remote load or remote cache hit (not an error)
+ PeerErrors AtomicInt
+ Loads AtomicInt // (gets - cacheHits)
+ LoadsDeduped AtomicInt // after singleflight
+ LocalLoads AtomicInt // total good local loads
+ LocalLoadErrs AtomicInt // total bad local loads
+ ServerRequests AtomicInt // gets that came over the network from peers
+}
+
+// Name returns the name of the group.
+func (g *Group) Name() string {
+ return g.name
+}
+
+func (g *Group) initPeers() {
+ if g.peers == nil {
+ g.peers = getPeers()
+ }
+}
+
+func (g *Group) Get(ctx Context, key string, dest Sink) error {
+ g.peersOnce.Do(g.initPeers)
+ g.Stats.Gets.Add(1)
+ if dest == nil {
+ return errors.New("groupcache: nil dest Sink")
+ }
+ value, cacheHit := g.lookupCache(key)
+
+ if cacheHit {
+ g.Stats.CacheHits.Add(1)
+ return setSinkView(dest, value)
+ }
+
+ // Optimization to avoid double unmarshalling or copying: keep
+ // track of whether the dest was already populated. One caller
+ // (if local) will set this; the losers will not. The common
+ // case will likely be one caller.
+ destPopulated := false
+ value, destPopulated, err := g.load(ctx, key, dest)
+ if err != nil {
+ return err
+ }
+ if destPopulated {
+ return nil
+ }
+ return setSinkView(dest, value)
+}
+
+// load loads key either by invoking the getter locally or by sending it to another machine.
+func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
+ g.Stats.Loads.Add(1)
+ viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
+ // Check the cache again because singleflight can only dedup calls
+ // that overlap concurrently. It's possible for 2 concurrent
+ // requests to miss the cache, resulting in 2 load() calls. An
+ // unfortunate goroutine scheduling would result in this callback
+ // being run twice, serially. If we don't check the cache again,
+ // cache.nbytes would be incremented below even though there will
+ // be only one entry for this key.
+ //
+ // Consider the following serialized event ordering for two
+ // goroutines in which this callback gets called twice for hte
+ // same key:
+ // 1: Get("key")
+ // 2: Get("key")
+ // 1: lookupCache("key")
+ // 2: lookupCache("key")
+ // 1: load("key")
+ // 2: load("key")
+ // 1: loadGroup.Do("key", fn)
+ // 1: fn()
+ // 2: loadGroup.Do("key", fn)
+ // 2: fn()
+ if value, cacheHit := g.lookupCache(key); cacheHit {
+ g.Stats.CacheHits.Add(1)
+ return value, nil
+ }
+ g.Stats.LoadsDeduped.Add(1)
+ var value ByteView
+ var err error
+ if peer, ok := g.peers.PickPeer(key); ok {
+ value, err = g.getFromPeer(ctx, peer, key)
+ if err == nil {
+ g.Stats.PeerLoads.Add(1)
+ return value, nil
+ }
+ g.Stats.PeerErrors.Add(1)
+ // TODO(bradfitz): log the peer's error? keep
+ // log of the past few for /groupcachez? It's
+ // probably boring (normal task movement), so not
+ // worth logging I imagine.
+ }
+ value, err = g.getLocally(ctx, key, dest)
+ if err != nil {
+ g.Stats.LocalLoadErrs.Add(1)
+ return nil, err
+ }
+ g.Stats.LocalLoads.Add(1)
+ destPopulated = true // only one caller of load gets this return value
+ g.populateCache(key, value, &g.mainCache)
+ return value, nil
+ })
+ if err == nil {
+ value = viewi.(ByteView)
+ }
+ return
+}
+
+func (g *Group) getLocally(ctx Context, key string, dest Sink) (ByteView, error) {
+ err := g.getter.Get(ctx, key, dest)
+ if err != nil {
+ return ByteView{}, err
+ }
+ return dest.view()
+}
+
+func (g *Group) getFromPeer(ctx Context, peer ProtoGetter, key string) (ByteView, error) {
+ req := &pb.GetRequest{
+ Group: &g.name,
+ Key: &key,
+ }
+ res := &pb.GetResponse{}
+ err := peer.Get(ctx, req, res)
+ if err != nil {
+ return ByteView{}, err
+ }
+ value := ByteView{b: res.Value}
+ // TODO(bradfitz): use res.MinuteQps or something smart to
+ // conditionally populate hotCache. For now just do it some
+ // percentage of the time.
+ if rand.Intn(10) == 0 {
+ g.populateCache(key, value, &g.hotCache)
+ }
+ return value, nil
+}
+
+func (g *Group) lookupCache(key string) (value ByteView, ok bool) {
+ if g.cacheBytes <= 0 {
+ return
+ }
+ value, ok = g.mainCache.get(key)
+ if ok {
+ return
+ }
+ value, ok = g.hotCache.get(key)
+ return
+}
+
+func (g *Group) populateCache(key string, value ByteView, cache *cache) {
+ if g.cacheBytes <= 0 {
+ return
+ }
+ cache.add(key, value)
+
+ // Evict items from cache(s) if necessary.
+ for {
+ mainBytes := g.mainCache.bytes()
+ hotBytes := g.hotCache.bytes()
+ if mainBytes+hotBytes <= g.cacheBytes {
+ return
+ }
+
+ // TODO(bradfitz): this is good-enough-for-now logic.
+ // It should be something based on measurements and/or
+ // respecting the costs of different resources.
+ victim := &g.mainCache
+ if hotBytes > mainBytes/8 {
+ victim = &g.hotCache
+ }
+ victim.removeOldest()
+ }
+}
+
+// CacheType represents a type of cache.
+type CacheType int
+
+const (
+ // The MainCache is the cache for items that this peer is the
+ // owner for.
+ MainCache CacheType = iota + 1
+
+ // The HotCache is the cache for items that seem popular
+ // enough to replicate to this node, even though it's not the
+ // owner.
+ HotCache
+)
+
+// CacheStats returns stats about the provided cache within the group.
+func (g *Group) CacheStats(which CacheType) CacheStats {
+ switch which {
+ case MainCache:
+ return g.mainCache.stats()
+ case HotCache:
+ return g.hotCache.stats()
+ default:
+ return CacheStats{}
+ }
+}
+
+// cache is a wrapper around an *lru.Cache that adds synchronization,
+// makes values always be ByteView, and counts the size of all keys and
+// values.
+type cache struct {
+ mu sync.RWMutex
+ nbytes int64 // of all keys and values
+ lru *lru.Cache
+ nhit, nget int64
+ nevict int64 // number of evictions
+}
+
+func (c *cache) stats() CacheStats {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ return CacheStats{
+ Bytes: c.nbytes,
+ Items: c.itemsLocked(),
+ Gets: c.nget,
+ Hits: c.nhit,
+ Evictions: c.nevict,
+ }
+}
+
+func (c *cache) add(key string, value ByteView) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if c.lru == nil {
+ c.lru = &lru.Cache{
+ OnEvicted: func(key lru.Key, value interface{}) {
+ val := value.(ByteView)
+ c.nbytes -= int64(len(key.(string))) + int64(val.Len())
+ c.nevict++
+ },
+ }
+ }
+ c.lru.Add(key, value)
+ c.nbytes += int64(len(key)) + int64(value.Len())
+}
+
+func (c *cache) get(key string) (value ByteView, ok bool) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ c.nget++
+ if c.lru == nil {
+ return
+ }
+ vi, ok := c.lru.Get(key)
+ if !ok {
+ return
+ }
+ c.nhit++
+ return vi.(ByteView), true
+}
+
+func (c *cache) removeOldest() {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if c.lru != nil {
+ c.lru.RemoveOldest()
+ }
+}
+
+func (c *cache) bytes() int64 {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ return c.nbytes
+}
+
+func (c *cache) items() int64 {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ return c.itemsLocked()
+}
+
+func (c *cache) itemsLocked() int64 {
+ if c.lru == nil {
+ return 0
+ }
+ return int64(c.lru.Len())
+}
+
+// An AtomicInt is an int64 to be accessed atomically.
+type AtomicInt int64
+
+// Add atomically adds n to i.
+func (i *AtomicInt) Add(n int64) {
+ atomic.AddInt64((*int64)(i), n)
+}
+
+// Get atomically gets the value of i.
+func (i *AtomicInt) Get() int64 {
+ return atomic.LoadInt64((*int64)(i))
+}
+
+func (i *AtomicInt) String() string {
+ return strconv.FormatInt(i.Get(), 10)
+}
+
+// CacheStats are returned by stats accessors on Group.
+type CacheStats struct {
+ Bytes int64
+ Items int64
+ Gets int64
+ Hits int64
+ Evictions int64
+}