mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2024-11-22 21:35:35 +03:00
c88547ce71
Continues on from #19202. Following the addition of pprof labels we can now more easily understand the relationship between a goroutine and the requests that spawn them. This PR takes advantage of the labels and adds a few others, then provides a mechanism for the monitoring page to query the pprof goroutine profile. The binary profile that results from this profile is immediately piped in to the google library for parsing this and then stack traces are formed for the goroutines. If the goroutine is within a context or has been created from a goroutine within a process context it will acquire the process description labels for that process. The goroutines are mapped with there associate pids and any that do not have an associated pid are placed in a group at the bottom as unbound. In this way we should be able to more easily examine goroutines that have been stuck. A manager command `gitea manager processes` is also provided that can export the processes (with or without stacktraces) to the command line. Signed-off-by: Andrew Thornton <art27@cantab.net>
215 lines
6.1 KiB
Go
215 lines
6.1 KiB
Go
// Copyright 2020 The Gitea Authors. All rights reserved.
|
|
// Use of this source code is governed by a MIT-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package nosql
|
|
|
|
import (
|
|
"fmt"
|
|
"path"
|
|
"runtime/pprof"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"code.gitea.io/gitea/modules/log"
|
|
|
|
"github.com/syndtr/goleveldb/leveldb"
|
|
"github.com/syndtr/goleveldb/leveldb/errors"
|
|
"github.com/syndtr/goleveldb/leveldb/opt"
|
|
)
|
|
|
|
// CloseLevelDB closes a levelDB
|
|
func (m *Manager) CloseLevelDB(connection string) error {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
db, ok := m.LevelDBConnections[connection]
|
|
if !ok {
|
|
// Try the full URI
|
|
uri := ToLevelDBURI(connection)
|
|
db, ok = m.LevelDBConnections[uri.String()]
|
|
|
|
if !ok {
|
|
// Try the datadir directly
|
|
dataDir := path.Join(uri.Host, uri.Path)
|
|
|
|
db, ok = m.LevelDBConnections[dataDir]
|
|
}
|
|
}
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
db.count--
|
|
if db.count > 0 {
|
|
return nil
|
|
}
|
|
|
|
for _, name := range db.name {
|
|
delete(m.LevelDBConnections, name)
|
|
}
|
|
return db.db.Close()
|
|
}
|
|
|
|
// GetLevelDB gets a levelDB for a particular connection
|
|
func (m *Manager) GetLevelDB(connection string) (db *leveldb.DB, err error) {
|
|
// Because we want associate any goroutines created by this call to the main nosqldb context we need to
|
|
// wrap this in a goroutine labelled with the nosqldb context
|
|
done := make(chan struct{})
|
|
var recovered interface{}
|
|
go func() {
|
|
defer func() {
|
|
recovered = recover()
|
|
if recovered != nil {
|
|
log.Critical("PANIC during GetLevelDB: %v\nStacktrace: %s", recovered, log.Stack(2))
|
|
}
|
|
close(done)
|
|
}()
|
|
pprof.SetGoroutineLabels(m.ctx)
|
|
|
|
db, err = m.getLevelDB(connection)
|
|
}()
|
|
<-done
|
|
if recovered != nil {
|
|
panic(recovered)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (m *Manager) getLevelDB(connection string) (*leveldb.DB, error) {
|
|
// Convert the provided connection description to the common format
|
|
uri := ToLevelDBURI(connection)
|
|
|
|
// Get the datadir
|
|
dataDir := path.Join(uri.Host, uri.Path)
|
|
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
db, ok := m.LevelDBConnections[connection]
|
|
if ok {
|
|
db.count++
|
|
|
|
return db.db, nil
|
|
}
|
|
|
|
db, ok = m.LevelDBConnections[uri.String()]
|
|
if ok {
|
|
db.count++
|
|
|
|
return db.db, nil
|
|
}
|
|
|
|
// if there is already a connection to this leveldb reuse that
|
|
// NOTE: if there differing options then only the first leveldb connection will be used
|
|
db, ok = m.LevelDBConnections[dataDir]
|
|
if ok {
|
|
db.count++
|
|
log.Warn("Duplicate connnection to level db: %s with different connection strings. Initial connection: %s. This connection: %s", dataDir, db.name[0], connection)
|
|
db.name = append(db.name, connection)
|
|
m.LevelDBConnections[connection] = db
|
|
return db.db, nil
|
|
}
|
|
db = &levelDBHolder{
|
|
name: []string{connection, uri.String(), dataDir},
|
|
}
|
|
|
|
opts := &opt.Options{}
|
|
for k, v := range uri.Query() {
|
|
switch replacer.Replace(strings.ToLower(k)) {
|
|
case "blockcachecapacity":
|
|
opts.BlockCacheCapacity, _ = strconv.Atoi(v[0])
|
|
case "blockcacheevictremoved":
|
|
opts.BlockCacheEvictRemoved, _ = strconv.ParseBool(v[0])
|
|
case "blockrestartinterval":
|
|
opts.BlockRestartInterval, _ = strconv.Atoi(v[0])
|
|
case "blocksize":
|
|
opts.BlockSize, _ = strconv.Atoi(v[0])
|
|
case "compactionexpandlimitfactor":
|
|
opts.CompactionExpandLimitFactor, _ = strconv.Atoi(v[0])
|
|
case "compactiongpoverlapsfactor":
|
|
opts.CompactionGPOverlapsFactor, _ = strconv.Atoi(v[0])
|
|
case "compactionl0trigger":
|
|
opts.CompactionL0Trigger, _ = strconv.Atoi(v[0])
|
|
case "compactionsourcelimitfactor":
|
|
opts.CompactionSourceLimitFactor, _ = strconv.Atoi(v[0])
|
|
case "compactiontablesize":
|
|
opts.CompactionTableSize, _ = strconv.Atoi(v[0])
|
|
case "compactiontablesizemultiplier":
|
|
opts.CompactionTableSizeMultiplier, _ = strconv.ParseFloat(v[0], 64)
|
|
case "compactiontablesizemultiplierperlevel":
|
|
for _, val := range v {
|
|
f, _ := strconv.ParseFloat(val, 64)
|
|
opts.CompactionTableSizeMultiplierPerLevel = append(opts.CompactionTableSizeMultiplierPerLevel, f)
|
|
}
|
|
case "compactiontotalsize":
|
|
opts.CompactionTotalSize, _ = strconv.Atoi(v[0])
|
|
case "compactiontotalsizemultiplier":
|
|
opts.CompactionTotalSizeMultiplier, _ = strconv.ParseFloat(v[0], 64)
|
|
case "compactiontotalsizemultiplierperlevel":
|
|
for _, val := range v {
|
|
f, _ := strconv.ParseFloat(val, 64)
|
|
opts.CompactionTotalSizeMultiplierPerLevel = append(opts.CompactionTotalSizeMultiplierPerLevel, f)
|
|
}
|
|
case "compression":
|
|
val, _ := strconv.Atoi(v[0])
|
|
opts.Compression = opt.Compression(val)
|
|
case "disablebufferpool":
|
|
opts.DisableBufferPool, _ = strconv.ParseBool(v[0])
|
|
case "disableblockcache":
|
|
opts.DisableBlockCache, _ = strconv.ParseBool(v[0])
|
|
case "disablecompactionbackoff":
|
|
opts.DisableCompactionBackoff, _ = strconv.ParseBool(v[0])
|
|
case "disablelargebatchtransaction":
|
|
opts.DisableLargeBatchTransaction, _ = strconv.ParseBool(v[0])
|
|
case "errorifexist":
|
|
opts.ErrorIfExist, _ = strconv.ParseBool(v[0])
|
|
case "errorifmissing":
|
|
opts.ErrorIfMissing, _ = strconv.ParseBool(v[0])
|
|
case "iteratorsamplingrate":
|
|
opts.IteratorSamplingRate, _ = strconv.Atoi(v[0])
|
|
case "nosync":
|
|
opts.NoSync, _ = strconv.ParseBool(v[0])
|
|
case "nowritemerge":
|
|
opts.NoWriteMerge, _ = strconv.ParseBool(v[0])
|
|
case "openfilescachecapacity":
|
|
opts.OpenFilesCacheCapacity, _ = strconv.Atoi(v[0])
|
|
case "readonly":
|
|
opts.ReadOnly, _ = strconv.ParseBool(v[0])
|
|
case "strict":
|
|
val, _ := strconv.Atoi(v[0])
|
|
opts.Strict = opt.Strict(val)
|
|
case "writebuffer":
|
|
opts.WriteBuffer, _ = strconv.Atoi(v[0])
|
|
case "writel0pausetrigger":
|
|
opts.WriteL0PauseTrigger, _ = strconv.Atoi(v[0])
|
|
case "writel0slowdowntrigger":
|
|
opts.WriteL0SlowdownTrigger, _ = strconv.Atoi(v[0])
|
|
case "clientname":
|
|
db.name = append(db.name, v[0])
|
|
}
|
|
}
|
|
|
|
var err error
|
|
db.db, err = leveldb.OpenFile(dataDir, opts)
|
|
if err != nil {
|
|
if !errors.IsCorrupted(err) {
|
|
if strings.Contains(err.Error(), "resource temporarily unavailable") {
|
|
err = fmt.Errorf("unable to lock level db at %s: %w", dataDir, err)
|
|
return nil, err
|
|
}
|
|
|
|
err = fmt.Errorf("unable to open level db at %s: %w", dataDir, err)
|
|
return nil, err
|
|
}
|
|
db.db, err = leveldb.RecoverFile(dataDir, opts)
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, name := range db.name {
|
|
m.LevelDBConnections[name] = db
|
|
}
|
|
db.count++
|
|
return db.db, nil
|
|
}
|