From a2773922d2fe40e6d95d73f532640702709ab526 Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Wed, 3 Mar 2021 16:27:44 +0000 Subject: [PATCH] Send events to appservice based on room membership (#1680) * Check membership of room * Use QueryStateAfterEventsResponse * Fix complexity * Changes that I made a long time ago * Rename to appserviceJoinedAtEvent * Check membership in GetMemberships * Update QueryMembershipsForRoom * Tweaks in client API * Update appserviceJoinedAtEvent * Comments * Try QueryMembershipForUser instead * Undo some changes to client API that shouldn't be needed * More /event tweaks * Refactor /event bit * Go back to QueryMembershipsForRoom because appservices are hard * Fix bugs in onMessage * Add comments Co-authored-by: Neil Alexander --- appservice/consumers/roomserver.go | 42 +++++++++++++++++++-- appservice/workers/transaction_scheduler.go | 2 +- clientapi/routing/getevent.go | 16 +++++++- roomserver/api/query.go | 4 +- roomserver/internal/query/query.go | 21 +++++++++++ userapi/api/api.go | 3 ++ userapi/internal/api.go | 3 +- 7 files changed, 83 insertions(+), 8 deletions(-) diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 5cbffa35..2ad7f68f 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -85,9 +85,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { } if output.Type != api.OutputTypeNewRoomEvent { - log.WithField("type", output.Type).Debug( - "roomserver output log: ignoring unknown output type", - ) return nil } @@ -114,6 +111,7 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents( // Queue this event to be sent off to the application service if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, event); err != nil { log.WithError(err).Warn("failed to insert incoming event into appservices database") + return err } else { // Tell our worker to send out new messages by updating remaining message // count and waking them up with a broadcast @@ -126,8 +124,43 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents( return nil } +// appserviceJoinedAtEvent returns a boolean depending on whether a given +// appservice has membership at the time a given event was created. +func (s *OutputRoomEventConsumer) appserviceJoinedAtEvent(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, appservice config.ApplicationService) bool { + // TODO: This is only checking the current room state, not the state at + // the event in question. Pretty sure this is what Synapse does too, but + // until we have a lighter way of checking the state before the event that + // doesn't involve state res, then this is probably OK. + membershipReq := &api.QueryMembershipsForRoomRequest{ + RoomID: event.RoomID(), + JoinedOnly: true, + } + membershipRes := &api.QueryMembershipsForRoomResponse{} + + // XXX: This could potentially race if the state for the event is not known yet + // e.g. the event came over federation but we do not have the full state persisted. + if err := s.rsAPI.QueryMembershipsForRoom(ctx, membershipReq, membershipRes); err == nil { + for _, ev := range membershipRes.JoinEvents { + var membership gomatrixserverlib.MemberContent + if err = json.Unmarshal(ev.Content, &membership); err != nil || ev.StateKey == nil { + continue + } + if appservice.IsInterestedInUserID(*ev.StateKey) { + return true + } + } + } else { + log.WithFields(log.Fields{ + "room_id": event.RoomID(), + }).WithError(err).Errorf("Unable to get membership for room") + } + return false +} + // appserviceIsInterestedInEvent returns a boolean depending on whether a given // event falls within one of a given application service's namespaces. +// +// TODO: This should be cached, see https://github.com/matrix-org/dendrite/issues/1682 func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, appservice config.ApplicationService) bool { // No reason to queue events if they'll never be sent to the application // service @@ -162,5 +195,6 @@ func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Cont }).WithError(err).Errorf("Unable to get aliases for room") } - return false + // Check if any of the members in the room match the appservice + return s.appserviceJoinedAtEvent(ctx, event, appservice) } diff --git a/appservice/workers/transaction_scheduler.go b/appservice/workers/transaction_scheduler.go index 6528fc1b..45748c21 100644 --- a/appservice/workers/transaction_scheduler.go +++ b/appservice/workers/transaction_scheduler.go @@ -62,7 +62,7 @@ func SetupTransactionWorkers( func worker(db storage.Database, ws types.ApplicationServiceWorkerState) { log.WithFields(log.Fields{ "appservice": ws.AppService.ID, - }).Info("starting application service") + }).Info("Starting application service") ctx := context.Background() // Create a HTTP client for sending requests to app services diff --git a/clientapi/routing/getevent.go b/clientapi/routing/getevent.go index 29340cc0..36f3ee9e 100644 --- a/clientapi/routing/getevent.go +++ b/clientapi/routing/getevent.go @@ -103,8 +103,22 @@ func GetEvent( } } + var appService *config.ApplicationService + if device.AppserviceID != "" { + for _, as := range cfg.Derived.ApplicationServices { + if as.ID == device.AppserviceID { + appService = &as + break + } + } + } + for _, stateEvent := range stateResp.StateEvents { - if !stateEvent.StateKeyEquals(device.UserID) { + if appService != nil { + if !appService.IsInterestedInUserID(*stateEvent.StateKey()) { + continue + } + } else if !stateEvent.StateKeyEquals(device.UserID) { continue } membership, err := stateEvent.Membership() diff --git a/roomserver/api/query.go b/roomserver/api/query.go index 43bbfd16..af35f7e7 100644 --- a/roomserver/api/query.go +++ b/roomserver/api/query.go @@ -151,7 +151,9 @@ type QueryMembershipsForRoomRequest struct { JoinedOnly bool `json:"joined_only"` // ID of the room to fetch memberships from RoomID string `json:"room_id"` - // ID of the user sending the request + // Optional - ID of the user sending the request, for checking if the + // user is allowed to see the memberships. If not specified then all + // room memberships will be returned. Sender string `json:"sender"` } diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go index 29dc7840..f69f67f7 100644 --- a/roomserver/internal/query/query.go +++ b/roomserver/internal/query/query.go @@ -242,6 +242,27 @@ func (r *Queryer) QueryMembershipsForRoom( return err } + // If no sender is specified then we will just return the entire + // set of memberships for the room, regardless of whether a specific + // user is allowed to see them or not. + if request.Sender == "" { + var events []types.Event + var eventNIDs []types.EventNID + eventNIDs, err = r.DB.GetMembershipEventNIDsForRoom(ctx, info.RoomNID, request.JoinedOnly, false) + if err != nil { + return fmt.Errorf("r.DB.GetMembershipEventNIDsForRoom: %w", err) + } + events, err = r.DB.Events(ctx, eventNIDs) + if err != nil { + return fmt.Errorf("r.DB.Events: %w", err) + } + for _, event := range events { + clientEvent := gomatrixserverlib.ToClientEvent(event.Event, gomatrixserverlib.FormatAll) + response.JoinEvents = append(response.JoinEvents, clientEvent) + } + return nil + } + membershipEventNID, stillInRoom, isRoomforgotten, err := r.DB.GetMembership(ctx, info.RoomNID, request.Sender) if err != nil { return err diff --git a/userapi/api/api.go b/userapi/api/api.go index 809ba047..45e4e834 100644 --- a/userapi/api/api.go +++ b/userapi/api/api.go @@ -241,6 +241,9 @@ type Device struct { LastSeenTS int64 LastSeenIP string UserAgent string + // If the device is for an appservice user, + // this is the appservice ID. + AppserviceID string } // Account represents a Matrix account on this home server. diff --git a/userapi/internal/api.go b/userapi/internal/api.go index e104714b..0d01afa1 100644 --- a/userapi/internal/api.go +++ b/userapi/internal/api.go @@ -381,7 +381,8 @@ func (a *UserInternalAPI) queryAppServiceToken(ctx context.Context, token, appSe // Use AS dummy device ID ID: types.AppServiceDeviceID, // AS dummy device has AS's token. - AccessToken: token, + AccessToken: token, + AppserviceID: appService.ID, } localpart, err := userutil.ParseUsernameParam(appServiceUserID, &a.ServerName)