AdGuardHome/internal/stats/unit.go
Stanislav Chzhen 6409011510 Pull request 2164: 6712-hourly-graphs
Updates #6712.

Squashed commit of the following:

commit dd4c822dfea04cdedf2f3fd5ea7d492ef1c5f7d1
Merge: 73207860f 28a6b9f30
Author: Stanislav Chzhen <s.chzhen@adguard.com>
Date:   Tue Mar 12 12:38:19 2024 +0300

    Merge branch 'master' into 6712-hourly-graphs

commit 73207860f6428ad2c7d3afe7e2d654cfeefcffdc
Merge: ca29ee8e4 5388ad55a
Author: Stanislav Chzhen <s.chzhen@adguard.com>
Date:   Thu Mar 7 16:25:49 2024 +0300

    Merge branch 'master' into 6712-hourly-graphs

commit ca29ee8e41bf34b7fa4d261a2c9d2a809b97f874
Author: Stanislav Chzhen <s.chzhen@adguard.com>
Date:   Thu Mar 7 14:41:45 2024 +0300

    all: upd chlog

commit 9d6154aef8f0c4db24c50bee6aaedc4b98e26b3c
Author: Stanislav Chzhen <s.chzhen@adguard.com>
Date:   Thu Mar 7 14:10:32 2024 +0300

    all: hourly graphs
2024-03-12 13:13:59 +03:00

633 lines
17 KiB
Go

package stats
import (
"bytes"
"encoding/binary"
"encoding/gob"
"fmt"
"slices"
"time"
"github.com/AdguardTeam/AdGuardHome/internal/aghnet"
"github.com/AdguardTeam/golibs/errors"
"github.com/AdguardTeam/golibs/log"
"go.etcd.io/bbolt"
"golang.org/x/exp/maps"
)
const (
// maxDomains is the max number of top domains to return.
maxDomains = 100
// maxClients is the max number of top clients to return.
maxClients = 100
// maxUpstreams is the max number of top upstreams to return.
maxUpstreams = 100
)
// UnitIDGenFunc is the signature of a function that generates a unique ID for
// the statistics unit.
type UnitIDGenFunc func() (id uint32)
// Supported values of [StatsResp.TimeUnits].
const (
timeUnitsHours = "hours"
timeUnitsDays = "days"
)
// Result is the resulting code of processing the DNS request.
type Result int
// Supported Result values.
//
// TODO(e.burkov): Think about better naming.
const (
RNotFiltered Result = iota + 1
RFiltered
RSafeBrowsing
RSafeSearch
RParental
resultLast = RParental + 1
)
// Entry is a statistics data entry.
type Entry struct {
// Clients is the client's primary ID.
//
// TODO(a.garipov): Make this a {net.IP, string} enum?
Client string
// Domain is the domain name requested.
Domain string
// Upstream is the upstream DNS server.
Upstream string
// Result is the result of processing the request.
Result Result
// ProcessingTime is the duration of the request processing from the start
// of the request including timeouts.
ProcessingTime time.Duration
// UpstreamTime is the duration of the successful request to the upstream.
UpstreamTime time.Duration
}
// validate returns an error if entry is not valid.
func (e *Entry) validate() (err error) {
switch {
case e.Result == 0:
return errors.Error("result code is not set")
case e.Result >= resultLast:
return fmt.Errorf("unknown result code %d", e.Result)
case e.Domain == "":
return errors.Error("domain is empty")
case e.Client == "":
return errors.Error("client is empty")
default:
return nil
}
}
// unit collects the statistics data for a specific period of time.
type unit struct {
// domains stores the number of requests for each domain.
domains map[string]uint64
// blockedDomains stores the number of requests for each domain that has
// been blocked.
blockedDomains map[string]uint64
// clients stores the number of requests from each client.
clients map[string]uint64
// upstreamsResponses stores the number of responses from each upstream.
upstreamsResponses map[string]uint64
// upstreamsTimeSum stores the sum of durations of successful queries in
// microseconds to each upstream.
upstreamsTimeSum map[string]uint64
// nResult stores the number of requests grouped by it's result.
nResult []uint64
// id is the unique unit's identifier. It's set to an absolute hour number
// since the beginning of UNIX time by the default ID generating function.
//
// Must not be rewritten after creating to be accessed concurrently without
// using mu.
id uint32
// nTotal stores the total number of requests.
nTotal uint64
// timeSum stores the sum of processing time in microseconds of each request
// written by the unit.
timeSum uint64
}
// newUnit allocates the new *unit.
func newUnit(id uint32) (u *unit) {
return &unit{
domains: map[string]uint64{},
blockedDomains: map[string]uint64{},
clients: map[string]uint64{},
upstreamsResponses: map[string]uint64{},
upstreamsTimeSum: map[string]uint64{},
nResult: make([]uint64, resultLast),
id: id,
}
}
// countPair is a single name-number pair for deserializing statistics data into
// the database.
type countPair struct {
Name string
Count uint64
}
// unitDB is the structure for serializing statistics data into the database.
//
// NOTE: Do not change the names or types of fields, as this structure is used
// for GOB encoding.
type unitDB struct {
// NResult is the number of requests by the result's kind.
NResult []uint64
// Domains is the number of requests for each domain name.
Domains []countPair
// BlockedDomains is the number of requests blocked for each domain name.
BlockedDomains []countPair
// Clients is the number of requests from each client.
Clients []countPair
// UpstreamsResponses is the number of responses from each upstream.
UpstreamsResponses []countPair
// UpstreamsTimeSum is the sum of processing time in microseconds of
// responses from each upstream.
UpstreamsTimeSum []countPair
// NTotal is the total number of requests.
NTotal uint64
// TimeAvg is the average of processing times in microseconds of all the
// requests in the unit.
TimeAvg uint32
}
// newUnitID is the default UnitIDGenFunc that generates the unique id hourly.
func newUnitID() (id uint32) {
const secsInHour = int64(time.Hour / time.Second)
return uint32(time.Now().Unix() / secsInHour)
}
func finishTxn(tx *bbolt.Tx, commit bool) (err error) {
if commit {
err = errors.Annotate(tx.Commit(), "committing: %w")
} else {
err = errors.Annotate(tx.Rollback(), "rolling back: %w")
}
return err
}
// bucketNameLen is the length of a bucket, a 64-bit unsigned integer.
//
// TODO(a.garipov): Find out why a 64-bit integer is used when IDs seem to
// always be 32 bits.
const bucketNameLen = 8
// idToUnitName converts a numerical ID into a database unit name.
func idToUnitName(id uint32) (name []byte) {
n := [bucketNameLen]byte{}
binary.BigEndian.PutUint64(n[:], uint64(id))
return n[:]
}
// unitNameToID converts a database unit name into a numerical ID. ok is false
// if name is not a valid database unit name.
func unitNameToID(name []byte) (id uint32, ok bool) {
if len(name) < bucketNameLen {
return 0, false
}
return uint32(binary.BigEndian.Uint64(name)), true
}
// compareCount used to sort countPair by Count in descending order.
func (a countPair) compareCount(b countPair) (res int) {
switch x, y := a.Count, b.Count; {
case x > y:
return -1
case x < y:
return +1
default:
return 0
}
}
func convertMapToSlice(m map[string]uint64, max int) (s []countPair) {
s = make([]countPair, 0, len(m))
for k, v := range m {
s = append(s, countPair{Name: k, Count: v})
}
slices.SortFunc(s, countPair.compareCount)
if max > len(s) {
max = len(s)
}
return s[:max]
}
func convertSliceToMap(a []countPair) (m map[string]uint64) {
m = map[string]uint64{}
for _, it := range a {
m[it.Name] = it.Count
}
return m
}
// serialize converts u to the *unitDB. It's safe for concurrent use. u must
// not be nil.
func (u *unit) serialize() (udb *unitDB) {
var timeAvg uint32 = 0
if u.nTotal != 0 {
timeAvg = uint32(u.timeSum / u.nTotal)
}
return &unitDB{
NTotal: u.nTotal,
NResult: append([]uint64{}, u.nResult...),
Domains: convertMapToSlice(u.domains, maxDomains),
BlockedDomains: convertMapToSlice(u.blockedDomains, maxDomains),
Clients: convertMapToSlice(u.clients, maxClients),
UpstreamsResponses: convertMapToSlice(u.upstreamsResponses, maxUpstreams),
UpstreamsTimeSum: convertMapToSlice(u.upstreamsTimeSum, maxUpstreams),
TimeAvg: timeAvg,
}
}
func loadUnitFromDB(tx *bbolt.Tx, id uint32) (udb *unitDB) {
bkt := tx.Bucket(idToUnitName(id))
if bkt == nil {
return nil
}
log.Tracef("Loading unit %d", id)
var buf bytes.Buffer
buf.Write(bkt.Get([]byte{0}))
udb = &unitDB{}
err := gob.NewDecoder(&buf).Decode(udb)
if err != nil {
log.Error("gob Decode: %s", err)
return nil
}
return udb
}
// deserialize assigns the appropriate values from udb to u. u must not be nil.
// It's safe for concurrent use.
func (u *unit) deserialize(udb *unitDB) {
if udb == nil {
return
}
u.nTotal = udb.NTotal
u.nResult = make([]uint64, resultLast)
copy(u.nResult, udb.NResult)
u.domains = convertSliceToMap(udb.Domains)
u.blockedDomains = convertSliceToMap(udb.BlockedDomains)
u.clients = convertSliceToMap(udb.Clients)
u.upstreamsResponses = convertSliceToMap(udb.UpstreamsResponses)
u.upstreamsTimeSum = convertSliceToMap(udb.UpstreamsTimeSum)
u.timeSum = uint64(udb.TimeAvg) * udb.NTotal
}
// add adds new data to u. It's safe for concurrent use.
func (u *unit) add(e *Entry) {
u.nResult[e.Result]++
if e.Result == RNotFiltered {
u.domains[e.Domain]++
} else {
u.blockedDomains[e.Domain]++
}
u.clients[e.Client]++
pt := uint64(e.ProcessingTime.Microseconds())
u.timeSum += pt
u.nTotal++
if e.Upstream != "" {
u.upstreamsResponses[e.Upstream]++
ut := uint64(e.UpstreamTime.Microseconds())
u.upstreamsTimeSum[e.Upstream] += ut
}
}
// flushUnitToDB puts udb to the database at id.
func (udb *unitDB) flushUnitToDB(tx *bbolt.Tx, id uint32) (err error) {
log.Debug("stats: flushing unit with id %d and total of %d", id, udb.NTotal)
bkt, err := tx.CreateBucketIfNotExists(idToUnitName(id))
if err != nil {
return fmt.Errorf("creating bucket: %w", err)
}
buf := &bytes.Buffer{}
err = gob.NewEncoder(buf).Encode(udb)
if err != nil {
return fmt.Errorf("encoding unit: %w", err)
}
err = bkt.Put([]byte{0}, buf.Bytes())
if err != nil {
return fmt.Errorf("putting unit to database: %w", err)
}
return nil
}
func convertTopSlice(a []countPair) (m []map[string]uint64) {
m = make([]map[string]uint64, 0, len(a))
for _, it := range a {
m = append(m, map[string]uint64{it.Name: it.Count})
}
return m
}
// pairsGetter is a signature for topsCollector argument.
type pairsGetter func(u *unitDB) (pairs []countPair)
// topsCollector collects statistics about highest values from the given *unitDB
// slice using pg to retrieve data.
func topsCollector(units []*unitDB, max int, ignored *aghnet.IgnoreEngine, pg pairsGetter) []map[string]uint64 {
m := map[string]uint64{}
for _, u := range units {
for _, cp := range pg(u) {
if !ignored.Has(cp.Name) {
m[cp.Name] += cp.Count
}
}
}
a2 := convertMapToSlice(m, max)
return convertTopSlice(a2)
}
// getData returns the statistics data using the following algorithm:
//
// 1. Prepare a slice of N units, where N is the value of "limit" configuration
// setting. Load data for the most recent units from the file. If a unit
// with required ID doesn't exist, just add an empty unit. Get data for the
// current unit.
//
// 2. Process data from the units and prepare an output map object, including
// per time unit counters (DNS queries per time-unit, blocked queries per
// time unit, etc.). If the time unit is hour, just add values from each
// unit to the slice; otherwise, the time unit is day, so aggregate per-hour
// data into days.
//
// To get the top counters (queries per domain, queries per blocked domain,
// etc.), first sum up data for all units into a single map. Then, get the
// pairs with the highest numbers.
//
// The total counters (DNS queries, blocked, etc.) are just the sum of data
// for all units.
func (s *StatsCtx) getData(limit uint32) (resp *StatsResp, ok bool) {
if limit == 0 {
return &StatsResp{
TimeUnits: "days",
TopBlocked: []topAddrs{},
TopClients: []topAddrs{},
TopQueried: []topAddrs{},
TopUpstreamsResponses: []topAddrs{},
TopUpstreamsAvgTime: []topAddrsFloat{},
BlockedFiltering: []uint64{},
DNSQueries: []uint64{},
ReplacedParental: []uint64{},
ReplacedSafebrowsing: []uint64{},
}, true
}
units, curID := s.loadUnits(limit)
if units == nil {
return &StatsResp{}, false
}
return s.dataFromUnits(units, curID), true
}
// dataFromUnits collects and returns the statistics data.
func (s *StatsCtx) dataFromUnits(units []*unitDB, curID uint32) (resp *StatsResp) {
topUpstreamsResponses, topUpstreamsAvgTime := topUpstreamsPairs(units)
resp = &StatsResp{
TopQueried: topsCollector(units, maxDomains, s.ignored, func(u *unitDB) (pairs []countPair) { return u.Domains }),
TopBlocked: topsCollector(units, maxDomains, s.ignored, func(u *unitDB) (pairs []countPair) { return u.BlockedDomains }),
TopUpstreamsResponses: topUpstreamsResponses,
TopUpstreamsAvgTime: topUpstreamsAvgTime,
TopClients: topsCollector(units, maxClients, nil, topClientPairs(s)),
}
s.fillCollectedStats(resp, units, curID)
// Total counters:
sum := unitDB{
NResult: make([]uint64, resultLast),
}
var timeN uint32
for _, u := range units {
sum.NTotal += u.NTotal
sum.TimeAvg += u.TimeAvg
if u.TimeAvg != 0 {
timeN++
}
sum.NResult[RFiltered] += u.NResult[RFiltered]
sum.NResult[RSafeBrowsing] += u.NResult[RSafeBrowsing]
sum.NResult[RSafeSearch] += u.NResult[RSafeSearch]
sum.NResult[RParental] += u.NResult[RParental]
}
resp.NumDNSQueries = sum.NTotal
resp.NumBlockedFiltering = sum.NResult[RFiltered]
resp.NumReplacedSafebrowsing = sum.NResult[RSafeBrowsing]
resp.NumReplacedSafesearch = sum.NResult[RSafeSearch]
resp.NumReplacedParental = sum.NResult[RParental]
if timeN != 0 {
resp.AvgProcessingTime = microsecondsToSeconds(float64(sum.TimeAvg / timeN))
}
return resp
}
// fillCollectedStats fills data with collected statistics.
func (s *StatsCtx) fillCollectedStats(data *StatsResp, units []*unitDB, curID uint32) {
size := len(units)
data.TimeUnits = timeUnitsHours
daysCount := size / 24
if daysCount > 7 {
size = daysCount
data.TimeUnits = timeUnitsDays
}
data.DNSQueries = make([]uint64, size)
data.BlockedFiltering = make([]uint64, size)
data.ReplacedSafebrowsing = make([]uint64, size)
data.ReplacedParental = make([]uint64, size)
if data.TimeUnits == timeUnitsDays {
s.fillCollectedStatsDaily(data, units, curID, size)
return
}
for i, u := range units {
data.DNSQueries[i] += u.NTotal
data.BlockedFiltering[i] += u.NResult[RFiltered]
data.ReplacedSafebrowsing[i] += u.NResult[RSafeBrowsing]
data.ReplacedParental[i] += u.NResult[RParental]
}
}
// fillCollectedStatsDaily fills data with collected daily statistics. units
// must contain data for the count of days.
//
// TODO(s.chzhen): Improve collection of statistics for frontend. Dashboard
// cards should contain statistics for the whole interval without rounding to
// days.
func (s *StatsCtx) fillCollectedStatsDaily(
data *StatsResp,
units []*unitDB,
curHour uint32,
days int,
) {
// Per time unit counters: 720 hours may span 31 days, so we skip data for
// the first hours in this case. align_ceil(24)
hours := countHours(curHour, days)
units = units[len(units)-hours:]
for i := 0; i < len(units); i++ {
day := i / 24
u := units[i]
data.DNSQueries[day] += u.NTotal
data.BlockedFiltering[day] += u.NResult[RFiltered]
data.ReplacedSafebrowsing[day] += u.NResult[RSafeBrowsing]
data.ReplacedParental[day] += u.NResult[RParental]
}
}
// countHours returns the number of hours in the last days.
func countHours(curHour uint32, days int) (n int) {
hoursInCurDay := int(curHour % 24)
if hoursInCurDay == 0 {
hoursInCurDay = 24
}
hoursInRestDays := (days - 1) * 24
return hoursInRestDays + hoursInCurDay
}
func topClientPairs(s *StatsCtx) (pg pairsGetter) {
return func(u *unitDB) (clients []countPair) {
for _, c := range u.Clients {
if c.Name != "" && !s.shouldCountClient([]string{c.Name}) {
continue
}
clients = append(clients, c)
}
return clients
}
}
// topUpstreamsPairs returns sorted lists of number of total responses and the
// average of processing time for each upstream.
func topUpstreamsPairs(
units []*unitDB,
) (topUpstreamsResponses []topAddrs, topUpstreamsAvgTime []topAddrsFloat) {
upstreamsResponses := topAddrs{}
upstreamsTimeSum := topAddrsFloat{}
for _, u := range units {
for _, cp := range u.UpstreamsResponses {
upstreamsResponses[cp.Name] += cp.Count
}
for _, cp := range u.UpstreamsTimeSum {
upstreamsTimeSum[cp.Name] += float64(cp.Count)
}
}
upstreamsAvgTime := topAddrsFloat{}
for u, n := range upstreamsResponses {
total := upstreamsTimeSum[u]
if total != 0 {
upstreamsAvgTime[u] = microsecondsToSeconds(total / float64(n))
}
}
upstreamsPairs := convertMapToSlice(upstreamsResponses, maxUpstreams)
topUpstreamsResponses = convertTopSlice(upstreamsPairs)
return topUpstreamsResponses, prepareTopUpstreamsAvgTime(upstreamsAvgTime)
}
// microsecondsToSeconds converts microseconds to seconds.
//
// NOTE: Frontend expects time duration in seconds as floating-point number
// with double precision.
func microsecondsToSeconds(n float64) (r float64) {
const micro = 1e-6
return n * micro
}
// prepareTopUpstreamsAvgTime returns sorted list of average processing times
// of the DNS requests from each upstream.
func prepareTopUpstreamsAvgTime(
upstreamsAvgTime topAddrsFloat,
) (topUpstreamsAvgTime []topAddrsFloat) {
keys := maps.Keys(upstreamsAvgTime)
slices.SortFunc(keys, func(a, b string) (res int) {
switch x, y := upstreamsAvgTime[a], upstreamsAvgTime[b]; {
case x > y:
return -1
case x < y:
return +1
default:
return 0
}
})
topUpstreamsAvgTime = make([]topAddrsFloat, 0, len(upstreamsAvgTime))
for _, k := range keys {
topUpstreamsAvgTime = append(topUpstreamsAvgTime, topAddrsFloat{k: upstreamsAvgTime[k]})
}
return topUpstreamsAvgTime
}