Pull request 1795: 5661-imp-querylog-logging

Updates #5661.

Squashed commit of the following:

commit 3fac63fb4ac906e61f6bb2d2af5408771d93a206
Author: Ainar Garipov <A.Garipov@AdGuard.COM>
Date:   Fri Mar 31 17:24:00 2023 +0300

    querylog: imp locks even more

commit bf14ab9a5ea1be83ac156dc5f3d7f3717d50767b
Author: Ainar Garipov <A.Garipov@AdGuard.COM>
Date:   Fri Mar 31 17:09:25 2023 +0300

    querylog: imp locks more

commit 40e885f39b4c38dcd387d4799529b0fa150ed520
Author: Ainar Garipov <A.Garipov@AdGuard.COM>
Date:   Fri Mar 31 16:26:15 2023 +0300

    querylog: imp locks, logs
This commit is contained in:
Ainar Garipov 2023-03-31 17:39:04 +03:00
parent 2eb3bf6ea5
commit 1731ce9c63
8 changed files with 125 additions and 89 deletions

View file

@ -31,7 +31,7 @@ NOTE: Add new changes BELOW THIS COMMENT.
is described in `openapi/openapi.yaml`. The duration of this pause could is described in `openapi/openapi.yaml`. The duration of this pause could
also be set with the config field `protection_disabled_until` in `dns` also be set with the config field `protection_disabled_until` in `dns`
section of the YAML configuration file. section of the YAML configuration file.
- Ability to create a static DHCP lease from a dynamic one more easily - The ability to create a static DHCP lease from a dynamic one more easily
([#3459]). ([#3459]).
- Two new HTTP APIs, `PUT /control/stats/config/update` and `GET - Two new HTTP APIs, `PUT /control/stats/config/update` and `GET
control/stats/config`, which can be used to set and receive the query log control/stats/config`, which can be used to set and receive the query log
@ -103,10 +103,10 @@ In this release, the schema version has changed from 17 to 20.
- The `POST /control/safesearch/disable` HTTP API is deprecated. Use the new - The `POST /control/safesearch/disable` HTTP API is deprecated. Use the new
`PUT /control/safesearch/settings` API `PUT /control/safesearch/settings` API
- The `safesearch_enabled` field is deprecated in the following HTTP APIs: - The `safesearch_enabled` field is deprecated in the following HTTP APIs:
- `GET /control/clients` - `GET /control/clients`;
- `POST /control/clients/add` - `POST /control/clients/add`;
- `POST /control/clients/update` - `POST /control/clients/update`;
- `GET /control/clients/find?ip0=...&ip1=...&ip2=...` - `GET /control/clients/find?ip0=...&ip1=...&ip2=...`.
Check `openapi/openapi.yaml` for more details. Check `openapi/openapi.yaml` for more details.
- The `GET /control/stats_info` HTTP API; use the new `GET - The `GET /control/stats_info` HTTP API; use the new `GET

4
go.mod
View file

@ -63,7 +63,3 @@ require (
golang.org/x/text v0.8.0 // indirect golang.org/x/text v0.8.0 // indirect
golang.org/x/tools v0.7.0 // indirect golang.org/x/tools v0.7.0 // indirect
) )
// TODO(a.garipov): Remove this and update github.com/ameshkov/dnscrypt when
// it's released.
replace github.com/ameshkov/dnscrypt/v2 => github.com/ainar-g/dnscrypt/v2 v2.0.1-0.20230315131826-cdb2bf61bda8

4
go.sum
View file

@ -15,8 +15,8 @@ github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da h1:KjTM2ks9d14ZYCvmH
github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da/go.mod h1:eHEWzANqSiWQsof+nXEI9bUVUyV6F53Fp89EuCh2EAA= github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da/go.mod h1:eHEWzANqSiWQsof+nXEI9bUVUyV6F53Fp89EuCh2EAA=
github.com/aead/poly1305 v0.0.0-20180717145839-3fee0db0b635 h1:52m0LGchQBBVqJRyYYufQuIbVqRawmubW3OFGqK1ekw= github.com/aead/poly1305 v0.0.0-20180717145839-3fee0db0b635 h1:52m0LGchQBBVqJRyYYufQuIbVqRawmubW3OFGqK1ekw=
github.com/aead/poly1305 v0.0.0-20180717145839-3fee0db0b635/go.mod h1:lmLxL+FV291OopO93Bwf9fQLQeLyt33VJRUg5VJ30us= github.com/aead/poly1305 v0.0.0-20180717145839-3fee0db0b635/go.mod h1:lmLxL+FV291OopO93Bwf9fQLQeLyt33VJRUg5VJ30us=
github.com/ainar-g/dnscrypt/v2 v2.0.1-0.20230315131826-cdb2bf61bda8 h1:jc3/aOQ01Yy3vxB/uqcuUi4F+6E/FKWaTCHq5J1ef2o= github.com/ameshkov/dnscrypt/v2 v2.2.6 h1:rE7AFbPWebq7me7RVS66Cipd1m7ef1yf2+C8QzjQXXE=
github.com/ainar-g/dnscrypt/v2 v2.0.1-0.20230315131826-cdb2bf61bda8/go.mod h1:qPWhwz6FdSmuK7W4sMyvogrez4MWdtzosdqlr0Rg3ow= github.com/ameshkov/dnscrypt/v2 v2.2.6/go.mod h1:qPWhwz6FdSmuK7W4sMyvogrez4MWdtzosdqlr0Rg3ow=
github.com/ameshkov/dnsstamps v1.0.3 h1:Srzik+J9mivH1alRACTbys2xOxs0lRH9qnTA7Y1OYVo= github.com/ameshkov/dnsstamps v1.0.3 h1:Srzik+J9mivH1alRACTbys2xOxs0lRH9qnTA7Y1OYVo=
github.com/ameshkov/dnsstamps v1.0.3/go.mod h1:Ii3eUu73dx4Vw5O4wjzmT5+lkCwovjzaEZZ4gKyIH5A= github.com/ameshkov/dnsstamps v1.0.3/go.mod h1:Ii3eUu73dx4Vw5O4wjzmT5+lkCwovjzaEZZ4gKyIH5A=
github.com/beefsack/go-rate v0.0.0-20220214233405-116f4ca011a0 h1:0b2vaepXIfMsG++IsjHiI2p4bxALD1Y2nQKGMR5zDQM= github.com/beefsack/go-rate v0.0.0-20220214233405-116f4ca011a0 h1:0b2vaepXIfMsG++IsjHiI2p4bxALD1Y2nQKGMR5zDQM=

View file

@ -60,41 +60,48 @@ type getConfigResp struct {
// Register web handlers // Register web handlers
func (l *queryLog) initWeb() { func (l *queryLog) initWeb() {
l.conf.HTTPRegister(http.MethodGet, "/control/querylog", l.handleQueryLog) l.conf.HTTPRegister(http.MethodGet, "/control/querylog", l.handleQueryLog)
l.conf.HTTPRegister(http.MethodGet, "/control/querylog_info", l.handleQueryLogInfo)
l.conf.HTTPRegister(http.MethodPost, "/control/querylog_clear", l.handleQueryLogClear) l.conf.HTTPRegister(http.MethodPost, "/control/querylog_clear", l.handleQueryLogClear)
l.conf.HTTPRegister(http.MethodPost, "/control/querylog_config", l.handleQueryLogConfig)
l.conf.HTTPRegister(http.MethodGet, "/control/querylog/config", l.handleGetQueryLogConfig) l.conf.HTTPRegister(http.MethodGet, "/control/querylog/config", l.handleGetQueryLogConfig)
l.conf.HTTPRegister( l.conf.HTTPRegister(
http.MethodPut, http.MethodPut,
"/control/querylog/config/update", "/control/querylog/config/update",
l.handlePutQueryLogConfig, l.handlePutQueryLogConfig,
) )
// Deprecated handlers.
l.conf.HTTPRegister(http.MethodGet, "/control/querylog_info", l.handleQueryLogInfo)
l.conf.HTTPRegister(http.MethodPost, "/control/querylog_config", l.handleQueryLogConfig)
} }
// handleQueryLog is the handler for the GET /control/querylog HTTP API.
func (l *queryLog) handleQueryLog(w http.ResponseWriter, r *http.Request) { func (l *queryLog) handleQueryLog(w http.ResponseWriter, r *http.Request) {
l.lock.Lock() params, err := parseSearchParams(r)
defer l.lock.Unlock()
params, err := l.parseSearchParams(r)
if err != nil { if err != nil {
aghhttp.Error(r, w, http.StatusBadRequest, "failed to parse params: %s", err) aghhttp.Error(r, w, http.StatusBadRequest, "parsing params: %s", err)
return return
} }
entries, oldest := l.search(params) var resp map[string]any
data := l.entriesToJSON(entries, oldest) func() {
l.lock.Lock()
defer l.lock.Unlock()
_ = aghhttp.WriteJSONResponse(w, r, data) entries, oldest := l.search(params)
resp = l.entriesToJSON(entries, oldest)
}()
_ = aghhttp.WriteJSONResponse(w, r, resp)
} }
// handleQueryLogClear is the handler for the POST /control/querylog/clear HTTP
// API.
func (l *queryLog) handleQueryLogClear(_ http.ResponseWriter, _ *http.Request) { func (l *queryLog) handleQueryLogClear(_ http.ResponseWriter, _ *http.Request) {
l.clear() l.clear()
} }
// handleQueryLogInfo handles requests to the GET /control/querylog_info // handleQueryLogInfo is the handler for the GET /control/querylog_info HTTP
// endpoint. // API.
// //
// Deprecated: Remove it when migration to the new API is over. // Deprecated: Remove it when migration to the new API is over.
func (l *queryLog) handleQueryLogInfo(w http.ResponseWriter, r *http.Request) { func (l *queryLog) handleQueryLogInfo(w http.ResponseWriter, r *http.Request) {
@ -116,20 +123,25 @@ func (l *queryLog) handleQueryLogInfo(w http.ResponseWriter, r *http.Request) {
}) })
} }
// handleGetQueryLogConfig handles requests to the GET /control/querylog/config // handleGetQueryLogConfig is the handler for the GET /control/querylog/config
// endpoint. // HTTP API.
func (l *queryLog) handleGetQueryLogConfig(w http.ResponseWriter, r *http.Request) { func (l *queryLog) handleGetQueryLogConfig(w http.ResponseWriter, r *http.Request) {
l.lock.Lock() var resp *getConfigResp
defer l.lock.Unlock() func() {
l.lock.Lock()
defer l.lock.Unlock()
ignored := l.conf.Ignored.Values() resp = &getConfigResp{
slices.Sort(ignored) Interval: float64(l.conf.RotationIvl.Milliseconds()),
_ = aghhttp.WriteJSONResponse(w, r, getConfigResp{ Enabled: aghalg.BoolToNullBool(l.conf.Enabled),
Ignored: ignored, AnonymizeClientIP: aghalg.BoolToNullBool(l.conf.AnonymizeClientIP),
Interval: float64(l.conf.RotationIvl.Milliseconds()), Ignored: l.conf.Ignored.Values(),
Enabled: aghalg.BoolToNullBool(l.conf.Enabled), }
AnonymizeClientIP: aghalg.BoolToNullBool(l.conf.AnonymizeClientIP),
}) slices.Sort(resp.Ignored)
}()
_ = aghhttp.WriteJSONResponse(w, r, resp)
} }
// AnonymizeIP masks ip to anonymize the client if the ip is a valid one. // AnonymizeIP masks ip to anonymize the client if the ip is a valid one.
@ -146,7 +158,8 @@ func AnonymizeIP(ip net.IP) {
} }
} }
// handleQueryLogConfig handles the POST /control/querylog_config queries. // handleQueryLogConfig is the handler for the POST /control/querylog_config
// HTTP API.
// //
// Deprecated: Remove it when migration to the new API is over. // Deprecated: Remove it when migration to the new API is over.
func (l *queryLog) handleQueryLogConfig(w http.ResponseWriter, r *http.Request) { func (l *queryLog) handleQueryLogConfig(w http.ResponseWriter, r *http.Request) {
@ -198,8 +211,8 @@ func (l *queryLog) handleQueryLogConfig(w http.ResponseWriter, r *http.Request)
l.conf = &conf l.conf = &conf
} }
// handlePutQueryLogConfig handles the PUT /control/querylog/config/update // handlePutQueryLogConfig is the handler for the PUT
// queries. // /control/querylog/config/update HTTP API.
func (l *queryLog) handlePutQueryLogConfig(w http.ResponseWriter, r *http.Request) { func (l *queryLog) handlePutQueryLogConfig(w http.ResponseWriter, r *http.Request) {
newConf := &getConfigResp{} newConf := &getConfigResp{}
err := json.NewDecoder(r.Body).Decode(newConf) err := json.NewDecoder(r.Body).Decode(newConf)
@ -268,7 +281,7 @@ func getDoubleQuotesEnclosedValue(s *string) bool {
} }
// parseSearchCriterion parses a search criterion from the query parameter. // parseSearchCriterion parses a search criterion from the query parameter.
func (l *queryLog) parseSearchCriterion(q url.Values, name string, ct criterionType) ( func parseSearchCriterion(q url.Values, name string, ct criterionType) (
ok bool, ok bool,
sc searchCriterion, sc searchCriterion,
err error, err error,
@ -317,8 +330,9 @@ func (l *queryLog) parseSearchCriterion(q url.Values, name string, ct criterionT
return true, sc, nil return true, sc, nil
} }
// parseSearchParams - parses "searchParams" from the HTTP request's query string // parseSearchParams parses search parameters from the HTTP request's query
func (l *queryLog) parseSearchParams(r *http.Request) (p *searchParams, err error) { // string.
func parseSearchParams(r *http.Request) (p *searchParams, err error) {
p = newSearchParams() p = newSearchParams()
q := r.URL.Query() q := r.URL.Query()
@ -356,7 +370,7 @@ func (l *queryLog) parseSearchParams(r *http.Request) (p *searchParams, err erro
}} { }} {
var ok bool var ok bool
var c searchCriterion var c searchCriterion
ok, c, err = l.parseSearchCriterion(q, v.urlField, v.ct) ok, c, err = parseSearchCriterion(q, v.urlField, v.ct)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -155,10 +155,13 @@ func (l *queryLog) clear() {
l.fileFlushLock.Lock() l.fileFlushLock.Lock()
defer l.fileFlushLock.Unlock() defer l.fileFlushLock.Unlock()
l.bufferLock.Lock() func() {
l.buffer = nil l.bufferLock.Lock()
l.flushPending = false defer l.bufferLock.Unlock()
l.bufferLock.Unlock()
l.buffer = nil
l.flushPending = false
}()
oldLogFile := l.logFile + ".1" oldLogFile := l.logFile + ".1"
err := os.Remove(oldLogFile) err := os.Remove(oldLogFile)
@ -241,26 +244,30 @@ func (l *queryLog) Add(params *AddParams) {
entry.OrigAnswer = a entry.OrigAnswer = a
} }
l.bufferLock.Lock()
l.buffer = append(l.buffer, &entry)
needFlush := false needFlush := false
func() {
l.bufferLock.Lock()
defer l.bufferLock.Unlock()
if !l.conf.FileEnabled { l.buffer = append(l.buffer, &entry)
if len(l.buffer) > int(l.conf.MemSize) {
// writing to file is disabled - just remove the oldest entry from array if !l.conf.FileEnabled {
// if len(l.buffer) > int(l.conf.MemSize) {
// TODO(a.garipov): This should be replaced by a proper ring buffer, // Writing to file is disabled, so just remove the oldest entry
// but it's currently difficult to do that. // from the slices.
l.buffer[0] = nil //
l.buffer = l.buffer[1:] // TODO(a.garipov): This should be replaced by a proper ring
// buffer, but it's currently difficult to do that.
l.buffer[0] = nil
l.buffer = l.buffer[1:]
}
} else if !l.flushPending {
needFlush = len(l.buffer) >= int(l.conf.MemSize)
if needFlush {
l.flushPending = true
}
} }
} else if !l.flushPending { }()
needFlush = len(l.buffer) >= int(l.conf.MemSize)
if needFlush {
l.flushPending = true
}
}
l.bufferLock.Unlock()
// if buffer needs to be flushed to disk, do it now // if buffer needs to be flushed to disk, do it now
if needFlush { if needFlush {

View file

@ -106,6 +106,7 @@ func (r *QLogReader) SeekStart() error {
r.currentFile = len(r.qFiles) - 1 r.currentFile = len(r.qFiles) - 1
_, err := r.qFiles[r.currentFile].SeekStart() _, err := r.qFiles[r.currentFile].SeekStart()
return err return err
} }

View file

@ -11,8 +11,9 @@ import (
"github.com/AdguardTeam/golibs/log" "github.com/AdguardTeam/golibs/log"
) )
// flushLogBuffer flushes the current buffer to file and resets the current buffer // flushLogBuffer flushes the current buffer to file and resets the current
func (l *queryLog) flushLogBuffer(fullFlush bool) error { // buffer.
func (l *queryLog) flushLogBuffer(fullFlush bool) (err error) {
if !l.conf.FileEnabled { if !l.conf.FileEnabled {
return nil return nil
} }
@ -20,31 +21,42 @@ func (l *queryLog) flushLogBuffer(fullFlush bool) error {
l.fileFlushLock.Lock() l.fileFlushLock.Lock()
defer l.fileFlushLock.Unlock() defer l.fileFlushLock.Unlock()
// flush remainder to file // Flush the remainder to file.
l.bufferLock.Lock() var flushBuffer []*logEntry
needFlush := len(l.buffer) >= int(l.conf.MemSize) needFlush := fullFlush
if !needFlush && !fullFlush { func() {
l.bufferLock.Unlock() l.bufferLock.Lock()
defer l.bufferLock.Unlock()
needFlush = needFlush || len(l.buffer) >= int(l.conf.MemSize)
if needFlush {
flushBuffer = l.buffer
l.buffer = nil
l.flushPending = false
}
}()
if !needFlush {
return nil return nil
} }
flushBuffer := l.buffer
l.buffer = nil err = l.flushToFile(flushBuffer)
l.flushPending = false
l.bufferLock.Unlock()
err := l.flushToFile(flushBuffer)
if err != nil { if err != nil {
log.Error("Saving querylog to file failed: %s", err) log.Error("querylog: writing to file: %s", err)
return err return err
} }
return nil return nil
} }
// flushToFile saves the specified log entries to the query log file // flushToFile saves the specified log entries to the query log file
func (l *queryLog) flushToFile(buffer []*logEntry) (err error) { func (l *queryLog) flushToFile(buffer []*logEntry) (err error) {
if len(buffer) == 0 { if len(buffer) == 0 {
log.Debug("querylog: there's nothing to write to a file") log.Debug("querylog: nothing to write to a file")
return nil return nil
} }
start := time.Now() start := time.Now()
var b bytes.Buffer var b bytes.Buffer

View file

@ -76,15 +76,20 @@ func (l *queryLog) searchMemory(params *searchParams, cache clientCache) (entrie
// search - searches log entries in the query log using specified parameters // search - searches log entries in the query log using specified parameters
// returns the list of entries found + time of the oldest entry // returns the list of entries found + time of the oldest entry
func (l *queryLog) search(params *searchParams) (entries []*logEntry, oldest time.Time) { func (l *queryLog) search(params *searchParams) (entries []*logEntry, oldest time.Time) {
now := time.Now() start := time.Now()
if params.limit == 0 { if params.limit == 0 {
return []*logEntry{}, time.Time{} return []*logEntry{}, time.Time{}
} }
cache := clientCache{} cache := clientCache{}
fileEntries, oldest, total := l.searchFiles(params, cache)
memoryEntries, bufLen := l.searchMemory(params, cache) memoryEntries, bufLen := l.searchMemory(params, cache)
log.Debug("querylog: got %d entries from memory", len(memoryEntries))
fileEntries, oldest, total := l.searchFiles(params, cache)
log.Debug("querylog: got %d entries from files", len(fileEntries))
total += bufLen total += bufLen
totalLimit := params.offset + params.limit totalLimit := params.offset + params.limit
@ -123,7 +128,7 @@ func (l *queryLog) search(params *searchParams) (entries []*logEntry, oldest tim
len(entries), len(entries),
total, total,
params.olderThan, params.olderThan,
time.Since(now), time.Since(start),
) )
return entries, oldest return entries, oldest
@ -145,13 +150,14 @@ func (l *queryLog) searchFiles(
r, err := NewQLogReader(files) r, err := NewQLogReader(files)
if err != nil { if err != nil {
log.Error("querylog: failed to open qlog reader: %s", err) log.Error("querylog: opening qlog reader: %s", err)
return entries, oldest, 0 return entries, oldest, 0
} }
defer func() { defer func() {
derr := r.Close() closeErr := r.Close()
if derr != nil { if closeErr != nil {
log.Error("querylog: closing file: %s", err) log.Error("querylog: closing file: %s", err)
} }
}() }()
@ -161,8 +167,8 @@ func (l *queryLog) searchFiles(
} else { } else {
err = r.seekTS(params.olderThan.UnixNano()) err = r.seekTS(params.olderThan.UnixNano())
if err == nil { if err == nil {
// Read to the next record, because we only need the one // Read to the next record, because we only need the one that goes
// that goes after it. // after it.
_, err = r.ReadNext() _, err = r.ReadNext()
} }
} }
@ -176,9 +182,9 @@ func (l *queryLog) searchFiles(
totalLimit := params.offset + params.limit totalLimit := params.offset + params.limit
oldestNano := int64(0) oldestNano := int64(0)
// By default, we do not scan more than maxFileScanEntries at once. // By default, we do not scan more than maxFileScanEntries at once. The
// The idea is to make search calls faster so that the UI could handle // idea is to make search calls faster so that the UI could handle it and
// it and show something quicker. This behavior can be overridden if // show something quicker. This behavior can be overridden if
// maxFileScanEntries is set to 0. // maxFileScanEntries is set to 0.
for total < params.maxFileScanEntries || params.maxFileScanEntries <= 0 { for total < params.maxFileScanEntries || params.maxFileScanEntries <= 0 {
var e *logEntry var e *logEntry