summaryrefslogtreecommitdiffstats
path: root/Godeps/_workspace/src/gopkg.in/throttled/throttled.v1/interval.go
blob: 628a5593e852744bf399478329b1fa097102a95f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package throttled

import (
	"net/http"
	"sync"
	"time"

	"github.com/golang/groupcache/lru"
)

// Static check to ensure that the interval limiters implement the Limiter interface.
var _ Limiter = (*intervalVaryByLimiter)(nil)
var _ Limiter = (*intervalLimiter)(nil)

// Interval creates a throttler that controls the requests so that they
// go through at a constant interval. The interval is specified by the
// delay argument, and convenience types such as PerSec can be used to
// express the interval in a more expressive way, i.e. PerSec(10) means
// 10 requests per second or one request each 100ms, PerMin(30) means
// 30 requests per minute or on request each 2s, etc.
//
// The bursts argument indicates the number of exceeding requests that may
// be queued up waiting to be processed. Requests that overflow the queue
// are dropped and go through the denied handler, which may be specified
// on the Throttler and that defaults to the package-global variable
// DefaultDeniedHandler.
//
// The vary argument indicates the criteria to use to group the requests,
// so that the interval applies to the requests in the same group (e.g. based on
// the path, or the remote IP address, etc.). If this argument is nil, the
// interval applies to all requests going through this throttler.
//
// The maxKeys indicates the maximum number of keys to keep in memory to apply the interval,
// when a vary argument is specified. A LRU algorithm is used to remove older keys.
//
func Interval(delay Delayer, bursts int, vary *VaryBy, maxKeys int) *Throttler {
	var l Limiter
	if vary != nil {
		if maxKeys < 1 {
			maxKeys = 1
		}
		l = &intervalVaryByLimiter{
			delay:   delay.Delay(),
			bursts:  bursts,
			vary:    vary,
			maxKeys: maxKeys,
		}
	} else {
		l = &intervalLimiter{
			delay:  delay.Delay(),
			bursts: bursts,
		}
	}
	return &Throttler{
		limiter: l,
	}
}

// The intervalLimiter struct implements an interval limiter with no vary-by
// criteria.
type intervalLimiter struct {
	delay  time.Duration
	bursts int

	bucket chan chan bool
}

// Start initializes the limiter for execution.
func (il *intervalLimiter) Start() {
	if il.bursts < 0 {
		il.bursts = 0
	}
	il.bucket = make(chan chan bool, il.bursts)
	go process(il.bucket, il.delay)
}

// Limit is called for each request to the throttled handler. It tries to
// queue the request to allow it to run at the given interval, but if the
// queue is full, the request is denied access.
func (il *intervalLimiter) Limit(w http.ResponseWriter, r *http.Request) (<-chan bool, error) {
	ch := make(chan bool, 1)
	select {
	case il.bucket <- ch:
		return ch, nil
	default:
		ch <- false
		return ch, nil
	}
}

// The intervalVaryByLimiter struct implements an interval limiter with a vary-by
// criteria.
type intervalVaryByLimiter struct {
	delay  time.Duration
	bursts int
	vary   *VaryBy

	lock    sync.RWMutex
	keys    *lru.Cache
	maxKeys int
}

// Start initializes the limiter for execution.
func (il *intervalVaryByLimiter) Start() {
	if il.bursts < 0 {
		il.bursts = 0
	}
	il.keys = lru.New(il.maxKeys)
	il.keys.OnEvicted = il.stopProcess
}

// Limit is called for each request to the throttled handler. It tries to
// queue the request for the vary-by key to allow it to run at the given interval,
// but if the queue is full, the request is denied access.
func (il *intervalVaryByLimiter) Limit(w http.ResponseWriter, r *http.Request) (<-chan bool, error) {
	ch := make(chan bool, 1)
	key := il.vary.Key(r)

	il.lock.RLock()
	item, ok := il.keys.Get(key)
	if !ok {
		// Create the key, bucket, start goroutine
		// First release the read lock and acquire a write lock
		il.lock.RUnlock()
		il.lock.Lock()
		// Create the bucket, add the key
		bucket := make(chan chan bool, il.bursts)
		il.keys.Add(key, bucket)
		// Start the goroutine to process this bucket
		go process(bucket, il.delay)
		item = bucket
		// Release the write lock, acquire the read lock
		il.lock.Unlock()
		il.lock.RLock()
	}
	defer il.lock.RUnlock()
	bucket := item.(chan chan bool)
	select {
	case bucket <- ch:
		return ch, nil
	default:
		ch <- false
		return ch, nil
	}
}

// process loops through the queued requests for a key's bucket, and sends
// requests through at the given interval.
func process(bucket chan chan bool, delay time.Duration) {
	after := time.After(0)
	for v := range bucket {
		<-after
		// Let the request go through
		v <- true
		// Wait the required duration
		after = time.After(delay)
	}
}

// stopProcess is called when a key is removed from the LRU cache so that its
// accompanying goroutine is correctly released.
func (il *intervalVaryByLimiter) stopProcess(key lru.Key, value interface{}) {
	close(value.(chan chan bool))
}