AdGuardHome/internal/querylog/search.go
Ainar Garipov 313fd7107f Pull request: all: imp code, err handling
Closes #2571.

Squashed commit of the following:

commit a5b50ee011a995f4ab3d93314acd6f0ca82d99cf
Author: Ainar Garipov <A.Garipov@AdGuard.COM>
Date:   Mon Mar 15 14:05:25 2021 +0300

    all: imp code

commit bc610f8f438549e8c6b04c8a213b5422dda2aff5
Author: Ainar Garipov <A.Garipov@AdGuard.COM>
Date:   Fri Mar 12 20:00:14 2021 +0300

    all: imp code, err handling
2021-03-15 14:19:04 +03:00

181 lines
4.6 KiB
Go

package querylog
import (
"io"
"sort"
"time"
"github.com/AdguardTeam/golibs/log"
)
// search - searches log entries in the query log using specified parameters
// returns the list of entries found + time of the oldest entry
func (l *queryLog) search(params *searchParams) ([]*logEntry, time.Time) {
now := time.Now()
if params.limit == 0 {
return []*logEntry{}, time.Time{}
}
// add from file
fileEntries, oldest, total := l.searchFiles(params)
// add from memory buffer
l.bufferLock.Lock()
total += len(l.buffer)
memoryEntries := make([]*logEntry, 0)
// go through the buffer in the reverse order
// from NEWER to OLDER
for i := len(l.buffer) - 1; i >= 0; i-- {
entry := l.buffer[i]
if !params.match(entry) {
continue
}
memoryEntries = append(memoryEntries, entry)
}
l.bufferLock.Unlock()
// limits
totalLimit := params.offset + params.limit
// now let's get a unified collection
entries := append(memoryEntries, fileEntries...)
if len(entries) > totalLimit {
// remove extra records
entries = entries[:totalLimit]
}
// Resort entries on start time to partially mitigate query log looking
// weird on the frontend.
//
// See https://github.com/AdguardTeam/AdGuardHome/issues/2293.
sort.SliceStable(entries, func(i, j int) (less bool) {
return entries[i].Time.After(entries[j].Time)
})
if params.offset > 0 {
if len(entries) > params.offset {
entries = entries[params.offset:]
} else {
entries = make([]*logEntry, 0)
oldest = time.Time{}
}
}
if len(entries) > 0 && len(entries) <= totalLimit {
// Update oldest after merging in the memory buffer.
oldest = entries[len(entries)-1].Time
}
log.Debug("QueryLog: prepared data (%d/%d) older than %s in %s",
len(entries), total, params.olderThan, time.Since(now))
return entries, oldest
}
// searchFiles reads log entries from all log files and applies the specified search criteria.
// IMPORTANT: this method does not scan more than "maxSearchEntries" so you
// may need to call it many times.
//
// it returns:
// * an array of log entries that we have read
// * time of the oldest processed entry (even if it was discarded)
// * total number of processed entries (including discarded).
func (l *queryLog) searchFiles(params *searchParams) ([]*logEntry, time.Time, int) {
entries := make([]*logEntry, 0)
oldest := time.Time{}
files := []string{
l.logFile + ".1",
l.logFile,
}
r, err := NewQLogReader(files)
if err != nil {
log.Error("querylog: failed to open qlog reader: %s", err)
return entries, oldest, 0
}
defer r.Close()
if params.olderThan.IsZero() {
err = r.SeekStart()
} else {
err = r.SeekTS(params.olderThan.UnixNano())
if err == nil {
// Read to the next record right away
// The one that was specified in the "oldest" param is not needed,
// we need only the one next to it
_, err = r.ReadNext()
}
}
if err != nil {
log.Debug("Cannot SeekTS() to %v: %v", params.olderThan, err)
return entries, oldest, 0
}
totalLimit := params.offset + params.limit
total := 0
oldestNano := int64(0)
// By default, we do not scan more than "maxFileScanEntries" at once
// The idea is to make search calls faster so that the UI could handle it and show something
// This behavior can be overridden if "maxFileScanEntries" is set to 0
for total < params.maxFileScanEntries || params.maxFileScanEntries <= 0 {
var entry *logEntry
var ts int64
entry, ts, err = l.readNextEntry(r, params)
if err == io.EOF {
// there's nothing to read anymore
break
}
oldestNano = ts
total++
if entry != nil {
entries = append(entries, entry)
if len(entries) == totalLimit {
// Do not read more than "totalLimit" records at once
break
}
}
}
if oldestNano != 0 {
oldest = time.Unix(0, oldestNano)
}
return entries, oldest, total
}
// readNextEntry - reads the next log entry and checks if it matches the search criteria (getDataParams)
//
// returns:
// * log entry that matches search criteria or null if it was discarded (or if there's nothing to read)
// * timestamp of the processed log entry
// * error if we can't read anymore
func (l *queryLog) readNextEntry(r *QLogReader, params *searchParams) (*logEntry, int64, error) {
line, err := r.ReadNext()
if err != nil {
return nil, 0, err
}
// Read the log record timestamp right away
timestamp := readQLogTimestamp(line)
// Quick check without deserializing log entry
if !params.quickMatch(line) {
return nil, timestamp, nil
}
entry := logEntry{}
decodeLogEntry(&entry, line)
// Full check of the deserialized log entry
if !params.match(&entry) {
return nil, timestamp, nil
}
return &entry, timestamp, nil
}