mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-08-01 13:52:46 +00:00
Revert "Merge branch 'kegan/HACK-goid-sqlite-db-is-locked' into matthew/peeking"
This reverts commit3cebd8dbfb
, reversing changes made toed4b3a58a7
.
This commit is contained in:
parent
5c04c52571
commit
ff65f0ec7b
6 changed files with 17 additions and 269 deletions
|
@ -31,33 +31,16 @@ import (
|
|||
)
|
||||
|
||||
var tracingEnabled = os.Getenv("DENDRITE_TRACE_SQL") == "1"
|
||||
var dbToWriter map[string]Writer
|
||||
var CtxDBInstance = "db_instance"
|
||||
var instCount = 0
|
||||
|
||||
type traceInterceptor struct {
|
||||
sqlmw.NullInterceptor
|
||||
conn driver.Conn
|
||||
}
|
||||
|
||||
func (in *traceInterceptor) StmtQueryContext(ctx context.Context, stmt driver.StmtQueryContext, query string, args []driver.NamedValue) (driver.Rows, error) {
|
||||
startedAt := time.Now()
|
||||
rows, err := stmt.QueryContext(ctx, args)
|
||||
key := ctx.Value(CtxDBInstance)
|
||||
var safe string
|
||||
if key != nil {
|
||||
w := dbToWriter[key.(string)]
|
||||
if w == nil {
|
||||
safe = fmt.Sprintf("no writer for key %s", key)
|
||||
} else {
|
||||
safe = w.Safe()
|
||||
}
|
||||
}
|
||||
if safe != "" && !strings.HasPrefix(query, "SELECT ") {
|
||||
logrus.Infof("unsafe: %s -- %s", safe, query)
|
||||
}
|
||||
|
||||
logrus.WithField("duration", time.Since(startedAt)).WithField(logrus.ErrorKey, err).WithField("safe", safe).Debug("executed sql query ", query, " args: ", args)
|
||||
logrus.WithField("duration", time.Since(startedAt)).WithField(logrus.ErrorKey, err).Debug("executed sql query ", query, " args: ", args)
|
||||
|
||||
return rows, err
|
||||
}
|
||||
|
@ -65,21 +48,8 @@ func (in *traceInterceptor) StmtQueryContext(ctx context.Context, stmt driver.St
|
|||
func (in *traceInterceptor) StmtExecContext(ctx context.Context, stmt driver.StmtExecContext, query string, args []driver.NamedValue) (driver.Result, error) {
|
||||
startedAt := time.Now()
|
||||
result, err := stmt.ExecContext(ctx, args)
|
||||
key := ctx.Value(CtxDBInstance)
|
||||
var safe string
|
||||
if key != nil {
|
||||
w := dbToWriter[key.(string)]
|
||||
if w == nil {
|
||||
safe = fmt.Sprintf("no writer for key %s", key)
|
||||
} else {
|
||||
safe = w.Safe()
|
||||
}
|
||||
}
|
||||
if safe != "" && !strings.HasPrefix(query, "SELECT ") {
|
||||
logrus.Infof("unsafe: %s -- %s", safe, query)
|
||||
}
|
||||
|
||||
logrus.WithField("duration", time.Since(startedAt)).WithField(logrus.ErrorKey, err).WithField("safe", safe).Debug("executed sql query ", query, " args: ", args)
|
||||
logrus.WithField("duration", time.Since(startedAt)).WithField(logrus.ErrorKey, err).Debug("executed sql query ", query, " args: ", args)
|
||||
|
||||
return result, err
|
||||
}
|
||||
|
@ -105,18 +75,6 @@ func (in *traceInterceptor) RowsNext(c context.Context, rows driver.Rows, dest [
|
|||
return err
|
||||
}
|
||||
|
||||
func OpenWithWriter(dbProperties *config.DatabaseOptions, w Writer) (*sql.DB, context.Context, error) {
|
||||
db, err := Open(dbProperties)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
instCount++
|
||||
ctxVal := fmt.Sprintf("%d", instCount)
|
||||
dbToWriter[ctxVal] = w
|
||||
ctx := context.WithValue(context.TODO(), CtxDBInstance, ctxVal)
|
||||
return db, ctx, nil
|
||||
}
|
||||
|
||||
// Open opens a database specified by its database driver name and a driver-specific data source name,
|
||||
// usually consisting of at least a database name and connection information. Includes tracing driver
|
||||
// if DENDRITE_TRACE_SQL=1
|
||||
|
@ -160,5 +118,4 @@ func Open(dbProperties *config.DatabaseOptions) (*sql.DB, error) {
|
|||
|
||||
func init() {
|
||||
registerDrivers()
|
||||
dbToWriter = make(map[string]Writer)
|
||||
}
|
||||
|
|
|
@ -43,6 +43,4 @@ type Writer interface {
|
|||
// Queue up one or more database write operations within the
|
||||
// provided function to be executed when it is safe to do so.
|
||||
Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error) error
|
||||
|
||||
Safe() string
|
||||
}
|
||||
|
|
|
@ -26,7 +26,3 @@ func (w *DummyWriter) Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error) err
|
|||
return f(txn)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *DummyWriter) Safe() string {
|
||||
return "DummyWriter"
|
||||
}
|
||||
|
|
|
@ -3,10 +3,6 @@ package sqlutil
|
|||
import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"go.uber.org/atomic"
|
||||
)
|
||||
|
@ -16,9 +12,8 @@ import (
|
|||
// contend on database locks in, e.g. SQLite. Only one task will run
|
||||
// at a time on a given ExclusiveWriter.
|
||||
type ExclusiveWriter struct {
|
||||
running atomic.Bool
|
||||
todo chan transactionWriterTask
|
||||
writerID int
|
||||
running atomic.Bool
|
||||
todo chan transactionWriterTask
|
||||
}
|
||||
|
||||
func NewExclusiveWriter() Writer {
|
||||
|
@ -35,15 +30,6 @@ type transactionWriterTask struct {
|
|||
wait chan error
|
||||
}
|
||||
|
||||
func (w *ExclusiveWriter) Safe() string {
|
||||
a := goid()
|
||||
b := w.writerID
|
||||
if a == b {
|
||||
return ""
|
||||
}
|
||||
return fmt.Sprintf("%v != %v", a, b)
|
||||
}
|
||||
|
||||
// Do queues a task to be run by a TransactionWriter. The function
|
||||
// provided will be ran within a transaction as supplied by the
|
||||
// txn parameter if one is supplied, and if not, will take out a
|
||||
|
@ -74,7 +60,6 @@ func (w *ExclusiveWriter) run() {
|
|||
if !w.running.CAS(false, true) {
|
||||
return
|
||||
}
|
||||
w.writerID = goid()
|
||||
defer w.running.Store(false)
|
||||
for task := range w.todo {
|
||||
if task.db != nil && task.txn != nil {
|
||||
|
@ -89,14 +74,3 @@ func (w *ExclusiveWriter) run() {
|
|||
close(task.wait)
|
||||
}
|
||||
}
|
||||
|
||||
func goid() int {
|
||||
var buf [64]byte
|
||||
n := runtime.Stack(buf[:], false)
|
||||
idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine "))[0]
|
||||
id, err := strconv.Atoi(idField)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("cannot get goroutine id: %v", err))
|
||||
}
|
||||
return id
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue