diff options
Diffstat (limited to 'vendor/github.com/golang/groupcache/groupcache.go')
-rw-r--r-- | vendor/github.com/golang/groupcache/groupcache.go | 489 |
1 files changed, 489 insertions, 0 deletions
diff --git a/vendor/github.com/golang/groupcache/groupcache.go b/vendor/github.com/golang/groupcache/groupcache.go new file mode 100644 index 000000000..40410a0cc --- /dev/null +++ b/vendor/github.com/golang/groupcache/groupcache.go @@ -0,0 +1,489 @@ +/* +Copyright 2012 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package groupcache provides a data loading mechanism with caching +// and de-duplication that works across a set of peer processes. +// +// Each data Get first consults its local cache, otherwise delegates +// to the requested key's canonical owner, which then checks its cache +// or finally gets the data. In the common case, many concurrent +// cache misses across a set of peers for the same key result in just +// one cache fill. +package groupcache + +import ( + "errors" + "math/rand" + "strconv" + "sync" + "sync/atomic" + + pb "github.com/golang/groupcache/groupcachepb" + "github.com/golang/groupcache/lru" + "github.com/golang/groupcache/singleflight" +) + +// A Getter loads data for a key. +type Getter interface { + // Get returns the value identified by key, populating dest. + // + // The returned data must be unversioned. That is, key must + // uniquely describe the loaded data, without an implicit + // current time, and without relying on cache expiration + // mechanisms. + Get(ctx Context, key string, dest Sink) error +} + +// A GetterFunc implements Getter with a function. +type GetterFunc func(ctx Context, key string, dest Sink) error + +func (f GetterFunc) Get(ctx Context, key string, dest Sink) error { + return f(ctx, key, dest) +} + +var ( + mu sync.RWMutex + groups = make(map[string]*Group) + + initPeerServerOnce sync.Once + initPeerServer func() +) + +// GetGroup returns the named group previously created with NewGroup, or +// nil if there's no such group. +func GetGroup(name string) *Group { + mu.RLock() + g := groups[name] + mu.RUnlock() + return g +} + +// NewGroup creates a coordinated group-aware Getter from a Getter. +// +// The returned Getter tries (but does not guarantee) to run only one +// Get call at once for a given key across an entire set of peer +// processes. Concurrent callers both in the local process and in +// other processes receive copies of the answer once the original Get +// completes. +// +// The group name must be unique for each getter. +func NewGroup(name string, cacheBytes int64, getter Getter) *Group { + return newGroup(name, cacheBytes, getter, nil) +} + +// If peers is nil, the peerPicker is called via a sync.Once to initialize it. +func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group { + if getter == nil { + panic("nil Getter") + } + mu.Lock() + defer mu.Unlock() + initPeerServerOnce.Do(callInitPeerServer) + if _, dup := groups[name]; dup { + panic("duplicate registration of group " + name) + } + g := &Group{ + name: name, + getter: getter, + peers: peers, + cacheBytes: cacheBytes, + loadGroup: &singleflight.Group{}, + } + if fn := newGroupHook; fn != nil { + fn(g) + } + groups[name] = g + return g +} + +// newGroupHook, if non-nil, is called right after a new group is created. +var newGroupHook func(*Group) + +// RegisterNewGroupHook registers a hook that is run each time +// a group is created. +func RegisterNewGroupHook(fn func(*Group)) { + if newGroupHook != nil { + panic("RegisterNewGroupHook called more than once") + } + newGroupHook = fn +} + +// RegisterServerStart registers a hook that is run when the first +// group is created. +func RegisterServerStart(fn func()) { + if initPeerServer != nil { + panic("RegisterServerStart called more than once") + } + initPeerServer = fn +} + +func callInitPeerServer() { + if initPeerServer != nil { + initPeerServer() + } +} + +// A Group is a cache namespace and associated data loaded spread over +// a group of 1 or more machines. +type Group struct { + name string + getter Getter + peersOnce sync.Once + peers PeerPicker + cacheBytes int64 // limit for sum of mainCache and hotCache size + + // mainCache is a cache of the keys for which this process + // (amongst its peers) is authorative. That is, this cache + // contains keys which consistent hash on to this process's + // peer number. + mainCache cache + + // hotCache contains keys/values for which this peer is not + // authorative (otherwise they would be in mainCache), but + // are popular enough to warrant mirroring in this process to + // avoid going over the network to fetch from a peer. Having + // a hotCache avoids network hotspotting, where a peer's + // network card could become the bottleneck on a popular key. + // This cache is used sparingly to maximize the total number + // of key/value pairs that can be stored globally. + hotCache cache + + // loadGroup ensures that each key is only fetched once + // (either locally or remotely), regardless of the number of + // concurrent callers. + loadGroup flightGroup + + // Stats are statistics on the group. + Stats Stats +} + +// flightGroup is defined as an interface which flightgroup.Group +// satisfies. We define this so that we may test with an alternate +// implementation. +type flightGroup interface { + // Done is called when Do is done. + Do(key string, fn func() (interface{}, error)) (interface{}, error) +} + +// Stats are per-group statistics. +type Stats struct { + Gets AtomicInt // any Get request, including from peers + CacheHits AtomicInt // either cache was good + PeerLoads AtomicInt // either remote load or remote cache hit (not an error) + PeerErrors AtomicInt + Loads AtomicInt // (gets - cacheHits) + LoadsDeduped AtomicInt // after singleflight + LocalLoads AtomicInt // total good local loads + LocalLoadErrs AtomicInt // total bad local loads + ServerRequests AtomicInt // gets that came over the network from peers +} + +// Name returns the name of the group. +func (g *Group) Name() string { + return g.name +} + +func (g *Group) initPeers() { + if g.peers == nil { + g.peers = getPeers() + } +} + +func (g *Group) Get(ctx Context, key string, dest Sink) error { + g.peersOnce.Do(g.initPeers) + g.Stats.Gets.Add(1) + if dest == nil { + return errors.New("groupcache: nil dest Sink") + } + value, cacheHit := g.lookupCache(key) + + if cacheHit { + g.Stats.CacheHits.Add(1) + return setSinkView(dest, value) + } + + // Optimization to avoid double unmarshalling or copying: keep + // track of whether the dest was already populated. One caller + // (if local) will set this; the losers will not. The common + // case will likely be one caller. + destPopulated := false + value, destPopulated, err := g.load(ctx, key, dest) + if err != nil { + return err + } + if destPopulated { + return nil + } + return setSinkView(dest, value) +} + +// load loads key either by invoking the getter locally or by sending it to another machine. +func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) { + g.Stats.Loads.Add(1) + viewi, err := g.loadGroup.Do(key, func() (interface{}, error) { + // Check the cache again because singleflight can only dedup calls + // that overlap concurrently. It's possible for 2 concurrent + // requests to miss the cache, resulting in 2 load() calls. An + // unfortunate goroutine scheduling would result in this callback + // being run twice, serially. If we don't check the cache again, + // cache.nbytes would be incremented below even though there will + // be only one entry for this key. + // + // Consider the following serialized event ordering for two + // goroutines in which this callback gets called twice for hte + // same key: + // 1: Get("key") + // 2: Get("key") + // 1: lookupCache("key") + // 2: lookupCache("key") + // 1: load("key") + // 2: load("key") + // 1: loadGroup.Do("key", fn) + // 1: fn() + // 2: loadGroup.Do("key", fn) + // 2: fn() + if value, cacheHit := g.lookupCache(key); cacheHit { + g.Stats.CacheHits.Add(1) + return value, nil + } + g.Stats.LoadsDeduped.Add(1) + var value ByteView + var err error + if peer, ok := g.peers.PickPeer(key); ok { + value, err = g.getFromPeer(ctx, peer, key) + if err == nil { + g.Stats.PeerLoads.Add(1) + return value, nil + } + g.Stats.PeerErrors.Add(1) + // TODO(bradfitz): log the peer's error? keep + // log of the past few for /groupcachez? It's + // probably boring (normal task movement), so not + // worth logging I imagine. + } + value, err = g.getLocally(ctx, key, dest) + if err != nil { + g.Stats.LocalLoadErrs.Add(1) + return nil, err + } + g.Stats.LocalLoads.Add(1) + destPopulated = true // only one caller of load gets this return value + g.populateCache(key, value, &g.mainCache) + return value, nil + }) + if err == nil { + value = viewi.(ByteView) + } + return +} + +func (g *Group) getLocally(ctx Context, key string, dest Sink) (ByteView, error) { + err := g.getter.Get(ctx, key, dest) + if err != nil { + return ByteView{}, err + } + return dest.view() +} + +func (g *Group) getFromPeer(ctx Context, peer ProtoGetter, key string) (ByteView, error) { + req := &pb.GetRequest{ + Group: &g.name, + Key: &key, + } + res := &pb.GetResponse{} + err := peer.Get(ctx, req, res) + if err != nil { + return ByteView{}, err + } + value := ByteView{b: res.Value} + // TODO(bradfitz): use res.MinuteQps or something smart to + // conditionally populate hotCache. For now just do it some + // percentage of the time. + if rand.Intn(10) == 0 { + g.populateCache(key, value, &g.hotCache) + } + return value, nil +} + +func (g *Group) lookupCache(key string) (value ByteView, ok bool) { + if g.cacheBytes <= 0 { + return + } + value, ok = g.mainCache.get(key) + if ok { + return + } + value, ok = g.hotCache.get(key) + return +} + +func (g *Group) populateCache(key string, value ByteView, cache *cache) { + if g.cacheBytes <= 0 { + return + } + cache.add(key, value) + + // Evict items from cache(s) if necessary. + for { + mainBytes := g.mainCache.bytes() + hotBytes := g.hotCache.bytes() + if mainBytes+hotBytes <= g.cacheBytes { + return + } + + // TODO(bradfitz): this is good-enough-for-now logic. + // It should be something based on measurements and/or + // respecting the costs of different resources. + victim := &g.mainCache + if hotBytes > mainBytes/8 { + victim = &g.hotCache + } + victim.removeOldest() + } +} + +// CacheType represents a type of cache. +type CacheType int + +const ( + // The MainCache is the cache for items that this peer is the + // owner for. + MainCache CacheType = iota + 1 + + // The HotCache is the cache for items that seem popular + // enough to replicate to this node, even though it's not the + // owner. + HotCache +) + +// CacheStats returns stats about the provided cache within the group. +func (g *Group) CacheStats(which CacheType) CacheStats { + switch which { + case MainCache: + return g.mainCache.stats() + case HotCache: + return g.hotCache.stats() + default: + return CacheStats{} + } +} + +// cache is a wrapper around an *lru.Cache that adds synchronization, +// makes values always be ByteView, and counts the size of all keys and +// values. +type cache struct { + mu sync.RWMutex + nbytes int64 // of all keys and values + lru *lru.Cache + nhit, nget int64 + nevict int64 // number of evictions +} + +func (c *cache) stats() CacheStats { + c.mu.RLock() + defer c.mu.RUnlock() + return CacheStats{ + Bytes: c.nbytes, + Items: c.itemsLocked(), + Gets: c.nget, + Hits: c.nhit, + Evictions: c.nevict, + } +} + +func (c *cache) add(key string, value ByteView) { + c.mu.Lock() + defer c.mu.Unlock() + if c.lru == nil { + c.lru = &lru.Cache{ + OnEvicted: func(key lru.Key, value interface{}) { + val := value.(ByteView) + c.nbytes -= int64(len(key.(string))) + int64(val.Len()) + c.nevict++ + }, + } + } + c.lru.Add(key, value) + c.nbytes += int64(len(key)) + int64(value.Len()) +} + +func (c *cache) get(key string) (value ByteView, ok bool) { + c.mu.Lock() + defer c.mu.Unlock() + c.nget++ + if c.lru == nil { + return + } + vi, ok := c.lru.Get(key) + if !ok { + return + } + c.nhit++ + return vi.(ByteView), true +} + +func (c *cache) removeOldest() { + c.mu.Lock() + defer c.mu.Unlock() + if c.lru != nil { + c.lru.RemoveOldest() + } +} + +func (c *cache) bytes() int64 { + c.mu.RLock() + defer c.mu.RUnlock() + return c.nbytes +} + +func (c *cache) items() int64 { + c.mu.RLock() + defer c.mu.RUnlock() + return c.itemsLocked() +} + +func (c *cache) itemsLocked() int64 { + if c.lru == nil { + return 0 + } + return int64(c.lru.Len()) +} + +// An AtomicInt is an int64 to be accessed atomically. +type AtomicInt int64 + +// Add atomically adds n to i. +func (i *AtomicInt) Add(n int64) { + atomic.AddInt64((*int64)(i), n) +} + +// Get atomically gets the value of i. +func (i *AtomicInt) Get() int64 { + return atomic.LoadInt64((*int64)(i)) +} + +func (i *AtomicInt) String() string { + return strconv.FormatInt(i.Get(), 10) +} + +// CacheStats are returned by stats accessors on Group. +type CacheStats struct { + Bytes int64 + Items int64 + Gets int64 + Hits int64 + Evictions int64 +} |