mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-07-29 12:42:46 +00:00
MSC2836: Threading - part one (#1589)
* Add mscs/hooks package, begin work for msc2836 * Flesh out hooks and add SQL schema * Begin implementing core msc2836 logic * Add test harness * Linting * Implement visibility checks; stub out APIs for tests * Flesh out testing * Flesh out walkThread a bit * Persist the origin_server_ts as well * Edges table instead of relationships * Add nodes table for event metadata * LEFT JOIN to extract origin_server_ts for children * Add graph walking structs * Implement walking algorithm * Add more graph walking tests * Add auto_join for local rooms * Fix create table syntax on postgres * Add relationship_room_id|servers to the unsigned section of events * Persist the parent room_id/servers in edge metadata Other events cannot assert the true room_id/servers for the parent event, only make claims to them, hence why this is edge metadata. * guts to pass through room_id/servers * Refactor msc2836 to allow handling from federation * Add JoinedVia to PerformJoin responses * Fix tests; review comments
This commit is contained in:
parent
1cf9f20d2f
commit
6353b0b7e4
14 changed files with 1517 additions and 35 deletions
|
@ -22,6 +22,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/matrix-org/dendrite/internal/hooks"
|
||||
"github.com/matrix-org/dendrite/roomserver/acls"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||
|
@ -61,7 +62,11 @@ func (w *inputWorker) start() {
|
|||
for {
|
||||
select {
|
||||
case task := <-w.input:
|
||||
hooks.Run(hooks.KindNewEventReceived, &task.event.Event)
|
||||
_, task.err = w.r.processRoomEvent(task.ctx, task.event)
|
||||
if task.err == nil {
|
||||
hooks.Run(hooks.KindNewEventPersisted, &task.event.Event)
|
||||
}
|
||||
task.wg.Done()
|
||||
case <-time.After(time.Second * 5):
|
||||
return
|
||||
|
|
|
@ -47,7 +47,7 @@ func (r *Joiner) PerformJoin(
|
|||
req *api.PerformJoinRequest,
|
||||
res *api.PerformJoinResponse,
|
||||
) {
|
||||
roomID, err := r.performJoin(ctx, req)
|
||||
roomID, joinedVia, err := r.performJoin(ctx, req)
|
||||
if err != nil {
|
||||
perr, ok := err.(*api.PerformError)
|
||||
if ok {
|
||||
|
@ -59,21 +59,22 @@ func (r *Joiner) PerformJoin(
|
|||
}
|
||||
}
|
||||
res.RoomID = roomID
|
||||
res.JoinedVia = joinedVia
|
||||
}
|
||||
|
||||
func (r *Joiner) performJoin(
|
||||
ctx context.Context,
|
||||
req *api.PerformJoinRequest,
|
||||
) (string, error) {
|
||||
) (string, gomatrixserverlib.ServerName, error) {
|
||||
_, domain, err := gomatrixserverlib.SplitID('@', req.UserID)
|
||||
if err != nil {
|
||||
return "", &api.PerformError{
|
||||
return "", "", &api.PerformError{
|
||||
Code: api.PerformErrorBadRequest,
|
||||
Msg: fmt.Sprintf("Supplied user ID %q in incorrect format", req.UserID),
|
||||
}
|
||||
}
|
||||
if domain != r.Cfg.Matrix.ServerName {
|
||||
return "", &api.PerformError{
|
||||
return "", "", &api.PerformError{
|
||||
Code: api.PerformErrorBadRequest,
|
||||
Msg: fmt.Sprintf("User %q does not belong to this homeserver", req.UserID),
|
||||
}
|
||||
|
@ -84,7 +85,7 @@ func (r *Joiner) performJoin(
|
|||
if strings.HasPrefix(req.RoomIDOrAlias, "#") {
|
||||
return r.performJoinRoomByAlias(ctx, req)
|
||||
}
|
||||
return "", &api.PerformError{
|
||||
return "", "", &api.PerformError{
|
||||
Code: api.PerformErrorBadRequest,
|
||||
Msg: fmt.Sprintf("Room ID or alias %q is invalid", req.RoomIDOrAlias),
|
||||
}
|
||||
|
@ -93,11 +94,11 @@ func (r *Joiner) performJoin(
|
|||
func (r *Joiner) performJoinRoomByAlias(
|
||||
ctx context.Context,
|
||||
req *api.PerformJoinRequest,
|
||||
) (string, error) {
|
||||
) (string, gomatrixserverlib.ServerName, error) {
|
||||
// Get the domain part of the room alias.
|
||||
_, domain, err := gomatrixserverlib.SplitID('#', req.RoomIDOrAlias)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Alias %q is not in the correct format", req.RoomIDOrAlias)
|
||||
return "", "", fmt.Errorf("Alias %q is not in the correct format", req.RoomIDOrAlias)
|
||||
}
|
||||
req.ServerNames = append(req.ServerNames, domain)
|
||||
|
||||
|
@ -115,7 +116,7 @@ func (r *Joiner) performJoinRoomByAlias(
|
|||
err = r.FSAPI.PerformDirectoryLookup(ctx, &dirReq, &dirRes)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Errorf("error looking up alias %q", req.RoomIDOrAlias)
|
||||
return "", fmt.Errorf("Looking up alias %q over federation failed: %w", req.RoomIDOrAlias, err)
|
||||
return "", "", fmt.Errorf("Looking up alias %q over federation failed: %w", req.RoomIDOrAlias, err)
|
||||
}
|
||||
roomID = dirRes.RoomID
|
||||
req.ServerNames = append(req.ServerNames, dirRes.ServerNames...)
|
||||
|
@ -123,13 +124,13 @@ func (r *Joiner) performJoinRoomByAlias(
|
|||
// Otherwise, look up if we know this room alias locally.
|
||||
roomID, err = r.DB.GetRoomIDForAlias(ctx, req.RoomIDOrAlias)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Lookup room alias %q failed: %w", req.RoomIDOrAlias, err)
|
||||
return "", "", fmt.Errorf("Lookup room alias %q failed: %w", req.RoomIDOrAlias, err)
|
||||
}
|
||||
}
|
||||
|
||||
// If the room ID is empty then we failed to look up the alias.
|
||||
if roomID == "" {
|
||||
return "", fmt.Errorf("Alias %q not found", req.RoomIDOrAlias)
|
||||
return "", "", fmt.Errorf("Alias %q not found", req.RoomIDOrAlias)
|
||||
}
|
||||
|
||||
// If we do, then pluck out the room ID and continue the join.
|
||||
|
@ -142,11 +143,11 @@ func (r *Joiner) performJoinRoomByAlias(
|
|||
func (r *Joiner) performJoinRoomByID(
|
||||
ctx context.Context,
|
||||
req *api.PerformJoinRequest,
|
||||
) (string, error) {
|
||||
) (string, gomatrixserverlib.ServerName, error) {
|
||||
// Get the domain part of the room ID.
|
||||
_, domain, err := gomatrixserverlib.SplitID('!', req.RoomIDOrAlias)
|
||||
if err != nil {
|
||||
return "", &api.PerformError{
|
||||
return "", "", &api.PerformError{
|
||||
Code: api.PerformErrorBadRequest,
|
||||
Msg: fmt.Sprintf("Room ID %q is invalid: %s", req.RoomIDOrAlias, err),
|
||||
}
|
||||
|
@ -169,7 +170,7 @@ func (r *Joiner) performJoinRoomByID(
|
|||
Redacts: "",
|
||||
}
|
||||
if err = eb.SetUnsigned(struct{}{}); err != nil {
|
||||
return "", fmt.Errorf("eb.SetUnsigned: %w", err)
|
||||
return "", "", fmt.Errorf("eb.SetUnsigned: %w", err)
|
||||
}
|
||||
|
||||
// It is possible for the request to include some "content" for the
|
||||
|
@ -180,7 +181,7 @@ func (r *Joiner) performJoinRoomByID(
|
|||
}
|
||||
req.Content["membership"] = gomatrixserverlib.Join
|
||||
if err = eb.SetContent(req.Content); err != nil {
|
||||
return "", fmt.Errorf("eb.SetContent: %w", err)
|
||||
return "", "", fmt.Errorf("eb.SetContent: %w", err)
|
||||
}
|
||||
|
||||
// Force a federated join if we aren't in the room and we've been
|
||||
|
@ -194,7 +195,7 @@ func (r *Joiner) performJoinRoomByID(
|
|||
if err == nil && isInvitePending {
|
||||
_, inviterDomain, ierr := gomatrixserverlib.SplitID('@', inviteSender)
|
||||
if ierr != nil {
|
||||
return "", fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
|
||||
return "", "", fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
|
||||
}
|
||||
|
||||
// If we were invited by someone from another server then we can
|
||||
|
@ -206,8 +207,10 @@ func (r *Joiner) performJoinRoomByID(
|
|||
}
|
||||
|
||||
// If we should do a forced federated join then do that.
|
||||
var joinedVia gomatrixserverlib.ServerName
|
||||
if forceFederatedJoin {
|
||||
return req.RoomIDOrAlias, r.performFederatedJoinRoomByID(ctx, req)
|
||||
joinedVia, err = r.performFederatedJoinRoomByID(ctx, req)
|
||||
return req.RoomIDOrAlias, joinedVia, err
|
||||
}
|
||||
|
||||
// Try to construct an actual join event from the template.
|
||||
|
@ -249,7 +252,7 @@ func (r *Joiner) performJoinRoomByID(
|
|||
inputRes := api.InputRoomEventsResponse{}
|
||||
r.Inputer.InputRoomEvents(ctx, &inputReq, &inputRes)
|
||||
if err = inputRes.Err(); err != nil {
|
||||
return "", &api.PerformError{
|
||||
return "", "", &api.PerformError{
|
||||
Code: api.PerformErrorNotAllowed,
|
||||
Msg: fmt.Sprintf("InputRoomEvents auth failed: %s", err),
|
||||
}
|
||||
|
@ -265,7 +268,7 @@ func (r *Joiner) performJoinRoomByID(
|
|||
// Otherwise we'll try a federated join as normal, since it's quite
|
||||
// possible that the room still exists on other servers.
|
||||
if len(req.ServerNames) == 0 {
|
||||
return "", &api.PerformError{
|
||||
return "", "", &api.PerformError{
|
||||
Code: api.PerformErrorNoRoom,
|
||||
Msg: fmt.Sprintf("Room ID %q does not exist", req.RoomIDOrAlias),
|
||||
}
|
||||
|
@ -273,24 +276,25 @@ func (r *Joiner) performJoinRoomByID(
|
|||
}
|
||||
|
||||
// Perform a federated room join.
|
||||
return req.RoomIDOrAlias, r.performFederatedJoinRoomByID(ctx, req)
|
||||
joinedVia, err = r.performFederatedJoinRoomByID(ctx, req)
|
||||
return req.RoomIDOrAlias, joinedVia, err
|
||||
|
||||
default:
|
||||
// Something else went wrong.
|
||||
return "", fmt.Errorf("Error joining local room: %q", err)
|
||||
return "", "", fmt.Errorf("Error joining local room: %q", err)
|
||||
}
|
||||
|
||||
// By this point, if req.RoomIDOrAlias contained an alias, then
|
||||
// it will have been overwritten with a room ID by performJoinRoomByAlias.
|
||||
// We should now include this in the response so that the CS API can
|
||||
// return the right room ID.
|
||||
return req.RoomIDOrAlias, nil
|
||||
return req.RoomIDOrAlias, r.Cfg.Matrix.ServerName, nil
|
||||
}
|
||||
|
||||
func (r *Joiner) performFederatedJoinRoomByID(
|
||||
ctx context.Context,
|
||||
req *api.PerformJoinRequest,
|
||||
) error {
|
||||
) (gomatrixserverlib.ServerName, error) {
|
||||
// Try joining by all of the supplied server names.
|
||||
fedReq := fsAPI.PerformJoinRequest{
|
||||
RoomID: req.RoomIDOrAlias, // the room ID to try and join
|
||||
|
@ -301,13 +305,13 @@ func (r *Joiner) performFederatedJoinRoomByID(
|
|||
fedRes := fsAPI.PerformJoinResponse{}
|
||||
r.FSAPI.PerformJoin(ctx, &fedReq, &fedRes)
|
||||
if fedRes.LastError != nil {
|
||||
return &api.PerformError{
|
||||
return "", &api.PerformError{
|
||||
Code: api.PerformErrRemote,
|
||||
Msg: fedRes.LastError.Message,
|
||||
RemoteCode: fedRes.LastError.Code,
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return fedRes.JoinedVia, nil
|
||||
}
|
||||
|
||||
func buildEvent(
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue