all: slog stats

This commit is contained in:
Stanislav Chzhen 2024-09-02 19:47:11 +03:00
parent 76344f9785
commit 2c0ffd91fd
7 changed files with 70 additions and 45 deletions

View file

@ -20,6 +20,7 @@ import (
"github.com/AdguardTeam/AdGuardHome/internal/stats" "github.com/AdguardTeam/AdGuardHome/internal/stats"
"github.com/AdguardTeam/golibs/errors" "github.com/AdguardTeam/golibs/errors"
"github.com/AdguardTeam/golibs/log" "github.com/AdguardTeam/golibs/log"
"github.com/AdguardTeam/golibs/logutil/slogutil"
"github.com/AdguardTeam/golibs/netutil" "github.com/AdguardTeam/golibs/netutil"
"github.com/ameshkov/dnscrypt/v2" "github.com/ameshkov/dnscrypt/v2"
yaml "gopkg.in/yaml.v3" yaml "gopkg.in/yaml.v3"
@ -54,6 +55,7 @@ func initDNS(l *slog.Logger) (err error) {
} }
statsConf := stats.Config{ statsConf := stats.Config{
Logger: l.With(slogutil.KeyPrefix, "stats"),
Filename: filepath.Join(statsDir, "stats.db"), Filename: filepath.Join(statsDir, "stats.db"),
Limit: config.Stats.Interval.Duration, Limit: config.Stats.Interval.Duration,
ConfigModified: onConfigModified, ConfigModified: onConfigModified,

View file

@ -10,7 +10,6 @@ import (
"github.com/AdguardTeam/AdGuardHome/internal/aghalg" "github.com/AdguardTeam/AdGuardHome/internal/aghalg"
"github.com/AdguardTeam/AdGuardHome/internal/aghhttp" "github.com/AdguardTeam/AdGuardHome/internal/aghhttp"
"github.com/AdguardTeam/AdGuardHome/internal/aghnet" "github.com/AdguardTeam/AdGuardHome/internal/aghnet"
"github.com/AdguardTeam/golibs/log"
"github.com/AdguardTeam/golibs/timeutil" "github.com/AdguardTeam/golibs/timeutil"
) )
@ -62,7 +61,7 @@ func (s *StatsCtx) handleStats(w http.ResponseWriter, r *http.Request) {
resp, ok = s.getData(uint32(s.limit.Hours())) resp, ok = s.getData(uint32(s.limit.Hours()))
}() }()
log.Debug("stats: prepared data in %v", time.Since(start)) s.logger.Debug("prepared data", "elapsed", time.Since(start))
if !ok { if !ok {
// Don't bring the message to the lower case since it's a part of UI // Don't bring the message to the lower case since it's a part of UI

View file

@ -10,6 +10,7 @@ import (
"time" "time"
"github.com/AdguardTeam/AdGuardHome/internal/aghalg" "github.com/AdguardTeam/AdGuardHome/internal/aghalg"
"github.com/AdguardTeam/golibs/logutil/slogutil"
"github.com/AdguardTeam/golibs/testutil" "github.com/AdguardTeam/golibs/testutil"
"github.com/AdguardTeam/golibs/timeutil" "github.com/AdguardTeam/golibs/timeutil"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -24,6 +25,7 @@ func TestHandleStatsConfig(t *testing.T) {
) )
conf := Config{ conf := Config{
Logger: slogutil.NewDiscardLogger(),
UnitID: func() (id uint32) { return 0 }, UnitID: func() (id uint32) { return 0 },
ConfigModified: func() {}, ConfigModified: func() {},
ShouldCountClient: func([]string) bool { return true }, ShouldCountClient: func([]string) bool { return true },

View file

@ -5,6 +5,7 @@ package stats
import ( import (
"fmt" "fmt"
"io" "io"
"log/slog"
"net/netip" "net/netip"
"os" "os"
"sync" "sync"
@ -14,7 +15,7 @@ import (
"github.com/AdguardTeam/AdGuardHome/internal/aghhttp" "github.com/AdguardTeam/AdGuardHome/internal/aghhttp"
"github.com/AdguardTeam/AdGuardHome/internal/aghnet" "github.com/AdguardTeam/AdGuardHome/internal/aghnet"
"github.com/AdguardTeam/golibs/errors" "github.com/AdguardTeam/golibs/errors"
"github.com/AdguardTeam/golibs/log" "github.com/AdguardTeam/golibs/logutil/slogutil"
"github.com/AdguardTeam/golibs/timeutil" "github.com/AdguardTeam/golibs/timeutil"
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
) )
@ -43,6 +44,10 @@ func validateIvl(ivl time.Duration) (err error) {
// //
// Do not alter any fields of this structure after using it. // Do not alter any fields of this structure after using it.
type Config struct { type Config struct {
// Logger is used for logging the operation of the statistics management.
// It must not be nil.
Logger *slog.Logger
// UnitID is the function to generate the identifier for current unit. If // UnitID is the function to generate the identifier for current unit. If
// nil, the default function is used, see newUnitID. // nil, the default function is used, see newUnitID.
UnitID UnitIDGenFunc UnitID UnitIDGenFunc
@ -96,6 +101,10 @@ type Interface interface {
// StatsCtx collects the statistics and flushes it to the database. Its default // StatsCtx collects the statistics and flushes it to the database. Its default
// flushing interval is one hour. // flushing interval is one hour.
type StatsCtx struct { type StatsCtx struct {
// logger is used for logging the operation of the statistics management.
// It must not be nil.
logger *slog.Logger
// currMu protects curr. // currMu protects curr.
currMu *sync.RWMutex currMu *sync.RWMutex
// curr is the actual statistics collection result. // curr is the actual statistics collection result.
@ -150,6 +159,7 @@ func New(conf Config) (s *StatsCtx, err error) {
} }
s = &StatsCtx{ s = &StatsCtx{
logger: conf.Logger,
currMu: &sync.RWMutex{}, currMu: &sync.RWMutex{},
httpRegister: conf.HTTPRegister, httpRegister: conf.HTTPRegister,
configModified: conf.ConfigModified, configModified: conf.ConfigModified,
@ -181,18 +191,18 @@ func New(conf Config) (s *StatsCtx, err error) {
return nil, fmt.Errorf("stats: opening a transaction: %w", err) return nil, fmt.Errorf("stats: opening a transaction: %w", err)
} }
deleted := deleteOldUnits(tx, id-uint32(s.limit.Hours())-1) deleted := s.deleteOldUnits(tx, id-uint32(s.limit.Hours())-1)
udb = loadUnitFromDB(tx, id) udb = s.loadUnitFromDB(tx, id)
err = finishTxn(tx, deleted > 0) err = finishTxn(tx, deleted > 0)
if err != nil { if err != nil {
log.Error("stats: %s", err) s.logger.Error("finishing transacation", slogutil.KeyError, err)
} }
s.curr = newUnit(id) s.curr = newUnit(id)
s.curr.deserialize(udb) s.curr.deserialize(udb)
log.Debug("stats: initialized") s.logger.Debug("initialized")
return s, nil return s, nil
} }
@ -237,7 +247,7 @@ func (s *StatsCtx) Close() (err error) {
defer func() { defer func() {
cerr := db.Close() cerr := db.Close()
if cerr == nil { if cerr == nil {
log.Debug("stats: database closed") s.logger.Debug("database closed")
} }
err = errors.WithDeferred(err, cerr) err = errors.WithDeferred(err, cerr)
@ -254,7 +264,7 @@ func (s *StatsCtx) Close() (err error) {
udb := s.curr.serialize() udb := s.curr.serialize()
return udb.flushUnitToDB(tx, s.curr.id) return s.flushUnitToDB(udb, tx, s.curr.id)
} }
// Update implements the [Interface] interface for *StatsCtx. e must not be // Update implements the [Interface] interface for *StatsCtx. e must not be
@ -269,7 +279,7 @@ func (s *StatsCtx) Update(e *Entry) {
err := e.validate() err := e.validate()
if err != nil { if err != nil {
log.Debug("stats: updating: validating entry: %s", err) s.logger.Debug("validating entry", slogutil.KeyError, err)
return return
} }
@ -278,7 +288,7 @@ func (s *StatsCtx) Update(e *Entry) {
defer s.currMu.Unlock() defer s.currMu.Unlock()
if s.curr == nil { if s.curr == nil {
log.Error("stats: current unit is nil") s.logger.Error("current unit is nil")
return return
} }
@ -333,8 +343,8 @@ func (s *StatsCtx) TopClientsIP(maxCount uint) (ips []netip.Addr) {
// deleteOldUnits walks the buckets available to tx and deletes old units. It // deleteOldUnits walks the buckets available to tx and deletes old units. It
// returns the number of deletions performed. // returns the number of deletions performed.
func deleteOldUnits(tx *bbolt.Tx, firstID uint32) (deleted int) { func (s *StatsCtx) deleteOldUnits(tx *bbolt.Tx, firstID uint32) (deleted int) {
log.Debug("stats: deleting old units until id %d", firstID) s.logger.Debug("deleting old units up to", "unit", firstID)
// TODO(a.garipov): See if this is actually necessary. Looks like a rather // TODO(a.garipov): See if this is actually necessary. Looks like a rather
// bizarre solution. // bizarre solution.
@ -348,12 +358,12 @@ func deleteOldUnits(tx *bbolt.Tx, firstID uint32) (deleted int) {
err = tx.DeleteBucket(name) err = tx.DeleteBucket(name)
if err != nil { if err != nil {
log.Debug("stats: deleting bucket: %s", err) s.logger.Debug("deleting bucket", slogutil.KeyError, err)
return nil return nil
} }
log.Debug("stats: deleted unit %d (name %x)", nameID, name) s.logger.Debug("deleted unit", "name_id", nameID, "name", fmt.Sprintf("%x", name))
deleted++ deleted++
@ -362,7 +372,7 @@ func deleteOldUnits(tx *bbolt.Tx, firstID uint32) (deleted int) {
err := tx.ForEach(walk) err := tx.ForEach(walk)
if err != nil && !errors.Is(err, errStop) { if err != nil && !errors.Is(err, errStop) {
log.Debug("stats: deleting units: %s", err) s.logger.Debug("deleting units", slogutil.KeyError, err)
} }
return deleted return deleted
@ -371,20 +381,19 @@ func deleteOldUnits(tx *bbolt.Tx, firstID uint32) (deleted int) {
// openDB returns an error if the database can't be opened from the specified // openDB returns an error if the database can't be opened from the specified
// file. It's safe for concurrent use. // file. It's safe for concurrent use.
func (s *StatsCtx) openDB() (err error) { func (s *StatsCtx) openDB() (err error) {
log.Debug("stats: opening database") s.logger.Debug("opening database")
var db *bbolt.DB var db *bbolt.DB
db, err = bbolt.Open(s.filename, 0o644, nil) db, err = bbolt.Open(s.filename, 0o644, nil)
if err != nil { if err != nil {
if err.Error() == "invalid argument" { if err.Error() == "invalid argument" {
log.Error("AdGuard Home cannot be initialized due to an incompatible file system.\nPlease read the explanation here: https://github.com/AdguardTeam/AdGuardHome/wiki/Getting-Started#limitations") s.logger.Error("AdGuard Home cannot be initialized due to an incompatible file system.\nPlease read the explanation here: https://github.com/AdguardTeam/AdGuardHome/wiki/Getting-Started#limitations")
} }
return err return err
} }
// Use defer to unlock the mutex as soon as possible. defer s.logger.Debug("database opened")
defer log.Debug("stats: database opened")
s.db.Store(db) s.db.Store(db)
@ -424,21 +433,22 @@ func (s *StatsCtx) flushDB(id, limit uint32, ptr *unit) (cont bool, sleepFor tim
isCommitable := true isCommitable := true
tx, err := db.Begin(true) tx, err := db.Begin(true)
if err != nil { if err != nil {
log.Error("stats: opening transaction: %s", err) s.logger.Error("opening transaction", slogutil.KeyError, err)
return true, 0 return true, 0
} }
defer func() { defer func() {
if err = finishTxn(tx, isCommitable); err != nil { if err = finishTxn(tx, isCommitable); err != nil {
log.Error("stats: %s", err) s.logger.Error("finishing transaction", slogutil.KeyError, err)
} }
}() }()
s.curr = newUnit(id) s.curr = newUnit(id)
flushErr := ptr.serialize().flushUnitToDB(tx, ptr.id) udb := ptr.serialize()
flushErr := s.flushUnitToDB(udb, tx, ptr.id)
if flushErr != nil { if flushErr != nil {
log.Error("stats: flushing unit: %s", flushErr) s.logger.Error("flushing unit", slogutil.KeyError, flushErr)
isCommitable = false isCommitable = false
} }
@ -446,11 +456,12 @@ func (s *StatsCtx) flushDB(id, limit uint32, ptr *unit) (cont bool, sleepFor tim
if delErr != nil { if delErr != nil {
// TODO(e.burkov): Improve the algorithm of deleting the oldest bucket // TODO(e.burkov): Improve the algorithm of deleting the oldest bucket
// to avoid the error. // to avoid the error.
msg := "deleting bucket"
if errors.Is(delErr, bbolt.ErrBucketNotFound) { if errors.Is(delErr, bbolt.ErrBucketNotFound) {
log.Debug("stats: warning: deleting unit: %s", delErr) s.logger.Warn(msg, slogutil.KeyError, delErr)
} else { } else {
isCommitable = false isCommitable = false
log.Error("stats: deleting unit: %s", delErr) s.logger.Error(msg, slogutil.KeyError, delErr)
} }
} }
@ -467,7 +478,7 @@ func (s *StatsCtx) periodicFlush() {
cont, sleepFor = s.flush() cont, sleepFor = s.flush()
} }
log.Debug("periodic flushing finished") s.logger.Debug("periodic flushing finished")
} }
// setLimit sets the limit. s.lock is expected to be locked. // setLimit sets the limit. s.lock is expected to be locked.
@ -477,16 +488,16 @@ func (s *StatsCtx) setLimit(limit time.Duration) {
if limit != 0 { if limit != 0 {
s.enabled = true s.enabled = true
s.limit = limit s.limit = limit
log.Debug("stats: set limit: %d days", limit/timeutil.Day) s.logger.Debug("setting limit in days", "num", limit/timeutil.Day)
return return
} }
s.enabled = false s.enabled = false
log.Debug("stats: disabled") s.logger.Debug("disabled")
if err := s.clear(); err != nil { if err := s.clear(); err != nil {
log.Error("stats: %s", err) s.logger.Error("clearing", slogutil.KeyError, err)
} }
} }
@ -499,7 +510,7 @@ func (s *StatsCtx) clear() (err error) {
var tx *bbolt.Tx var tx *bbolt.Tx
tx, err = db.Begin(true) tx, err = db.Begin(true)
if err != nil { if err != nil {
log.Error("stats: opening a transaction: %s", err) s.logger.Error("opening transaction", slogutil.KeyError, err)
} else if err = finishTxn(tx, false); err != nil { } else if err = finishTxn(tx, false); err != nil {
// Don't wrap the error since it's informative enough as is. // Don't wrap the error since it's informative enough as is.
return err return err
@ -513,21 +524,21 @@ func (s *StatsCtx) clear() (err error) {
} }
// All active transactions are now closed. // All active transactions are now closed.
log.Debug("stats: database closed") s.logger.Debug("database closed")
} }
err = os.Remove(s.filename) err = os.Remove(s.filename)
if err != nil { if err != nil {
log.Error("stats: %s", err) s.logger.Error("removing", slogutil.KeyError, err)
} }
err = s.openDB() err = s.openDB()
if err != nil { if err != nil {
log.Error("stats: opening database: %s", err) s.logger.Error("opening database", slogutil.KeyError, err)
} }
// Use defer to unlock the mutex as soon as possible. // Use defer to unlock the mutex as soon as possible.
defer log.Debug("stats: cleared") defer s.logger.Debug("cleared")
s.currMu.Lock() s.currMu.Lock()
defer s.currMu.Unlock() defer s.currMu.Unlock()
@ -548,7 +559,7 @@ func (s *StatsCtx) loadUnits(limit uint32) (units []*unitDB, curID uint32) {
// taken into account. // taken into account.
tx, err := db.Begin(true) tx, err := db.Begin(true)
if err != nil { if err != nil {
log.Error("stats: opening transaction: %s", err) s.logger.Error("opening transaction", slogutil.KeyError, err)
return nil, 0 return nil, 0
} }
@ -568,7 +579,7 @@ func (s *StatsCtx) loadUnits(limit uint32) (units []*unitDB, curID uint32) {
units = make([]*unitDB, 0, limit) units = make([]*unitDB, 0, limit)
firstID := curID - limit + 1 firstID := curID - limit + 1
for i := firstID; i != curID; i++ { for i := firstID; i != curID; i++ {
u := loadUnitFromDB(tx, i) u := s.loadUnitFromDB(tx, i)
if u == nil { if u == nil {
u = &unitDB{NResult: make([]uint64, resultLast)} u = &unitDB{NResult: make([]uint64, resultLast)}
} }
@ -577,7 +588,7 @@ func (s *StatsCtx) loadUnits(limit uint32) (units []*unitDB, curID uint32) {
err = finishTxn(tx, false) err = finishTxn(tx, false)
if err != nil { if err != nil {
log.Error("stats: %s", err) s.logger.Error("finishing transaction", slogutil.KeyError, err)
} }
if cur != nil { if cur != nil {
@ -585,7 +596,9 @@ func (s *StatsCtx) loadUnits(limit uint32) (units []*unitDB, curID uint32) {
} }
if unitsLen := len(units); unitsLen != int(limit) { if unitsLen := len(units); unitsLen != int(limit) {
log.Fatalf("loaded %d units whilst the desired number is %d", unitsLen, limit) // Should not happen.
s.logger.Error("number of loaded units not equal to limit", "loaded", unitsLen, "limit", limit)
units = units[:limit]
} }
return units, curID return units, curID

View file

@ -8,6 +8,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/AdguardTeam/golibs/logutil/slogutil"
"github.com/AdguardTeam/golibs/testutil" "github.com/AdguardTeam/golibs/testutil"
"github.com/AdguardTeam/golibs/timeutil" "github.com/AdguardTeam/golibs/timeutil"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -18,6 +19,7 @@ func TestStats_races(t *testing.T) {
var r uint32 var r uint32
idGen := func() (id uint32) { return atomic.LoadUint32(&r) } idGen := func() (id uint32) { return atomic.LoadUint32(&r) }
conf := Config{ conf := Config{
Logger: slogutil.NewDiscardLogger(),
ShouldCountClient: func([]string) bool { return true }, ShouldCountClient: func([]string) bool { return true },
UnitID: idGen, UnitID: idGen,
Filename: filepath.Join(t.TempDir(), "./stats.db"), Filename: filepath.Join(t.TempDir(), "./stats.db"),
@ -94,6 +96,7 @@ func TestStatsCtx_FillCollectedStats_daily(t *testing.T) {
) )
s, err := New(Config{ s, err := New(Config{
Logger: slogutil.NewDiscardLogger(),
ShouldCountClient: func([]string) bool { return true }, ShouldCountClient: func([]string) bool { return true },
Filename: filepath.Join(t.TempDir(), "./stats.db"), Filename: filepath.Join(t.TempDir(), "./stats.db"),
Limit: time.Hour, Limit: time.Hour,
@ -151,6 +154,7 @@ func TestStatsCtx_DataFromUnits_month(t *testing.T) {
const hoursInMonth = 720 const hoursInMonth = 720
s, err := New(Config{ s, err := New(Config{
Logger: slogutil.NewDiscardLogger(),
ShouldCountClient: func([]string) bool { return true }, ShouldCountClient: func([]string) bool { return true },
Filename: filepath.Join(t.TempDir(), "./stats.db"), Filename: filepath.Join(t.TempDir(), "./stats.db"),
Limit: time.Hour, Limit: time.Hour,

View file

@ -13,6 +13,7 @@ import (
"github.com/AdguardTeam/AdGuardHome/internal/aghnet" "github.com/AdguardTeam/AdGuardHome/internal/aghnet"
"github.com/AdguardTeam/AdGuardHome/internal/stats" "github.com/AdguardTeam/AdGuardHome/internal/stats"
"github.com/AdguardTeam/golibs/logutil/slogutil"
"github.com/AdguardTeam/golibs/netutil" "github.com/AdguardTeam/golibs/netutil"
"github.com/AdguardTeam/golibs/testutil" "github.com/AdguardTeam/golibs/testutil"
"github.com/AdguardTeam/golibs/timeutil" "github.com/AdguardTeam/golibs/timeutil"
@ -55,6 +56,7 @@ func TestStats(t *testing.T) {
handlers := map[string]http.Handler{} handlers := map[string]http.Handler{}
conf := stats.Config{ conf := stats.Config{
Logger: slogutil.NewDiscardLogger(),
ShouldCountClient: func([]string) bool { return true }, ShouldCountClient: func([]string) bool { return true },
Filename: filepath.Join(t.TempDir(), "stats.db"), Filename: filepath.Join(t.TempDir(), "stats.db"),
Limit: timeutil.Day, Limit: timeutil.Day,
@ -171,6 +173,7 @@ func TestLargeNumbers(t *testing.T) {
handlers := map[string]http.Handler{} handlers := map[string]http.Handler{}
conf := stats.Config{ conf := stats.Config{
Logger: slogutil.NewDiscardLogger(),
ShouldCountClient: func([]string) bool { return true }, ShouldCountClient: func([]string) bool { return true },
Filename: filepath.Join(t.TempDir(), "stats.db"), Filename: filepath.Join(t.TempDir(), "stats.db"),
Limit: timeutil.Day, Limit: timeutil.Day,
@ -222,6 +225,7 @@ func TestShouldCount(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
s, err := stats.New(stats.Config{ s, err := stats.New(stats.Config{
Logger: slogutil.NewDiscardLogger(),
Enabled: true, Enabled: true,
Filename: filepath.Join(t.TempDir(), "stats.db"), Filename: filepath.Join(t.TempDir(), "stats.db"),
Limit: timeutil.Day, Limit: timeutil.Day,

View file

@ -10,7 +10,7 @@ import (
"github.com/AdguardTeam/AdGuardHome/internal/aghnet" "github.com/AdguardTeam/AdGuardHome/internal/aghnet"
"github.com/AdguardTeam/golibs/errors" "github.com/AdguardTeam/golibs/errors"
"github.com/AdguardTeam/golibs/log" "github.com/AdguardTeam/golibs/logutil/slogutil"
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
"golang.org/x/exp/maps" "golang.org/x/exp/maps"
) )
@ -277,13 +277,14 @@ func (u *unit) serialize() (udb *unitDB) {
} }
} }
func loadUnitFromDB(tx *bbolt.Tx, id uint32) (udb *unitDB) { // loadUnitFromDB loads unit by id from the database.
func (s *StatsCtx) loadUnitFromDB(tx *bbolt.Tx, id uint32) (udb *unitDB) {
bkt := tx.Bucket(idToUnitName(id)) bkt := tx.Bucket(idToUnitName(id))
if bkt == nil { if bkt == nil {
return nil return nil
} }
log.Tracef("Loading unit %d", id) s.logger.Debug("loading unit", "id", id)
var buf bytes.Buffer var buf bytes.Buffer
buf.Write(bkt.Get([]byte{0})) buf.Write(bkt.Get([]byte{0}))
@ -291,7 +292,7 @@ func loadUnitFromDB(tx *bbolt.Tx, id uint32) (udb *unitDB) {
err := gob.NewDecoder(&buf).Decode(udb) err := gob.NewDecoder(&buf).Decode(udb)
if err != nil { if err != nil {
log.Error("gob Decode: %s", err) s.logger.Error("gob decode", slogutil.KeyError, err)
return nil return nil
} }
@ -339,8 +340,8 @@ func (u *unit) add(e *Entry) {
} }
// flushUnitToDB puts udb to the database at id. // flushUnitToDB puts udb to the database at id.
func (udb *unitDB) flushUnitToDB(tx *bbolt.Tx, id uint32) (err error) { func (s *StatsCtx) flushUnitToDB(udb *unitDB, tx *bbolt.Tx, id uint32) (err error) {
log.Debug("stats: flushing unit with id %d and total of %d", id, udb.NTotal) s.logger.Debug("flushing unit", "id", id, "req_num", udb.NTotal)
bkt, err := tx.CreateBucketIfNotExists(idToUnitName(id)) bkt, err := tx.CreateBucketIfNotExists(idToUnitName(id))
if err != nil { if err != nil {