From 2c33905a79241e3f203f31ed5d9ca5a69d408c70 Mon Sep 17 00:00:00 2001 From: Eugene Bujak Date: Sun, 7 Oct 2018 00:51:44 +0300 Subject: [PATCH 01/17] Querylog -- Implement file writing and update /querylog handler for changed structures. --- coredns_plugin/coredns_plugin.go | 10 +++ coredns_plugin/querylog.go | 124 +++++++++++++++++++------- coredns_plugin/querylog_file.go | 146 +++++++++++++++++++++++++++++++ stats.go | 7 +- 4 files changed, 255 insertions(+), 32 deletions(-) create mode 100644 coredns_plugin/querylog_file.go diff --git a/coredns_plugin/coredns_plugin.go b/coredns_plugin/coredns_plugin.go index 8af7601e..530e2c2b 100644 --- a/coredns_plugin/coredns_plugin.go +++ b/coredns_plugin/coredns_plugin.go @@ -219,6 +219,7 @@ func setup(c *caddy.Controller) error { return nil }) c.OnShutdown(p.onShutdown) + c.OnFinalShutdown(p.onFinalShutdown) return nil } @@ -250,6 +251,15 @@ func (p *plug) onShutdown() error { return nil } +func (p *plug) onFinalShutdown() error { + err := flushToFile(logBuffer) + if err != nil { + log.Printf("failed to flush to file: %s", err) + return err + } + return nil +} + type statsFunc func(ch interface{}, name string, text string, value float64, valueType prometheus.ValueType) func doDesc(ch interface{}, name string, text string, value float64, valueType prometheus.ValueType) { diff --git a/coredns_plugin/querylog.go b/coredns_plugin/querylog.go index 3e2ae239..808450db 100644 --- a/coredns_plugin/querylog.go +++ b/coredns_plugin/querylog.go @@ -5,6 +5,8 @@ import ( "fmt" "log" "net/http" + "os" + "path" "runtime" "strconv" "strings" @@ -13,45 +15,93 @@ import ( "github.com/AdguardTeam/AdguardDNS/dnsfilter" "github.com/coredns/coredns/plugin/pkg/response" "github.com/miekg/dns" - "github.com/zfjagann/golang-ring" ) -const logBufferCap = 10000 +const ( + logBufferCap = 1000 // maximum capacity of logBuffer before it's flushed to disk + queryLogAPI = 1000 // maximum API response for /querylog +) -var logBuffer = ring.Ring{} +var ( + logBuffer []logEntry +) type logEntry struct { - Question *dns.Msg - Answer *dns.Msg + Question []byte + Answer []byte Result dnsfilter.Result Time time.Time Elapsed time.Duration IP string } -func init() { - logBuffer.SetCapacity(logBufferCap) -} - func logRequest(question *dns.Msg, answer *dns.Msg, result dnsfilter.Result, elapsed time.Duration, ip string) { + var q []byte + var a []byte + var err error + + if question != nil { + q, err = question.Pack() + if err != nil { + log.Printf("failed to pack question for querylog: %s", err) + return + } + } + if answer != nil { + a, err = answer.Pack() + if err != nil { + log.Printf("failed to pack answer for querylog: %s", err) + return + } + } + entry := logEntry{ - Question: question, - Answer: answer, + Question: q, + Answer: a, Result: result, Time: time.Now(), Elapsed: elapsed, IP: ip, } - logBuffer.Enqueue(entry) + var flushBuffer []logEntry + + logBuffer = append(logBuffer, entry) + if len(logBuffer) >= logBufferCap { + flushBuffer = logBuffer + logBuffer = nil + } + if len(flushBuffer) > 0 { + // write to file + // do it in separate goroutine -- we are stalling DNS response this whole time + go flushToFile(flushBuffer) + } + return } -func handler(w http.ResponseWriter, r *http.Request) { - values := logBuffer.Values() +func handleQueryLog(w http.ResponseWriter, r *http.Request) { + // TODO: fetch values from disk if len(logBuffer) < queryLogSize + // TODO: cache output + values := logBuffer var data = []map[string]interface{}{} - for _, value := range values { - entry, ok := value.(logEntry) - if !ok { - continue + for _, entry := range values { + var q *dns.Msg + var a *dns.Msg + + if len(entry.Question) > 0 { + q = new(dns.Msg) + if err := q.Unpack(entry.Question); err != nil { + // ignore, log and move on + log.Printf("Failed to unpack dns message question: %s", err) + q = nil + } + } + if len(entry.Answer) > 0 { + a = new(dns.Msg) + if err := a.Unpack(entry.Answer); err != nil { + // ignore, log and move on + log.Printf("Failed to unpack dns message question: %s", err) + a = nil + } } jsonentry := map[string]interface{}{ @@ -60,22 +110,25 @@ func handler(w http.ResponseWriter, r *http.Request) { "time": entry.Time.Format(time.RFC3339), "client": entry.IP, } - question := map[string]interface{}{ - "host": strings.ToLower(strings.TrimSuffix(entry.Question.Question[0].Name, ".")), - "type": dns.Type(entry.Question.Question[0].Qtype).String(), - "class": dns.Class(entry.Question.Question[0].Qclass).String(), + if q != nil { + jsonentry["question"] = map[string]interface{}{ + "host": strings.ToLower(strings.TrimSuffix(q.Question[0].Name, ".")), + "type": dns.Type(q.Question[0].Qtype).String(), + "class": dns.Class(q.Question[0].Qclass).String(), + } } - jsonentry["question"] = question - status, _ := response.Typify(entry.Answer, time.Now().UTC()) - jsonentry["status"] = status.String() + if a != nil { + status, _ := response.Typify(a, time.Now().UTC()) + jsonentry["status"] = status.String() + } if len(entry.Result.Rule) > 0 { jsonentry["rule"] = entry.Result.Rule } - if entry.Answer != nil && len(entry.Answer.Answer) > 0 { + if a != nil && len(a.Answer) > 0 { var answers = []map[string]interface{}{} - for _, k := range entry.Answer.Answer { + for _, k := range a.Answer { header := k.Header() answer := map[string]interface{}{ "type": dns.TypeToString[header.Rrtype], @@ -137,17 +190,26 @@ func handler(w http.ResponseWriter, r *http.Request) { } func startQueryLogServer() { - listenAddr := "127.0.0.1:8618" // sha512sum of "querylog" then each byte summed + listenAddr := "127.0.0.1:8618" // 8618 is sha512sum of "querylog" then each byte summed - http.HandleFunc("/querylog", handler) + go periodicQueryLogRotate(queryLogRotationPeriod) + + http.HandleFunc("/querylog", handleQueryLog) if err := http.ListenAndServe(listenAddr, nil); err != nil { log.Fatalf("error in ListenAndServe: %s", err) } } -func trace(text string) { +func trace(format string, args ...interface{}) { pc := make([]uintptr, 10) // at least 1 entry needed runtime.Callers(2, pc) f := runtime.FuncForPC(pc[0]) - log.Printf("%s(): %s\n", f.Name(), text) + var buf strings.Builder + buf.WriteString(fmt.Sprintf("%s(): ", path.Base(f.Name()))) + text := fmt.Sprintf(format, args...) + buf.WriteString(text) + if len(text) == 0 || text[len(text)-1] != '\n' { + buf.WriteRune('\n') + } + fmt.Fprint(os.Stderr, buf.String()) } diff --git a/coredns_plugin/querylog_file.go b/coredns_plugin/querylog_file.go new file mode 100644 index 00000000..1cfa93a7 --- /dev/null +++ b/coredns_plugin/querylog_file.go @@ -0,0 +1,146 @@ +package dnsfilter + +import ( + "bytes" + "compress/gzip" + "encoding/json" + "fmt" + "log" + "os" + "sync" + "time" + + "github.com/go-test/deep" +) + +const ( + queryLogRotationPeriod = time.Hour * 24 // rotate the log every 24 hours + queryLogFileName = "querylog.json" // .gz added during compression +) + +var ( + fileWriteLock sync.Mutex +) + +func flushToFile(buffer []logEntry) error { + if len(buffer) == 0 { + return nil + } + start := time.Now() + + var b bytes.Buffer + e := json.NewEncoder(&b) + for _, entry := range buffer { + err := e.Encode(entry) + if err != nil { + log.Printf("Failed to marshal entry: %s", err) + return err + } + } + + elapsed := time.Since(start) + log.Printf("%d elements serialized via json in %v: %d kB, %v/entry, %v/entry", len(buffer), elapsed, b.Len()/1024, float64(b.Len())/float64(len(buffer)), elapsed/time.Duration(len(buffer))) + + err := checkBuffer(buffer, b) + if err != nil { + log.Printf("failed to check buffer: %s", err) + return err + } + + filenamegz := queryLogFileName + ".gz" + + var zb bytes.Buffer + + zw := gzip.NewWriter(&zb) + zw.Name = queryLogFileName + zw.ModTime = time.Now() + + _, err = zw.Write(b.Bytes()) + if err != nil { + log.Printf("Couldn't compress to gzip: %s", err) + zw.Close() + return err + } + + if err = zw.Close(); err != nil { + log.Printf("Couldn't close gzip writer: %s", err) + return err + } + + fileWriteLock.Lock() + defer fileWriteLock.Unlock() + f, err := os.OpenFile(filenamegz, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + log.Printf("failed to create file \"%s\": %s", filenamegz, err) + return err + } + defer f.Close() + + n, err := f.Write(zb.Bytes()) + if err != nil { + log.Printf("Couldn't write to file: %s", err) + return err + } + + log.Printf("ok \"%s\": %v bytes written", filenamegz, n) + + return nil +} + +func checkBuffer(buffer []logEntry, b bytes.Buffer) error { + l := len(buffer) + d := json.NewDecoder(&b) + + i := 0 + for d.More() { + var entry logEntry + err := d.Decode(&entry) + if err != nil { + log.Printf("Failed to decode: %s", err) + return err + } + if diff := deep.Equal(entry, buffer[i]); diff != nil { + log.Printf("decoded buffer differs: %s", diff) + return fmt.Errorf("decoded buffer differs: %s", diff) + } + i++ + } + if i != l { + err := fmt.Errorf("check fail: %d vs %d entries", l, i) + log.Print(err) + return err + } + log.Printf("check ok: %d entries", i) + + return nil +} + +func rotateQueryLog() error { + from := queryLogFileName + ".gz" + to := queryLogFileName + ".gz.1" + + if _, err := os.Stat(from); os.IsNotExist(err) { + // do nothing, file doesn't exist + return nil + } + + err := os.Rename(from, to) + if err != nil { + log.Printf("Failed to rename querylog: %s", err) + return err + } + + log.Printf("Rotated from %s to %s successfully", from, to) + + return nil +} + +func periodicQueryLogRotate(t time.Duration) { + for range time.Tick(t) { + err := rotateQueryLog() + if err != nil { + log.Printf("Failed to rotate querylog: %s", err) + // do nothing, continue rotating + } + } +} diff --git a/stats.go b/stats.go index aff78ee0..29f36fbf 100644 --- a/stats.go +++ b/stats.go @@ -62,7 +62,12 @@ type stats struct { var statistics stats func initPeriodicStats(periodic *periodicStats) { - *periodic = periodicStats{} + periodic.Entries = statsEntries{} + periodic.LastRotate = time.Time{} +} + +func init() { + purgeStats() } func purgeStats() { From 2244c21b765e089e49fede0cc27af7f9f76bae4d Mon Sep 17 00:00:00 2001 From: Eugene Bujak Date: Sun, 7 Oct 2018 00:58:59 +0300 Subject: [PATCH 02/17] Fix race conditions found by go's race detector --- config.go | 2 +- control.go | 7 +++++-- coredns_plugin/coredns_plugin.go | 21 ++++++++++++++++++--- coredns_plugin/querylog.go | 6 ++++++ stats.go | 10 ++++++++++ 5 files changed, 40 insertions(+), 6 deletions(-) diff --git a/config.go b/config.go index 8a753a49..ec0843b2 100644 --- a/config.go +++ b/config.go @@ -27,7 +27,7 @@ type configuration struct { Filters []filter `yaml:"filters"` UserRules []string `yaml:"user_rules"` - sync.Mutex `yaml:"-"` + sync.RWMutex `yaml:"-"` } type coreDNSConfig struct { diff --git a/control.go b/control.go index d60c6509..cc902f3a 100644 --- a/control.go +++ b/control.go @@ -789,10 +789,11 @@ func handleFilteringStatus(w http.ResponseWriter, r *http.Request) { "enabled": config.CoreDNS.FilteringEnabled, } + config.RLock() data["filters"] = config.Filters data["user_rules"] = config.UserRules - json, err := json.Marshal(data) + config.RUnlock() if err != nil { errortext := fmt.Sprintf("Unable to marshal status json: %s", err) @@ -1122,7 +1123,6 @@ func runFilterRefreshers() { func refreshFiltersIfNeccessary() int { now := time.Now() config.Lock() - defer config.Unlock() // deduplicate // TODO: move it somewhere else @@ -1154,6 +1154,7 @@ func refreshFiltersIfNeccessary() int { updateCount++ } } + config.Unlock() if updateCount > 0 { err := writeFilterFile() @@ -1237,6 +1238,7 @@ func writeFilterFile() error { log.Printf("Writing filter file: %s", filterpath) // TODO: check if file contents have modified data := []byte{} + config.RLock() filters := config.Filters for _, filter := range filters { if !filter.Enabled { @@ -1249,6 +1251,7 @@ func writeFilterFile() error { data = append(data, []byte(rule)...) data = append(data, '\n') } + config.RUnlock() err := ioutil.WriteFile(filterpath+".tmp", data, 0644) if err != nil { log.Printf("Couldn't write filter file: %s", err) diff --git a/coredns_plugin/coredns_plugin.go b/coredns_plugin/coredns_plugin.go index 530e2c2b..dcdb50c8 100644 --- a/coredns_plugin/coredns_plugin.go +++ b/coredns_plugin/coredns_plugin.go @@ -55,6 +55,8 @@ type plug struct { ParentalBlockHost string QueryLogEnabled bool BlockedTTL uint32 // in seconds, default 3600 + + sync.RWMutex } var defaultPlugin = plug{ @@ -246,17 +248,21 @@ func (p *plug) parseEtcHosts(text string) bool { } func (p *plug) onShutdown() error { + p.Lock() p.d.Destroy() p.d = nil + p.Unlock() return nil } func (p *plug) onFinalShutdown() error { + logBufferLock.Lock() err := flushToFile(logBuffer) if err != nil { log.Printf("failed to flush to file: %s", err) return err } + logBufferLock.Unlock() return nil } @@ -293,9 +299,11 @@ func doStatsLookup(ch interface{}, doFunc statsFunc, name string, lookupstats *d } func (p *plug) doStats(ch interface{}, doFunc statsFunc) { + p.RLock() stats := p.d.GetStats() doStatsLookup(ch, doFunc, "safebrowsing", &stats.Safebrowsing) doStatsLookup(ch, doFunc, "parental", &stats.Parental) + p.RUnlock() } // Describe is called by prometheus handler to know stat types @@ -365,12 +373,12 @@ func (p *plug) genSOA(r *dns.Msg) []dns.RR { } Ns := "fake-for-negative-caching.adguard.com." - soa := defaultSOA + soa := *defaultSOA soa.Hdr = header soa.Mbox = Mbox soa.Ns = Ns - soa.Serial = uint32(time.Now().Unix()) - return []dns.RR{soa} + soa.Serial = 100500 // faster than uint32(time.Now().Unix()) + return []dns.RR{&soa} } func (p *plug) writeNXdomain(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) { @@ -397,13 +405,17 @@ func (p *plug) serveDNSInternal(ctx context.Context, w dns.ResponseWriter, r *dn for _, question := range r.Question { host := strings.ToLower(strings.TrimSuffix(question.Name, ".")) // is it a safesearch domain? + p.RLock() if val, ok := p.d.SafeSearchDomain(host); ok { rcode, err := p.replaceHostWithValAndReply(ctx, w, r, host, val, question) if err != nil { + p.RUnlock() return rcode, dnsfilter.Result{}, err } + p.RUnlock() return rcode, dnsfilter.Result{Reason: dnsfilter.FilteredSafeSearch}, err } + p.RUnlock() // is it in hosts? if val, ok := p.hosts[host]; ok { @@ -425,11 +437,14 @@ func (p *plug) serveDNSInternal(ctx context.Context, w dns.ResponseWriter, r *dn } // needs to be filtered instead + p.RLock() result, err := p.d.CheckHost(host) if err != nil { log.Printf("plugin/dnsfilter: %s\n", err) + p.RUnlock() return dns.RcodeServerFailure, dnsfilter.Result{}, fmt.Errorf("plugin/dnsfilter: %s", err) } + p.RUnlock() if result.IsFiltered { switch result.Reason { diff --git a/coredns_plugin/querylog.go b/coredns_plugin/querylog.go index 808450db..eb848fb8 100644 --- a/coredns_plugin/querylog.go +++ b/coredns_plugin/querylog.go @@ -10,6 +10,7 @@ import ( "runtime" "strconv" "strings" + "sync" "time" "github.com/AdguardTeam/AdguardDNS/dnsfilter" @@ -23,6 +24,7 @@ const ( ) var ( + logBufferLock sync.RWMutex logBuffer []logEntry ) @@ -65,11 +67,13 @@ func logRequest(question *dns.Msg, answer *dns.Msg, result dnsfilter.Result, ela } var flushBuffer []logEntry + logBufferLock.Lock() logBuffer = append(logBuffer, entry) if len(logBuffer) >= logBufferCap { flushBuffer = logBuffer logBuffer = nil } + logBufferLock.Unlock() if len(flushBuffer) > 0 { // write to file // do it in separate goroutine -- we are stalling DNS response this whole time @@ -81,7 +85,9 @@ func logRequest(question *dns.Msg, answer *dns.Msg, result dnsfilter.Result, ela func handleQueryLog(w http.ResponseWriter, r *http.Request) { // TODO: fetch values from disk if len(logBuffer) < queryLogSize // TODO: cache output + logBufferLock.RLock() values := logBuffer + logBufferLock.RUnlock() var data = []map[string]interface{}{} for _, entry := range values { var q *dns.Msg diff --git a/stats.go b/stats.go index 29f36fbf..a3231faf 100644 --- a/stats.go +++ b/stats.go @@ -12,6 +12,7 @@ import ( "path/filepath" "strconv" "strings" + "sync" "syscall" "time" ) @@ -57,6 +58,7 @@ type stats struct { PerDay periodicStats LastSeen statsEntry + sync.RWMutex } var statistics stats @@ -71,10 +73,12 @@ func init() { } func purgeStats() { + statistics.Lock() initPeriodicStats(&statistics.PerSecond) initPeriodicStats(&statistics.PerMinute) initPeriodicStats(&statistics.PerHour) initPeriodicStats(&statistics.PerDay) + statistics.Unlock() } func runStatsCollectors() { @@ -121,10 +125,12 @@ func statsRotate(periodic *periodicStats, now time.Time, rotations int64) { // called every second, accumulates stats for each second, minute, hour and day func collectStats() { now := time.Now() + statistics.Lock() statsRotate(&statistics.PerSecond, now, int64(now.Sub(statistics.PerSecond.LastRotate)/time.Second)) statsRotate(&statistics.PerMinute, now, int64(now.Sub(statistics.PerMinute.LastRotate)/time.Minute)) statsRotate(&statistics.PerHour, now, int64(now.Sub(statistics.PerHour.LastRotate)/time.Hour)) statsRotate(&statistics.PerDay, now, int64(now.Sub(statistics.PerDay.LastRotate)/time.Hour/24)) + statistics.Unlock() // grab HTTP from prometheus resp, err := client.Get("http://127.0.0.1:9153/metrics") @@ -191,6 +197,7 @@ func collectStats() { } // calculate delta + statistics.Lock() delta := calcDelta(entry, statistics.LastSeen) // apply delta to second/minute/hour/day @@ -201,6 +208,7 @@ func collectStats() { // save last seen statistics.LastSeen = entry + statistics.Unlock() } func calcDelta(current, seen statsEntry) statsEntry { @@ -245,7 +253,9 @@ func loadStats() error { func writeStats() error { statsFile := filepath.Join(config.ourBinaryDir, "stats.json") log.Printf("Writing JSON file: %s", statsFile) + statistics.RLock() json, err := json.MarshalIndent(statistics, "", " ") + statistics.RUnlock() if err != nil { log.Printf("Couldn't generate JSON: %s", err) return err From 656d092ad652bd296d436229c332da58a5543c3c Mon Sep 17 00:00:00 2001 From: Eugene Bujak Date: Sun, 7 Oct 2018 00:59:21 +0300 Subject: [PATCH 03/17] if coredns unexpectedly quits, restart it --- control.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/control.go b/control.go index cc902f3a..06adb498 100644 --- a/control.go +++ b/control.go @@ -142,9 +142,14 @@ func handleStart(w http.ResponseWriter, r *http.Request) { func childwaiter() { err := coreDNSCommand.Wait() - log.Printf("coredns terminated: %s\n", err) - err = coreDNSCommand.Process.Release() - log.Printf("coredns released: %s\n", err) + log.Printf("coredns unexpectedly died: %s\n", err) + coreDNSCommand.Process.Release() + log.Printf("restarting coredns\n", err) + err = startDNSServer() + if err != nil { + log.Printf("Couldn't restart DNS server: %s\n", err) + return + } } func handleStop(w http.ResponseWriter, r *http.Request) { From 0ee112e8a0f3f5f7fcbbd5cac9713052f2df12ef Mon Sep 17 00:00:00 2001 From: Eugene Bujak Date: Sun, 7 Oct 2018 01:00:00 +0300 Subject: [PATCH 04/17] querylog -- Add querylog files to gitignore --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 1acc2434..53e4b4dd 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,5 @@ /coredns /Corefile /dnsfilter.txt +/querylog.json.gz +/querylog.json.gz.1 From a63fe958aeae690dba4754cd9ecedd449f2770b9 Mon Sep 17 00:00:00 2001 From: Eugene Bujak Date: Sun, 7 Oct 2018 02:17:22 +0300 Subject: [PATCH 05/17] Querylog -- Read from querylog files when answering to /querylog API, it now survives restarts. --- coredns_plugin/querylog.go | 5 ++ coredns_plugin/querylog_file.go | 87 +++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+) diff --git a/coredns_plugin/querylog.go b/coredns_plugin/querylog.go index eb848fb8..381ac16a 100644 --- a/coredns_plugin/querylog.go +++ b/coredns_plugin/querylog.go @@ -88,6 +88,11 @@ func handleQueryLog(w http.ResponseWriter, r *http.Request) { logBufferLock.RLock() values := logBuffer logBufferLock.RUnlock() + + if len(values) < queryLogAPI { + values = appendFromLogFile(values, queryLogAPI, time.Hour*24) + } + var data = []map[string]interface{}{} for _, entry := range values { var q *dns.Msg diff --git a/coredns_plugin/querylog_file.go b/coredns_plugin/querylog_file.go index 1cfa93a7..412817ed 100644 --- a/coredns_plugin/querylog_file.go +++ b/coredns_plugin/querylog_file.go @@ -144,3 +144,90 @@ func periodicQueryLogRotate(t time.Duration) { } } } + +func appendFromLogFile(values []logEntry, maxLen int, timeWindow time.Duration) []logEntry { + now := time.Now() + // read from querylog files, try newest file first + files := []string{ + queryLogFileName + ".gz", + queryLogFileName + ".gz.1", + } + + a := []logEntry{} + + // read from all files + for _, file := range files { + if len(a) >= maxLen { + // previous file filled us with enough fresh entries + break + } + if _, err := os.Stat(file); os.IsNotExist(err) { + // do nothing, file doesn't exist + continue + } + + trace("Opening file %s", file) + f, err := os.Open(file) + if err != nil { + log.Printf("Failed to open file \"%s\": %s", file, err) + // try next file + continue + } + defer f.Close() + + trace("Creating gzip reader") + zr, err := gzip.NewReader(f) + if err != nil { + log.Printf("Failed to create gzip reader: %s", err) + continue + } + + trace("Creating json decoder") + d := json.NewDecoder(zr) + + i := 0 + // entries on file are in oldest->newest order + // we want maxLen newest + for d.More() { + var entry logEntry + err := d.Decode(&entry) + if err != nil { + log.Printf("Failed to decode: %s", err) + // next entry can be fine, try more + continue + } + + if now.Sub(entry.Time) > timeWindow { + trace("skipping entry") + continue + } + + i++ + a = append(a, entry) + if len(a) > maxLen { + toskip := len(a) - maxLen + a = a[toskip:] + } + } + err = zr.Close() + if err != nil { + log.Printf("Encountered error while closing gzip reader: %s", err) + } + log.Printf("file \"%s\": read %d entries", file, i) + } + + // now that we've read all eligible entries, reverse the slice to make it go from newest->oldest + for left, right := 0, len(a)-1; left < right; left, right = left+1, right-1 { + a[left], a[right] = a[right], a[left] + } + + // append it to values + values = append(values, a...) + + // then cut off of it is bigger than maxLen + if len(values) > maxLen { + values = values[:maxLen] + } + + return values +} From dc1042c3e9ab8a9ae2cc2dd034f42a5dee8eff91 Mon Sep 17 00:00:00 2001 From: Eugene Bujak Date: Sun, 7 Oct 2018 02:17:57 +0300 Subject: [PATCH 06/17] Querylog -- Omit empty fields when writing json --- coredns_plugin/querylog.go | 2 +- dnsfilter/dnsfilter.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/coredns_plugin/querylog.go b/coredns_plugin/querylog.go index 381ac16a..d064ea08 100644 --- a/coredns_plugin/querylog.go +++ b/coredns_plugin/querylog.go @@ -30,7 +30,7 @@ var ( type logEntry struct { Question []byte - Answer []byte + Answer []byte `json:",omitempty"` // sometimes empty answers happen like binerdunt.top or rev2.globalrootservers.net Result dnsfilter.Result Time time.Time Elapsed time.Duration diff --git a/dnsfilter/dnsfilter.go b/dnsfilter/dnsfilter.go index 3060b297..1344ea9f 100644 --- a/dnsfilter/dnsfilter.go +++ b/dnsfilter/dnsfilter.go @@ -136,9 +136,9 @@ var ( // Result holds state of hostname check type Result struct { - IsFiltered bool - Reason Reason - Rule string + IsFiltered bool `json:",omitempty"` + Reason Reason `json:",omitempty"` + Rule string `json:",omitempty"` } // Matched can be used to see if any match at all was found, no matter filtered or not From 3b1faa1365dc72c01728263b4dfd61c5274dbf6d Mon Sep 17 00:00:00 2001 From: Eugene Bujak Date: Sun, 7 Oct 2018 21:24:22 +0300 Subject: [PATCH 07/17] Fix more race conditions found by race detector --- coredns_plugin/coredns_plugin.go | 40 ++++++++++++++++++-------------- stats.go | 2 +- 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/coredns_plugin/coredns_plugin.go b/coredns_plugin/coredns_plugin.go index dcdb50c8..2db776ac 100644 --- a/coredns_plugin/coredns_plugin.go +++ b/coredns_plugin/coredns_plugin.go @@ -45,21 +45,24 @@ func init() { }) } +type plugSettings struct { + SafeBrowsingBlockHost string + ParentalBlockHost string + QueryLogEnabled bool + BlockedTTL uint32 // in seconds, default 3600 +} + type plug struct { d *dnsfilter.Dnsfilter Next plugin.Handler upstream upstream.Upstream hosts map[string]net.IP - - SafeBrowsingBlockHost string - ParentalBlockHost string - QueryLogEnabled bool - BlockedTTL uint32 // in seconds, default 3600 + settings plugSettings sync.RWMutex } -var defaultPlugin = plug{ +var defaultPluginSettings = plugSettings{ SafeBrowsingBlockHost: "safebrowsing.block.dns.adguard.com", ParentalBlockHost: "family.block.dns.adguard.com", BlockedTTL: 3600, // in seconds @@ -91,10 +94,11 @@ var ( // func setupPlugin(c *caddy.Controller) (*plug, error) { // create new Plugin and copy default values - var p = new(plug) - *p = defaultPlugin - p.d = dnsfilter.New() - p.hosts = make(map[string]net.IP) + p := &plug{ + settings: defaultPluginSettings, + d: dnsfilter.New(), + hosts: make(map[string]net.IP), + } filterFileNames := []string{} for c.Next() { @@ -130,7 +134,7 @@ func setupPlugin(c *caddy.Controller) (*plug, error) { if len(c.Val()) == 0 { return nil, c.ArgErr() } - p.ParentalBlockHost = c.Val() + p.settings.ParentalBlockHost = c.Val() } case "blocked_ttl": if !c.NextArg() { @@ -140,9 +144,9 @@ func setupPlugin(c *caddy.Controller) (*plug, error) { if err != nil { return nil, c.ArgErr() } - p.BlockedTTL = uint32(blockttl) + p.settings.BlockedTTL = uint32(blockttl) case "querylog": - p.QueryLogEnabled = true + p.settings.QueryLogEnabled = true onceQueryLog.Do(func() { go startQueryLogServer() // TODO: how to handle errors? }) @@ -323,7 +327,7 @@ func (p *plug) replaceHostWithValAndReply(ctx context.Context, w dns.ResponseWri log.Println("Will give", val, "instead of", host) if addr != nil { // this is an IP address, return it - result, err := dns.NewRR(fmt.Sprintf("%s %d A %s", host, p.BlockedTTL, val)) + result, err := dns.NewRR(fmt.Sprintf("%s %d A %s", host, p.settings.BlockedTTL, val)) if err != nil { log.Printf("Got error %s\n", err) return dns.RcodeServerFailure, fmt.Errorf("plugin/dnsfilter: %s", err) @@ -365,7 +369,7 @@ func (p *plug) replaceHostWithValAndReply(ctx context.Context, w dns.ResponseWri // the only value that is important is TTL in header, other values like refresh, retry, expire and minttl are irrelevant func (p *plug) genSOA(r *dns.Msg) []dns.RR { zone := r.Question[0].Name - header := dns.RR_Header{Name: zone, Rrtype: dns.TypeSOA, Ttl: p.BlockedTTL, Class: dns.ClassINET} + header := dns.RR_Header{Name: zone, Rrtype: dns.TypeSOA, Ttl: p.settings.BlockedTTL, Class: dns.ClassINET} Mbox := "hostmaster." if zone[0] != '.' { @@ -450,7 +454,7 @@ func (p *plug) serveDNSInternal(ctx context.Context, w dns.ResponseWriter, r *dn switch result.Reason { case dnsfilter.FilteredSafeBrowsing: // return cname safebrowsing.block.dns.adguard.com - val := p.SafeBrowsingBlockHost + val := p.settings.SafeBrowsingBlockHost rcode, err := p.replaceHostWithValAndReply(ctx, w, r, host, val, question) if err != nil { return rcode, dnsfilter.Result{}, err @@ -458,7 +462,7 @@ func (p *plug) serveDNSInternal(ctx context.Context, w dns.ResponseWriter, r *dn return rcode, result, err case dnsfilter.FilteredParental: // return cname family.block.dns.adguard.com - val := p.ParentalBlockHost + val := p.settings.ParentalBlockHost rcode, err := p.replaceHostWithValAndReply(ctx, w, r, host, val, question) if err != nil { return rcode, dnsfilter.Result{}, err @@ -549,7 +553,7 @@ func (p *plug) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) ( } // log - if p.QueryLogEnabled { + if p.settings.QueryLogEnabled { logRequest(r, rrw.Msg, result, time.Since(start), ip) } return rcode, err diff --git a/stats.go b/stats.go index a3231faf..f06380d2 100644 --- a/stats.go +++ b/stats.go @@ -254,7 +254,7 @@ func writeStats() error { statsFile := filepath.Join(config.ourBinaryDir, "stats.json") log.Printf("Writing JSON file: %s", statsFile) statistics.RLock() - json, err := json.MarshalIndent(statistics, "", " ") + json, err := json.MarshalIndent(&statistics, "", " ") statistics.RUnlock() if err != nil { log.Printf("Couldn't generate JSON: %s", err) From a2434d4574bf5dc3ab892748776086ba565e4356 Mon Sep 17 00:00:00 2001 From: Eugene Bujak Date: Sun, 7 Oct 2018 23:24:04 +0300 Subject: [PATCH 08/17] coredns plugin -- Calculate top for domains, clients and blocked both from querylog and running requests. This moves the functionality from frontend to coredns plugin. --- control.go | 87 +-------- coredns_plugin/coredns_plugin.go | 17 +- coredns_plugin/querylog.go | 14 +- coredns_plugin/querylog_file.go | 49 +++-- coredns_plugin/querylog_top.go | 325 +++++++++++++++++++++++++++++++ helpers.go | 85 -------- 6 files changed, 394 insertions(+), 183 deletions(-) create mode 100644 coredns_plugin/querylog_top.go diff --git a/control.go b/control.go index 06adb498..dea887b2 100644 --- a/control.go +++ b/control.go @@ -417,9 +417,9 @@ func handleStatsReset(w http.ResponseWriter, r *http.Request) { } func handleStatsTop(w http.ResponseWriter, r *http.Request) { - resp, err := client.Get("http://127.0.0.1:8618/querylog") + resp, err := client.Get("http://127.0.0.1:8618/stats_top") if err != nil { - errortext := fmt.Sprintf("Couldn't get querylog from coredns: %T %s\n", err, err) + errortext := fmt.Sprintf("Couldn't get stats_top from coredns: %T %s\n", err, err) log.Println(errortext) http.Error(w, errortext, http.StatusBadGateway) return @@ -428,7 +428,7 @@ func handleStatsTop(w http.ResponseWriter, r *http.Request) { defer resp.Body.Close() } - // read body + // read the body entirely body, err := ioutil.ReadAll(resp.Body) if err != nil { errortext := fmt.Sprintf("Couldn't read response body: %s", err) @@ -436,85 +436,12 @@ func handleStatsTop(w http.ResponseWriter, r *http.Request) { http.Error(w, errortext, http.StatusBadGateway) return } - // empty body - if len(body) == 0 { - return - } - - values := []interface{}{} - err = json.Unmarshal(body, &values) - if err != nil { - errortext := fmt.Sprintf("Couldn't parse response body: %s", err) - log.Println(errortext) - http.Error(w, errortext, http.StatusBadGateway) - return - } - - domains := map[string]int{} - blocked := map[string]int{} - clients := map[string]int{} - now := time.Now() - timeWindow := time.Hour * 24 - notBefore := now.Add(timeWindow * -1) - - for _, value := range values { - entry, ok := value.(map[string]interface{}) - if !ok { - // ignore anything else - continue - } - host := getHost(entry) - reason := getReason(entry) - client := getClient(entry) - time := getTime(entry) - if time.Before(notBefore) { - // skip if the entry is before specified cutoff - continue - } - if len(host) > 0 { - domains[host]++ - } - if len(host) > 0 && strings.HasPrefix(reason, "Filtered") { - blocked[host]++ - } - if len(client) > 0 { - clients[client]++ - } - } - - // use manual json marshalling because we want maps to be sorted by value - json := bytes.Buffer{} - json.WriteString("{\n") - - gen := func(json *bytes.Buffer, name string, top map[string]int, addComma bool) { - json.WriteString(" \"") - json.WriteString(name) - json.WriteString("\": {\n") - sorted := sortByValue(top) - for i, key := range sorted { - json.WriteString(" \"") - json.WriteString(key) - json.WriteString("\": ") - json.WriteString(strconv.Itoa(top[key])) - if i+1 != len(sorted) { - json.WriteByte(',') - } - json.WriteByte('\n') - } - json.WriteString(" }") - if addComma { - json.WriteByte(',') - } - json.WriteByte('\n') - } - gen(&json, "top_queried_domains", domains, true) - gen(&json, "top_blocked_domains", blocked, true) - gen(&json, "top_clients", clients, true) - json.WriteString(" \"stats_period\": \"24 hours\"\n") - json.WriteString("}\n") + // forward body entirely with status code w.Header().Set("Content-Type", "application/json") - _, err = w.Write(json.Bytes()) + w.Header().Set("Content-Length", strconv.Itoa(len(body))) + w.WriteHeader(resp.StatusCode) + _, err = w.Write(body) if err != nil { errortext := fmt.Sprintf("Couldn't write body: %s", err) log.Println(errortext) diff --git a/coredns_plugin/coredns_plugin.go b/coredns_plugin/coredns_plugin.go index 2db776ac..474c3f22 100644 --- a/coredns_plugin/coredns_plugin.go +++ b/coredns_plugin/coredns_plugin.go @@ -147,9 +147,6 @@ func setupPlugin(c *caddy.Controller) (*plug, error) { p.settings.BlockedTTL = uint32(blockttl) case "querylog": p.settings.QueryLogEnabled = true - onceQueryLog.Do(func() { - go startQueryLogServer() // TODO: how to handle errors? - }) } } } @@ -186,7 +183,19 @@ func setupPlugin(c *caddy.Controller) (*plug, error) { } } - var err error + log.Printf("Loading top from querylog") + err := loadTopFromFiles() + if err != nil { + log.Printf("Failed to load top from querylog: %s", err) + return nil, err + } + + if p.settings.QueryLogEnabled { + onceQueryLog.Do(func() { + go startQueryLogServer() // TODO: how to handle errors? + }) + } + p.upstream, err = upstream.New(nil) if err != nil { return nil, err diff --git a/coredns_plugin/querylog.go b/coredns_plugin/querylog.go index d064ea08..c0df50e2 100644 --- a/coredns_plugin/querylog.go +++ b/coredns_plugin/querylog.go @@ -57,11 +57,12 @@ func logRequest(question *dns.Msg, answer *dns.Msg, result dnsfilter.Result, ela } } + now := time.Now() entry := logEntry{ Question: q, Answer: a, Result: result, - Time: time.Now(), + Time: now, Elapsed: elapsed, IP: ip, } @@ -74,6 +75,15 @@ func logRequest(question *dns.Msg, answer *dns.Msg, result dnsfilter.Result, ela logBuffer = nil } logBufferLock.Unlock() + + // add it to running top + err = runningTop.addEntry(&entry, now) + if err != nil { + log.Printf("Failed to add entry to running top: %s", err) + // don't do failure, just log + } + + // if buffer needs to be flushed to disk, do it now if len(flushBuffer) > 0 { // write to file // do it in separate goroutine -- we are stalling DNS response this whole time @@ -204,8 +214,10 @@ func startQueryLogServer() { listenAddr := "127.0.0.1:8618" // 8618 is sha512sum of "querylog" then each byte summed go periodicQueryLogRotate(queryLogRotationPeriod) + go periodicHourlyTopRotate() http.HandleFunc("/querylog", handleQueryLog) + http.HandleFunc("/stats_top", handleStatsTop) if err := http.ListenAndServe(listenAddr, nil); err != nil { log.Fatalf("error in ListenAndServe: %s", err) } diff --git a/coredns_plugin/querylog_file.go b/coredns_plugin/querylog_file.go index 412817ed..fffd050a 100644 --- a/coredns_plugin/querylog_file.go +++ b/coredns_plugin/querylog_file.go @@ -145,7 +145,7 @@ func periodicQueryLogRotate(t time.Duration) { } } -func appendFromLogFile(values []logEntry, maxLen int, timeWindow time.Duration) []logEntry { +func genericLoader(onEntry func(entry *logEntry) error, needMore func() bool, timeWindow time.Duration) error { now := time.Now() // read from querylog files, try newest file first files := []string{ @@ -153,12 +153,9 @@ func appendFromLogFile(values []logEntry, maxLen int, timeWindow time.Duration) queryLogFileName + ".gz.1", } - a := []logEntry{} - // read from all files for _, file := range files { - if len(a) >= maxLen { - // previous file filled us with enough fresh entries + if !needMore() { break } if _, err := os.Stat(file); os.IsNotExist(err) { @@ -181,6 +178,7 @@ func appendFromLogFile(values []logEntry, maxLen int, timeWindow time.Duration) log.Printf("Failed to create gzip reader: %s", err) continue } + defer zr.Close() trace("Creating json decoder") d := json.NewDecoder(zr) @@ -189,6 +187,9 @@ func appendFromLogFile(values []logEntry, maxLen int, timeWindow time.Duration) // entries on file are in oldest->newest order // we want maxLen newest for d.More() { + if !needMore() { + break + } var entry logEntry err := d.Decode(&entry) if err != nil { @@ -203,18 +204,40 @@ func appendFromLogFile(values []logEntry, maxLen int, timeWindow time.Duration) } i++ - a = append(a, entry) - if len(a) > maxLen { - toskip := len(a) - maxLen - a = a[toskip:] + err = onEntry(&entry) + if err != nil { + return err } } - err = zr.Close() - if err != nil { - log.Printf("Encountered error while closing gzip reader: %s", err) - } log.Printf("file \"%s\": read %d entries", file, i) } + return nil +} + +func appendFromLogFile(values []logEntry, maxLen int, timeWindow time.Duration) []logEntry { + a := []logEntry{} + + onEntry := func(entry *logEntry) error { + a = append(a, *entry) + if len(a) > maxLen { + toskip := len(a) - maxLen + a = a[toskip:] + } + return nil + } + + needMore := func() bool { + if len(a) >= maxLen { + return false + } + return true + } + + err := genericLoader(onEntry, needMore, timeWindow) + if err != nil { + log.Printf("Failed to load entries from querylog: %s", err) + return values + } // now that we've read all eligible entries, reverse the slice to make it go from newest->oldest for left, right := 0, len(a)-1; left < right; left, right = left+1, right-1 { diff --git a/coredns_plugin/querylog_top.go b/coredns_plugin/querylog_top.go new file mode 100644 index 00000000..2ba7bcb4 --- /dev/null +++ b/coredns_plugin/querylog_top.go @@ -0,0 +1,325 @@ +package dnsfilter + +import ( + "bytes" + "fmt" + "log" + "net/http" + "sort" + "strconv" + "strings" + "sync" + "time" + + "github.com/bluele/gcache" + "github.com/miekg/dns" +) + +// top domains/clients/blocked stats in the last 24 hours + +// on start we read the saved stats from the last 24 hours and add them to the stats + +// stats are counted using hourly LRU, rotating hourly and keeping last 24 hours + +type hourTop struct { + domains gcache.Cache + blocked gcache.Cache + clients gcache.Cache + sync.RWMutex +} + +func (top *hourTop) init() { + top.domains = gcache.New(500).LRU().Build() + top.blocked = gcache.New(500).LRU().Build() + top.clients = gcache.New(500).LRU().Build() +} + +type dayTop struct { + hours []*hourTop + sync.RWMutex // write -- rotating hourTop, read -- anything else +} + +var runningTop dayTop + +func init() { + runningTop.Lock() + for i := 0; i < 24; i++ { + hour := hourTop{} + hour.init() + runningTop.hours = append(runningTop.hours, &hour) + } + runningTop.Unlock() +} + +func rotateHourlyTop() { + log.Printf("Rotating hourly top") + hour := &hourTop{} + hour.init() + runningTop.Lock() + runningTop.hours = append([]*hourTop{hour}, runningTop.hours...) + runningTop.hours = runningTop.hours[:24] + runningTop.Unlock() +} + +func periodicHourlyTopRotate() { + t := time.Hour + for range time.Tick(t) { + rotateHourlyTop() + } +} + +func (top *hourTop) incrementValue(key string, cache gcache.Cache) error { + top.Lock() + defer top.Unlock() + ivalue, err := cache.Get(key) + if err == gcache.KeyNotFoundError { + // we just set it and we're done + err = cache.Set(key, 1) + if err != nil { + log.Printf("Failed to set hourly top value: %s", err) + return err + } + return nil + } + + if err != nil { + log.Printf("gcache encountered an error during get: %s", err) + return err + } + + cachedValue, ok := ivalue.(int) + if !ok { + err = fmt.Errorf("SHOULD NOT HAPPEN: gcache has non-int as value: %v", ivalue) + log.Println(err) + return err + } + + err = cache.Set(key, cachedValue+1) + if err != nil { + log.Printf("Failed to set hourly top value: %s", err) + return err + } + return nil +} + +func (top *hourTop) incrementDomains(key string) error { + return top.incrementValue(key, top.domains) +} + +func (top *hourTop) incrementBlocked(key string) error { + return top.incrementValue(key, top.blocked) +} + +func (top *hourTop) incrementClients(key string) error { + return top.incrementValue(key, top.clients) +} + +// if does not exist -- return 0 +func (top *hourTop) lockedGetValue(key string, cache gcache.Cache) (int, error) { + ivalue, err := cache.Get(key) + if err == gcache.KeyNotFoundError { + return 0, nil + } + + if err != nil { + log.Printf("gcache encountered an error during get: %s", err) + return 0, err + } + + value, ok := ivalue.(int) + if !ok { + err := fmt.Errorf("SHOULD NOT HAPPEN: gcache has non-int as value: %v", ivalue) + log.Println(err) + return 0, err + } + + return value, nil +} + +func (top *hourTop) lockedGetDomains(key string) (int, error) { + return top.lockedGetValue(key, top.domains) +} + +func (top *hourTop) lockedGetBlocked(key string) (int, error) { + return top.lockedGetValue(key, top.blocked) +} + +func (top *hourTop) lockedGetClients(key string) (int, error) { + return top.lockedGetValue(key, top.clients) +} + +func (r *dayTop) addEntry(entry *logEntry, now time.Time) error { + if len(entry.Question) == 0 { + log.Printf("entry question is absent, skipping") + return nil + } + + if entry.Time.After(now) { + log.Printf("t %v vs %v is in the future, ignoring", entry.Time, now) + return nil + } + // figure out which hour bucket it belongs to + hour := int(now.Sub(entry.Time).Hours()) + if hour >= 24 { + log.Printf("t %v is >24 hours ago, ignoring", entry.Time) + return nil + } + + q := new(dns.Msg) + if err := q.Unpack(entry.Question); err != nil { + log.Printf("failed to unpack dns message question: %s", err) + return err + } + + if len(q.Question) != 1 { + log.Printf("malformed dns message, has no questions, skipping") + return nil + } + + hostname := strings.ToLower(strings.TrimSuffix(q.Question[0].Name, ".")) + + // get value, if not set, crate one + runningTop.RLock() + defer runningTop.RUnlock() + err := runningTop.hours[hour].incrementDomains(hostname) + if err != nil { + log.Printf("Failed to increment value: %s", err) + return err + } + + if entry.Result.IsFiltered { + err := runningTop.hours[hour].incrementBlocked(hostname) + if err != nil { + log.Printf("Failed to increment value: %s", err) + return err + } + } + + if len(entry.IP) > 0 { + err := runningTop.hours[hour].incrementClients(entry.IP) + if err != nil { + log.Printf("Failed to increment value: %s", err) + return err + } + } + + return nil +} + +func loadTopFromFiles() error { + now := time.Now() + runningTop.RLock() + defer runningTop.RUnlock() + onEntry := func(entry *logEntry) error { + err := runningTop.addEntry(entry, now) + if err != nil { + log.Printf("Failed to add entry to running top: %s", err) + return err + } + return nil + } + + needMore := func() bool { return true } + err := genericLoader(onEntry, needMore, time.Hour*24) + if err != nil { + log.Printf("Failed to load entries from querylog: %s", err) + return err + } + + return nil +} + +func handleStatsTop(w http.ResponseWriter, r *http.Request) { + domains := map[string]int{} + blocked := map[string]int{} + clients := map[string]int{} + + do := func(keys []interface{}, getter func(key string) (int, error), result map[string]int) { + for _, ikey := range keys { + key, ok := ikey.(string) + if !ok { + continue + } + value, err := getter(key) + if err != nil { + log.Printf("Failed to get top domains value for %v: %s", key, err) + return + } + result[key] += value + } + } + + runningTop.RLock() + for hour := 0; hour < 24; hour++ { + runningTop.hours[hour].RLock() + do(runningTop.hours[hour].domains.Keys(), runningTop.hours[hour].lockedGetDomains, domains) + do(runningTop.hours[hour].blocked.Keys(), runningTop.hours[hour].lockedGetBlocked, blocked) + do(runningTop.hours[hour].clients.Keys(), runningTop.hours[hour].lockedGetClients, clients) + runningTop.hours[hour].RUnlock() + } + runningTop.RUnlock() + + // use manual json marshalling because we want maps to be sorted by value + json := bytes.Buffer{} + json.WriteString("{\n") + + gen := func(json *bytes.Buffer, name string, top map[string]int, addComma bool) { + json.WriteString(" \"") + json.WriteString(name) + json.WriteString("\": {\n") + sorted := sortByValue(top) + for i, key := range sorted { + // no more than 50 entries + if i >= 50 { + break + } + json.WriteString(" \"") + json.WriteString(key) + json.WriteString("\": ") + json.WriteString(strconv.Itoa(top[key])) + if i+1 != len(sorted) { + json.WriteByte(',') + } + json.WriteByte('\n') + } + json.WriteString(" }") + if addComma { + json.WriteByte(',') + } + json.WriteByte('\n') + } + gen(&json, "top_queried_domains", domains, true) + gen(&json, "top_blocked_domains", blocked, true) + gen(&json, "top_clients", clients, true) + json.WriteString(" \"stats_period\": \"24 hours\"\n") + json.WriteString("}\n") + + w.Header().Set("Content-Type", "application/json") + _, err := w.Write(json.Bytes()) + if err != nil { + errortext := fmt.Sprintf("Couldn't write body: %s", err) + log.Println(errortext) + http.Error(w, errortext, http.StatusInternalServerError) + } +} + +// helper function for querylog API +func sortByValue(m map[string]int) []string { + type kv struct { + k string + v int + } + var ss []kv + for k, v := range m { + ss = append(ss, kv{k, v}) + } + sort.Slice(ss, func(l, r int) bool { + return ss[l].v > ss[r].v + }) + + sorted := []string{} + for _, v := range ss { + sorted = append(sorted, v.k) + } + return sorted +} diff --git a/helpers.go b/helpers.go index a35a5f1c..1bbca87c 100644 --- a/helpers.go +++ b/helpers.go @@ -9,9 +9,7 @@ import ( "os" "path" "runtime" - "sort" "strings" - "time" ) func clamp(value, low, high int) int { @@ -133,89 +131,6 @@ func generateMapFromStats(stats *periodicStats, start int, end int) map[string]i return result } -// ------------------------------------- -// helper functions for querylog parsing -// ------------------------------------- -func sortByValue(m map[string]int) []string { - type kv struct { - k string - v int - } - var ss []kv - for k, v := range m { - ss = append(ss, kv{k, v}) - } - sort.Slice(ss, func(l, r int) bool { - return ss[l].v > ss[r].v - }) - - sorted := []string{} - for _, v := range ss { - sorted = append(sorted, v.k) - } - return sorted -} - -func getHost(entry map[string]interface{}) string { - q, ok := entry["question"] - if !ok { - return "" - } - question, ok := q.(map[string]interface{}) - if !ok { - return "" - } - h, ok := question["host"] - if !ok { - return "" - } - host, ok := h.(string) - if !ok { - return "" - } - return host -} - -func getReason(entry map[string]interface{}) string { - r, ok := entry["reason"] - if !ok { - return "" - } - reason, ok := r.(string) - if !ok { - return "" - } - return reason -} - -func getClient(entry map[string]interface{}) string { - c, ok := entry["client"] - if !ok { - return "" - } - client, ok := c.(string) - if !ok { - return "" - } - return client -} - -func getTime(entry map[string]interface{}) time.Time { - t, ok := entry["time"] - if !ok { - return time.Time{} - } - tstr, ok := t.(string) - if !ok { - return time.Time{} - } - value, err := time.Parse(time.RFC3339, tstr) - if err != nil { - return time.Time{} - } - return value -} - // ------------------------------------------------- // helper functions for parsing parameters from body // ------------------------------------------------- From 4ecb84f9ad776b504dd707e5e0390223e61dfb27 Mon Sep 17 00:00:00 2001 From: Eugene Bujak Date: Sun, 7 Oct 2018 23:43:24 +0300 Subject: [PATCH 09/17] Fix some lint warnings --- control.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/control.go b/control.go index dea887b2..a1b88900 100644 --- a/control.go +++ b/control.go @@ -144,7 +144,7 @@ func childwaiter() { err := coreDNSCommand.Wait() log.Printf("coredns unexpectedly died: %s\n", err) coreDNSCommand.Process.Release() - log.Printf("restarting coredns\n", err) + log.Printf("restarting coredns") err = startDNSServer() if err != nil { log.Printf("Couldn't restart DNS server: %s\n", err) @@ -510,7 +510,7 @@ func handleTestUpstreamDNS(w http.ResponseWriter, r *http.Request) { result := map[string]string{} for _, host := range hosts { - err := checkDNS(host) + err = checkDNS(host) if err != nil { log.Println(err) result[host] = err.Error() From ea1125f57d93dd9456f513d8a8531cc8d6e3e796 Mon Sep 17 00:00:00 2001 From: Eugene Bujak Date: Mon, 8 Oct 2018 04:24:37 +0300 Subject: [PATCH 10/17] coredns plugin -- don't reload from querylog on SIGUSR, we already have it in memory --- coredns_plugin/querylog_top.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/coredns_plugin/querylog_top.go b/coredns_plugin/querylog_top.go index 2ba7bcb4..82ee1130 100644 --- a/coredns_plugin/querylog_top.go +++ b/coredns_plugin/querylog_top.go @@ -36,6 +36,7 @@ func (top *hourTop) init() { type dayTop struct { hours []*hourTop + loaded bool sync.RWMutex // write -- rotating hourTop, read -- anything else } @@ -209,6 +210,9 @@ func (r *dayTop) addEntry(entry *logEntry, now time.Time) error { func loadTopFromFiles() error { now := time.Now() runningTop.RLock() + if runningTop.loaded { + return nil + } defer runningTop.RUnlock() onEntry := func(entry *logEntry) error { err := runningTop.addEntry(entry, now) @@ -226,6 +230,8 @@ func loadTopFromFiles() error { return err } + runningTop.loaded = true + return nil } From 182fa37e5fd1acea810ef091d3e50f29694f3baa Mon Sep 17 00:00:00 2001 From: Eugene Bujak Date: Mon, 8 Oct 2018 05:07:02 +0300 Subject: [PATCH 11/17] querylog API -- when manually generating json, don't forget to escape strings --- coredns_plugin/querylog_top.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/coredns_plugin/querylog_top.go b/coredns_plugin/querylog_top.go index 82ee1130..bcc12b3a 100644 --- a/coredns_plugin/querylog_top.go +++ b/coredns_plugin/querylog_top.go @@ -270,18 +270,18 @@ func handleStatsTop(w http.ResponseWriter, r *http.Request) { json.WriteString("{\n") gen := func(json *bytes.Buffer, name string, top map[string]int, addComma bool) { - json.WriteString(" \"") - json.WriteString(name) - json.WriteString("\": {\n") + json.WriteString(" ") + json.WriteString(fmt.Sprintf("%q", name)) + json.WriteString(": {\n") sorted := sortByValue(top) + // no more than 50 entries + if len(sorted) > 50 { + sorted = sorted[:50] + } for i, key := range sorted { - // no more than 50 entries - if i >= 50 { - break - } - json.WriteString(" \"") - json.WriteString(key) - json.WriteString("\": ") + json.WriteString(" ") + json.WriteString(fmt.Sprintf("%q", key)) + json.WriteString(": ") json.WriteString(strconv.Itoa(top[key])) if i+1 != len(sorted) { json.WriteByte(',') From 0440ef016afb5b3ef9619481df1288fc36257529 Mon Sep 17 00:00:00 2001 From: Eugene Bujak Date: Mon, 8 Oct 2018 05:55:33 +0300 Subject: [PATCH 12/17] stats -- Clamp number of rotations to sane value and prevent from going into (very long) loop --- stats.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/stats.go b/stats.go index f06380d2..c3b69126 100644 --- a/stats.go +++ b/stats.go @@ -107,6 +107,9 @@ func isConnRefused(err error) bool { } func statsRotate(periodic *periodicStats, now time.Time, rotations int64) { + if rotations > statsHistoryElements { + rotations = statsHistoryElements + } // calculate how many times we should rotate for r := int64(0); r < rotations; r++ { for key, values := range periodic.Entries { From 2c84cd6448f2187c953578e6a286857c18842e30 Mon Sep 17 00:00:00 2001 From: Eugene Bujak Date: Mon, 8 Oct 2018 14:18:19 +0300 Subject: [PATCH 13/17] coredns plugin -- Fix deadlock during coredns reload --- coredns_plugin/querylog_top.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coredns_plugin/querylog_top.go b/coredns_plugin/querylog_top.go index bcc12b3a..70ec0034 100644 --- a/coredns_plugin/querylog_top.go +++ b/coredns_plugin/querylog_top.go @@ -210,10 +210,10 @@ func (r *dayTop) addEntry(entry *logEntry, now time.Time) error { func loadTopFromFiles() error { now := time.Now() runningTop.RLock() + defer runningTop.RUnlock() if runningTop.loaded { return nil } - defer runningTop.RUnlock() onEntry := func(entry *logEntry) error { err := runningTop.addEntry(entry, now) if err != nil { From 3109529dbb0a3cd2cb01ef58875fb68ff35358fa Mon Sep 17 00:00:00 2001 From: Eugene Bujak Date: Mon, 8 Oct 2018 17:14:11 +0300 Subject: [PATCH 14/17] coredns plugin -- change rlock to lock when loading top stats to avoid doing it in parallel --- coredns_plugin/querylog_top.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/coredns_plugin/querylog_top.go b/coredns_plugin/querylog_top.go index 70ec0034..56eea744 100644 --- a/coredns_plugin/querylog_top.go +++ b/coredns_plugin/querylog_top.go @@ -209,8 +209,8 @@ func (r *dayTop) addEntry(entry *logEntry, now time.Time) error { func loadTopFromFiles() error { now := time.Now() - runningTop.RLock() - defer runningTop.RUnlock() + runningTop.Lock() // not rlock because we set it at the end of the function + defer runningTop.Unlock() if runningTop.loaded { return nil } From 763dcc46e96393282864c914321007d8810ed223 Mon Sep 17 00:00:00 2001 From: Eugene Bujak Date: Mon, 8 Oct 2018 17:49:08 +0300 Subject: [PATCH 15/17] coredns plugin -- Final fix for deadlock during coredns reload --- coredns_plugin/querylog_top.go | 56 +++++++++++++++++++++++++--------- 1 file changed, 42 insertions(+), 14 deletions(-) diff --git a/coredns_plugin/querylog_top.go b/coredns_plugin/querylog_top.go index 56eea744..89426e14 100644 --- a/coredns_plugin/querylog_top.go +++ b/coredns_plugin/querylog_top.go @@ -25,7 +25,8 @@ type hourTop struct { domains gcache.Cache blocked gcache.Cache clients gcache.Cache - sync.RWMutex + + mutex sync.RWMutex } func (top *hourTop) init() { @@ -35,31 +36,33 @@ func (top *hourTop) init() { } type dayTop struct { - hours []*hourTop - loaded bool - sync.RWMutex // write -- rotating hourTop, read -- anything else + hours []*hourTop + hoursLock sync.RWMutex // writelock this lock ONLY WHEN rotating or intializing hours! + + loaded bool + loadedLock sync.Mutex } var runningTop dayTop func init() { - runningTop.Lock() + runningTop.hoursWriteLock() for i := 0; i < 24; i++ { hour := hourTop{} hour.init() runningTop.hours = append(runningTop.hours, &hour) } - runningTop.Unlock() + runningTop.hoursWriteUnlock() } func rotateHourlyTop() { log.Printf("Rotating hourly top") hour := &hourTop{} hour.init() - runningTop.Lock() + runningTop.hoursWriteLock() runningTop.hours = append([]*hourTop{hour}, runningTop.hours...) runningTop.hours = runningTop.hours[:24] - runningTop.Unlock() + runningTop.hoursWriteUnlock() } func periodicHourlyTopRotate() { @@ -180,8 +183,8 @@ func (r *dayTop) addEntry(entry *logEntry, now time.Time) error { hostname := strings.ToLower(strings.TrimSuffix(q.Question[0].Name, ".")) // get value, if not set, crate one - runningTop.RLock() - defer runningTop.RUnlock() + runningTop.hoursReadLock() + defer runningTop.hoursReadUnlock() err := runningTop.hours[hour].incrementDomains(hostname) if err != nil { log.Printf("Failed to increment value: %s", err) @@ -209,8 +212,8 @@ func (r *dayTop) addEntry(entry *logEntry, now time.Time) error { func loadTopFromFiles() error { now := time.Now() - runningTop.Lock() // not rlock because we set it at the end of the function - defer runningTop.Unlock() + runningTop.loadedWriteLock() + defer runningTop.loadedWriteUnlock() if runningTop.loaded { return nil } @@ -255,7 +258,7 @@ func handleStatsTop(w http.ResponseWriter, r *http.Request) { } } - runningTop.RLock() + runningTop.hoursReadLock() for hour := 0; hour < 24; hour++ { runningTop.hours[hour].RLock() do(runningTop.hours[hour].domains.Keys(), runningTop.hours[hour].lockedGetDomains, domains) @@ -263,7 +266,7 @@ func handleStatsTop(w http.ResponseWriter, r *http.Request) { do(runningTop.hours[hour].clients.Keys(), runningTop.hours[hour].lockedGetClients, clients) runningTop.hours[hour].RUnlock() } - runningTop.RUnlock() + runningTop.hoursReadUnlock() // use manual json marshalling because we want maps to be sorted by value json := bytes.Buffer{} @@ -329,3 +332,28 @@ func sortByValue(m map[string]int) []string { } return sorted } + +func (d *dayTop) hoursWriteLock() { tracelock(); d.hoursLock.Lock() } +func (d *dayTop) hoursWriteUnlock() { tracelock(); d.hoursLock.Unlock() } +func (d *dayTop) hoursReadLock() { tracelock(); d.hoursLock.RLock() } +func (d *dayTop) hoursReadUnlock() { tracelock(); d.hoursLock.RUnlock() } +func (d *dayTop) loadedWriteLock() { tracelock(); d.loadedLock.Lock() } +func (d *dayTop) loadedWriteUnlock() { tracelock(); d.loadedLock.Unlock() } + +// func (d *dayTop) loadedReadLock() { tracelock(); d.loadedLock.RLock() } +// func (d *dayTop) loadedReadUnlock() { tracelock(); d.loadedLock.RUnlock() } + +func (h *hourTop) Lock() { tracelock(); h.mutex.Lock() } +func (h *hourTop) RLock() { tracelock(); h.mutex.RLock() } +func (h *hourTop) RUnlock() { tracelock(); h.mutex.RUnlock() } +func (h *hourTop) Unlock() { tracelock(); h.mutex.Unlock() } + +func tracelock() { + /* + pc := make([]uintptr, 10) // at least 1 entry needed + runtime.Callers(2, pc) + f := path.Base(runtime.FuncForPC(pc[1]).Name()) + lockf := path.Base(runtime.FuncForPC(pc[0]).Name()) + fmt.Fprintf(os.Stderr, "%s(): %s\n", f, lockf) + */ +} From a15c59e24eaa6c8544fe74193f4cacaf0b5f3fbb Mon Sep 17 00:00:00 2001 From: Eugene Bujak Date: Mon, 8 Oct 2018 19:51:43 +0300 Subject: [PATCH 16/17] coredns plugin -- Cache /querylog API result --- coredns_plugin/querylog.go | 35 ++++++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/coredns_plugin/querylog.go b/coredns_plugin/querylog.go index c0df50e2..0ff676f8 100644 --- a/coredns_plugin/querylog.go +++ b/coredns_plugin/querylog.go @@ -19,13 +19,18 @@ import ( ) const ( - logBufferCap = 1000 // maximum capacity of logBuffer before it's flushed to disk - queryLogAPI = 1000 // maximum API response for /querylog + logBufferCap = 5000 // maximum capacity of logBuffer before it's flushed to disk + queryLogCacheSize = 1000 // maximum API response for /querylog + queryLogCacheTime = time.Minute ) var ( logBufferLock sync.RWMutex logBuffer []logEntry + + queryLogCache []logEntry + queryLogLock sync.RWMutex + queryLogTime time.Time ) type logEntry struct { @@ -93,14 +98,26 @@ func logRequest(question *dns.Msg, answer *dns.Msg, result dnsfilter.Result, ela } func handleQueryLog(w http.ResponseWriter, r *http.Request) { - // TODO: fetch values from disk if len(logBuffer) < queryLogSize - // TODO: cache output - logBufferLock.RLock() - values := logBuffer - logBufferLock.RUnlock() + now := time.Now() - if len(values) < queryLogAPI { - values = appendFromLogFile(values, queryLogAPI, time.Hour*24) + queryLogLock.RLock() + values := queryLogCache + needRefresh := now.Sub(queryLogTime) >= queryLogCacheTime + queryLogLock.RUnlock() + + if needRefresh { + // need to get fresh data + logBufferLock.RLock() + values := logBuffer + logBufferLock.RUnlock() + + if len(values) < queryLogCacheSize { + values = appendFromLogFile(values, queryLogCacheSize, time.Hour*24) + } + queryLogLock.Lock() + queryLogCache = values + queryLogTime = now + queryLogLock.Unlock() } var data = []map[string]interface{}{} From a15f21ca1cd0acabf77a4e9750dd77b2b870a6f4 Mon Sep 17 00:00:00 2001 From: Eugene Bujak Date: Mon, 8 Oct 2018 20:02:09 +0300 Subject: [PATCH 17/17] code review -- move constants into named constants --- coredns_plugin/querylog.go | 20 +++++++++++++------- coredns_plugin/querylog_file.go | 9 ++------- coredns_plugin/querylog_top.go | 18 +++++++++--------- 3 files changed, 24 insertions(+), 23 deletions(-) diff --git a/coredns_plugin/querylog.go b/coredns_plugin/querylog.go index 0ff676f8..9e45315f 100644 --- a/coredns_plugin/querylog.go +++ b/coredns_plugin/querylog.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "log" + "net" "net/http" "os" "path" @@ -19,9 +20,14 @@ import ( ) const ( - logBufferCap = 5000 // maximum capacity of logBuffer before it's flushed to disk - queryLogCacheSize = 1000 // maximum API response for /querylog - queryLogCacheTime = time.Minute + logBufferCap = 5000 // maximum capacity of logBuffer before it's flushed to disk + queryLogTimeLimit = time.Hour * 24 // how far in the past we care about querylogs + queryLogRotationPeriod = time.Hour * 24 // rotate the log every 24 hours + queryLogFileName = "querylog.json" // .gz added during compression + queryLogCacheSize = 1000 // maximum API response for /querylog + queryLogCacheTime = time.Minute // if requested more often than this, give out cached response + queryLogTopSize = 500 // Keep in memory only top N values + queryLogAPIPort = "8618" // 8618 is sha512sum of "querylog" then each byte summed ) var ( @@ -112,7 +118,7 @@ func handleQueryLog(w http.ResponseWriter, r *http.Request) { logBufferLock.RUnlock() if len(values) < queryLogCacheSize { - values = appendFromLogFile(values, queryLogCacheSize, time.Hour*24) + values = appendFromLogFile(values, queryLogCacheSize, queryLogTimeLimit) } queryLogLock.Lock() queryLogCache = values @@ -223,14 +229,14 @@ func handleQueryLog(w http.ResponseWriter, r *http.Request) { if err != nil { errortext := fmt.Sprintf("Unable to write response json: %s", err) log.Println(errortext) - http.Error(w, errortext, 500) + http.Error(w, errortext, http.StatusInternalServerError) } } func startQueryLogServer() { - listenAddr := "127.0.0.1:8618" // 8618 is sha512sum of "querylog" then each byte summed + listenAddr := net.JoinHostPort("127.0.0.1", queryLogAPIPort) - go periodicQueryLogRotate(queryLogRotationPeriod) + go periodicQueryLogRotate() go periodicHourlyTopRotate() http.HandleFunc("/querylog", handleQueryLog) diff --git a/coredns_plugin/querylog_file.go b/coredns_plugin/querylog_file.go index fffd050a..7025fcd3 100644 --- a/coredns_plugin/querylog_file.go +++ b/coredns_plugin/querylog_file.go @@ -13,11 +13,6 @@ import ( "github.com/go-test/deep" ) -const ( - queryLogRotationPeriod = time.Hour * 24 // rotate the log every 24 hours - queryLogFileName = "querylog.json" // .gz added during compression -) - var ( fileWriteLock sync.Mutex ) @@ -135,8 +130,8 @@ func rotateQueryLog() error { return nil } -func periodicQueryLogRotate(t time.Duration) { - for range time.Tick(t) { +func periodicQueryLogRotate() { + for range time.Tick(queryLogRotationPeriod) { err := rotateQueryLog() if err != nil { log.Printf("Failed to rotate querylog: %s", err) diff --git a/coredns_plugin/querylog_top.go b/coredns_plugin/querylog_top.go index 89426e14..e2bfa53e 100644 --- a/coredns_plugin/querylog_top.go +++ b/coredns_plugin/querylog_top.go @@ -5,6 +5,9 @@ import ( "fmt" "log" "net/http" + "os" + "path" + "runtime" "sort" "strconv" "strings" @@ -30,9 +33,9 @@ type hourTop struct { } func (top *hourTop) init() { - top.domains = gcache.New(500).LRU().Build() - top.blocked = gcache.New(500).LRU().Build() - top.clients = gcache.New(500).LRU().Build() + top.domains = gcache.New(topLRUsize).LRU().Build() + top.blocked = gcache.New(topLRUsize).LRU().Build() + top.clients = gcache.New(topLRUsize).LRU().Build() } type dayTop struct { @@ -227,7 +230,7 @@ func loadTopFromFiles() error { } needMore := func() bool { return true } - err := genericLoader(onEntry, needMore, time.Hour*24) + err := genericLoader(onEntry, needMore, queryLogTimeLimit) if err != nil { log.Printf("Failed to load entries from querylog: %s", err) return err @@ -340,20 +343,17 @@ func (d *dayTop) hoursReadUnlock() { tracelock(); d.hoursLock.RUnlock() } func (d *dayTop) loadedWriteLock() { tracelock(); d.loadedLock.Lock() } func (d *dayTop) loadedWriteUnlock() { tracelock(); d.loadedLock.Unlock() } -// func (d *dayTop) loadedReadLock() { tracelock(); d.loadedLock.RLock() } -// func (d *dayTop) loadedReadUnlock() { tracelock(); d.loadedLock.RUnlock() } - func (h *hourTop) Lock() { tracelock(); h.mutex.Lock() } func (h *hourTop) RLock() { tracelock(); h.mutex.RLock() } func (h *hourTop) RUnlock() { tracelock(); h.mutex.RUnlock() } func (h *hourTop) Unlock() { tracelock(); h.mutex.Unlock() } func tracelock() { - /* + if false { // not commented out to make code checked during compilation pc := make([]uintptr, 10) // at least 1 entry needed runtime.Callers(2, pc) f := path.Base(runtime.FuncForPC(pc[1]).Name()) lockf := path.Base(runtime.FuncForPC(pc[0]).Name()) fmt.Fprintf(os.Stderr, "%s(): %s\n", f, lockf) - */ + } }