AdGuardHome/internal/querylog/qlogfile.go
Dimitry Kolyshev cbc7985e75 Pull request: querylog imp code
Merge in DNS/adguard-home from querylog-imp-code to master

Squashed commit of the following:

commit a58ad36508a2355b686d314dec51ac0b5e357281
Merge: df5494f2c 941eb1dd7
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Wed May 24 15:26:55 2023 +0300

    Merge remote-tracking branch 'origin/master' into querylog-imp-code

commit df5494f2c337736690a3c2a547c2d71858d0378f
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Wed May 24 15:24:43 2023 +0300

    querylog: imp code

commit 8c3c2b76dd5858e7b107f222c112e9cde2477fb3
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Wed May 24 12:14:15 2023 +0300

    all: lint script

commit be04a4decfaf20a1649d32ecaab3c1c6bb205ffd
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Wed May 24 12:03:12 2023 +0300

    querylog: imp code

commit fe7beacff3a5cfcf2332c4998b9c65820284eaf7
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Wed May 24 11:57:33 2023 +0300

    querylog: imp docs

commit 2ae239c57d12524fbc092f582842af2ad726c1d0
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Wed May 24 11:46:54 2023 +0300

    querylog: imp code

commit 417216cefbf154fa870f8f43468f35e0e345971f
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Wed May 24 11:25:44 2023 +0300

    querylog: imp code

commit 514b6ee99113844a4e0dad30dc53703e3220c289
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Wed May 24 11:14:13 2023 +0300

    querylog: imp docs

commit 321351a3abb524208daacd5a3a7fbf5f07ab259d
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Mon May 22 16:38:31 2023 +0300

    querylog: imp code

commit ee91de5c43210b5bc213f933d411adb894d2e586
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Mon May 22 16:01:32 2023 +0300

    querylog: imp code

commit 862ff12177fb769d5cb2ec250eaee538dc91d70a
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Mon May 22 15:07:24 2023 +0300

    querylog: imp code

commit cc62c1c4ae8b813d03ccf51b596ba1ebf44d9a1f
Merge: 37ace34e9 24b41100c
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Mon May 22 13:09:10 2023 +0300

    Merge remote-tracking branch 'origin/master' into querylog-imp-code

commit 37ace34e91e5189bef6e774db960f40cdaa18270
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Mon May 22 11:23:08 2023 +0300

    querylog: imp code

commit 8417815a6349f10b5dbad410ce28aab98bc479fa
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Mon May 22 11:08:29 2023 +0300

    querylog: imp docs

commit 4e5cde74d25713f78675aa3e18083b4fb5e619f3
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Fri May 19 16:41:34 2023 +0300

    querylog: imp code

commit 3494eab7006240f652a0217d305ac916bd6c3c83
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Fri May 19 16:13:08 2023 +0300

    all: lint script

commit 704534ce6278e7d9b1bef30a3acc4e59f25693bc
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Fri May 19 16:12:04 2023 +0300

    querylog: imp code

commit 48510102a2fa5187f78067d2b9157dac62f8bb56
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Fri May 19 15:52:57 2023 +0300

    querylog: imp code

commit 89c273aea0e6758eb749a2d3bbaf1bc385a57797
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Fri May 19 15:40:50 2023 +0300

    querylog: imp code

commit 0057fe64553ad38de0fda10efb9d3512c9a00e45
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Fri May 19 13:54:46 2023 +0300

    querylog: imp code

... and 1 more commit
2023-05-24 16:33:15 +03:00

404 lines
10 KiB
Go

package querylog
import (
"fmt"
"io"
"os"
"strings"
"sync"
"time"
"github.com/AdguardTeam/golibs/errors"
"github.com/AdguardTeam/golibs/log"
)
const (
// Timestamp not found errors.
errTSNotFound errors.Error = "ts not found"
errTSTooLate errors.Error = "ts too late"
errTSTooEarly errors.Error = "ts too early"
// maxEntrySize is a maximum size of the entry.
//
// TODO: Find a way to grow buffer instead of relying on this value when
// reading strings.
maxEntrySize = 16 * 1024
// bufferSize should be enough for at least this number of entries.
bufferSize = 100 * maxEntrySize
)
// qLogFile represents a single query log file. It allows reading from the
// file in the reverse order.
//
// Please note, that this is a stateful object. Internally, it contains a
// pointer to a specific position in the file, and it reads lines in reverse
// order starting from that position.
type qLogFile struct {
// file is the query log file.
file *os.File
// buffer that we've read from the file.
buffer []byte
// lock is a mutex to make it thread-safe.
lock sync.Mutex
// position is the position in the file.
position int64
// bufferStart is the start of the buffer (in the file).
bufferStart int64
// bufferLen is the length of the buffer.
bufferLen int
}
// newQLogFile initializes a new instance of the qLogFile.
func newQLogFile(path string) (qf *qLogFile, err error) {
f, err := os.OpenFile(path, os.O_RDONLY, 0o644)
if err != nil {
return nil, err
}
return &qLogFile{file: f}, nil
}
// validateQLogLineIdx returns error if the line index is not valid to continue
// search.
func (q *qLogFile) validateQLogLineIdx(lineIdx, lastProbeLineIdx, ts, fSize int64) (err error) {
if lineIdx == lastProbeLineIdx {
if lineIdx == 0 {
return errTSTooEarly
}
// If we're testing the same line twice then most likely the scope is
// too narrow and we won't find anything anymore in any other file.
return fmt.Errorf("looking up timestamp %d in %q: %w", ts, q.file.Name(), errTSNotFound)
} else if lineIdx == fSize {
return errTSTooLate
}
return nil
}
// seekTS performs binary search in the query log file looking for a record
// with the specified timestamp. Once the record is found, it sets "position"
// so that the next ReadNext call returned that record.
//
// The algorithm is rather simple:
// 1. It starts with the position in the middle of a file.
// 2. Shifts back to the beginning of the line.
// 3. Checks the log record timestamp.
// 4. If it is lower than the timestamp we are looking for, it shifts seek
// position to 3/4 of the file. Otherwise, to 1/4 of the file.
// 5. It performs the search again, every time the search scope is narrowed
// twice.
//
// Returns:
// - It returns the position of the line with the timestamp we were looking
// for so that when we call "ReadNext" this line was returned.
// - Depth of the search (how many times we compared timestamps).
// - If we could not find it, it returns one of the errors described above.
func (q *qLogFile) seekTS(timestamp int64) (pos int64, depth int, err error) {
q.lock.Lock()
defer q.lock.Unlock()
// Empty the buffer.
q.buffer = nil
// First of all, check the file size.
fileInfo, err := q.file.Stat()
if err != nil {
return 0, 0, err
}
// Define the search scope.
// Start of the search interval (position in the file).
start := int64(0)
// End of the search interval (position in the file).
end := fileInfo.Size()
// Probe is the approximate index of the line we'll try to check.
probe := (end - start) / 2
var line string
// Index of the probe line in the file.
var lineIdx int64
var lineEndIdx int64
// Index of the last probe line.
var lastProbeLineIdx int64
lastProbeLineIdx = -1
// Count seek depth in order to detect mistakes. If depth is too large,
// we should stop the search.
for {
// Get the line at the specified position.
line, lineIdx, lineEndIdx, err = q.readProbeLine(probe)
if err != nil {
return 0, depth, err
}
// Check if the line index if invalid.
err = q.validateQLogLineIdx(lineIdx, lastProbeLineIdx, timestamp, fileInfo.Size())
if err != nil {
return 0, depth, err
}
// Save the last found idx.
lastProbeLineIdx = lineIdx
// Get the timestamp from the query log record.
ts := readQLogTimestamp(line)
if ts == 0 {
return 0, depth, fmt.Errorf(
"looking up timestamp %d in %q: record %q has empty timestamp",
timestamp,
q.file.Name(),
line,
)
}
if ts == timestamp {
// Hurray, returning the result.
break
}
// Narrow the scope and repeat the search.
if ts > timestamp {
// If the timestamp we're looking for is OLDER than what we found,
// then the line is somewhere on the LEFT side from the current
// probe position.
end = lineIdx
} else {
// If the timestamp we're looking for is NEWER than what we found,
// then the line is somewhere on the RIGHT side from the current
// probe position.
start = lineEndIdx
}
probe = start + (end-start)/2
depth++
if depth >= 100 {
return 0, depth, fmt.Errorf(
"looking up timestamp %d in %q: depth %d too high: %w",
timestamp,
q.file.Name(),
depth,
errTSNotFound,
)
}
}
q.position = lineIdx + int64(len(line))
return q.position, depth, nil
}
// SeekStart changes the current position to the end of the file. Please note,
// that we're reading query log in the reverse order and that's why log start
// is actually the end of file.
//
// Returns nil if we were able to change the current position. Returns error
// in any other case.
func (q *qLogFile) SeekStart() (int64, error) {
q.lock.Lock()
defer q.lock.Unlock()
// Empty the buffer.
q.buffer = nil
// First of all, check the file size.
fileInfo, err := q.file.Stat()
if err != nil {
return 0, err
}
// Place the position to the very end of file.
q.position = fileInfo.Size() - 1
if q.position < 0 {
q.position = 0
}
return q.position, nil
}
// ReadNext reads the next line (in the reverse order) from the file and shifts
// the current position left to the next (actually prev) line.
//
// Returns io.EOF if there's nothing more to read.
func (q *qLogFile) ReadNext() (string, error) {
q.lock.Lock()
defer q.lock.Unlock()
if q.position == 0 {
return "", io.EOF
}
line, lineIdx, err := q.readNextLine(q.position)
if err != nil {
return "", err
}
// Shift position.
if lineIdx == 0 {
q.position = 0
} else {
// There's usually a line break before the line, so we should shift one
// more char left from the line "\nline".
q.position = lineIdx - 1
}
return line, err
}
// Close frees the underlying resources.
func (q *qLogFile) Close() error {
return q.file.Close()
}
// readNextLine reads the next line from the specified position. This line
// actually have to END on that position.
//
// The algorithm is:
// 1. Check if we have the buffer initialized.
// 2. If it is so, scan it and look for the line there.
// 3. If we cannot find the line there, read the prev chunk into the buffer.
// 4. Read the line from the buffer.
func (q *qLogFile) readNextLine(position int64) (string, int64, error) {
relativePos := position - q.bufferStart
if q.buffer == nil || (relativePos < maxEntrySize && q.bufferStart != 0) {
// Time to re-init the buffer.
err := q.initBuffer(position)
if err != nil {
return "", 0, err
}
relativePos = position - q.bufferStart
}
// Look for the end of the prev line, this is where we'll read from.
startLine := int64(0)
for i := relativePos - 1; i >= 0; i-- {
if q.buffer[i] == '\n' {
startLine = i + 1
break
}
}
line := string(q.buffer[startLine:relativePos])
lineIdx := q.bufferStart + startLine
return line, lineIdx, nil
}
// initBuffer initializes the qLogFile buffer. The goal is to read a chunk of
// file that includes the line with the specified position.
func (q *qLogFile) initBuffer(position int64) error {
q.bufferStart = int64(0)
if position > bufferSize {
q.bufferStart = position - bufferSize
}
// Seek to this position.
_, err := q.file.Seek(q.bufferStart, io.SeekStart)
if err != nil {
return err
}
if q.buffer == nil {
q.buffer = make([]byte, bufferSize)
}
q.bufferLen, err = q.file.Read(q.buffer)
return err
}
// readProbeLine reads a line that includes the specified position. This
// method is supposed to be used when we use binary search in the Seek method.
// In the case of consecutive reads, use readNext, cause it uses better buffer.
func (q *qLogFile) readProbeLine(position int64) (string, int64, int64, error) {
// First of all, we should read a buffer that will include the query log
// line. In order to do this, we'll define the boundaries.
seekPosition := int64(0)
// Position relative to the buffer we're going to read.
relativePos := position
if position > maxEntrySize {
seekPosition = position - maxEntrySize
relativePos = maxEntrySize
}
// Seek to this position.
_, err := q.file.Seek(seekPosition, io.SeekStart)
if err != nil {
return "", 0, 0, err
}
// The buffer size is 2*maxEntrySize.
buffer := make([]byte, maxEntrySize*2)
bufferLen, err := q.file.Read(buffer)
if err != nil {
return "", 0, 0, err
}
// Now start looking for the new line character starting from the
// relativePos and going left.
startLine := int64(0)
for i := relativePos - 1; i >= 0; i-- {
if buffer[i] == '\n' {
startLine = i + 1
break
}
}
// Looking for the end of line now.
endLine := int64(bufferLen)
lineEndIdx := endLine + seekPosition
for i := relativePos; i < int64(bufferLen); i++ {
if buffer[i] == '\n' {
endLine = i
lineEndIdx = endLine + seekPosition + 1
break
}
}
// Finally we can return the string we were looking for.
lineIdx := startLine + seekPosition
return string(buffer[startLine:endLine]), lineIdx, lineEndIdx, nil
}
// readJSONValue reads a JSON string in form of '"key":"value"'. prefix must
// be of the form '"key":"' to generate less garbage.
func readJSONValue(s, prefix string) string {
i := strings.Index(s, prefix)
if i == -1 {
return ""
}
start := i + len(prefix)
i = strings.IndexByte(s[start:], '"')
if i == -1 {
return ""
}
end := start + i
return s[start:end]
}
// readQLogTimestamp reads the timestamp field from the query log line.
func readQLogTimestamp(str string) int64 {
val := readJSONValue(str, `"T":"`)
if len(val) == 0 {
val = readJSONValue(str, `"Time":"`)
}
if len(val) == 0 {
log.Error("Couldn't find timestamp: %s", str)
return 0
}
tm, err := time.Parse(time.RFC3339Nano, val)
if err != nil {
log.Error("Couldn't parse timestamp: %s", val)
return 0
}
return tm.UnixNano()
}