2019-08-26 11:54:38 +03:00
package querylog
2018-12-05 14:03:41 +03:00
import (
2019-10-22 15:51:48 +03:00
"bufio"
2018-12-05 14:03:41 +03:00
"bytes"
"compress/gzip"
2019-10-24 20:00:58 +03:00
"encoding/base64"
2018-12-05 14:03:41 +03:00
"encoding/json"
2019-09-16 17:07:18 +03:00
"io"
2018-12-05 14:03:41 +03:00
"os"
2019-10-24 20:00:58 +03:00
"strconv"
2019-10-22 15:51:48 +03:00
"strings"
2018-12-05 14:03:41 +03:00
"time"
2019-10-24 20:00:58 +03:00
"github.com/AdguardTeam/AdGuardHome/dnsfilter"
2019-02-25 16:44:22 +03:00
"github.com/AdguardTeam/golibs/log"
2019-10-24 20:00:58 +03:00
"github.com/miekg/dns"
2018-12-05 14:03:41 +03:00
)
const enableGzip = false
2019-09-16 17:07:18 +03:00
const maxEntrySize = 1000
2018-12-05 14:03:41 +03:00
2019-02-11 14:22:36 +03:00
// flushLogBuffer flushes the current buffer to file and resets the current buffer
2019-05-15 13:11:36 +03:00
func ( l * queryLog ) flushLogBuffer ( fullFlush bool ) error {
l . fileFlushLock . Lock ( )
defer l . fileFlushLock . Unlock ( )
2019-02-10 20:47:43 +03:00
// flush remainder to file
2019-09-27 18:58:57 +03:00
l . bufferLock . Lock ( )
needFlush := len ( l . buffer ) >= logBufferCap
2019-05-15 13:11:36 +03:00
if ! needFlush && ! fullFlush {
2019-09-27 18:58:57 +03:00
l . bufferLock . Unlock ( )
2019-05-15 13:11:36 +03:00
return nil
}
2019-09-27 18:58:57 +03:00
flushBuffer := l . buffer
l . buffer = nil
2019-05-15 13:11:36 +03:00
l . flushPending = false
2019-09-27 18:58:57 +03:00
l . bufferLock . Unlock ( )
2019-02-10 20:47:43 +03:00
err := l . flushToFile ( flushBuffer )
if err != nil {
2019-02-25 16:44:22 +03:00
log . Error ( "Saving querylog to file failed: %s" , err )
2019-02-10 20:47:43 +03:00
return err
}
return nil
}
// flushToFile saves the specified log entries to the query log file
func ( l * queryLog ) flushToFile ( buffer [ ] * logEntry ) error {
2018-12-05 14:03:41 +03:00
if len ( buffer ) == 0 {
2019-05-08 10:43:47 +03:00
log . Debug ( "querylog: there's nothing to write to a file" )
2018-12-05 14:03:41 +03:00
return nil
}
start := time . Now ( )
var b bytes . Buffer
e := json . NewEncoder ( & b )
for _ , entry := range buffer {
err := e . Encode ( entry )
if err != nil {
2019-02-25 16:44:22 +03:00
log . Error ( "Failed to marshal entry: %s" , err )
2018-12-05 14:03:41 +03:00
return err
}
}
elapsed := time . Since ( start )
2019-02-25 16:44:22 +03:00
log . Debug ( "%d elements serialized via json in %v: %d kB, %v/entry, %v/entry" , len ( buffer ) , elapsed , b . Len ( ) / 1024 , float64 ( b . Len ( ) ) / float64 ( len ( buffer ) ) , elapsed / time . Duration ( len ( buffer ) ) )
2018-12-05 14:03:41 +03:00
2019-10-24 14:19:13 +03:00
var err error
2018-12-05 14:03:41 +03:00
var zb bytes . Buffer
2019-02-10 20:47:43 +03:00
filename := l . logFile
2018-12-05 14:03:41 +03:00
// gzip enabled?
if enableGzip {
filename += ".gz"
zw := gzip . NewWriter ( & zb )
2019-02-10 20:47:43 +03:00
zw . Name = l . logFile
2018-12-05 14:03:41 +03:00
zw . ModTime = time . Now ( )
_ , err = zw . Write ( b . Bytes ( ) )
if err != nil {
2019-02-25 16:44:22 +03:00
log . Error ( "Couldn't compress to gzip: %s" , err )
2018-12-05 14:03:41 +03:00
zw . Close ( )
return err
}
if err = zw . Close ( ) ; err != nil {
2019-02-25 16:44:22 +03:00
log . Error ( "Couldn't close gzip writer: %s" , err )
2018-12-05 14:03:41 +03:00
return err
}
} else {
zb = b
}
2019-09-27 18:58:57 +03:00
l . fileWriteLock . Lock ( )
defer l . fileWriteLock . Unlock ( )
2018-12-05 14:03:41 +03:00
f , err := os . OpenFile ( filename , os . O_WRONLY | os . O_CREATE | os . O_APPEND , 0644 )
if err != nil {
2019-02-25 16:44:22 +03:00
log . Error ( "failed to create file \"%s\": %s" , filename , err )
2018-12-05 14:03:41 +03:00
return err
}
defer f . Close ( )
n , err := f . Write ( zb . Bytes ( ) )
if err != nil {
2019-02-25 16:44:22 +03:00
log . Error ( "Couldn't write to file: %s" , err )
2018-12-05 14:03:41 +03:00
return err
}
2019-02-25 16:44:22 +03:00
log . Debug ( "ok \"%s\": %v bytes written" , filename , n )
2018-12-05 14:03:41 +03:00
return nil
}
2019-09-27 18:58:57 +03:00
func ( l * queryLog ) rotate ( ) error {
2019-02-10 20:47:43 +03:00
from := l . logFile
to := l . logFile + ".1"
2018-12-05 14:03:41 +03:00
if enableGzip {
2019-02-10 20:47:43 +03:00
from = l . logFile + ".gz"
to = l . logFile + ".gz.1"
2018-12-05 14:03:41 +03:00
}
if _ , err := os . Stat ( from ) ; os . IsNotExist ( err ) {
// do nothing, file doesn't exist
return nil
}
err := os . Rename ( from , to )
if err != nil {
2019-02-25 16:44:22 +03:00
log . Error ( "Failed to rename querylog: %s" , err )
2018-12-05 14:03:41 +03:00
return err
}
2019-02-25 16:44:22 +03:00
log . Debug ( "Rotated from %s to %s successfully" , from , to )
2018-12-05 14:03:41 +03:00
return nil
}
2019-09-27 18:58:57 +03:00
func ( l * queryLog ) periodicRotate ( ) {
for range time . Tick ( time . Duration ( l . conf . Interval ) * 24 * time . Hour ) {
err := l . rotate ( )
2018-12-05 14:03:41 +03:00
if err != nil {
2019-02-25 16:44:22 +03:00
log . Error ( "Failed to rotate querylog: %s" , err )
2018-12-05 14:03:41 +03:00
// do nothing, continue rotating
}
}
}
2019-08-26 11:54:38 +03:00
// Reader is the DB reader context
type Reader struct {
2019-10-24 20:00:58 +03:00
ql * queryLog
search * getDataParams
2019-09-16 17:07:18 +03:00
f * os . File
2019-10-22 15:51:48 +03:00
reader * bufio . Reader // reads file line by line
2019-09-16 17:07:18 +03:00
now time . Time
validFrom int64 // UNIX time (ns)
olderThan int64 // UNIX time (ns)
2019-10-24 20:00:58 +03:00
oldest time . Time
2019-08-26 11:54:38 +03:00
files [ ] string
ifile int
2019-09-16 17:07:18 +03:00
limit uint64
count uint64 // counter for returned elements
latest bool // return the latest entries
filePrepared bool
2019-10-24 20:00:58 +03:00
seeking bool // we're seaching for an entry with exact time stamp
2019-09-16 17:07:18 +03:00
fseeker fileSeeker // file seeker object
fpos uint64 // current file offset
nSeekRequests uint32 // number of Seek() requests made (finding a new line doesn't count)
2019-10-24 20:00:58 +03:00
timecnt uint64
2019-08-26 11:54:38 +03:00
}
2019-09-16 17:07:18 +03:00
type fileSeeker struct {
target uint64 // target value
pos uint64 // current offset, may be adjusted by user for increased accuracy
lastpos uint64 // the last offset returned
lo uint64 // low boundary offset
hi uint64 // high boundary offset
}
// OpenReader - return reader object
2019-08-26 11:54:38 +03:00
func ( l * queryLog ) OpenReader ( ) * Reader {
r := Reader { }
r . ql = l
r . now = time . Now ( )
2019-09-27 18:58:57 +03:00
r . validFrom = r . now . Unix ( ) - int64 ( l . conf . Interval * 24 * 60 * 60 )
2019-09-16 17:07:18 +03:00
r . validFrom *= 1000000000
r . files = [ ] string {
r . ql . logFile ,
r . ql . logFile + ".1" ,
}
2019-08-26 11:54:38 +03:00
return & r
}
2019-09-16 17:07:18 +03:00
// Close - close the reader
2019-08-26 11:54:38 +03:00
func ( r * Reader ) Close ( ) {
elapsed := time . Since ( r . now )
var perunit time . Duration
if r . count > 0 {
perunit = elapsed / time . Duration ( r . count )
}
2019-10-24 20:00:58 +03:00
log . Debug ( "querylog: read %d entries in %v, %v/entry, seek-reqs:%d time:%dus (%d%%)" ,
r . count , elapsed , perunit , r . nSeekRequests , r . timecnt / 1000 , r . timecnt * 100 / uint64 ( elapsed . Nanoseconds ( ) ) )
2019-08-26 11:54:38 +03:00
if r . f != nil {
r . f . Close ( )
}
}
2019-09-16 17:07:18 +03:00
// BeginRead - start reading
// olderThan: stop returning entries when an entry with this time is reached
// count: minimum number of entries to return
2019-10-24 20:00:58 +03:00
func ( r * Reader ) BeginRead ( olderThan time . Time , count uint64 , search * getDataParams ) {
2019-09-16 17:07:18 +03:00
r . olderThan = olderThan . UnixNano ( )
r . latest = olderThan . IsZero ( )
2019-10-24 20:00:58 +03:00
r . oldest = time . Time { }
r . search = search
2019-09-16 17:07:18 +03:00
r . limit = count
if r . latest {
r . olderThan = r . now . UnixNano ( )
}
r . filePrepared = false
2019-10-24 20:00:58 +03:00
r . seeking = false
2019-09-16 17:07:18 +03:00
}
// BeginReadPrev - start reading the previous data chunk
2019-10-24 20:00:58 +03:00
func ( r * Reader ) BeginReadPrev ( count uint64 ) {
r . olderThan = r . oldest . UnixNano ( )
r . oldest = time . Time { }
r . latest = false
2019-09-16 17:07:18 +03:00
r . limit = count
2019-10-24 20:00:58 +03:00
r . count = 0
2019-09-16 17:07:18 +03:00
off := r . fpos - maxEntrySize * ( r . limit + 1 )
if int64 ( off ) < maxEntrySize {
off = 0
}
2019-10-02 15:41:14 +03:00
r . fpos = off
2019-09-16 17:07:18 +03:00
log . Debug ( "QueryLog: seek: %x" , off )
_ , err := r . f . Seek ( int64 ( off ) , io . SeekStart )
if err != nil {
log . Error ( "file.Seek: %s: %s" , r . files [ r . ifile ] , err )
return
}
r . nSeekRequests ++
r . seekToNewLine ( )
r . fseeker . pos = r . fpos
r . filePrepared = true
2019-10-24 20:00:58 +03:00
r . seeking = false
2019-09-16 17:07:18 +03:00
}
// Perform binary seek
// Return 0: success; 1: seek reqiured; -1: error
func ( fs * fileSeeker ) seekBinary ( cur uint64 ) int32 {
log . Debug ( "QueryLog: seek: tgt=%x cur=%x, %x: [%x..%x]" , fs . target , cur , fs . pos , fs . lo , fs . hi )
off := uint64 ( 0 )
if fs . pos >= fs . lo && fs . pos < fs . hi {
if cur == fs . target {
return 0
} else if cur < fs . target {
fs . lo = fs . pos + 1
} else {
fs . hi = fs . pos
}
off = fs . lo + ( fs . hi - fs . lo ) / 2
} else {
// we didn't find another entry from the last file offset: now return the boundary beginning
off = fs . lo
}
if off == fs . lastpos {
return - 1
}
fs . lastpos = off
fs . pos = off
return 1
}
// Seek to a new line
func ( r * Reader ) seekToNewLine ( ) bool {
2019-10-22 15:51:48 +03:00
r . reader = bufio . NewReader ( r . f )
b , err := r . reader . ReadBytes ( '\n' )
2019-09-16 17:07:18 +03:00
if err != nil {
2019-10-22 15:51:48 +03:00
r . reader = nil
2019-09-16 17:07:18 +03:00
log . Error ( "QueryLog: file.Read: %s: %s" , r . files [ r . ifile ] , err )
return false
}
2019-10-22 15:51:48 +03:00
off := len ( b )
2019-09-16 17:07:18 +03:00
r . fpos += uint64 ( off )
log . Debug ( "QueryLog: seek: %x (+%d)" , r . fpos , off )
return true
}
// Open a file
func ( r * Reader ) openFile ( ) bool {
var err error
fn := r . files [ r . ifile ]
r . f , err = os . Open ( fn )
if err != nil {
if ! os . IsNotExist ( err ) {
log . Error ( "QueryLog: Failed to open file \"%s\": %s" , fn , err )
}
return false
}
return true
}
// Seek to the needed position
func ( r * Reader ) prepareRead ( ) bool {
fn := r . files [ r . ifile ]
fi , err := r . f . Stat ( )
if err != nil {
log . Error ( "QueryLog: file.Stat: %s: %s" , fn , err )
return false
}
fsize := uint64 ( fi . Size ( ) )
off := uint64 ( 0 )
if r . latest {
// read data from the end of file
off = fsize - maxEntrySize * ( r . limit + 1 )
if int64 ( off ) < maxEntrySize {
off = 0
}
2019-10-02 15:41:14 +03:00
r . fpos = off
2019-09-16 17:07:18 +03:00
log . Debug ( "QueryLog: seek: %x" , off )
_ , err = r . f . Seek ( int64 ( off ) , io . SeekStart )
if err != nil {
log . Error ( "QueryLog: file.Seek: %s: %s" , fn , err )
return false
}
} else {
// start searching in file: we'll read the first chunk of data from the middle of file
2019-10-24 20:00:58 +03:00
r . seeking = true
2019-09-16 17:07:18 +03:00
r . fseeker = fileSeeker { }
r . fseeker . target = uint64 ( r . olderThan )
r . fseeker . hi = fsize
rc := r . fseeker . seekBinary ( 0 )
r . fpos = r . fseeker . pos
if rc == 1 {
_ , err = r . f . Seek ( int64 ( r . fpos ) , io . SeekStart )
if err != nil {
log . Error ( "QueryLog: file.Seek: %s: %s" , fn , err )
return false
}
}
2019-08-26 11:54:38 +03:00
}
2019-09-16 17:07:18 +03:00
r . nSeekRequests ++
if ! r . seekToNewLine ( ) {
return false
}
r . fseeker . pos = r . fpos
return true
2019-08-26 11:54:38 +03:00
}
2019-10-24 20:00:58 +03:00
// Get bool value from "key":bool
func readJSONBool ( s , name string ) ( bool , bool ) {
i := strings . Index ( s , "\"" + name + "\":" )
if i == - 1 {
return false , false
}
start := i + 1 + len ( name ) + 2
b := false
if strings . HasPrefix ( s [ start : ] , "true" ) {
b = true
} else if ! strings . HasPrefix ( s [ start : ] , "false" ) {
return false , false
}
return b , true
}
// Get value from "key":"value"
func readJSONValue ( s , name string ) string {
i := strings . Index ( s , "\"" + name + "\":\"" )
if i == - 1 {
return ""
}
start := i + 1 + len ( name ) + 3
i = strings . IndexByte ( s [ start : ] , '"' )
if i == - 1 {
return ""
}
end := start + i
return s [ start : end ]
}
func ( r * Reader ) applySearch ( str string ) bool {
if r . search . ResponseStatus == responseStatusFiltered {
boolVal , ok := readJSONBool ( str , "IsFiltered" )
if ! ok || ! boolVal {
return false
}
}
if len ( r . search . Domain ) != 0 {
val := readJSONValue ( str , "QH" )
if len ( val ) == 0 {
return false
}
if ( r . search . StrictMatchDomain && val != r . search . Domain ) ||
( ! r . search . StrictMatchDomain && strings . Index ( val , r . search . Domain ) == - 1 ) {
return false
}
}
if len ( r . search . QuestionType ) != 0 {
val := readJSONValue ( str , "QT" )
if len ( val ) == 0 {
return false
}
if val != r . search . QuestionType {
return false
}
}
if len ( r . search . Client ) != 0 {
val := readJSONValue ( str , "IP" )
if len ( val ) == 0 {
log . Debug ( "QueryLog: failed to decode" )
return false
}
if ( r . search . StrictMatchClient && val != r . search . Client ) ||
( ! r . search . StrictMatchClient && strings . Index ( val , r . search . Client ) == - 1 ) {
return false
}
}
return true
}
const (
jsonTErr = iota
jsonTObj
jsonTStr
jsonTNum
jsonTBool
)
// Parse JSON key-value pair
// e.g.: "key":VALUE where VALUE is "string", true|false (boolean), or 123.456 (number)
// Note the limitations:
// . doesn't support whitespace
// . doesn't support "null"
// . doesn't validate boolean or number
// . no proper handling of {} braces
// . no handling of [] brackets
// Return (key, value, type)
func readJSON ( ps * string ) ( string , string , int32 ) {
s := * ps
k := ""
v := ""
t := int32 ( jsonTErr )
q1 := strings . IndexByte ( s , '"' )
if q1 == - 1 {
return k , v , t
}
q2 := strings . IndexByte ( s [ q1 + 1 : ] , '"' )
if q2 == - 1 {
return k , v , t
}
k = s [ q1 + 1 : q1 + 1 + q2 ]
s = s [ q1 + 1 + q2 + 1 : ]
if len ( s ) < 2 || s [ 0 ] != ':' {
return k , v , t
}
if s [ 1 ] == '"' {
q2 = strings . IndexByte ( s [ 2 : ] , '"' )
if q2 == - 1 {
return k , v , t
}
v = s [ 2 : 2 + q2 ]
t = jsonTStr
s = s [ 2 + q2 + 1 : ]
} else if s [ 1 ] == '{' {
t = jsonTObj
s = s [ 1 + 1 : ]
} else {
sep := strings . IndexAny ( s [ 1 : ] , ",}" )
if sep == - 1 {
return k , v , t
}
v = s [ 1 : 1 + sep ]
if s [ 1 ] == 't' || s [ 1 ] == 'f' {
t = jsonTBool
} else if s [ 1 ] == '.' || ( s [ 1 ] >= '0' && s [ 1 ] <= '9' ) {
t = jsonTNum
}
s = s [ 1 + sep + 1 : ]
}
* ps = s
return k , v , t
}
// nolint (gocyclo)
func decode ( ent * logEntry , str string ) {
var b bool
var i int
var err error
for {
k , v , t := readJSON ( & str )
if t == jsonTErr {
break
}
switch k {
case "IP" :
ent . IP = v
case "T" :
ent . Time , err = time . Parse ( time . RFC3339 , v )
case "QH" :
ent . QHost = v
case "QT" :
ent . QType = v
case "QC" :
ent . QClass = v
case "Answer" :
ent . Answer , err = base64 . StdEncoding . DecodeString ( v )
case "IsFiltered" :
b , err = strconv . ParseBool ( v )
ent . Result . IsFiltered = b
case "Rule" :
ent . Result . Rule = v
case "FilterID" :
i , err = strconv . Atoi ( v )
ent . Result . FilterID = int64 ( i )
case "Reason" :
i , err = strconv . Atoi ( v )
ent . Result . Reason = dnsfilter . Reason ( i )
case "Upstream" :
ent . Upstream = v
case "Elapsed" :
i , err = strconv . Atoi ( v )
ent . Elapsed = time . Duration ( i )
// pre-v0.99.3 compatibility:
case "Question" :
var qstr [ ] byte
qstr , err = base64 . StdEncoding . DecodeString ( v )
if err != nil {
break
}
q := new ( dns . Msg )
err = q . Unpack ( qstr )
if err != nil {
break
}
ent . QHost = q . Question [ 0 ] . Name
if len ( ent . QHost ) == 0 {
break
}
ent . QHost = ent . QHost [ : len ( ent . QHost ) - 1 ]
ent . QType = dns . TypeToString [ q . Question [ 0 ] . Qtype ]
ent . QClass = dns . ClassToString [ q . Question [ 0 ] . Qclass ]
case "Time" :
ent . Time , err = time . Parse ( time . RFC3339 , v )
}
if err != nil {
log . Debug ( "decode err: %s" , err )
break
}
}
}
2019-09-16 17:07:18 +03:00
// Next - return the next entry or nil if reading is finished
2019-08-26 11:54:38 +03:00
func ( r * Reader ) Next ( ) * logEntry { // nolint
for {
// open file if needed
if r . f == nil {
if r . ifile == len ( r . files ) {
return nil
}
2019-09-16 17:07:18 +03:00
if ! r . openFile ( ) {
2019-08-26 11:54:38 +03:00
r . ifile ++
continue
}
}
2019-09-16 17:07:18 +03:00
if ! r . filePrepared {
if ! r . prepareRead ( ) {
return nil
}
r . filePrepared = true
}
2019-10-22 15:51:48 +03:00
b , err := r . reader . ReadBytes ( '\n' )
if err != nil {
2019-09-16 17:07:18 +03:00
return nil
2019-08-26 11:54:38 +03:00
}
2019-10-24 20:00:58 +03:00
str := string ( b )
2019-08-26 11:54:38 +03:00
2019-10-24 20:00:58 +03:00
val := readJSONValue ( str , "T" )
if len ( val ) == 0 {
val = readJSONValue ( str , "Time" )
}
if len ( val ) == 0 {
log . Debug ( "QueryLog: failed to decode" )
continue
}
tm , err := time . Parse ( time . RFC3339 , val )
2019-08-26 11:54:38 +03:00
if err != nil {
2019-10-24 20:00:58 +03:00
log . Debug ( "QueryLog: failed to decode" )
2019-10-22 15:51:48 +03:00
continue
2019-08-26 11:54:38 +03:00
}
2019-10-24 20:00:58 +03:00
t := tm . UnixNano ( )
2019-08-26 11:54:38 +03:00
2019-10-24 20:00:58 +03:00
if r . seeking {
2019-08-26 11:54:38 +03:00
2019-10-22 15:51:48 +03:00
r . reader = nil
2019-09-16 17:07:18 +03:00
rr := r . fseeker . seekBinary ( uint64 ( t ) )
r . fpos = r . fseeker . pos
if rr < 0 {
log . Error ( "QueryLog: File seek error: can't find the target entry: %s" , r . files [ r . ifile ] )
return nil
} else if rr == 0 {
// We found the target entry.
// We'll start reading the previous chunk of data.
2019-10-24 20:00:58 +03:00
r . seeking = false
2019-09-16 17:07:18 +03:00
off := r . fpos - ( maxEntrySize * ( r . limit + 1 ) )
if int64 ( off ) < maxEntrySize {
off = 0
}
r . fpos = off
}
2019-08-26 11:54:38 +03:00
2019-09-16 17:07:18 +03:00
_ , err = r . f . Seek ( int64 ( r . fpos ) , io . SeekStart )
if err != nil {
log . Error ( "QueryLog: file.Seek: %s: %s" , r . files [ r . ifile ] , err )
return nil
}
r . nSeekRequests ++
2019-08-26 11:54:38 +03:00
2019-09-16 17:07:18 +03:00
if ! r . seekToNewLine ( ) {
return nil
}
r . fseeker . pos = r . fpos
2019-08-26 11:54:38 +03:00
continue
}
2019-10-24 20:00:58 +03:00
if r . oldest . IsZero ( ) {
r . oldest = tm
}
2019-09-16 17:07:18 +03:00
if t < r . validFrom {
2019-08-26 11:54:38 +03:00
continue
}
2019-09-16 17:07:18 +03:00
if t >= r . olderThan {
return nil
2019-08-26 11:54:38 +03:00
}
2019-09-16 17:07:18 +03:00
r . count ++
2019-10-24 20:00:58 +03:00
if ! r . applySearch ( str ) {
continue
}
st := time . Now ( )
var ent logEntry
decode ( & ent , str )
r . timecnt += uint64 ( time . Now ( ) . Sub ( st ) . Nanoseconds ( ) )
return & ent
2019-08-26 11:54:38 +03:00
}
2019-09-16 17:07:18 +03:00
}
2019-08-26 11:54:38 +03:00
2019-10-24 20:00:58 +03:00
// Total returns the total number of processed items
func ( r * Reader ) Total ( ) uint64 {
return r . count
}
// Oldest returns the time of the oldest processed entry
func ( r * Reader ) Oldest ( ) time . Time {
return r . oldest
2019-08-26 11:54:38 +03:00
}