AdGuardHome/internal/client/addrproc.go
Dimitry Kolyshev df40da7c64 Pull request: AG-28961-upd-golibs
Squashed commit of the following:

commit b153bbc7100dd9184ca689f1755f068b63e3046b
Merge: d16da0cf6 4508ae860
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Wed Jan 17 13:56:34 2024 +0200

    Merge remote-tracking branch 'origin/master' into AG-28961-upd-golibs

commit d16da0cf61d050afd04f00ffc36bca550548edd9
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Wed Jan 17 09:52:03 2024 +0200

    all: imp code

commit 46aeca7221586ce0cdc91838764bbacdbdfa8620
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Wed Jan 17 09:50:10 2024 +0200

    all: imp code

commit 32bc83c0a909467655a258e2e879731a90dc96e6
Merge: ee51c6046 6dbeb5b97
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Tue Jan 16 15:42:32 2024 +0200

    Merge remote-tracking branch 'origin/master' into AG-28961-upd-golibs

    # Conflicts:
    #	go.mod
    #	go.sum

commit ee51c6046632f89fbe5aa8f6d857c239f060aba5
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Tue Jan 16 10:56:38 2024 +0200

    all: upd libs

commit 02c1dbd9b568cb9f6ec52a0e9835d0d39e3cd377
Merge: 1daba8342 58b47adaf
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Tue Jan 16 10:53:54 2024 +0200

    Merge remote-tracking branch 'origin/master' into AG-28961-upd-golibs

commit 1daba8342b72163c8a26380e083c4e497d6bb772
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Mon Jan 15 11:15:05 2024 +0200

    all: upd dnsproxy

commit b1670e8a81c04f400245e1316857578b549e58f1
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Mon Jan 15 10:46:27 2024 +0200

    dnsforward: imp code

commit 7b65a50fca37ad71b68a8bda504839a78b6f7319
Author: Dimitry Kolyshev <dkolyshev@adguard.com>
Date:   Fri Jan 12 14:14:34 2024 +0200

    all: upd golibs
2024-01-17 15:06:16 +03:00

301 lines
8.5 KiB
Go

package client
import (
"context"
"net/netip"
"sync"
"time"
"github.com/AdguardTeam/AdGuardHome/internal/aghnet"
"github.com/AdguardTeam/AdGuardHome/internal/rdns"
"github.com/AdguardTeam/AdGuardHome/internal/whois"
"github.com/AdguardTeam/golibs/errors"
"github.com/AdguardTeam/golibs/log"
"github.com/AdguardTeam/golibs/netutil"
)
// ErrClosed is returned from [AddressProcessor.Close] if it's closed more than
// once.
const ErrClosed errors.Error = "use of closed address processor"
// AddressProcessor is the interface for types that can process clients.
type AddressProcessor interface {
Process(ip netip.Addr)
Close() (err error)
}
// EmptyAddrProc is an [AddressProcessor] that does nothing.
type EmptyAddrProc struct{}
// type check
var _ AddressProcessor = EmptyAddrProc{}
// Process implements the [AddressProcessor] interface for EmptyAddrProc.
func (EmptyAddrProc) Process(_ netip.Addr) {}
// Close implements the [AddressProcessor] interface for EmptyAddrProc.
func (EmptyAddrProc) Close() (_ error) { return nil }
// DefaultAddrProcConfig is the configuration structure for address processors.
type DefaultAddrProcConfig struct {
// DialContext is used to create TCP connections to WHOIS servers.
// DialContext must not be nil if [DefaultAddrProcConfig.UseWHOIS] is true.
DialContext aghnet.DialContextFunc
// Exchanger is used to perform rDNS queries. Exchanger must not be nil if
// [DefaultAddrProcConfig.UseRDNS] is true.
Exchanger rdns.Exchanger
// PrivateSubnets are used to determine if an incoming IP address is
// private. It must not be nil.
PrivateSubnets netutil.SubnetSet
// AddressUpdater is used to update the information about a client's IP
// address. It must not be nil.
AddressUpdater AddressUpdater
// InitialAddresses are the addresses that are queued for processing
// immediately by [NewDefaultAddrProc].
InitialAddresses []netip.Addr
// CatchPanics, if true, makes the address processor catch and log panics.
//
// TODO(a.garipov): Consider better ways to do this or apply this method to
// other parts of the codebase.
CatchPanics bool
// UseRDNS, if true, enables resolving of client IP addresses using reverse
// DNS.
UseRDNS bool
// UsePrivateRDNS, if true, enables resolving of private client IP addresses
// using reverse DNS. See [DefaultAddrProcConfig.PrivateSubnets].
UsePrivateRDNS bool
// UseWHOIS, if true, enables resolving of client IP addresses using WHOIS.
UseWHOIS bool
}
// AddressUpdater is the interface for storages of DNS clients that can update
// information about them.
//
// TODO(a.garipov): Consider using the actual client storage once it is moved
// into this package.
type AddressUpdater interface {
// UpdateAddress updates information about an IP address, setting host (if
// not empty) and WHOIS information (if not nil).
UpdateAddress(ip netip.Addr, host string, info *whois.Info)
}
// DefaultAddrProc processes incoming client addresses with rDNS and WHOIS, if
// configured, and updates that information in a client storage.
type DefaultAddrProc struct {
// clientIPsMu serializes closure of clientIPs and access to isClosed.
clientIPsMu *sync.Mutex
// clientIPs is the channel queueing client processing tasks.
clientIPs chan netip.Addr
// rdns is used to perform rDNS lookups of clients' IP addresses.
rdns rdns.Interface
// whois is used to perform WHOIS lookups of clients' IP addresses.
whois whois.Interface
// addrUpdater is used to update the information about a client's IP
// address.
addrUpdater AddressUpdater
// privateSubnets are used to determine if an incoming IP address is
// private.
privateSubnets netutil.SubnetSet
// isClosed is set to true once the address processor is closed.
isClosed bool
// usePrivateRDNS, if true, enables resolving of private client IP addresses
// using reverse DNS.
usePrivateRDNS bool
}
const (
// defaultQueueSize is the size of queue of IPs for rDNS and WHOIS
// processing.
defaultQueueSize = 255
// defaultCacheSize is the maximum size of the cache for rDNS and WHOIS
// processing. It must be greater than zero.
defaultCacheSize = 10_000
// defaultIPTTL is the Time to Live duration for IP addresses cached by
// rDNS and WHOIS.
defaultIPTTL = 1 * time.Hour
)
// NewDefaultAddrProc returns a new running client address processor. c must
// not be nil.
func NewDefaultAddrProc(c *DefaultAddrProcConfig) (p *DefaultAddrProc) {
p = &DefaultAddrProc{
clientIPsMu: &sync.Mutex{},
clientIPs: make(chan netip.Addr, defaultQueueSize),
rdns: &rdns.Empty{},
addrUpdater: c.AddressUpdater,
whois: &whois.Empty{},
privateSubnets: c.PrivateSubnets,
usePrivateRDNS: c.UsePrivateRDNS,
}
if c.UseRDNS {
p.rdns = rdns.New(&rdns.Config{
Exchanger: c.Exchanger,
CacheSize: defaultCacheSize,
CacheTTL: defaultIPTTL,
})
}
if c.UseWHOIS {
p.whois = newWHOIS(c.DialContext)
}
go p.process(c.CatchPanics)
for _, ip := range c.InitialAddresses {
p.Process(ip)
}
return p
}
// newWHOIS returns a whois.Interface instance using the given function for
// dialing.
func newWHOIS(dialFunc aghnet.DialContextFunc) (w whois.Interface) {
// TODO(s.chzhen): Consider making configurable.
const (
// defaultTimeout is the timeout for WHOIS requests.
defaultTimeout = 5 * time.Second
// defaultMaxConnReadSize is an upper limit in bytes for reading from a
// net.Conn.
defaultMaxConnReadSize = 64 * 1024
// defaultMaxRedirects is the maximum redirects count.
defaultMaxRedirects = 5
// defaultMaxInfoLen is the maximum length of whois.Info fields.
defaultMaxInfoLen = 250
)
return whois.New(&whois.Config{
DialContext: dialFunc,
ServerAddr: whois.DefaultServer,
Port: whois.DefaultPort,
Timeout: defaultTimeout,
CacheSize: defaultCacheSize,
MaxConnReadSize: defaultMaxConnReadSize,
MaxRedirects: defaultMaxRedirects,
MaxInfoLen: defaultMaxInfoLen,
CacheTTL: defaultIPTTL,
})
}
// type check
var _ AddressProcessor = (*DefaultAddrProc)(nil)
// Process implements the [AddressProcessor] interface for *DefaultAddrProc.
func (p *DefaultAddrProc) Process(ip netip.Addr) {
p.clientIPsMu.Lock()
defer p.clientIPsMu.Unlock()
if p.isClosed {
return
}
select {
case p.clientIPs <- ip:
// Go on.
default:
log.Debug("clients: ip channel is full; len: %d", len(p.clientIPs))
}
}
// process processes the incoming client IP-address information. It is intended
// to be used as a goroutine. Once clientIPs is closed, process exits.
func (p *DefaultAddrProc) process(catchPanics bool) {
if catchPanics {
defer log.OnPanic("addrProcessor.process")
}
log.Info("clients: processing addresses")
for ip := range p.clientIPs {
host := p.processRDNS(ip)
info := p.processWHOIS(ip)
p.addrUpdater.UpdateAddress(ip, host, info)
}
log.Info("clients: finished processing addresses")
}
// processRDNS resolves the clients' IP addresses using reverse DNS. host is
// empty if there were errors or if the information hasn't changed.
func (p *DefaultAddrProc) processRDNS(ip netip.Addr) (host string) {
start := time.Now()
log.Debug("clients: processing %s with rdns", ip)
defer func() {
log.Debug("clients: finished processing %s with rdns in %s", ip, time.Since(start))
}()
ok := p.shouldResolve(ip)
if !ok {
return
}
host, changed := p.rdns.Process(ip)
if !changed {
host = ""
}
return host
}
// shouldResolve returns false if ip is a loopback address, or ip is private and
// resolving of private addresses is disabled.
func (p *DefaultAddrProc) shouldResolve(ip netip.Addr) (ok bool) {
return !ip.IsLoopback() && (p.usePrivateRDNS || !p.privateSubnets.Contains(ip))
}
// processWHOIS looks up the information about clients' IP addresses in the
// WHOIS databases. info is nil if there were errors or if the information
// hasn't changed.
func (p *DefaultAddrProc) processWHOIS(ip netip.Addr) (info *whois.Info) {
start := time.Now()
log.Debug("clients: processing %s with whois", ip)
defer func() {
log.Debug("clients: finished processing %s with whois in %s", ip, time.Since(start))
}()
// TODO(s.chzhen): Move the timeout logic from WHOIS configuration to the
// context.
info, changed := p.whois.Process(context.Background(), ip)
if !changed {
info = nil
}
return info
}
// Close implements the [AddressProcessor] interface for *DefaultAddrProc.
func (p *DefaultAddrProc) Close() (err error) {
p.clientIPsMu.Lock()
defer p.clientIPsMu.Unlock()
if p.isClosed {
return ErrClosed
}
close(p.clientIPs)
p.isClosed = true
return nil
}