mirror of
https://codeberg.org/superseriousbusiness/gotosocial.git
synced 2024-12-18 07:01:49 +03:00
3d3e99ae52
* update to use go-storage/ instead of go-store/v2/storage/
* pull in latest version from codeberg
* remove test output 😇
* add code comments
* set the exclusive bit when creating new files in disk config
* bump to actual release version
* bump to v0.1.1 (tis a simple no-logic change)
* update readme
* only use a temporary read seeker when decoding video if required (should only be S3 now)
* use fastcopy library to use memory pooled buffers when calling TempFileSeeker()
* update to use seek call in serveFileRange()
887 lines
20 KiB
Text
887 lines
20 KiB
Text
package storage
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/sha256"
|
|
"fmt"
|
|
"io"
|
|
"io/fs"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"syscall"
|
|
|
|
"codeberg.org/gruf/go-byteutil"
|
|
"codeberg.org/gruf/go-errors/v2"
|
|
"codeberg.org/gruf/go-fastcopy"
|
|
"codeberg.org/gruf/go-hashenc"
|
|
"codeberg.org/gruf/go-iotools"
|
|
"codeberg.org/gruf/go-pools"
|
|
"codeberg.org/gruf/go-store/v2/util"
|
|
)
|
|
|
|
var (
|
|
nodePathPrefix = "node/"
|
|
blockPathPrefix = "block/"
|
|
)
|
|
|
|
// DefaultBlockConfig is the default BlockStorage configuration.
|
|
var DefaultBlockConfig = &BlockConfig{
|
|
BlockSize: 1024 * 16,
|
|
WriteBufSize: 4096,
|
|
Overwrite: false,
|
|
Compression: NoCompression(),
|
|
}
|
|
|
|
// BlockConfig defines options to be used when opening a BlockStorage.
|
|
type BlockConfig struct {
|
|
// BlockSize is the chunking size to use when splitting and storing blocks of data.
|
|
BlockSize int
|
|
|
|
// ReadBufSize is the buffer size to use when reading node files.
|
|
ReadBufSize int
|
|
|
|
// WriteBufSize is the buffer size to use when writing file streams.
|
|
WriteBufSize int
|
|
|
|
// Overwrite allows overwriting values of stored keys in the storage.
|
|
Overwrite bool
|
|
|
|
// Compression is the Compressor to use when reading / writing files,
|
|
// default is no compression.
|
|
Compression Compressor
|
|
}
|
|
|
|
// getBlockConfig returns a valid BlockConfig for supplied ptr.
|
|
func getBlockConfig(cfg *BlockConfig) BlockConfig {
|
|
// If nil, use default
|
|
if cfg == nil {
|
|
cfg = DefaultBlockConfig
|
|
}
|
|
|
|
// Assume nil compress == none
|
|
if cfg.Compression == nil {
|
|
cfg.Compression = NoCompression()
|
|
}
|
|
|
|
// Assume 0 chunk size == use default
|
|
if cfg.BlockSize <= 0 {
|
|
cfg.BlockSize = DefaultBlockConfig.BlockSize
|
|
}
|
|
|
|
// Assume 0 buf size == use default
|
|
if cfg.WriteBufSize <= 0 {
|
|
cfg.WriteBufSize = DefaultDiskConfig.WriteBufSize
|
|
}
|
|
|
|
// Return owned config copy
|
|
return BlockConfig{
|
|
BlockSize: cfg.BlockSize,
|
|
WriteBufSize: cfg.WriteBufSize,
|
|
Overwrite: cfg.Overwrite,
|
|
Compression: cfg.Compression,
|
|
}
|
|
}
|
|
|
|
// BlockStorage is a Storage implementation that stores input data as chunks on
|
|
// a filesystem. Each value is chunked into blocks of configured size and these
|
|
// blocks are stored with name equal to their base64-encoded SHA256 hash-sum. A
|
|
// "node" file is finally created containing an array of hashes contained within
|
|
// this value.
|
|
type BlockStorage struct {
|
|
path string // path is the root path of this store
|
|
blockPath string // blockPath is the joined root path + block path prefix
|
|
nodePath string // nodePath is the joined root path + node path prefix
|
|
config BlockConfig // cfg is the supplied configuration for this store
|
|
hashPool sync.Pool // hashPool is this store's hashEncoder pool
|
|
bufpool pools.BufferPool // bufpool is this store's bytes.Buffer pool
|
|
cppool fastcopy.CopyPool // cppool is the prepared io copier with buffer pool
|
|
lock *Lock // lock is the opened lockfile for this storage instance
|
|
|
|
// NOTE:
|
|
// BlockStorage does not need to lock each of the underlying block files
|
|
// as the filename itself directly relates to the contents. If there happens
|
|
// to be an overwrite, it will just be of the same data since the filename is
|
|
// the hash of the data.
|
|
}
|
|
|
|
// OpenBlock opens a BlockStorage instance for given folder path and configuration.
|
|
func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) {
|
|
// Acquire path builder
|
|
pb := util.GetPathBuilder()
|
|
defer util.PutPathBuilder(pb)
|
|
|
|
// Clean provided path, ensure ends in '/' (should
|
|
// be dir, this helps with file path trimming later)
|
|
path = pb.Clean(path) + "/"
|
|
|
|
// Get checked config
|
|
config := getBlockConfig(cfg)
|
|
|
|
// Attempt to open path
|
|
file, err := os.OpenFile(path, defaultFileROFlags, defaultDirPerms)
|
|
if err != nil {
|
|
// If not a not-exist error, return
|
|
if !os.IsNotExist(err) {
|
|
return nil, err
|
|
}
|
|
|
|
// Attempt to make store path dirs
|
|
err = os.MkdirAll(path, defaultDirPerms)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Reopen dir now it's been created
|
|
file, err = os.OpenFile(path, defaultFileROFlags, defaultDirPerms)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
defer file.Close()
|
|
|
|
// Double check this is a dir (NOT a file!)
|
|
stat, err := file.Stat()
|
|
if err != nil {
|
|
return nil, err
|
|
} else if !stat.IsDir() {
|
|
return nil, new_error("path is file")
|
|
}
|
|
|
|
// Open and acquire storage lock for path
|
|
lock, err := OpenLock(pb.Join(path, LockFile))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Figure out the largest size for bufpool slices
|
|
bufSz := encodedHashLen
|
|
if bufSz < config.BlockSize {
|
|
bufSz = config.BlockSize
|
|
}
|
|
if bufSz < config.WriteBufSize {
|
|
bufSz = config.WriteBufSize
|
|
}
|
|
|
|
// Prepare BlockStorage
|
|
st := &BlockStorage{
|
|
path: path,
|
|
blockPath: pb.Join(path, blockPathPrefix),
|
|
nodePath: pb.Join(path, nodePathPrefix),
|
|
config: config,
|
|
hashPool: sync.Pool{
|
|
New: func() interface{} {
|
|
return newHashEncoder()
|
|
},
|
|
},
|
|
bufpool: pools.NewBufferPool(bufSz),
|
|
lock: lock,
|
|
}
|
|
|
|
// Set copypool buffer size
|
|
st.cppool.Buffer(config.ReadBufSize)
|
|
|
|
return st, nil
|
|
}
|
|
|
|
// Clean implements storage.Clean().
|
|
func (st *BlockStorage) Clean(ctx context.Context) error {
|
|
// Check if open
|
|
if st.lock.Closed() {
|
|
return ErrClosed
|
|
}
|
|
|
|
// Check context still valid
|
|
if err := ctx.Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Acquire path builder
|
|
pb := util.GetPathBuilder()
|
|
defer util.PutPathBuilder(pb)
|
|
|
|
nodes := map[string]*node{}
|
|
|
|
// Walk nodes dir for entries
|
|
err := walkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) error {
|
|
// Only deal with regular files
|
|
if !fsentry.Type().IsRegular() {
|
|
return nil
|
|
}
|
|
|
|
// Get joined node path name
|
|
npath = pb.Join(npath, fsentry.Name())
|
|
|
|
// Attempt to open RO file
|
|
file, err := open(npath, defaultFileROFlags)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer file.Close()
|
|
|
|
// Alloc new Node + acquire hash buffer for writes
|
|
hbuf := st.bufpool.Get()
|
|
defer st.bufpool.Put(hbuf)
|
|
hbuf.Guarantee(encodedHashLen)
|
|
node := node{}
|
|
|
|
// Write file contents to node
|
|
_, err = io.CopyBuffer(
|
|
&nodeWriter{
|
|
node: &node,
|
|
buf: hbuf,
|
|
},
|
|
file,
|
|
nil,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Append to nodes slice
|
|
nodes[fsentry.Name()] = &node
|
|
return nil
|
|
})
|
|
|
|
// Handle errors (though nodePath may not have been created yet)
|
|
if err != nil && !os.IsNotExist(err) {
|
|
return err
|
|
}
|
|
|
|
// Walk blocks dir for entries
|
|
err = walkDir(pb, st.blockPath, func(bpath string, fsentry fs.DirEntry) error {
|
|
// Only deal with regular files
|
|
if !fsentry.Type().IsRegular() {
|
|
return nil
|
|
}
|
|
|
|
inUse := false
|
|
for key, node := range nodes {
|
|
if node.removeHash(fsentry.Name()) {
|
|
if len(node.hashes) < 1 {
|
|
// This node contained hash, and after removal is now empty.
|
|
// Remove this node from our tracked nodes slice
|
|
delete(nodes, key)
|
|
}
|
|
inUse = true
|
|
}
|
|
}
|
|
|
|
// Block hash is used by node
|
|
if inUse {
|
|
return nil
|
|
}
|
|
|
|
// Get joined block path name
|
|
bpath = pb.Join(bpath, fsentry.Name())
|
|
|
|
// Remove this unused block path
|
|
return os.Remove(bpath)
|
|
})
|
|
|
|
// Handle errors (though blockPath may not have been created yet)
|
|
if err != nil && !os.IsNotExist(err) {
|
|
return err
|
|
}
|
|
|
|
// If there are nodes left at this point, they are corrupt
|
|
// (i.e. they're referencing block hashes that don't exist)
|
|
if len(nodes) > 0 {
|
|
nodeKeys := []string{}
|
|
for key := range nodes {
|
|
nodeKeys = append(nodeKeys, key)
|
|
}
|
|
return fmt.Errorf("store/storage: corrupted nodes: %v", nodeKeys)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ReadBytes implements Storage.ReadBytes().
|
|
func (st *BlockStorage) ReadBytes(ctx context.Context, key string) ([]byte, error) {
|
|
// Get stream reader for key
|
|
rc, err := st.ReadStream(ctx, key)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rc.Close()
|
|
|
|
// Read all bytes and return
|
|
return io.ReadAll(rc)
|
|
}
|
|
|
|
// ReadStream implements Storage.ReadStream().
|
|
func (st *BlockStorage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) {
|
|
// Get node file path for key
|
|
npath, err := st.nodePathForKey(key)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Check if open
|
|
if st.lock.Closed() {
|
|
return nil, ErrClosed
|
|
}
|
|
|
|
// Check context still valid
|
|
if err := ctx.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Attempt to open RO file
|
|
file, err := open(npath, defaultFileROFlags)
|
|
if err != nil {
|
|
return nil, errSwapNotFound(err)
|
|
}
|
|
defer file.Close()
|
|
|
|
// Acquire hash buffer for writes
|
|
hbuf := st.bufpool.Get()
|
|
defer st.bufpool.Put(hbuf)
|
|
|
|
var node node
|
|
|
|
// Write file contents to node
|
|
_, err = st.cppool.Copy(
|
|
&nodeWriter{
|
|
node: &node,
|
|
buf: hbuf,
|
|
},
|
|
file,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Prepare block reader and return
|
|
return iotools.NopReadCloser(&blockReader{
|
|
storage: st,
|
|
node: &node,
|
|
}), nil
|
|
}
|
|
|
|
// readBlock reads the block with hash (key) from the filesystem.
|
|
func (st *BlockStorage) readBlock(key string) ([]byte, error) {
|
|
// Get block file path for key
|
|
bpath := st.blockPathForKey(key)
|
|
|
|
// Attempt to open RO file
|
|
file, err := open(bpath, defaultFileROFlags)
|
|
if err != nil {
|
|
return nil, wrap(new_error("corrupted node"), err)
|
|
}
|
|
defer file.Close()
|
|
|
|
// Wrap the file in a compressor
|
|
cFile, err := st.config.Compression.Reader(file)
|
|
if err != nil {
|
|
return nil, wrap(new_error("corrupted node"), err)
|
|
}
|
|
defer cFile.Close()
|
|
|
|
// Read the entire file
|
|
return io.ReadAll(cFile)
|
|
}
|
|
|
|
// WriteBytes implements Storage.WriteBytes().
|
|
func (st *BlockStorage) WriteBytes(ctx context.Context, key string, value []byte) (int, error) {
|
|
n, err := st.WriteStream(ctx, key, bytes.NewReader(value))
|
|
return int(n), err
|
|
}
|
|
|
|
// WriteStream implements Storage.WriteStream().
|
|
func (st *BlockStorage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) {
|
|
// Get node file path for key
|
|
npath, err := st.nodePathForKey(key)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Check if open
|
|
if st.lock.Closed() {
|
|
return 0, ErrClosed
|
|
}
|
|
|
|
// Check context still valid
|
|
if err := ctx.Err(); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Check if this exists
|
|
ok, err := stat(key)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Check if we allow overwrites
|
|
if ok && !st.config.Overwrite {
|
|
return 0, ErrAlreadyExists
|
|
}
|
|
|
|
// Ensure nodes dir (and any leading up to) exists
|
|
err = os.MkdirAll(st.nodePath, defaultDirPerms)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Ensure blocks dir (and any leading up to) exists
|
|
err = os.MkdirAll(st.blockPath, defaultDirPerms)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
var node node
|
|
var total atomic.Int64
|
|
|
|
// Acquire HashEncoder
|
|
hc := st.hashPool.Get().(*hashEncoder)
|
|
defer st.hashPool.Put(hc)
|
|
|
|
// Create new waitgroup and OnceError for
|
|
// goroutine error tracking and propagating
|
|
wg := sync.WaitGroup{}
|
|
onceErr := errors.OnceError{}
|
|
|
|
loop:
|
|
for !onceErr.IsSet() {
|
|
// Fetch new buffer for this loop
|
|
buf := st.bufpool.Get()
|
|
buf.Grow(st.config.BlockSize)
|
|
|
|
// Read next chunk
|
|
n, err := io.ReadFull(r, buf.B)
|
|
switch err {
|
|
case nil, io.ErrUnexpectedEOF:
|
|
// do nothing
|
|
case io.EOF:
|
|
st.bufpool.Put(buf)
|
|
break loop
|
|
default:
|
|
st.bufpool.Put(buf)
|
|
return 0, err
|
|
}
|
|
|
|
// Hash the encoded data
|
|
sum := hc.EncodeSum(buf.B)
|
|
|
|
// Append to the node's hashes
|
|
node.hashes = append(node.hashes, sum)
|
|
|
|
// If already on disk, skip
|
|
has, err := st.statBlock(sum)
|
|
if err != nil {
|
|
st.bufpool.Put(buf)
|
|
return 0, err
|
|
} else if has {
|
|
st.bufpool.Put(buf)
|
|
continue loop
|
|
}
|
|
|
|
// Check if reached EOF
|
|
atEOF := (n < buf.Len())
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
// Perform writes in goroutine
|
|
|
|
defer func() {
|
|
// Defer release +
|
|
// signal we're done
|
|
st.bufpool.Put(buf)
|
|
wg.Done()
|
|
}()
|
|
|
|
// Write block to store at hash
|
|
n, err := st.writeBlock(sum, buf.B[:n])
|
|
if err != nil {
|
|
onceErr.Store(err)
|
|
return
|
|
}
|
|
|
|
// Increment total.
|
|
total.Add(int64(n))
|
|
}()
|
|
|
|
// Break at end
|
|
if atEOF {
|
|
break loop
|
|
}
|
|
}
|
|
|
|
// Wait, check errors
|
|
wg.Wait()
|
|
if onceErr.IsSet() {
|
|
return 0, onceErr.Load()
|
|
}
|
|
|
|
// If no hashes created, return
|
|
if len(node.hashes) < 1 {
|
|
return 0, new_error("no hashes written")
|
|
}
|
|
|
|
// Prepare to swap error if need-be
|
|
errSwap := errSwapNoop
|
|
|
|
// Build file RW flags
|
|
// NOTE: we performed an initial check for
|
|
// this before writing blocks, but if
|
|
// the utilizer of this storage didn't
|
|
// correctly mutex protect this key then
|
|
// someone may have beaten us to the
|
|
// punch at writing the node file.
|
|
flags := defaultFileRWFlags
|
|
if !st.config.Overwrite {
|
|
flags |= syscall.O_EXCL
|
|
|
|
// Catch + replace err exist
|
|
errSwap = errSwapExist
|
|
}
|
|
|
|
// Attempt to open RW file
|
|
file, err := open(npath, flags)
|
|
if err != nil {
|
|
return 0, errSwap(err)
|
|
}
|
|
defer file.Close()
|
|
|
|
// Acquire write buffer
|
|
buf := st.bufpool.Get()
|
|
defer st.bufpool.Put(buf)
|
|
buf.Grow(st.config.WriteBufSize)
|
|
|
|
// Finally, write data to file
|
|
_, err = io.CopyBuffer(file, &nodeReader{node: node}, buf.B)
|
|
return total.Load(), err
|
|
}
|
|
|
|
// writeBlock writes the block with hash and supplied value to the filesystem.
|
|
func (st *BlockStorage) writeBlock(hash string, value []byte) (int, error) {
|
|
// Get block file path for key
|
|
bpath := st.blockPathForKey(hash)
|
|
|
|
// Attempt to open RW file
|
|
file, err := open(bpath, defaultFileRWFlags)
|
|
if err != nil {
|
|
if err == syscall.EEXIST {
|
|
err = nil /* race issue describe in struct NOTE */
|
|
}
|
|
return 0, err
|
|
}
|
|
defer file.Close()
|
|
|
|
// Wrap the file in a compressor
|
|
cFile, err := st.config.Compression.Writer(file)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer cFile.Close()
|
|
|
|
// Write value to file
|
|
return cFile.Write(value)
|
|
}
|
|
|
|
// statBlock checks for existence of supplied block hash.
|
|
func (st *BlockStorage) statBlock(hash string) (bool, error) {
|
|
return stat(st.blockPathForKey(hash))
|
|
}
|
|
|
|
// Stat implements Storage.Stat()
|
|
func (st *BlockStorage) Stat(ctx context.Context, key string) (bool, error) {
|
|
// Get node file path for key
|
|
kpath, err := st.nodePathForKey(key)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
// Check if open
|
|
if st.lock.Closed() {
|
|
return false, ErrClosed
|
|
}
|
|
|
|
// Check context still valid
|
|
if err := ctx.Err(); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
// Check for file on disk
|
|
return stat(kpath)
|
|
}
|
|
|
|
// Remove implements Storage.Remove().
|
|
func (st *BlockStorage) Remove(ctx context.Context, key string) error {
|
|
// Get node file path for key
|
|
kpath, err := st.nodePathForKey(key)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Check if open
|
|
if st.lock.Closed() {
|
|
return ErrClosed
|
|
}
|
|
|
|
// Check context still valid
|
|
if err := ctx.Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Remove at path (we know this is file)
|
|
if err := unlink(kpath); err != nil {
|
|
return errSwapNotFound(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close implements Storage.Close().
|
|
func (st *BlockStorage) Close() error {
|
|
return st.lock.Close()
|
|
}
|
|
|
|
// WalkKeys implements Storage.WalkKeys().
|
|
func (st *BlockStorage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error {
|
|
// Check if open
|
|
if st.lock.Closed() {
|
|
return ErrClosed
|
|
}
|
|
|
|
// Check context still valid
|
|
if err := ctx.Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Acquire path builder
|
|
pb := util.GetPathBuilder()
|
|
defer util.PutPathBuilder(pb)
|
|
|
|
// Walk dir for entries
|
|
return walkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) error {
|
|
if !fsentry.Type().IsRegular() {
|
|
// Only deal with regular files
|
|
return nil
|
|
}
|
|
|
|
// Perform provided walk function
|
|
return opts.WalkFn(ctx, Entry{
|
|
Key: fsentry.Name(),
|
|
Size: -1,
|
|
})
|
|
})
|
|
}
|
|
|
|
// nodePathForKey calculates the node file path for supplied key.
|
|
func (st *BlockStorage) nodePathForKey(key string) (string, error) {
|
|
// Path separators are illegal, as directory paths
|
|
if strings.Contains(key, "/") || key == "." || key == ".." {
|
|
return "", ErrInvalidKey
|
|
}
|
|
|
|
// Acquire path builder
|
|
pb := util.GetPathBuilder()
|
|
defer util.PutPathBuilder(pb)
|
|
|
|
// Return joined + cleaned node-path
|
|
return pb.Join(st.nodePath, key), nil
|
|
}
|
|
|
|
// blockPathForKey calculates the block file path for supplied hash.
|
|
func (st *BlockStorage) blockPathForKey(hash string) string {
|
|
pb := util.GetPathBuilder()
|
|
defer util.PutPathBuilder(pb)
|
|
return pb.Join(st.blockPath, hash)
|
|
}
|
|
|
|
// hashSeparator is the separating byte between block hashes.
|
|
const hashSeparator = byte('\n')
|
|
|
|
// node represents the contents of a node file in storage.
|
|
type node struct {
|
|
hashes []string
|
|
}
|
|
|
|
// removeHash attempts to remove supplied block hash from the node's hash array.
|
|
func (n *node) removeHash(hash string) bool {
|
|
for i := 0; i < len(n.hashes); {
|
|
if n.hashes[i] == hash {
|
|
// Drop this hash from slice
|
|
n.hashes = append(n.hashes[:i], n.hashes[i+1:]...)
|
|
return true
|
|
}
|
|
|
|
// Continue iter
|
|
i++
|
|
}
|
|
return false
|
|
}
|
|
|
|
// nodeReader is an io.Reader implementation for the node file representation,
|
|
// which is useful when calculated node file is being written to the store.
|
|
type nodeReader struct {
|
|
node node
|
|
idx int
|
|
last int
|
|
}
|
|
|
|
func (r *nodeReader) Read(b []byte) (int, error) {
|
|
n := 0
|
|
|
|
// '-1' means we missed writing
|
|
// hash separator on last iteration
|
|
if r.last == -1 {
|
|
b[n] = hashSeparator
|
|
n++
|
|
r.last = 0
|
|
}
|
|
|
|
for r.idx < len(r.node.hashes) {
|
|
hash := r.node.hashes[r.idx]
|
|
|
|
// Copy into buffer + update read count
|
|
m := copy(b[n:], hash[r.last:])
|
|
n += m
|
|
|
|
// If incomplete copy, return here
|
|
if m < len(hash)-r.last {
|
|
r.last = m
|
|
return n, nil
|
|
}
|
|
|
|
// Check we can write last separator
|
|
if n == len(b) {
|
|
r.last = -1
|
|
return n, nil
|
|
}
|
|
|
|
// Write separator, iter, reset
|
|
b[n] = hashSeparator
|
|
n++
|
|
r.idx++
|
|
r.last = 0
|
|
}
|
|
|
|
// We reached end of hashes
|
|
return n, io.EOF
|
|
}
|
|
|
|
// nodeWriter is an io.Writer implementation for the node file representation,
|
|
// which is useful when calculated node file is being read from the store.
|
|
type nodeWriter struct {
|
|
node *node
|
|
buf *byteutil.Buffer
|
|
}
|
|
|
|
func (w *nodeWriter) Write(b []byte) (int, error) {
|
|
n := 0
|
|
|
|
for {
|
|
// Find next hash separator position
|
|
idx := bytes.IndexByte(b[n:], hashSeparator)
|
|
if idx == -1 {
|
|
// Check we shouldn't be expecting it
|
|
if w.buf.Len() > encodedHashLen {
|
|
return n, new_error("invalid node")
|
|
}
|
|
|
|
// Write all contents to buffer
|
|
w.buf.Write(b[n:])
|
|
return len(b), nil
|
|
}
|
|
|
|
// Found hash separator, write
|
|
// current buf contents to Node hashes
|
|
w.buf.Write(b[n : n+idx])
|
|
n += idx + 1
|
|
if w.buf.Len() != encodedHashLen {
|
|
return n, new_error("invalid node")
|
|
}
|
|
|
|
// Append to hashes & reset
|
|
w.node.hashes = append(w.node.hashes, w.buf.String())
|
|
w.buf.Reset()
|
|
}
|
|
}
|
|
|
|
// blockReader is an io.Reader implementation for the combined, linked block
|
|
// data contained with a node file. Basically, this allows reading value data
|
|
// from the store for a given node file.
|
|
type blockReader struct {
|
|
storage *BlockStorage
|
|
node *node
|
|
buf []byte
|
|
prev int
|
|
}
|
|
|
|
func (r *blockReader) Read(b []byte) (int, error) {
|
|
n := 0
|
|
|
|
// Data left in buf, copy as much as we
|
|
// can into supplied read buffer
|
|
if r.prev < len(r.buf)-1 {
|
|
n += copy(b, r.buf[r.prev:])
|
|
r.prev += n
|
|
if n >= len(b) {
|
|
return n, nil
|
|
}
|
|
}
|
|
|
|
for {
|
|
// Check we have any hashes left
|
|
if len(r.node.hashes) < 1 {
|
|
return n, io.EOF
|
|
}
|
|
|
|
// Get next key from slice
|
|
key := r.node.hashes[0]
|
|
r.node.hashes = r.node.hashes[1:]
|
|
|
|
// Attempt to fetch next batch of data
|
|
var err error
|
|
r.buf, err = r.storage.readBlock(key)
|
|
if err != nil {
|
|
return n, err
|
|
}
|
|
r.prev = 0
|
|
|
|
// Copy as much as can from new buffer
|
|
m := copy(b[n:], r.buf)
|
|
r.prev += m
|
|
n += m
|
|
|
|
// If we hit end of supplied buf, return
|
|
if n >= len(b) {
|
|
return n, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
var (
|
|
// base64Encoding is our base64 encoding object.
|
|
base64Encoding = hashenc.Base64()
|
|
|
|
// encodedHashLen is the once-calculated encoded hash-sum length
|
|
encodedHashLen = base64Encoding.EncodedLen(
|
|
sha256.New().Size(),
|
|
)
|
|
)
|
|
|
|
// hashEncoder is a HashEncoder with built-in encode buffer.
|
|
type hashEncoder struct {
|
|
henc hashenc.HashEncoder
|
|
ebuf []byte
|
|
}
|
|
|
|
// newHashEncoder returns a new hashEncoder instance.
|
|
func newHashEncoder() *hashEncoder {
|
|
return &hashEncoder{
|
|
henc: hashenc.New(sha256.New(), base64Encoding),
|
|
ebuf: make([]byte, encodedHashLen),
|
|
}
|
|
}
|
|
|
|
// EncodeSum encodes the src data and returns resulting bytes, only valid until next call to EncodeSum().
|
|
func (henc *hashEncoder) EncodeSum(src []byte) string {
|
|
henc.henc.EncodeSum(henc.ebuf, src)
|
|
return string(henc.ebuf)
|
|
}
|