-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathinfluxparser.go
61 lines (52 loc) · 1.55 KB
/
influxparser.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package main
import (
"bytes"
"fmt"
"time"
)
// InfluxParser is an object for Parsing incoming metrics.
type InfluxParser struct {
// DefaultTags will be added to every parsed metric
DefaultTags map[string]string
}
func (p *InfluxParser) ParseWithDefaultTimePrecision(buf []byte, t time.Time, precision string) ([]Metric, error) {
if !bytes.HasSuffix(buf, []byte("\n")) {
buf = append(buf, '\n')
}
// parse even if the buffer begins with a newline
buf = bytes.TrimPrefix(buf, []byte("\n"))
metrics, err := ParseWithDefaultTimePrecision(buf, t, precision)
if len(p.DefaultTags) > 0 {
for _, m := range metrics {
for k, v := range p.DefaultTags {
// only set the default tag if it doesn't already exist:
if !m.HasTag(k) {
m.AddTag(k, v)
}
}
}
}
return metrics, err
}
// Parse returns a slice of Metrics from a text representation of a
// metric (in line-protocol format)
// with each metric separated by newlines. If any metrics fail to parse,
// a non-nil error will be returned in addition to the metrics that parsed
// successfully.
func (p *InfluxParser) Parse(buf []byte) ([]Metric, error) {
return p.ParseWithDefaultTimePrecision(buf, time.Now(), "")
}
func (p *InfluxParser) ParseLine(line string) (Metric, error) {
metrics, err := p.Parse([]byte(line + "\n"))
if err != nil {
return nil, err
}
if len(metrics) < 1 {
return nil, fmt.Errorf(
"Can not parse the line: %s, for data format: influx ", line)
}
return metrics[0], nil
}
func (p *InfluxParser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}