From d103ed6ca97ca5a2669f6cf5fe4b3d2a9c945f26 Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Wed, 17 May 2017 16:51:25 -0400 Subject: Upgrading server dependancies (#6431) --- vendor/github.com/armon/go-metrics/.gitignore | 24 ++ vendor/github.com/armon/go-metrics/LICENSE | 20 ++ vendor/github.com/armon/go-metrics/README.md | 74 ++++++ .../armon/go-metrics/circonus/circonus.go | 92 ++++++++ .../armon/go-metrics/circonus/circonus_test.go | 153 ++++++++++++ vendor/github.com/armon/go-metrics/const_unix.go | 12 + .../github.com/armon/go-metrics/const_windows.go | 13 + .../armon/go-metrics/datadog/dogstatsd.go | 125 ++++++++++ .../armon/go-metrics/datadog/dogstatsd_test.go | 147 ++++++++++++ vendor/github.com/armon/go-metrics/inmem.go | 247 +++++++++++++++++++ vendor/github.com/armon/go-metrics/inmem_signal.go | 100 ++++++++ .../armon/go-metrics/inmem_signal_test.go | 46 ++++ vendor/github.com/armon/go-metrics/inmem_test.go | 107 +++++++++ vendor/github.com/armon/go-metrics/metrics.go | 115 +++++++++ vendor/github.com/armon/go-metrics/metrics_test.go | 262 +++++++++++++++++++++ .../armon/go-metrics/prometheus/prometheus.go | 89 +++++++ vendor/github.com/armon/go-metrics/sink.go | 52 ++++ vendor/github.com/armon/go-metrics/sink_test.go | 120 ++++++++++ vendor/github.com/armon/go-metrics/start.go | 95 ++++++++ vendor/github.com/armon/go-metrics/start_test.go | 110 +++++++++ vendor/github.com/armon/go-metrics/statsd.go | 154 ++++++++++++ vendor/github.com/armon/go-metrics/statsd_test.go | 105 +++++++++ vendor/github.com/armon/go-metrics/statsite.go | 142 +++++++++++ .../github.com/armon/go-metrics/statsite_test.go | 101 ++++++++ 24 files changed, 2505 insertions(+) create mode 100755 vendor/github.com/armon/go-metrics/.gitignore create mode 100644 vendor/github.com/armon/go-metrics/LICENSE create mode 100644 vendor/github.com/armon/go-metrics/README.md create mode 100644 vendor/github.com/armon/go-metrics/circonus/circonus.go create mode 100644 vendor/github.com/armon/go-metrics/circonus/circonus_test.go create mode 100644 vendor/github.com/armon/go-metrics/const_unix.go create mode 100644 vendor/github.com/armon/go-metrics/const_windows.go create mode 100644 vendor/github.com/armon/go-metrics/datadog/dogstatsd.go create mode 100644 vendor/github.com/armon/go-metrics/datadog/dogstatsd_test.go create mode 100644 vendor/github.com/armon/go-metrics/inmem.go create mode 100644 vendor/github.com/armon/go-metrics/inmem_signal.go create mode 100644 vendor/github.com/armon/go-metrics/inmem_signal_test.go create mode 100644 vendor/github.com/armon/go-metrics/inmem_test.go create mode 100755 vendor/github.com/armon/go-metrics/metrics.go create mode 100644 vendor/github.com/armon/go-metrics/metrics_test.go create mode 100644 vendor/github.com/armon/go-metrics/prometheus/prometheus.go create mode 100755 vendor/github.com/armon/go-metrics/sink.go create mode 100755 vendor/github.com/armon/go-metrics/sink_test.go create mode 100755 vendor/github.com/armon/go-metrics/start.go create mode 100755 vendor/github.com/armon/go-metrics/start_test.go create mode 100644 vendor/github.com/armon/go-metrics/statsd.go create mode 100644 vendor/github.com/armon/go-metrics/statsd_test.go create mode 100755 vendor/github.com/armon/go-metrics/statsite.go create mode 100755 vendor/github.com/armon/go-metrics/statsite_test.go (limited to 'vendor/github.com/armon/go-metrics') diff --git a/vendor/github.com/armon/go-metrics/.gitignore b/vendor/github.com/armon/go-metrics/.gitignore new file mode 100755 index 000000000..8c03ec112 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/.gitignore @@ -0,0 +1,24 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe + +/metrics.out diff --git a/vendor/github.com/armon/go-metrics/LICENSE b/vendor/github.com/armon/go-metrics/LICENSE new file mode 100644 index 000000000..106569e54 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2013 Armon Dadgar + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/vendor/github.com/armon/go-metrics/README.md b/vendor/github.com/armon/go-metrics/README.md new file mode 100644 index 000000000..a7399cddf --- /dev/null +++ b/vendor/github.com/armon/go-metrics/README.md @@ -0,0 +1,74 @@ +go-metrics +========== + +This library provides a `metrics` package which can be used to instrument code, +expose application metrics, and profile runtime performance in a flexible manner. + +Current API: [![GoDoc](https://godoc.org/github.com/armon/go-metrics?status.svg)](https://godoc.org/github.com/armon/go-metrics) + +Sinks +===== + +The `metrics` package makes use of a `MetricSink` interface to support delivery +to any type of backend. Currently the following sinks are provided: + +* StatsiteSink : Sinks to a [statsite](https://github.com/armon/statsite/) instance (TCP) +* StatsdSink: Sinks to a [StatsD](https://github.com/etsy/statsd/) / statsite instance (UDP) +* PrometheusSink: Sinks to a [Prometheus](http://prometheus.io/) metrics endpoint (exposed via HTTP for scrapes) +* InmemSink : Provides in-memory aggregation, can be used to export stats +* FanoutSink : Sinks to multiple sinks. Enables writing to multiple statsite instances for example. +* BlackholeSink : Sinks to nowhere + +In addition to the sinks, the `InmemSignal` can be used to catch a signal, +and dump a formatted output of recent metrics. For example, when a process gets +a SIGUSR1, it can dump to stderr recent performance metrics for debugging. + +Examples +======== + +Here is an example of using the package: + +```go +func SlowMethod() { + // Profiling the runtime of a method + defer metrics.MeasureSince([]string{"SlowMethod"}, time.Now()) +} + +// Configure a statsite sink as the global metrics sink +sink, _ := metrics.NewStatsiteSink("statsite:8125") +metrics.NewGlobal(metrics.DefaultConfig("service-name"), sink) + +// Emit a Key/Value pair +metrics.EmitKey([]string{"questions", "meaning of life"}, 42) +``` + +Here is an example of setting up a signal handler: + +```go +// Setup the inmem sink and signal handler +inm := metrics.NewInmemSink(10*time.Second, time.Minute) +sig := metrics.DefaultInmemSignal(inm) +metrics.NewGlobal(metrics.DefaultConfig("service-name"), inm) + +// Run some code +inm.SetGauge([]string{"foo"}, 42) +inm.EmitKey([]string{"bar"}, 30) + +inm.IncrCounter([]string{"baz"}, 42) +inm.IncrCounter([]string{"baz"}, 1) +inm.IncrCounter([]string{"baz"}, 80) + +inm.AddSample([]string{"method", "wow"}, 42) +inm.AddSample([]string{"method", "wow"}, 100) +inm.AddSample([]string{"method", "wow"}, 22) + +.... +``` + +When a signal comes in, output like the following will be dumped to stderr: + + [2014-01-28 14:57:33.04 -0800 PST][G] 'foo': 42.000 + [2014-01-28 14:57:33.04 -0800 PST][P] 'bar': 30.000 + [2014-01-28 14:57:33.04 -0800 PST][C] 'baz': Count: 3 Min: 1.000 Mean: 41.000 Max: 80.000 Stddev: 39.509 + [2014-01-28 14:57:33.04 -0800 PST][S] 'method.wow': Count: 3 Min: 22.000 Mean: 54.667 Max: 100.000 Stddev: 40.513 + diff --git a/vendor/github.com/armon/go-metrics/circonus/circonus.go b/vendor/github.com/armon/go-metrics/circonus/circonus.go new file mode 100644 index 000000000..c6e3974b5 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/circonus/circonus.go @@ -0,0 +1,92 @@ +// Circonus Metrics Sink + +package circonus + +import ( + "strings" + + cgm "github.com/circonus-labs/circonus-gometrics" +) + +// CirconusSink provides an interface to forward metrics to Circonus with +// automatic check creation and metric management +type CirconusSink struct { + metrics *cgm.CirconusMetrics +} + +// Config options for CirconusSink +// See https://github.com/circonus-labs/circonus-gometrics for configuration options +type Config cgm.Config + +// NewCirconusSink - create new metric sink for circonus +// +// one of the following must be supplied: +// - API Token - search for an existing check or create a new check +// - API Token + Check Id - the check identified by check id will be used +// - API Token + Check Submission URL - the check identified by the submission url will be used +// - Check Submission URL - the check identified by the submission url will be used +// metric management will be *disabled* +// +// Note: If submission url is supplied w/o an api token, the public circonus ca cert will be used +// to verify the broker for metrics submission. +func NewCirconusSink(cc *Config) (*CirconusSink, error) { + cfg := cgm.Config{} + if cc != nil { + cfg = cgm.Config(*cc) + } + + metrics, err := cgm.NewCirconusMetrics(&cfg) + if err != nil { + return nil, err + } + + return &CirconusSink{ + metrics: metrics, + }, nil +} + +// Start submitting metrics to Circonus (flush every SubmitInterval) +func (s *CirconusSink) Start() { + s.metrics.Start() +} + +// Flush manually triggers metric submission to Circonus +func (s *CirconusSink) Flush() { + s.metrics.Flush() +} + +// SetGauge sets value for a gauge metric +func (s *CirconusSink) SetGauge(key []string, val float32) { + flatKey := s.flattenKey(key) + s.metrics.SetGauge(flatKey, int64(val)) +} + +// EmitKey is not implemented in circonus +func (s *CirconusSink) EmitKey(key []string, val float32) { + // NOP +} + +// IncrCounter increments a counter metric +func (s *CirconusSink) IncrCounter(key []string, val float32) { + flatKey := s.flattenKey(key) + s.metrics.IncrementByValue(flatKey, uint64(val)) +} + +// AddSample adds a sample to a histogram metric +func (s *CirconusSink) AddSample(key []string, val float32) { + flatKey := s.flattenKey(key) + s.metrics.RecordValue(flatKey, float64(val)) +} + +// Flattens key to Circonus metric name +func (s *CirconusSink) flattenKey(parts []string) string { + joined := strings.Join(parts, "`") + return strings.Map(func(r rune) rune { + switch r { + case ' ': + return '_' + default: + return r + } + }, joined) +} diff --git a/vendor/github.com/armon/go-metrics/circonus/circonus_test.go b/vendor/github.com/armon/go-metrics/circonus/circonus_test.go new file mode 100644 index 000000000..234a3cb89 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/circonus/circonus_test.go @@ -0,0 +1,153 @@ +package circonus + +import ( + "errors" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" +) + +func TestNewCirconusSink(t *testing.T) { + + // test with invalid config (nil) + expectedError := errors.New("Invalid check manager configuration (no API token AND no submission url).") + _, err := NewCirconusSink(nil) + if err == nil || err.Error() != expectedError.Error() { + t.Errorf("Expected an '%#v' error, got '%#v'", expectedError, err) + } + + // test w/submission url and w/o token + cfg := &Config{} + cfg.CheckManager.Check.SubmissionURL = "http://127.0.0.1:43191/" + _, err = NewCirconusSink(cfg) + if err != nil { + t.Errorf("Expected no error, got '%v'", err) + } + + // note: a test with a valid token is *not* done as it *will* create a + // check resulting in testing the api more than the circonus sink + // see circonus-gometrics/checkmgr/checkmgr_test.go for testing of api token +} + +func TestFlattenKey(t *testing.T) { + var testKeys = []struct { + input []string + expected string + }{ + {[]string{"a", "b", "c"}, "a`b`c"}, + {[]string{"a-a", "b_b", "c/c"}, "a-a`b_b`c/c"}, + {[]string{"spaces must", "flatten", "to", "underscores"}, "spaces_must`flatten`to`underscores"}, + } + + c := &CirconusSink{} + + for _, test := range testKeys { + if actual := c.flattenKey(test.input); actual != test.expected { + t.Fatalf("Flattening %v failed, expected '%s' got '%s'", test.input, test.expected, actual) + } + } +} + +func fakeBroker(q chan string) *httptest.Server { + handler := func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json") + defer r.Body.Close() + body, err := ioutil.ReadAll(r.Body) + if err != nil { + q <- err.Error() + fmt.Fprintln(w, err.Error()) + } else { + q <- string(body) + fmt.Fprintln(w, `{"stats":1}`) + } + } + + return httptest.NewServer(http.HandlerFunc(handler)) +} + +func TestSetGauge(t *testing.T) { + q := make(chan string) + + server := fakeBroker(q) + defer server.Close() + + cfg := &Config{} + cfg.CheckManager.Check.SubmissionURL = server.URL + + cs, err := NewCirconusSink(cfg) + if err != nil { + t.Errorf("Expected no error, got '%v'", err) + } + + go func() { + cs.SetGauge([]string{"foo", "bar"}, 1) + cs.Flush() + }() + + expect := "{\"foo`bar\":{\"_type\":\"n\",\"_value\":1}}" + actual := <-q + + if actual != expect { + t.Errorf("Expected '%s', got '%s'", expect, actual) + + } +} + +func TestIncrCounter(t *testing.T) { + q := make(chan string) + + server := fakeBroker(q) + defer server.Close() + + cfg := &Config{} + cfg.CheckManager.Check.SubmissionURL = server.URL + + cs, err := NewCirconusSink(cfg) + if err != nil { + t.Errorf("Expected no error, got '%v'", err) + } + + go func() { + cs.IncrCounter([]string{"foo", "bar"}, 1) + cs.Flush() + }() + + expect := "{\"foo`bar\":{\"_type\":\"n\",\"_value\":1}}" + actual := <-q + + if actual != expect { + t.Errorf("Expected '%s', got '%s'", expect, actual) + + } +} + +func TestAddSample(t *testing.T) { + q := make(chan string) + + server := fakeBroker(q) + defer server.Close() + + cfg := &Config{} + cfg.CheckManager.Check.SubmissionURL = server.URL + + cs, err := NewCirconusSink(cfg) + if err != nil { + t.Errorf("Expected no error, got '%v'", err) + } + + go func() { + cs.AddSample([]string{"foo", "bar"}, 1) + cs.Flush() + }() + + expect := "{\"foo`bar\":{\"_type\":\"n\",\"_value\":[\"H[1.0e+00]=1\"]}}" + actual := <-q + + if actual != expect { + t.Errorf("Expected '%s', got '%s'", expect, actual) + + } +} diff --git a/vendor/github.com/armon/go-metrics/const_unix.go b/vendor/github.com/armon/go-metrics/const_unix.go new file mode 100644 index 000000000..31098dd57 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/const_unix.go @@ -0,0 +1,12 @@ +// +build !windows + +package metrics + +import ( + "syscall" +) + +const ( + // DefaultSignal is used with DefaultInmemSignal + DefaultSignal = syscall.SIGUSR1 +) diff --git a/vendor/github.com/armon/go-metrics/const_windows.go b/vendor/github.com/armon/go-metrics/const_windows.go new file mode 100644 index 000000000..38136af3e --- /dev/null +++ b/vendor/github.com/armon/go-metrics/const_windows.go @@ -0,0 +1,13 @@ +// +build windows + +package metrics + +import ( + "syscall" +) + +const ( + // DefaultSignal is used with DefaultInmemSignal + // Windows has no SIGUSR1, use SIGBREAK + DefaultSignal = syscall.Signal(21) +) diff --git a/vendor/github.com/armon/go-metrics/datadog/dogstatsd.go b/vendor/github.com/armon/go-metrics/datadog/dogstatsd.go new file mode 100644 index 000000000..aaba9fe0e --- /dev/null +++ b/vendor/github.com/armon/go-metrics/datadog/dogstatsd.go @@ -0,0 +1,125 @@ +package datadog + +import ( + "fmt" + "strings" + + "github.com/DataDog/datadog-go/statsd" +) + +// DogStatsdSink provides a MetricSink that can be used +// with a dogstatsd server. It utilizes the Dogstatsd client at github.com/DataDog/datadog-go/statsd +type DogStatsdSink struct { + client *statsd.Client + hostName string + propagateHostname bool +} + +// NewDogStatsdSink is used to create a new DogStatsdSink with sane defaults +func NewDogStatsdSink(addr string, hostName string) (*DogStatsdSink, error) { + client, err := statsd.New(addr) + if err != nil { + return nil, err + } + sink := &DogStatsdSink{ + client: client, + hostName: hostName, + propagateHostname: false, + } + return sink, nil +} + +// SetTags sets common tags on the Dogstatsd Client that will be sent +// along with all dogstatsd packets. +// Ref: http://docs.datadoghq.com/guides/dogstatsd/#tags +func (s *DogStatsdSink) SetTags(tags []string) { + s.client.Tags = tags +} + +// EnableHostnamePropagation forces a Dogstatsd `host` tag with the value specified by `s.HostName` +// Since the go-metrics package has its own mechanism for attaching a hostname to metrics, +// setting the `propagateHostname` flag ensures that `s.HostName` overrides the host tag naively set by the DogStatsd server +func (s *DogStatsdSink) EnableHostNamePropagation() { + s.propagateHostname = true +} + +func (s *DogStatsdSink) flattenKey(parts []string) string { + joined := strings.Join(parts, ".") + return strings.Map(func(r rune) rune { + switch r { + case ':': + fallthrough + case ' ': + return '_' + default: + return r + } + }, joined) +} + +func (s *DogStatsdSink) parseKey(key []string) ([]string, []string) { + // Since DogStatsd supports dimensionality via tags on metric keys, this sink's approach is to splice the hostname out of the key in favor of a `host` tag + // The `host` tag is either forced here, or set downstream by the DogStatsd server + + var tags []string + hostName := s.hostName + + //Splice the hostname out of the key + for i, el := range key { + if el == hostName { + key = append(key[:i], key[i+1:]...) + } + } + + if s.propagateHostname { + tags = append(tags, fmt.Sprintf("host:%s", hostName)) + } + return key, tags +} + +// Implementation of methods in the MetricSink interface + +func (s *DogStatsdSink) SetGauge(key []string, val float32) { + s.SetGaugeWithTags(key, val, []string{}) +} + +func (s *DogStatsdSink) IncrCounter(key []string, val float32) { + s.IncrCounterWithTags(key, val, []string{}) +} + +// EmitKey is not implemented since DogStatsd does not provide a metric type that holds an +// arbitrary number of values +func (s *DogStatsdSink) EmitKey(key []string, val float32) { +} + +func (s *DogStatsdSink) AddSample(key []string, val float32) { + s.AddSampleWithTags(key, val, []string{}) +} + +// The following ...WithTags methods correspond to Datadog's Tag extension to Statsd. +// http://docs.datadoghq.com/guides/dogstatsd/#tags + +func (s *DogStatsdSink) SetGaugeWithTags(key []string, val float32, tags []string) { + flatKey, tags := s.getFlatkeyAndCombinedTags(key, tags) + rate := 1.0 + s.client.Gauge(flatKey, float64(val), tags, rate) +} + +func (s *DogStatsdSink) IncrCounterWithTags(key []string, val float32, tags []string) { + flatKey, tags := s.getFlatkeyAndCombinedTags(key, tags) + rate := 1.0 + s.client.Count(flatKey, int64(val), tags, rate) +} + +func (s *DogStatsdSink) AddSampleWithTags(key []string, val float32, tags []string) { + flatKey, tags := s.getFlatkeyAndCombinedTags(key, tags) + rate := 1.0 + s.client.TimeInMilliseconds(flatKey, float64(val), tags, rate) +} + +func (s *DogStatsdSink) getFlatkeyAndCombinedTags(key []string, tags []string) (flattenedKey string, combinedTags []string) { + key, hostTags := s.parseKey(key) + flatKey := s.flattenKey(key) + tags = append(tags, hostTags...) + return flatKey, tags +} diff --git a/vendor/github.com/armon/go-metrics/datadog/dogstatsd_test.go b/vendor/github.com/armon/go-metrics/datadog/dogstatsd_test.go new file mode 100644 index 000000000..0ec51e3f1 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/datadog/dogstatsd_test.go @@ -0,0 +1,147 @@ +package datadog + +import ( + "fmt" + "net" + "reflect" + "testing" +) + +var EmptyTags []string + +const ( + DogStatsdAddr = "127.0.0.1:7254" + HostnameEnabled = true + HostnameDisabled = false + TestHostname = "test_hostname" +) + +func MockGetHostname() string { + return TestHostname +} + +var ParseKeyTests = []struct { + KeyToParse []string + Tags []string + PropagateHostname bool + ExpectedKey []string + ExpectedTags []string +}{ + {[]string{"a", MockGetHostname(), "b", "c"}, EmptyTags, HostnameDisabled, []string{"a", "b", "c"}, EmptyTags}, + {[]string{"a", "b", "c"}, EmptyTags, HostnameDisabled, []string{"a", "b", "c"}, EmptyTags}, + {[]string{"a", "b", "c"}, EmptyTags, HostnameEnabled, []string{"a", "b", "c"}, []string{fmt.Sprintf("host:%s", MockGetHostname())}}, +} + +var FlattenKeyTests = []struct { + KeyToFlatten []string + Expected string +}{ + {[]string{"a", "b", "c"}, "a.b.c"}, + {[]string{"spaces must", "flatten", "to", "underscores"}, "spaces_must.flatten.to.underscores"}, +} + +var MetricSinkTests = []struct { + Method string + Metric []string + Value interface{} + Tags []string + PropagateHostname bool + Expected string +}{ + {"SetGauge", []string{"foo", "bar"}, float32(42), EmptyTags, HostnameDisabled, "foo.bar:42.000000|g"}, + {"SetGauge", []string{"foo", "bar", "baz"}, float32(42), EmptyTags, HostnameDisabled, "foo.bar.baz:42.000000|g"}, + {"AddSample", []string{"sample", "thing"}, float32(4), EmptyTags, HostnameDisabled, "sample.thing:4.000000|ms"}, + {"IncrCounter", []string{"count", "me"}, float32(3), EmptyTags, HostnameDisabled, "count.me:3|c"}, + + {"SetGauge", []string{"foo", "baz"}, float32(42), []string{"my_tag:my_value"}, HostnameDisabled, "foo.baz:42.000000|g|#my_tag:my_value"}, + {"SetGauge", []string{"foo", "bar"}, float32(42), []string{"my_tag:my_value", "other_tag:other_value"}, HostnameDisabled, "foo.bar:42.000000|g|#my_tag:my_value,other_tag:other_value"}, + {"SetGauge", []string{"foo", "bar"}, float32(42), []string{"my_tag:my_value", "other_tag:other_value"}, HostnameEnabled, "foo.bar:42.000000|g|#my_tag:my_value,other_tag:other_value,host:test_hostname"}, +} + +func mockNewDogStatsdSink(addr string, tags []string, tagWithHostname bool) *DogStatsdSink { + dog, _ := NewDogStatsdSink(addr, MockGetHostname()) + dog.SetTags(tags) + if tagWithHostname { + dog.EnableHostNamePropagation() + } + + return dog +} + +func setupTestServerAndBuffer(t *testing.T) (*net.UDPConn, []byte) { + udpAddr, err := net.ResolveUDPAddr("udp", DogStatsdAddr) + if err != nil { + t.Fatal(err) + } + server, err := net.ListenUDP("udp", udpAddr) + if err != nil { + t.Fatal(err) + } + return server, make([]byte, 1024) +} + +func TestParseKey(t *testing.T) { + for _, tt := range ParseKeyTests { + dog := mockNewDogStatsdSink(DogStatsdAddr, tt.Tags, tt.PropagateHostname) + key, tags := dog.parseKey(tt.KeyToParse) + + if !reflect.DeepEqual(key, tt.ExpectedKey) { + t.Fatalf("Key Parsing failed for %v", tt.KeyToParse) + } + + if !reflect.DeepEqual(tags, tt.ExpectedTags) { + t.Fatalf("Tag Parsing Failed for %v", tt.KeyToParse) + } + } +} + +func TestFlattenKey(t *testing.T) { + dog := mockNewDogStatsdSink(DogStatsdAddr, EmptyTags, HostnameDisabled) + for _, tt := range FlattenKeyTests { + if !reflect.DeepEqual(dog.flattenKey(tt.KeyToFlatten), tt.Expected) { + t.Fatalf("Flattening %v failed", tt.KeyToFlatten) + } + } +} + +func TestMetricSink(t *testing.T) { + server, buf := setupTestServerAndBuffer(t) + defer server.Close() + + for _, tt := range MetricSinkTests { + dog := mockNewDogStatsdSink(DogStatsdAddr, tt.Tags, tt.PropagateHostname) + method := reflect.ValueOf(dog).MethodByName(tt.Method) + method.Call([]reflect.Value{ + reflect.ValueOf(tt.Metric), + reflect.ValueOf(tt.Value)}) + assertServerMatchesExpected(t, server, buf, tt.Expected) + } +} + +func TestTaggableMetrics(t *testing.T) { + server, buf := setupTestServerAndBuffer(t) + defer server.Close() + + dog := mockNewDogStatsdSink(DogStatsdAddr, EmptyTags, HostnameDisabled) + + dog.AddSampleWithTags([]string{"sample", "thing"}, float32(4), []string{"tagkey:tagvalue"}) + assertServerMatchesExpected(t, server, buf, "sample.thing:4.000000|ms|#tagkey:tagvalue") + + dog.SetGaugeWithTags([]string{"sample", "thing"}, float32(4), []string{"tagkey:tagvalue"}) + assertServerMatchesExpected(t, server, buf, "sample.thing:4.000000|g|#tagkey:tagvalue") + + dog.IncrCounterWithTags([]string{"sample", "thing"}, float32(4), []string{"tagkey:tagvalue"}) + assertServerMatchesExpected(t, server, buf, "sample.thing:4|c|#tagkey:tagvalue") + + dog = mockNewDogStatsdSink(DogStatsdAddr, []string{"global"}, HostnameEnabled) // with hostname, global tags + dog.IncrCounterWithTags([]string{"sample", "thing"}, float32(4), []string{"tagkey:tagvalue"}) + assertServerMatchesExpected(t, server, buf, "sample.thing:4|c|#global,tagkey:tagvalue,host:test_hostname") +} + +func assertServerMatchesExpected(t *testing.T, server *net.UDPConn, buf []byte, expected string) { + n, _ := server.Read(buf) + msg := buf[:n] + if string(msg) != expected { + t.Fatalf("Line %s does not match expected: %s", string(msg), expected) + } +} diff --git a/vendor/github.com/armon/go-metrics/inmem.go b/vendor/github.com/armon/go-metrics/inmem.go new file mode 100644 index 000000000..83fb6bba0 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/inmem.go @@ -0,0 +1,247 @@ +package metrics + +import ( + "fmt" + "math" + "strings" + "sync" + "time" +) + +// InmemSink provides a MetricSink that does in-memory aggregation +// without sending metrics over a network. It can be embedded within +// an application to provide profiling information. +type InmemSink struct { + // How long is each aggregation interval + interval time.Duration + + // Retain controls how many metrics interval we keep + retain time.Duration + + // maxIntervals is the maximum length of intervals. + // It is retain / interval. + maxIntervals int + + // intervals is a slice of the retained intervals + intervals []*IntervalMetrics + intervalLock sync.RWMutex + + rateDenom float64 +} + +// IntervalMetrics stores the aggregated metrics +// for a specific interval +type IntervalMetrics struct { + sync.RWMutex + + // The start time of the interval + Interval time.Time + + // Gauges maps the key to the last set value + Gauges map[string]float32 + + // Points maps the string to the list of emitted values + // from EmitKey + Points map[string][]float32 + + // Counters maps the string key to a sum of the counter + // values + Counters map[string]*AggregateSample + + // Samples maps the key to an AggregateSample, + // which has the rolled up view of a sample + Samples map[string]*AggregateSample +} + +// NewIntervalMetrics creates a new IntervalMetrics for a given interval +func NewIntervalMetrics(intv time.Time) *IntervalMetrics { + return &IntervalMetrics{ + Interval: intv, + Gauges: make(map[string]float32), + Points: make(map[string][]float32), + Counters: make(map[string]*AggregateSample), + Samples: make(map[string]*AggregateSample), + } +} + +// AggregateSample is used to hold aggregate metrics +// about a sample +type AggregateSample struct { + Count int // The count of emitted pairs + Rate float64 // The count of emitted pairs per time unit (usually 1 second) + Sum float64 // The sum of values + SumSq float64 // The sum of squared values + Min float64 // Minimum value + Max float64 // Maximum value + LastUpdated time.Time // When value was last updated +} + +// Computes a Stddev of the values +func (a *AggregateSample) Stddev() float64 { + num := (float64(a.Count) * a.SumSq) - math.Pow(a.Sum, 2) + div := float64(a.Count * (a.Count - 1)) + if div == 0 { + return 0 + } + return math.Sqrt(num / div) +} + +// Computes a mean of the values +func (a *AggregateSample) Mean() float64 { + if a.Count == 0 { + return 0 + } + return a.Sum / float64(a.Count) +} + +// Ingest is used to update a sample +func (a *AggregateSample) Ingest(v float64, rateDenom float64) { + a.Count++ + a.Sum += v + a.SumSq += (v * v) + if v < a.Min || a.Count == 1 { + a.Min = v + } + if v > a.Max || a.Count == 1 { + a.Max = v + } + a.Rate = float64(a.Count)/rateDenom + a.LastUpdated = time.Now() +} + +func (a *AggregateSample) String() string { + if a.Count == 0 { + return "Count: 0" + } else if a.Stddev() == 0 { + return fmt.Sprintf("Count: %d Sum: %0.3f LastUpdated: %s", a.Count, a.Sum, a.LastUpdated) + } else { + return fmt.Sprintf("Count: %d Min: %0.3f Mean: %0.3f Max: %0.3f Stddev: %0.3f Sum: %0.3f LastUpdated: %s", + a.Count, a.Min, a.Mean(), a.Max, a.Stddev(), a.Sum, a.LastUpdated) + } +} + +// NewInmemSink is used to construct a new in-memory sink. +// Uses an aggregation interval and maximum retention period. +func NewInmemSink(interval, retain time.Duration) *InmemSink { + rateTimeUnit := time.Second + i := &InmemSink{ + interval: interval, + retain: retain, + maxIntervals: int(retain / interval), + rateDenom: float64(interval.Nanoseconds()) / float64(rateTimeUnit.Nanoseconds()), + } + i.intervals = make([]*IntervalMetrics, 0, i.maxIntervals) + return i +} + +func (i *InmemSink) SetGauge(key []string, val float32) { + k := i.flattenKey(key) + intv := i.getInterval() + + intv.Lock() + defer intv.Unlock() + intv.Gauges[k] = val +} + +func (i *InmemSink) EmitKey(key []string, val float32) { + k := i.flattenKey(key) + intv := i.getInterval() + + intv.Lock() + defer intv.Unlock() + vals := intv.Points[k] + intv.Points[k] = append(vals, val) +} + +func (i *InmemSink) IncrCounter(key []string, val float32) { + k := i.flattenKey(key) + intv := i.getInterval() + + intv.Lock() + defer intv.Unlock() + + agg := intv.Counters[k] + if agg == nil { + agg = &AggregateSample{} + intv.Counters[k] = agg + } + agg.Ingest(float64(val), i.rateDenom) +} + +func (i *InmemSink) AddSample(key []string, val float32) { + k := i.flattenKey(key) + intv := i.getInterval() + + intv.Lock() + defer intv.Unlock() + + agg := intv.Samples[k] + if agg == nil { + agg = &AggregateSample{} + intv.Samples[k] = agg + } + agg.Ingest(float64(val), i.rateDenom) +} + +// Data is used to retrieve all the aggregated metrics +// Intervals may be in use, and a read lock should be acquired +func (i *InmemSink) Data() []*IntervalMetrics { + // Get the current interval, forces creation + i.getInterval() + + i.intervalLock.RLock() + defer i.intervalLock.RUnlock() + + intervals := make([]*IntervalMetrics, len(i.intervals)) + copy(intervals, i.intervals) + return intervals +} + +func (i *InmemSink) getExistingInterval(intv time.Time) *IntervalMetrics { + i.intervalLock.RLock() + defer i.intervalLock.RUnlock() + + n := len(i.intervals) + if n > 0 && i.intervals[n-1].Interval == intv { + return i.intervals[n-1] + } + return nil +} + +func (i *InmemSink) createInterval(intv time.Time) *IntervalMetrics { + i.intervalLock.Lock() + defer i.intervalLock.Unlock() + + // Check for an existing interval + n := len(i.intervals) + if n > 0 && i.intervals[n-1].Interval == intv { + return i.intervals[n-1] + } + + // Add the current interval + current := NewIntervalMetrics(intv) + i.intervals = append(i.intervals, current) + n++ + + // Truncate the intervals if they are too long + if n >= i.maxIntervals { + copy(i.intervals[0:], i.intervals[n-i.maxIntervals:]) + i.intervals = i.intervals[:i.maxIntervals] + } + return current +} + +// getInterval returns the current interval to write to +func (i *InmemSink) getInterval() *IntervalMetrics { + intv := time.Now().Truncate(i.interval) + if m := i.getExistingInterval(intv); m != nil { + return m + } + return i.createInterval(intv) +} + +// Flattens the key for formatting, removes spaces +func (i *InmemSink) flattenKey(parts []string) string { + joined := strings.Join(parts, ".") + return strings.Replace(joined, " ", "_", -1) +} diff --git a/vendor/github.com/armon/go-metrics/inmem_signal.go b/vendor/github.com/armon/go-metrics/inmem_signal.go new file mode 100644 index 000000000..95d08ee10 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/inmem_signal.go @@ -0,0 +1,100 @@ +package metrics + +import ( + "bytes" + "fmt" + "io" + "os" + "os/signal" + "sync" + "syscall" +) + +// InmemSignal is used to listen for a given signal, and when received, +// to dump the current metrics from the InmemSink to an io.Writer +type InmemSignal struct { + signal syscall.Signal + inm *InmemSink + w io.Writer + sigCh chan os.Signal + + stop bool + stopCh chan struct{} + stopLock sync.Mutex +} + +// NewInmemSignal creates a new InmemSignal which listens for a given signal, +// and dumps the current metrics out to a writer +func NewInmemSignal(inmem *InmemSink, sig syscall.Signal, w io.Writer) *InmemSignal { + i := &InmemSignal{ + signal: sig, + inm: inmem, + w: w, + sigCh: make(chan os.Signal, 1), + stopCh: make(chan struct{}), + } + signal.Notify(i.sigCh, sig) + go i.run() + return i +} + +// DefaultInmemSignal returns a new InmemSignal that responds to SIGUSR1 +// and writes output to stderr. Windows uses SIGBREAK +func DefaultInmemSignal(inmem *InmemSink) *InmemSignal { + return NewInmemSignal(inmem, DefaultSignal, os.Stderr) +} + +// Stop is used to stop the InmemSignal from listening +func (i *InmemSignal) Stop() { + i.stopLock.Lock() + defer i.stopLock.Unlock() + + if i.stop { + return + } + i.stop = true + close(i.stopCh) + signal.Stop(i.sigCh) +} + +// run is a long running routine that handles signals +func (i *InmemSignal) run() { + for { + select { + case <-i.sigCh: + i.dumpStats() + case <-i.stopCh: + return + } + } +} + +// dumpStats is used to dump the data to output writer +func (i *InmemSignal) dumpStats() { + buf := bytes.NewBuffer(nil) + + data := i.inm.Data() + // Skip the last period which is still being aggregated + for i := 0; i < len(data)-1; i++ { + intv := data[i] + intv.RLock() + for name, val := range intv.Gauges { + fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val) + } + for name, vals := range intv.Points { + for _, val := range vals { + fmt.Fprintf(buf, "[%v][P] '%s': %0.3f\n", intv.Interval, name, val) + } + } + for name, agg := range intv.Counters { + fmt.Fprintf(buf, "[%v][C] '%s': %s\n", intv.Interval, name, agg) + } + for name, agg := range intv.Samples { + fmt.Fprintf(buf, "[%v][S] '%s': %s\n", intv.Interval, name, agg) + } + intv.RUnlock() + } + + // Write out the bytes + i.w.Write(buf.Bytes()) +} diff --git a/vendor/github.com/armon/go-metrics/inmem_signal_test.go b/vendor/github.com/armon/go-metrics/inmem_signal_test.go new file mode 100644 index 000000000..9bbca5f25 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/inmem_signal_test.go @@ -0,0 +1,46 @@ +package metrics + +import ( + "bytes" + "os" + "strings" + "syscall" + "testing" + "time" +) + +func TestInmemSignal(t *testing.T) { + buf := bytes.NewBuffer(nil) + inm := NewInmemSink(10*time.Millisecond, 50*time.Millisecond) + sig := NewInmemSignal(inm, syscall.SIGUSR1, buf) + defer sig.Stop() + + inm.SetGauge([]string{"foo"}, 42) + inm.EmitKey([]string{"bar"}, 42) + inm.IncrCounter([]string{"baz"}, 42) + inm.AddSample([]string{"wow"}, 42) + + // Wait for period to end + time.Sleep(15 * time.Millisecond) + + // Send signal! + syscall.Kill(os.Getpid(), syscall.SIGUSR1) + + // Wait for flush + time.Sleep(10 * time.Millisecond) + + // Check the output + out := string(buf.Bytes()) + if !strings.Contains(out, "[G] 'foo': 42") { + t.Fatalf("bad: %v", out) + } + if !strings.Contains(out, "[P] 'bar': 42") { + t.Fatalf("bad: %v", out) + } + if !strings.Contains(out, "[C] 'baz': Count: 1 Sum: 42") { + t.Fatalf("bad: %v", out) + } + if !strings.Contains(out, "[S] 'wow': Count: 1 Sum: 42") { + t.Fatalf("bad: %v", out) + } +} diff --git a/vendor/github.com/armon/go-metrics/inmem_test.go b/vendor/github.com/armon/go-metrics/inmem_test.go new file mode 100644 index 000000000..1c2455114 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/inmem_test.go @@ -0,0 +1,107 @@ +package metrics + +import ( + "math" + "testing" + "time" +) + +func TestInmemSink(t *testing.T) { + inm := NewInmemSink(10*time.Millisecond, 50*time.Millisecond) + + data := inm.Data() + if len(data) != 1 { + t.Fatalf("bad: %v", data) + } + + // Add data points + inm.SetGauge([]string{"foo", "bar"}, 42) + inm.EmitKey([]string{"foo", "bar"}, 42) + inm.IncrCounter([]string{"foo", "bar"}, 20) + inm.IncrCounter([]string{"foo", "bar"}, 22) + inm.AddSample([]string{"foo", "bar"}, 20) + inm.AddSample([]string{"foo", "bar"}, 22) + + data = inm.Data() + if len(data) != 1 { + t.Fatalf("bad: %v", data) + } + + intvM := data[0] + intvM.RLock() + + if time.Now().Sub(intvM.Interval) > 10*time.Millisecond { + t.Fatalf("interval too old") + } + if intvM.Gauges["foo.bar"] != 42 { + t.Fatalf("bad val: %v", intvM.Gauges) + } + if intvM.Points["foo.bar"][0] != 42 { + t.Fatalf("bad val: %v", intvM.Points) + } + + agg := intvM.Counters["foo.bar"] + if agg.Count != 2 { + t.Fatalf("bad val: %v", agg) + } + if agg.Rate != 200 { + t.Fatalf("bad val: %v", agg.Rate) + } + if agg.Sum != 42 { + t.Fatalf("bad val: %v", agg) + } + if agg.SumSq != 884 { + t.Fatalf("bad val: %v", agg) + } + if agg.Min != 20 { + t.Fatalf("bad val: %v", agg) + } + if agg.Max != 22 { + t.Fatalf("bad val: %v", agg) + } + if agg.Mean() != 21 { + t.Fatalf("bad val: %v", agg) + } + if agg.Stddev() != math.Sqrt(2) { + t.Fatalf("bad val: %v", agg) + } + + if agg.LastUpdated.IsZero() { + t.Fatalf("agg.LastUpdated is not set: %v", agg) + } + + diff := time.Now().Sub(agg.LastUpdated).Seconds() + if diff > 1 { + t.Fatalf("time diff too great: %f", diff) + } + + if agg = intvM.Samples["foo.bar"]; agg == nil { + t.Fatalf("missing sample") + } + + intvM.RUnlock() + + for i := 1; i < 10; i++ { + time.Sleep(10 * time.Millisecond) + inm.SetGauge([]string{"foo", "bar"}, 42) + data = inm.Data() + if len(data) != min(i+1, 5) { + t.Fatalf("bad: %v", data) + } + } + + // Should not exceed 5 intervals! + time.Sleep(10 * time.Millisecond) + inm.SetGauge([]string{"foo", "bar"}, 42) + data = inm.Data() + if len(data) != 5 { + t.Fatalf("bad: %v", data) + } +} + +func min(a, b int) int { + if a < b { + return a + } + return b +} diff --git a/vendor/github.com/armon/go-metrics/metrics.go b/vendor/github.com/armon/go-metrics/metrics.go new file mode 100755 index 000000000..b818e4182 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/metrics.go @@ -0,0 +1,115 @@ +package metrics + +import ( + "runtime" + "time" +) + +func (m *Metrics) SetGauge(key []string, val float32) { + if m.HostName != "" && m.EnableHostname { + key = insert(0, m.HostName, key) + } + if m.EnableTypePrefix { + key = insert(0, "gauge", key) + } + if m.ServiceName != "" { + key = insert(0, m.ServiceName, key) + } + m.sink.SetGauge(key, val) +} + +func (m *Metrics) EmitKey(key []string, val float32) { + if m.EnableTypePrefix { + key = insert(0, "kv", key) + } + if m.ServiceName != "" { + key = insert(0, m.ServiceName, key) + } + m.sink.EmitKey(key, val) +} + +func (m *Metrics) IncrCounter(key []string, val float32) { + if m.EnableTypePrefix { + key = insert(0, "counter", key) + } + if m.ServiceName != "" { + key = insert(0, m.ServiceName, key) + } + m.sink.IncrCounter(key, val) +} + +func (m *Metrics) AddSample(key []string, val float32) { + if m.EnableTypePrefix { + key = insert(0, "sample", key) + } + if m.ServiceName != "" { + key = insert(0, m.ServiceName, key) + } + m.sink.AddSample(key, val) +} + +func (m *Metrics) MeasureSince(key []string, start time.Time) { + if m.EnableTypePrefix { + key = insert(0, "timer", key) + } + if m.ServiceName != "" { + key = insert(0, m.ServiceName, key) + } + now := time.Now() + elapsed := now.Sub(start) + msec := float32(elapsed.Nanoseconds()) / float32(m.TimerGranularity) + m.sink.AddSample(key, msec) +} + +// Periodically collects runtime stats to publish +func (m *Metrics) collectStats() { + for { + time.Sleep(m.ProfileInterval) + m.emitRuntimeStats() + } +} + +// Emits various runtime statsitics +func (m *Metrics) emitRuntimeStats() { + // Export number of Goroutines + numRoutines := runtime.NumGoroutine() + m.SetGauge([]string{"runtime", "num_goroutines"}, float32(numRoutines)) + + // Export memory stats + var stats runtime.MemStats + runtime.ReadMemStats(&stats) + m.SetGauge([]string{"runtime", "alloc_bytes"}, float32(stats.Alloc)) + m.SetGauge([]string{"runtime", "sys_bytes"}, float32(stats.Sys)) + m.SetGauge([]string{"runtime", "malloc_count"}, float32(stats.Mallocs)) + m.SetGauge([]string{"runtime", "free_count"}, float32(stats.Frees)) + m.SetGauge([]string{"runtime", "heap_objects"}, float32(stats.HeapObjects)) + m.SetGauge([]string{"runtime", "total_gc_pause_ns"}, float32(stats.PauseTotalNs)) + m.SetGauge([]string{"runtime", "total_gc_runs"}, float32(stats.NumGC)) + + // Export info about the last few GC runs + num := stats.NumGC + + // Handle wrap around + if num < m.lastNumGC { + m.lastNumGC = 0 + } + + // Ensure we don't scan more than 256 + if num-m.lastNumGC >= 256 { + m.lastNumGC = num - 255 + } + + for i := m.lastNumGC; i < num; i++ { + pause := stats.PauseNs[i%256] + m.AddSample([]string{"runtime", "gc_pause_ns"}, float32(pause)) + } + m.lastNumGC = num +} + +// Inserts a string value at an index into the slice +func insert(i int, v string, s []string) []string { + s = append(s, "") + copy(s[i+1:], s[i:]) + s[i] = v + return s +} diff --git a/vendor/github.com/armon/go-metrics/metrics_test.go b/vendor/github.com/armon/go-metrics/metrics_test.go new file mode 100644 index 000000000..f5b2a4c79 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/metrics_test.go @@ -0,0 +1,262 @@ +package metrics + +import ( + "reflect" + "runtime" + "testing" + "time" +) + +func mockMetric() (*MockSink, *Metrics) { + m := &MockSink{} + met := &Metrics{sink: m} + return m, met +} + +func TestMetrics_SetGauge(t *testing.T) { + m, met := mockMetric() + met.SetGauge([]string{"key"}, float32(1)) + if m.keys[0][0] != "key" { + t.Fatalf("") + } + if m.vals[0] != 1 { + t.Fatalf("") + } + + m, met = mockMetric() + met.HostName = "test" + met.EnableHostname = true + met.SetGauge([]string{"key"}, float32(1)) + if m.keys[0][0] != "test" || m.keys[0][1] != "key" { + t.Fatalf("") + } + if m.vals[0] != 1 { + t.Fatalf("") + } + + m, met = mockMetric() + met.EnableTypePrefix = true + met.SetGauge([]string{"key"}, float32(1)) + if m.keys[0][0] != "gauge" || m.keys[0][1] != "key" { + t.Fatalf("") + } + if m.vals[0] != 1 { + t.Fatalf("") + } + + m, met = mockMetric() + met.ServiceName = "service" + met.SetGauge([]string{"key"}, float32(1)) + if m.keys[0][0] != "service" || m.keys[0][1] != "key" { + t.Fatalf("") + } + if m.vals[0] != 1 { + t.Fatalf("") + } +} + +func TestMetrics_EmitKey(t *testing.T) { + m, met := mockMetric() + met.EmitKey([]string{"key"}, float32(1)) + if m.keys[0][0] != "key" { + t.Fatalf("") + } + if m.vals[0] != 1 { + t.Fatalf("") + } + + m, met = mockMetric() + met.EnableTypePrefix = true + met.EmitKey([]string{"key"}, float32(1)) + if m.keys[0][0] != "kv" || m.keys[0][1] != "key" { + t.Fatalf("") + } + if m.vals[0] != 1 { + t.Fatalf("") + } + + m, met = mockMetric() + met.ServiceName = "service" + met.EmitKey([]string{"key"}, float32(1)) + if m.keys[0][0] != "service" || m.keys[0][1] != "key" { + t.Fatalf("") + } + if m.vals[0] != 1 { + t.Fatalf("") + } +} + +func TestMetrics_IncrCounter(t *testing.T) { + m, met := mockMetric() + met.IncrCounter([]string{"key"}, float32(1)) + if m.keys[0][0] != "key" { + t.Fatalf("") + } + if m.vals[0] != 1 { + t.Fatalf("") + } + + m, met = mockMetric() + met.EnableTypePrefix = true + met.IncrCounter([]string{"key"}, float32(1)) + if m.keys[0][0] != "counter" || m.keys[0][1] != "key" { + t.Fatalf("") + } + if m.vals[0] != 1 { + t.Fatalf("") + } + + m, met = mockMetric() + met.ServiceName = "service" + met.IncrCounter([]string{"key"}, float32(1)) + if m.keys[0][0] != "service" || m.keys[0][1] != "key" { + t.Fatalf("") + } + if m.vals[0] != 1 { + t.Fatalf("") + } +} + +func TestMetrics_AddSample(t *testing.T) { + m, met := mockMetric() + met.AddSample([]string{"key"}, float32(1)) + if m.keys[0][0] != "key" { + t.Fatalf("") + } + if m.vals[0] != 1 { + t.Fatalf("") + } + + m, met = mockMetric() + met.EnableTypePrefix = true + met.AddSample([]string{"key"}, float32(1)) + if m.keys[0][0] != "sample" || m.keys[0][1] != "key" { + t.Fatalf("") + } + if m.vals[0] != 1 { + t.Fatalf("") + } + + m, met = mockMetric() + met.ServiceName = "service" + met.AddSample([]string{"key"}, float32(1)) + if m.keys[0][0] != "service" || m.keys[0][1] != "key" { + t.Fatalf("") + } + if m.vals[0] != 1 { + t.Fatalf("") + } +} + +func TestMetrics_MeasureSince(t *testing.T) { + m, met := mockMetric() + met.TimerGranularity = time.Millisecond + n := time.Now() + met.MeasureSince([]string{"key"}, n) + if m.keys[0][0] != "key" { + t.Fatalf("") + } + if m.vals[0] > 0.1 { + t.Fatalf("") + } + + m, met = mockMetric() + met.TimerGranularity = time.Millisecond + met.EnableTypePrefix = true + met.MeasureSince([]string{"key"}, n) + if m.keys[0][0] != "timer" || m.keys[0][1] != "key" { + t.Fatalf("") + } + if m.vals[0] > 0.1 { + t.Fatalf("") + } + + m, met = mockMetric() + met.TimerGranularity = time.Millisecond + met.ServiceName = "service" + met.MeasureSince([]string{"key"}, n) + if m.keys[0][0] != "service" || m.keys[0][1] != "key" { + t.Fatalf("") + } + if m.vals[0] > 0.1 { + t.Fatalf("") + } +} + +func TestMetrics_EmitRuntimeStats(t *testing.T) { + runtime.GC() + m, met := mockMetric() + met.emitRuntimeStats() + + if m.keys[0][0] != "runtime" || m.keys[0][1] != "num_goroutines" { + t.Fatalf("bad key %v", m.keys) + } + if m.vals[0] <= 1 { + t.Fatalf("bad val: %v", m.vals) + } + + if m.keys[1][0] != "runtime" || m.keys[1][1] != "alloc_bytes" { + t.Fatalf("bad key %v", m.keys) + } + if m.vals[1] <= 40000 { + t.Fatalf("bad val: %v", m.vals) + } + + if m.keys[2][0] != "runtime" || m.keys[2][1] != "sys_bytes" { + t.Fatalf("bad key %v", m.keys) + } + if m.vals[2] <= 100000 { + t.Fatalf("bad val: %v", m.vals) + } + + if m.keys[3][0] != "runtime" || m.keys[3][1] != "malloc_count" { + t.Fatalf("bad key %v", m.keys) + } + if m.vals[3] <= 100 { + t.Fatalf("bad val: %v", m.vals) + } + + if m.keys[4][0] != "runtime" || m.keys[4][1] != "free_count" { + t.Fatalf("bad key %v", m.keys) + } + if m.vals[4] <= 100 { + t.Fatalf("bad val: %v", m.vals) + } + + if m.keys[5][0] != "runtime" || m.keys[5][1] != "heap_objects" { + t.Fatalf("bad key %v", m.keys) + } + if m.vals[5] <= 100 { + t.Fatalf("bad val: %v", m.vals) + } + + if m.keys[6][0] != "runtime" || m.keys[6][1] != "total_gc_pause_ns" { + t.Fatalf("bad key %v", m.keys) + } + if m.vals[6] <= 100000 { + t.Fatalf("bad val: %v", m.vals) + } + + if m.keys[7][0] != "runtime" || m.keys[7][1] != "total_gc_runs" { + t.Fatalf("bad key %v", m.keys) + } + if m.vals[7] < 1 { + t.Fatalf("bad val: %v", m.vals) + } + + if m.keys[8][0] != "runtime" || m.keys[8][1] != "gc_pause_ns" { + t.Fatalf("bad key %v", m.keys) + } + if m.vals[8] <= 1000 { + t.Fatalf("bad val: %v", m.vals) + } +} + +func TestInsert(t *testing.T) { + k := []string{"hi", "bob"} + exp := []string{"hi", "there", "bob"} + out := insert(1, "there", k) + if !reflect.DeepEqual(exp, out) { + t.Fatalf("bad insert %v %v", exp, out) + } +} diff --git a/vendor/github.com/armon/go-metrics/prometheus/prometheus.go b/vendor/github.com/armon/go-metrics/prometheus/prometheus.go new file mode 100644 index 000000000..b26d27981 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/prometheus/prometheus.go @@ -0,0 +1,89 @@ +// +build go1.3 +package prometheus + +import ( + "strings" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +type PrometheusSink struct { + mu sync.Mutex + gauges map[string]prometheus.Gauge + summaries map[string]prometheus.Summary + counters map[string]prometheus.Counter +} + +func NewPrometheusSink() (*PrometheusSink, error) { + return &PrometheusSink{ + gauges: make(map[string]prometheus.Gauge), + summaries: make(map[string]prometheus.Summary), + counters: make(map[string]prometheus.Counter), + }, nil +} + +func (p *PrometheusSink) flattenKey(parts []string) string { + joined := strings.Join(parts, "_") + joined = strings.Replace(joined, " ", "_", -1) + joined = strings.Replace(joined, ".", "_", -1) + joined = strings.Replace(joined, "-", "_", -1) + joined = strings.Replace(joined, "=", "_", -1) + return joined +} + +func (p *PrometheusSink) SetGauge(parts []string, val float32) { + p.mu.Lock() + defer p.mu.Unlock() + key := p.flattenKey(parts) + g, ok := p.gauges[key] + if !ok { + g = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: key, + Help: key, + }) + prometheus.MustRegister(g) + p.gauges[key] = g + } + g.Set(float64(val)) +} + +func (p *PrometheusSink) AddSample(parts []string, val float32) { + p.mu.Lock() + defer p.mu.Unlock() + key := p.flattenKey(parts) + g, ok := p.summaries[key] + if !ok { + g = prometheus.NewSummary(prometheus.SummaryOpts{ + Name: key, + Help: key, + MaxAge: 10 * time.Second, + }) + prometheus.MustRegister(g) + p.summaries[key] = g + } + g.Observe(float64(val)) +} + +// EmitKey is not implemented. Prometheus doesn’t offer a type for which an +// arbitrary number of values is retained, as Prometheus works with a pull +// model, rather than a push model. +func (p *PrometheusSink) EmitKey(key []string, val float32) { +} + +func (p *PrometheusSink) IncrCounter(parts []string, val float32) { + p.mu.Lock() + defer p.mu.Unlock() + key := p.flattenKey(parts) + g, ok := p.counters[key] + if !ok { + g = prometheus.NewCounter(prometheus.CounterOpts{ + Name: key, + Help: key, + }) + prometheus.MustRegister(g) + p.counters[key] = g + } + g.Add(float64(val)) +} diff --git a/vendor/github.com/armon/go-metrics/sink.go b/vendor/github.com/armon/go-metrics/sink.go new file mode 100755 index 000000000..0c240c2c4 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/sink.go @@ -0,0 +1,52 @@ +package metrics + +// The MetricSink interface is used to transmit metrics information +// to an external system +type MetricSink interface { + // A Gauge should retain the last value it is set to + SetGauge(key []string, val float32) + + // Should emit a Key/Value pair for each call + EmitKey(key []string, val float32) + + // Counters should accumulate values + IncrCounter(key []string, val float32) + + // Samples are for timing information, where quantiles are used + AddSample(key []string, val float32) +} + +// BlackholeSink is used to just blackhole messages +type BlackholeSink struct{} + +func (*BlackholeSink) SetGauge(key []string, val float32) {} +func (*BlackholeSink) EmitKey(key []string, val float32) {} +func (*BlackholeSink) IncrCounter(key []string, val float32) {} +func (*BlackholeSink) AddSample(key []string, val float32) {} + +// FanoutSink is used to sink to fanout values to multiple sinks +type FanoutSink []MetricSink + +func (fh FanoutSink) SetGauge(key []string, val float32) { + for _, s := range fh { + s.SetGauge(key, val) + } +} + +func (fh FanoutSink) EmitKey(key []string, val float32) { + for _, s := range fh { + s.EmitKey(key, val) + } +} + +func (fh FanoutSink) IncrCounter(key []string, val float32) { + for _, s := range fh { + s.IncrCounter(key, val) + } +} + +func (fh FanoutSink) AddSample(key []string, val float32) { + for _, s := range fh { + s.AddSample(key, val) + } +} diff --git a/vendor/github.com/armon/go-metrics/sink_test.go b/vendor/github.com/armon/go-metrics/sink_test.go new file mode 100755 index 000000000..15c5d771a --- /dev/null +++ b/vendor/github.com/armon/go-metrics/sink_test.go @@ -0,0 +1,120 @@ +package metrics + +import ( + "reflect" + "testing" +) + +type MockSink struct { + keys [][]string + vals []float32 +} + +func (m *MockSink) SetGauge(key []string, val float32) { + m.keys = append(m.keys, key) + m.vals = append(m.vals, val) +} +func (m *MockSink) EmitKey(key []string, val float32) { + m.keys = append(m.keys, key) + m.vals = append(m.vals, val) +} +func (m *MockSink) IncrCounter(key []string, val float32) { + m.keys = append(m.keys, key) + m.vals = append(m.vals, val) +} +func (m *MockSink) AddSample(key []string, val float32) { + m.keys = append(m.keys, key) + m.vals = append(m.vals, val) +} + +func TestFanoutSink_Gauge(t *testing.T) { + m1 := &MockSink{} + m2 := &MockSink{} + fh := &FanoutSink{m1, m2} + + k := []string{"test"} + v := float32(42.0) + fh.SetGauge(k, v) + + if !reflect.DeepEqual(m1.keys[0], k) { + t.Fatalf("key not equal") + } + if !reflect.DeepEqual(m2.keys[0], k) { + t.Fatalf("key not equal") + } + if !reflect.DeepEqual(m1.vals[0], v) { + t.Fatalf("val not equal") + } + if !reflect.DeepEqual(m2.vals[0], v) { + t.Fatalf("val not equal") + } +} + +func TestFanoutSink_Key(t *testing.T) { + m1 := &MockSink{} + m2 := &MockSink{} + fh := &FanoutSink{m1, m2} + + k := []string{"test"} + v := float32(42.0) + fh.EmitKey(k, v) + + if !reflect.DeepEqual(m1.keys[0], k) { + t.Fatalf("key not equal") + } + if !reflect.DeepEqual(m2.keys[0], k) { + t.Fatalf("key not equal") + } + if !reflect.DeepEqual(m1.vals[0], v) { + t.Fatalf("val not equal") + } + if !reflect.DeepEqual(m2.vals[0], v) { + t.Fatalf("val not equal") + } +} + +func TestFanoutSink_Counter(t *testing.T) { + m1 := &MockSink{} + m2 := &MockSink{} + fh := &FanoutSink{m1, m2} + + k := []string{"test"} + v := float32(42.0) + fh.IncrCounter(k, v) + + if !reflect.DeepEqual(m1.keys[0], k) { + t.Fatalf("key not equal") + } + if !reflect.DeepEqual(m2.keys[0], k) { + t.Fatalf("key not equal") + } + if !reflect.DeepEqual(m1.vals[0], v) { + t.Fatalf("val not equal") + } + if !reflect.DeepEqual(m2.vals[0], v) { + t.Fatalf("val not equal") + } +} + +func TestFanoutSink_Sample(t *testing.T) { + m1 := &MockSink{} + m2 := &MockSink{} + fh := &FanoutSink{m1, m2} + + k := []string{"test"} + v := float32(42.0) + fh.AddSample(k, v) + + if !reflect.DeepEqual(m1.keys[0], k) { + t.Fatalf("key not equal") + } + if !reflect.DeepEqual(m2.keys[0], k) { + t.Fatalf("key not equal") + } + if !reflect.DeepEqual(m1.vals[0], v) { + t.Fatalf("val not equal") + } + if !reflect.DeepEqual(m2.vals[0], v) { + t.Fatalf("val not equal") + } +} diff --git a/vendor/github.com/armon/go-metrics/start.go b/vendor/github.com/armon/go-metrics/start.go new file mode 100755 index 000000000..44113f100 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/start.go @@ -0,0 +1,95 @@ +package metrics + +import ( + "os" + "time" +) + +// Config is used to configure metrics settings +type Config struct { + ServiceName string // Prefixed with keys to seperate services + HostName string // Hostname to use. If not provided and EnableHostname, it will be os.Hostname + EnableHostname bool // Enable prefixing gauge values with hostname + EnableRuntimeMetrics bool // Enables profiling of runtime metrics (GC, Goroutines, Memory) + EnableTypePrefix bool // Prefixes key with a type ("counter", "gauge", "timer") + TimerGranularity time.Duration // Granularity of timers. + ProfileInterval time.Duration // Interval to profile runtime metrics +} + +// Metrics represents an instance of a metrics sink that can +// be used to emit +type Metrics struct { + Config + lastNumGC uint32 + sink MetricSink +} + +// Shared global metrics instance +var globalMetrics *Metrics + +func init() { + // Initialize to a blackhole sink to avoid errors + globalMetrics = &Metrics{sink: &BlackholeSink{}} +} + +// DefaultConfig provides a sane default configuration +func DefaultConfig(serviceName string) *Config { + c := &Config{ + ServiceName: serviceName, // Use client provided service + HostName: "", + EnableHostname: true, // Enable hostname prefix + EnableRuntimeMetrics: true, // Enable runtime profiling + EnableTypePrefix: false, // Disable type prefix + TimerGranularity: time.Millisecond, // Timers are in milliseconds + ProfileInterval: time.Second, // Poll runtime every second + } + + // Try to get the hostname + name, _ := os.Hostname() + c.HostName = name + return c +} + +// New is used to create a new instance of Metrics +func New(conf *Config, sink MetricSink) (*Metrics, error) { + met := &Metrics{} + met.Config = *conf + met.sink = sink + + // Start the runtime collector + if conf.EnableRuntimeMetrics { + go met.collectStats() + } + return met, nil +} + +// NewGlobal is the same as New, but it assigns the metrics object to be +// used globally as well as returning it. +func NewGlobal(conf *Config, sink MetricSink) (*Metrics, error) { + metrics, err := New(conf, sink) + if err == nil { + globalMetrics = metrics + } + return metrics, err +} + +// Proxy all the methods to the globalMetrics instance +func SetGauge(key []string, val float32) { + globalMetrics.SetGauge(key, val) +} + +func EmitKey(key []string, val float32) { + globalMetrics.EmitKey(key, val) +} + +func IncrCounter(key []string, val float32) { + globalMetrics.IncrCounter(key, val) +} + +func AddSample(key []string, val float32) { + globalMetrics.AddSample(key, val) +} + +func MeasureSince(key []string, start time.Time) { + globalMetrics.MeasureSince(key, start) +} diff --git a/vendor/github.com/armon/go-metrics/start_test.go b/vendor/github.com/armon/go-metrics/start_test.go new file mode 100755 index 000000000..8b3210c15 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/start_test.go @@ -0,0 +1,110 @@ +package metrics + +import ( + "reflect" + "testing" + "time" +) + +func TestDefaultConfig(t *testing.T) { + conf := DefaultConfig("service") + if conf.ServiceName != "service" { + t.Fatalf("Bad name") + } + if conf.HostName == "" { + t.Fatalf("missing hostname") + } + if !conf.EnableHostname || !conf.EnableRuntimeMetrics { + t.Fatalf("expect true") + } + if conf.EnableTypePrefix { + t.Fatalf("expect false") + } + if conf.TimerGranularity != time.Millisecond { + t.Fatalf("bad granularity") + } + if conf.ProfileInterval != time.Second { + t.Fatalf("bad interval") + } +} + +func Test_GlobalMetrics_SetGauge(t *testing.T) { + m := &MockSink{} + globalMetrics = &Metrics{sink: m} + + k := []string{"test"} + v := float32(42.0) + SetGauge(k, v) + + if !reflect.DeepEqual(m.keys[0], k) { + t.Fatalf("key not equal") + } + if !reflect.DeepEqual(m.vals[0], v) { + t.Fatalf("val not equal") + } +} + +func Test_GlobalMetrics_EmitKey(t *testing.T) { + m := &MockSink{} + globalMetrics = &Metrics{sink: m} + + k := []string{"test"} + v := float32(42.0) + EmitKey(k, v) + + if !reflect.DeepEqual(m.keys[0], k) { + t.Fatalf("key not equal") + } + if !reflect.DeepEqual(m.vals[0], v) { + t.Fatalf("val not equal") + } +} + +func Test_GlobalMetrics_IncrCounter(t *testing.T) { + m := &MockSink{} + globalMetrics = &Metrics{sink: m} + + k := []string{"test"} + v := float32(42.0) + IncrCounter(k, v) + + if !reflect.DeepEqual(m.keys[0], k) { + t.Fatalf("key not equal") + } + if !reflect.DeepEqual(m.vals[0], v) { + t.Fatalf("val not equal") + } +} + +func Test_GlobalMetrics_AddSample(t *testing.T) { + m := &MockSink{} + globalMetrics = &Metrics{sink: m} + + k := []string{"test"} + v := float32(42.0) + AddSample(k, v) + + if !reflect.DeepEqual(m.keys[0], k) { + t.Fatalf("key not equal") + } + if !reflect.DeepEqual(m.vals[0], v) { + t.Fatalf("val not equal") + } +} + +func Test_GlobalMetrics_MeasureSince(t *testing.T) { + m := &MockSink{} + globalMetrics = &Metrics{sink: m} + globalMetrics.TimerGranularity = time.Millisecond + + k := []string{"test"} + now := time.Now() + MeasureSince(k, now) + + if !reflect.DeepEqual(m.keys[0], k) { + t.Fatalf("key not equal") + } + if m.vals[0] > 0.1 { + t.Fatalf("val too large %v", m.vals[0]) + } +} diff --git a/vendor/github.com/armon/go-metrics/statsd.go b/vendor/github.com/armon/go-metrics/statsd.go new file mode 100644 index 000000000..65a5021a0 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/statsd.go @@ -0,0 +1,154 @@ +package metrics + +import ( + "bytes" + "fmt" + "log" + "net" + "strings" + "time" +) + +const ( + // statsdMaxLen is the maximum size of a packet + // to send to statsd + statsdMaxLen = 1400 +) + +// StatsdSink provides a MetricSink that can be used +// with a statsite or statsd metrics server. It uses +// only UDP packets, while StatsiteSink uses TCP. +type StatsdSink struct { + addr string + metricQueue chan string +} + +// NewStatsdSink is used to create a new StatsdSink +func NewStatsdSink(addr string) (*StatsdSink, error) { + s := &StatsdSink{ + addr: addr, + metricQueue: make(chan string, 4096), + } + go s.flushMetrics() + return s, nil +} + +// Close is used to stop flushing to statsd +func (s *StatsdSink) Shutdown() { + close(s.metricQueue) +} + +func (s *StatsdSink) SetGauge(key []string, val float32) { + flatKey := s.flattenKey(key) + s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val)) +} + +func (s *StatsdSink) EmitKey(key []string, val float32) { + flatKey := s.flattenKey(key) + s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val)) +} + +func (s *StatsdSink) IncrCounter(key []string, val float32) { + flatKey := s.flattenKey(key) + s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val)) +} + +func (s *StatsdSink) AddSample(key []string, val float32) { + flatKey := s.flattenKey(key) + s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val)) +} + +// Flattens the key for formatting, removes spaces +func (s *StatsdSink) flattenKey(parts []string) string { + joined := strings.Join(parts, ".") + return strings.Map(func(r rune) rune { + switch r { + case ':': + fallthrough + case ' ': + return '_' + default: + return r + } + }, joined) +} + +// Does a non-blocking push to the metrics queue +func (s *StatsdSink) pushMetric(m string) { + select { + case s.metricQueue <- m: + default: + } +} + +// Flushes metrics +func (s *StatsdSink) flushMetrics() { + var sock net.Conn + var err error + var wait <-chan time.Time + ticker := time.NewTicker(flushInterval) + defer ticker.Stop() + +CONNECT: + // Create a buffer + buf := bytes.NewBuffer(nil) + + // Attempt to connect + sock, err = net.Dial("udp", s.addr) + if err != nil { + log.Printf("[ERR] Error connecting to statsd! Err: %s", err) + goto WAIT + } + + for { + select { + case metric, ok := <-s.metricQueue: + // Get a metric from the queue + if !ok { + goto QUIT + } + + // Check if this would overflow the packet size + if len(metric)+buf.Len() > statsdMaxLen { + _, err := sock.Write(buf.Bytes()) + buf.Reset() + if err != nil { + log.Printf("[ERR] Error writing to statsd! Err: %s", err) + goto WAIT + } + } + + // Append to the buffer + buf.WriteString(metric) + + case <-ticker.C: + if buf.Len() == 0 { + continue + } + + _, err := sock.Write(buf.Bytes()) + buf.Reset() + if err != nil { + log.Printf("[ERR] Error flushing to statsd! Err: %s", err) + goto WAIT + } + } + } + +WAIT: + // Wait for a while + wait = time.After(time.Duration(5) * time.Second) + for { + select { + // Dequeue the messages to avoid backlog + case _, ok := <-s.metricQueue: + if !ok { + goto QUIT + } + case <-wait: + goto CONNECT + } + } +QUIT: + s.metricQueue = nil +} diff --git a/vendor/github.com/armon/go-metrics/statsd_test.go b/vendor/github.com/armon/go-metrics/statsd_test.go new file mode 100644 index 000000000..622eb5d3a --- /dev/null +++ b/vendor/github.com/armon/go-metrics/statsd_test.go @@ -0,0 +1,105 @@ +package metrics + +import ( + "bufio" + "bytes" + "net" + "testing" + "time" +) + +func TestStatsd_Flatten(t *testing.T) { + s := &StatsdSink{} + flat := s.flattenKey([]string{"a", "b", "c", "d"}) + if flat != "a.b.c.d" { + t.Fatalf("Bad flat") + } +} + +func TestStatsd_PushFullQueue(t *testing.T) { + q := make(chan string, 1) + q <- "full" + + s := &StatsdSink{metricQueue: q} + s.pushMetric("omit") + + out := <-q + if out != "full" { + t.Fatalf("bad val %v", out) + } + + select { + case v := <-q: + t.Fatalf("bad val %v", v) + default: + } +} + +func TestStatsd_Conn(t *testing.T) { + addr := "127.0.0.1:7524" + done := make(chan bool) + go func() { + list, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 7524}) + if err != nil { + panic(err) + } + defer list.Close() + buf := make([]byte, 1500) + n, err := list.Read(buf) + if err != nil { + panic(err) + } + buf = buf[:n] + reader := bufio.NewReader(bytes.NewReader(buf)) + + line, err := reader.ReadString('\n') + if err != nil { + t.Fatalf("unexpected err %s", err) + } + if line != "gauge.val:1.000000|g\n" { + t.Fatalf("bad line %s", line) + } + + line, err = reader.ReadString('\n') + if err != nil { + t.Fatalf("unexpected err %s", err) + } + if line != "key.other:2.000000|kv\n" { + t.Fatalf("bad line %s", line) + } + + line, err = reader.ReadString('\n') + if err != nil { + t.Fatalf("unexpected err %s", err) + } + if line != "counter.me:3.000000|c\n" { + t.Fatalf("bad line %s", line) + } + + line, err = reader.ReadString('\n') + if err != nil { + t.Fatalf("unexpected err %s", err) + } + if line != "sample.slow_thingy:4.000000|ms\n" { + t.Fatalf("bad line %s", line) + } + + done <- true + }() + s, err := NewStatsdSink(addr) + if err != nil { + t.Fatalf("bad error") + } + + s.SetGauge([]string{"gauge", "val"}, float32(1)) + s.EmitKey([]string{"key", "other"}, float32(2)) + s.IncrCounter([]string{"counter", "me"}, float32(3)) + s.AddSample([]string{"sample", "slow thingy"}, float32(4)) + + select { + case <-done: + s.Shutdown() + case <-time.After(3 * time.Second): + t.Fatalf("timeout") + } +} diff --git a/vendor/github.com/armon/go-metrics/statsite.go b/vendor/github.com/armon/go-metrics/statsite.go new file mode 100755 index 000000000..68730139a --- /dev/null +++ b/vendor/github.com/armon/go-metrics/statsite.go @@ -0,0 +1,142 @@ +package metrics + +import ( + "bufio" + "fmt" + "log" + "net" + "strings" + "time" +) + +const ( + // We force flush the statsite metrics after this period of + // inactivity. Prevents stats from getting stuck in a buffer + // forever. + flushInterval = 100 * time.Millisecond +) + +// StatsiteSink provides a MetricSink that can be used with a +// statsite metrics server +type StatsiteSink struct { + addr string + metricQueue chan string +} + +// NewStatsiteSink is used to create a new StatsiteSink +func NewStatsiteSink(addr string) (*StatsiteSink, error) { + s := &StatsiteSink{ + addr: addr, + metricQueue: make(chan string, 4096), + } + go s.flushMetrics() + return s, nil +} + +// Close is used to stop flushing to statsite +func (s *StatsiteSink) Shutdown() { + close(s.metricQueue) +} + +func (s *StatsiteSink) SetGauge(key []string, val float32) { + flatKey := s.flattenKey(key) + s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val)) +} + +func (s *StatsiteSink) EmitKey(key []string, val float32) { + flatKey := s.flattenKey(key) + s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val)) +} + +func (s *StatsiteSink) IncrCounter(key []string, val float32) { + flatKey := s.flattenKey(key) + s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val)) +} + +func (s *StatsiteSink) AddSample(key []string, val float32) { + flatKey := s.flattenKey(key) + s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val)) +} + +// Flattens the key for formatting, removes spaces +func (s *StatsiteSink) flattenKey(parts []string) string { + joined := strings.Join(parts, ".") + return strings.Map(func(r rune) rune { + switch r { + case ':': + fallthrough + case ' ': + return '_' + default: + return r + } + }, joined) +} + +// Does a non-blocking push to the metrics queue +func (s *StatsiteSink) pushMetric(m string) { + select { + case s.metricQueue <- m: + default: + } +} + +// Flushes metrics +func (s *StatsiteSink) flushMetrics() { + var sock net.Conn + var err error + var wait <-chan time.Time + var buffered *bufio.Writer + ticker := time.NewTicker(flushInterval) + defer ticker.Stop() + +CONNECT: + // Attempt to connect + sock, err = net.Dial("tcp", s.addr) + if err != nil { + log.Printf("[ERR] Error connecting to statsite! Err: %s", err) + goto WAIT + } + + // Create a buffered writer + buffered = bufio.NewWriter(sock) + + for { + select { + case metric, ok := <-s.metricQueue: + // Get a metric from the queue + if !ok { + goto QUIT + } + + // Try to send to statsite + _, err := buffered.Write([]byte(metric)) + if err != nil { + log.Printf("[ERR] Error writing to statsite! Err: %s", err) + goto WAIT + } + case <-ticker.C: + if err := buffered.Flush(); err != nil { + log.Printf("[ERR] Error flushing to statsite! Err: %s", err) + goto WAIT + } + } + } + +WAIT: + // Wait for a while + wait = time.After(time.Duration(5) * time.Second) + for { + select { + // Dequeue the messages to avoid backlog + case _, ok := <-s.metricQueue: + if !ok { + goto QUIT + } + case <-wait: + goto CONNECT + } + } +QUIT: + s.metricQueue = nil +} diff --git a/vendor/github.com/armon/go-metrics/statsite_test.go b/vendor/github.com/armon/go-metrics/statsite_test.go new file mode 100755 index 000000000..d9c744f41 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/statsite_test.go @@ -0,0 +1,101 @@ +package metrics + +import ( + "bufio" + "net" + "testing" + "time" +) + +func acceptConn(addr string) net.Conn { + ln, _ := net.Listen("tcp", addr) + conn, _ := ln.Accept() + return conn +} + +func TestStatsite_Flatten(t *testing.T) { + s := &StatsiteSink{} + flat := s.flattenKey([]string{"a", "b", "c", "d"}) + if flat != "a.b.c.d" { + t.Fatalf("Bad flat") + } +} + +func TestStatsite_PushFullQueue(t *testing.T) { + q := make(chan string, 1) + q <- "full" + + s := &StatsiteSink{metricQueue: q} + s.pushMetric("omit") + + out := <-q + if out != "full" { + t.Fatalf("bad val %v", out) + } + + select { + case v := <-q: + t.Fatalf("bad val %v", v) + default: + } +} + +func TestStatsite_Conn(t *testing.T) { + addr := "localhost:7523" + done := make(chan bool) + go func() { + conn := acceptConn(addr) + reader := bufio.NewReader(conn) + + line, err := reader.ReadString('\n') + if err != nil { + t.Fatalf("unexpected err %s", err) + } + if line != "gauge.val:1.000000|g\n" { + t.Fatalf("bad line %s", line) + } + + line, err = reader.ReadString('\n') + if err != nil { + t.Fatalf("unexpected err %s", err) + } + if line != "key.other:2.000000|kv\n" { + t.Fatalf("bad line %s", line) + } + + line, err = reader.ReadString('\n') + if err != nil { + t.Fatalf("unexpected err %s", err) + } + if line != "counter.me:3.000000|c\n" { + t.Fatalf("bad line %s", line) + } + + line, err = reader.ReadString('\n') + if err != nil { + t.Fatalf("unexpected err %s", err) + } + if line != "sample.slow_thingy:4.000000|ms\n" { + t.Fatalf("bad line %s", line) + } + + conn.Close() + done <- true + }() + s, err := NewStatsiteSink(addr) + if err != nil { + t.Fatalf("bad error") + } + + s.SetGauge([]string{"gauge", "val"}, float32(1)) + s.EmitKey([]string{"key", "other"}, float32(2)) + s.IncrCounter([]string{"counter", "me"}, float32(3)) + s.AddSample([]string{"sample", "slow thingy"}, float32(4)) + + select { + case <-done: + s.Shutdown() + case <-time.After(3 * time.Second): + t.Fatalf("timeout") + } +} -- cgit v1.2.3-1-g7c22