Add peer-to-peer support into Dendrite via libp2p and fetch (#880)

* Use a fork of pq which supports userCurrent on wasm

* Use sqlite3_js driver when running in JS

* Add cmd/dendritejs to pull in sqlite3_js driver for wasm only

* Update to latest go-sqlite-js version

* Replace prometheus with a stub. sigh

* Hard-code a config and don't use opentracing

* Latest go-sqlite3-js version

* Generate a key for now

* Listen for fetch traffic rather than HTTP

* Latest hacks for js

* libp2p support

* More libp2p

* Fork gjson to allow us to enforce auth checks as before

Previously, all events would come down redacted because the hash
checks would fail. They would fail because sjson.DeleteBytes didn't
remove keys not used for hashing. This didn't work because of a build
tag which included a file which no-oped the index returned.

See https://github.com/tidwall/gjson/issues/157

When it's resolved, let's go back to mainline.

* Use gjson@1.6.0 as it fixes https://github.com/tidwall/gjson/issues/157

* Use latest gomatrixserverlib for sig checks

* Fix a bug which could cause exclude_from_sync to not be set

Caused when sending events over federation.

* Use query variadic to make lookups actually work!

* Latest gomatrixserverlib

* Add notes on getting p2p up and running

Partly so I don't forget myself!

* refactor: Move p2p specific stuff to cmd/dendritejs

This is important or else the normal build of dendrite will fail
because the p2p libraries depend on syscall/js which doesn't work
on normal builds.

Also, clean up main.go to read a bit better.

* Update ho-http-js-libp2p to return errors from RoundTrip

* Add an LRU cache around the key DB

We actually need this for P2P because otherwise we can *segfault*
with things like: "runtime: unexpected return pc for runtime.handleEvent"
where the event is a `syscall/js` event, caused by spamming sql.js
caused by "Checking event signatures for 14 events of room state" which
hammers the key DB repeatedly in quick succession.

Using a cache fixes this, though the underlying cause is probably a bug
in the version of Go I'm on (1.13.7)

* breaking: Add Tracing.Enabled to toggle whether we do opentracing

Defaults to false, which is why this is a breaking change. We need
this flag because WASM builds cannot do opentracing.

* Start adding conditional builds for wasm to handle lib/pq

The general idea here is to have the wasm build have a `NewXXXDatabase`
that doesn't import any postgres package and hence we never import
`lib/pq`, which doesn't work under WASM (undefined `userCurrent`).

* Remove lib/pq for wasm for syncapi

* Add conditional building to remaining storage APIs

* Update build script to set env vars correctly for dendritejs

* sqlite bug fixes

* Docs

* Add a no-op main for dendritejs when not building under wasm

* Use the real prometheus, even for WASM

Instead, the dendrite-sw.js must mock out `process.pid` and
`fs.stat` - which must invoke the callback with an error (e.g `EINVAL`)
in order for it to work:

```
    global.process = {
        pid: 1,
    };
    global.fs.stat = function(path, cb) {
        cb({
            code: "EINVAL",
        });
    }
```

* Linting
This commit is contained in:
Kegsay 2020-03-06 10:23:55 +00:00 committed by GitHub
parent 0cda3c52d0
commit a97b8eafd4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
59 changed files with 1260 additions and 347 deletions

View file

@ -216,7 +216,7 @@ func setupNaffka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {
uri, err := url.Parse(string(cfg.Database.Naffka))
if err != nil || uri.Scheme == "file" {
db, err = sql.Open("sqlite3", string(cfg.Database.Naffka))
db, err = sql.Open(common.SQLiteDriverName(), string(cfg.Database.Naffka))
if err != nil {
logrus.WithError(err).Panic("Failed to open naffka database")
}

View file

@ -224,6 +224,8 @@ type Dendrite struct {
// The config for tracing the dendrite servers.
Tracing struct {
// Set to true to enable tracer hooks. If false, no tracing is set up.
Enabled bool `yaml:"enabled"`
// The config for the jaeger opentracing reporter.
Jaeger jaegerconfig.Configuration `yaml:"jaeger"`
} `yaml:"tracing"`
@ -365,7 +367,7 @@ func loadConfig(
return nil, err
}
config.setDefaults()
config.SetDefaults()
if err = config.check(monolithic); err != nil {
return nil, err
@ -398,7 +400,7 @@ func loadConfig(
config.Media.AbsBasePath = Path(absPath(basePath, config.Media.BasePath))
// Generate data from config options
err = config.derive()
err = config.Derive()
if err != nil {
return nil, err
}
@ -406,9 +408,9 @@ func loadConfig(
return &config, nil
}
// derive generates data that is derived from various values provided in
// Derive generates data that is derived from various values provided in
// the config file.
func (config *Dendrite) derive() error {
func (config *Dendrite) Derive() error {
// Determine registrations flows based off config values
config.Derived.Registration.Params = make(map[string]interface{})
@ -433,8 +435,8 @@ func (config *Dendrite) derive() error {
return nil
}
// setDefaults sets default config values if they are not explicitly set.
func (config *Dendrite) setDefaults() {
// SetDefaults sets default config values if they are not explicitly set.
func (config *Dendrite) SetDefaults() {
if config.Matrix.KeyValidityPeriod == 0 {
config.Matrix.KeyValidityPeriod = 24 * time.Hour
}
@ -703,6 +705,9 @@ func (config *Dendrite) FederationSenderURL() string {
// SetupTracing configures the opentracing using the supplied configuration.
func (config *Dendrite) SetupTracing(serviceName string) (closer io.Closer, err error) {
if !config.Tracing.Enabled {
return ioutil.NopCloser(bytes.NewReader([]byte{})), nil
}
return config.Tracing.Jaeger.InitGlobalTracer(
serviceName,
jaegerconfig.Logger(logrusLogger{logrus.StandardLogger()}),

13
common/keydb/interface.go Normal file
View file

@ -0,0 +1,13 @@
package keydb
import (
"context"
"github.com/matrix-org/gomatrixserverlib"
)
type Database interface {
FetcherName() string
FetchKeys(ctx context.Context, requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error)
StoreKeys(ctx context.Context, keyMap map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult) error
}

View file

@ -12,10 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !wasm
package keydb
import (
"context"
"net/url"
"golang.org/x/crypto/ed25519"
@ -25,12 +26,6 @@ import (
"github.com/matrix-org/gomatrixserverlib"
)
type Database interface {
FetcherName() string
FetchKeys(ctx context.Context, requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error)
StoreKeys(ctx context.Context, keyMap map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult) error
}
// NewDatabase opens a database connection.
func NewDatabase(
dataSourceName string,

View file

@ -0,0 +1,46 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package keydb
import (
"fmt"
"net/url"
"golang.org/x/crypto/ed25519"
"github.com/matrix-org/dendrite/common/keydb/sqlite3"
"github.com/matrix-org/gomatrixserverlib"
)
// NewDatabase opens a database connection.
func NewDatabase(
dataSourceName string,
serverName gomatrixserverlib.ServerName,
serverKey ed25519.PublicKey,
serverKeyID gomatrixserverlib.KeyID,
) (Database, error) {
uri, err := url.Parse(dataSourceName)
if err != nil {
return nil, err
}
switch uri.Scheme {
case "postgres":
return nil, fmt.Errorf("Cannot use postgres implementation")
case "file":
return sqlite3.NewDatabase(dataSourceName, serverName, serverKey, serverKeyID)
default:
return nil, fmt.Errorf("Cannot use postgres implementation")
}
}

View file

@ -22,6 +22,7 @@ import (
"golang.org/x/crypto/ed25519"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/gomatrixserverlib"
_ "github.com/mattn/go-sqlite3"
@ -43,7 +44,7 @@ func NewDatabase(
serverKey ed25519.PublicKey,
serverKeyID gomatrixserverlib.KeyID,
) (*Database, error) {
db, err := sql.Open("sqlite3", dataSourceName)
db, err := sql.Open(common.SQLiteDriverName(), dataSourceName)
if err != nil {
return nil, err
}

View file

@ -18,9 +18,12 @@ package sqlite3
import (
"context"
"database/sql"
"strings"
"github.com/lib/pq"
lru "github.com/hashicorp/golang-lru"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
const serverKeysSchema = `
@ -60,11 +63,19 @@ const upsertServerKeysSQL = "" +
" DO UPDATE SET valid_until_ts = $4, expired_ts = $5, server_key = $6"
type serverKeyStatements struct {
db *sql.DB
bulkSelectServerKeysStmt *sql.Stmt
upsertServerKeysStmt *sql.Stmt
cache *lru.Cache // nameAndKeyID => gomatrixserverlib.PublicKeyLookupResult
}
func (s *serverKeyStatements) prepare(db *sql.DB) (err error) {
s.db = db
s.cache, err = lru.New(64)
if err != nil {
return
}
_, err = db.Exec(serverKeysSchema)
if err != nil {
return
@ -86,8 +97,30 @@ func (s *serverKeyStatements) bulkSelectServerKeys(
for request := range requests {
nameAndKeyIDs = append(nameAndKeyIDs, nameAndKeyID(request))
}
stmt := s.bulkSelectServerKeysStmt
rows, err := stmt.QueryContext(ctx, pq.StringArray(nameAndKeyIDs))
// If we can satisfy all of the requests from the cache, do so. TODO: Allow partial matches with merges.
cacheResults := map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult{}
for request := range requests {
r, ok := s.cache.Get(nameAndKeyID(request))
if !ok {
break
}
cacheResult := r.(gomatrixserverlib.PublicKeyLookupResult)
cacheResults[request] = cacheResult
}
if len(cacheResults) == len(requests) {
util.GetLogger(ctx).Infof("KeyDB cache hit for %d keys", len(cacheResults))
return cacheResults, nil
}
query := strings.Replace(bulkSelectServerKeysSQL, "($1)", common.QueryVariadic(len(nameAndKeyIDs)), 1)
iKeyIDs := make([]interface{}, len(nameAndKeyIDs))
for i, v := range nameAndKeyIDs {
iKeyIDs[i] = v
}
rows, err := s.db.QueryContext(ctx, query, iKeyIDs...)
if err != nil {
return nil, err
}
@ -125,6 +158,7 @@ func (s *serverKeyStatements) upsertServerKeys(
request gomatrixserverlib.PublicKeyLookupRequest,
key gomatrixserverlib.PublicKeyLookupResult,
) error {
s.cache.Add(nameAndKeyID(request), key)
_, err := s.upsertServerKeysStmt.ExecContext(
ctx,
string(request.ServerName),

25
common/postgres.go Normal file
View file

@ -0,0 +1,25 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !wasm
package common
import "github.com/lib/pq"
// IsUniqueConstraintViolationErr returns true if the error is a postgresql unique_violation error
func IsUniqueConstraintViolationErr(err error) bool {
pqErr, ok := err.(*pq.Error)
return ok && pqErr.Code == "23505"
}

22
common/postgres_wasm.go Normal file
View file

@ -0,0 +1,22 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build wasm
package common
// IsUniqueConstraintViolationErr no-ops for this architecture
func IsUniqueConstraintViolationErr(err error) bool {
return false
}

View file

@ -17,8 +17,7 @@ package common
import (
"database/sql"
"fmt"
"github.com/lib/pq"
"runtime"
)
// A Transaction is something that can be committed or rolledback.
@ -77,12 +76,6 @@ func TxStmt(transaction *sql.Tx, statement *sql.Stmt) *sql.Stmt {
return statement
}
// IsUniqueConstraintViolationErr returns true if the error is a postgresql unique_violation error
func IsUniqueConstraintViolationErr(err error) bool {
pqErr, ok := err.(*pq.Error)
return ok && pqErr.Code == "23505"
}
// Hack of the century
func QueryVariadic(count int) string {
return QueryVariadicOffset(count, 0)
@ -99,3 +92,10 @@ func QueryVariadicOffset(count, offset int) string {
str += ")"
return str
}
func SQLiteDriverName() string {
if runtime.GOOS == "js" {
return "sqlite3_js"
}
return "sqlite3"
}