mirror of
https://github.com/hoernschen/dendrite.git
synced 2024-12-27 23:48:27 +00:00
Fetching missing state from the roomserver.
Whenever the syncserver receives an event from the room server that adds state that isn't in the syncserver's local database it should fetch those state events from the roomserver.
This commit is contained in:
parent
def49400bc
commit
08825defef
5 changed files with 183 additions and 46 deletions
|
@ -20,6 +20,8 @@ import (
|
||||||
|
|
||||||
// Sync contains the config information necessary to spin up a sync-server process.
|
// Sync contains the config information necessary to spin up a sync-server process.
|
||||||
type Sync struct {
|
type Sync struct {
|
||||||
|
// Where the room server is listening for queries.
|
||||||
|
RoomserverURL string `yaml:"roomserver_url"`
|
||||||
// The topic for events which are written by the room server output log.
|
// The topic for events which are written by the room server output log.
|
||||||
RoomserverOutputTopic string `yaml:"roomserver_topic"`
|
RoomserverOutputTopic string `yaml:"roomserver_topic"`
|
||||||
// A list of URIs to consume events from. These kafka logs should be produced by a Room Server.
|
// A list of URIs to consume events from. These kafka logs should be produced by a Room Server.
|
||||||
|
|
|
@ -16,6 +16,7 @@ package consumers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
@ -33,6 +34,7 @@ type OutputRoomEvent struct {
|
||||||
roomServerConsumer *common.ContinualConsumer
|
roomServerConsumer *common.ContinualConsumer
|
||||||
db *storage.SyncServerDatabase
|
db *storage.SyncServerDatabase
|
||||||
notifier *sync.Notifier
|
notifier *sync.Notifier
|
||||||
|
query api.RoomserverQueryAPI
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
|
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
|
||||||
|
@ -51,6 +53,7 @@ func NewOutputRoomEvent(cfg *config.Sync, n *sync.Notifier, store *storage.SyncS
|
||||||
roomServerConsumer: &consumer,
|
roomServerConsumer: &consumer,
|
||||||
db: store,
|
db: store,
|
||||||
notifier: n,
|
notifier: n,
|
||||||
|
query: api.NewRoomserverQueryAPIHTTP(cfg.RoomserverURL, nil),
|
||||||
}
|
}
|
||||||
consumer.ProcessMessage = s.onMessage
|
consumer.ProcessMessage = s.onMessage
|
||||||
|
|
||||||
|
@ -84,7 +87,19 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
"room_id": ev.RoomID(),
|
"room_id": ev.RoomID(),
|
||||||
}).Info("received event from roomserver")
|
}).Info("received event from roomserver")
|
||||||
|
|
||||||
syncStreamPos, err := s.db.WriteEvent(&ev, output.AddsStateEventIDs, output.RemovesStateEventIDs)
|
addsStateEvents, err := s.addsStateEvents(output.AddsStateEventIDs, ev)
|
||||||
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"event": string(ev.JSON()),
|
||||||
|
log.ErrorKey: err,
|
||||||
|
"add": output.AddsStateEventIDs,
|
||||||
|
"del": output.RemovesStateEventIDs,
|
||||||
|
}).Panicf("roomserver output log: state event lookup failure")
|
||||||
|
}
|
||||||
|
|
||||||
|
syncStreamPos, err := s.db.WriteEvent(
|
||||||
|
&ev, addsStateEvents, output.AddsStateEventIDs, output.RemovesStateEventIDs,
|
||||||
|
)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// panic rather than continue with an inconsistent database
|
// panic rather than continue with an inconsistent database
|
||||||
|
@ -100,3 +115,64 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *OutputRoomEvent) addsStateEvents(
|
||||||
|
addsStateEventIDs []string, event gomatrixserverlib.Event,
|
||||||
|
) ([]gomatrixserverlib.Event, error) {
|
||||||
|
if len(addsStateEventIDs) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() {
|
||||||
|
return []gomatrixserverlib.Event{event}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := s.db.Events(addsStateEventIDs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
missing := missingEventsFrom(result, addsStateEventIDs)
|
||||||
|
|
||||||
|
for _, eventID := range missing {
|
||||||
|
if eventID == event.EventID() {
|
||||||
|
result = append(result, event)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
missing = missingEventsFrom(result, addsStateEventIDs)
|
||||||
|
|
||||||
|
if len(missing) == 0 {
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
eventReq := api.QueryEventsByIDRequest{EventIDs: missing}
|
||||||
|
var eventResp api.QueryEventsByIDResponse
|
||||||
|
if err := s.query.QueryEventsByID(&eventReq, &eventResp); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
result = append(result, eventResp.Events...)
|
||||||
|
missing = missingEventsFrom(result, addsStateEventIDs)
|
||||||
|
|
||||||
|
if len(missing) != 0 {
|
||||||
|
return nil, fmt.Errorf(
|
||||||
|
"missing %d state events IDs at event %q", len(missing), event.EventID(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func missingEventsFrom(events []gomatrixserverlib.Event, required []string) []string {
|
||||||
|
have := map[string]bool{}
|
||||||
|
for _, event := range events {
|
||||||
|
have[event.EventID()] = true
|
||||||
|
}
|
||||||
|
var missing []string
|
||||||
|
for _, eventID := range required {
|
||||||
|
if !have[eventID] {
|
||||||
|
missing = append(missing, eventID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return missing
|
||||||
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@ package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"github.com/lib/pq"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -61,12 +62,16 @@ const selectCurrentStateSQL = "" +
|
||||||
const selectJoinedUsersSQL = "" +
|
const selectJoinedUsersSQL = "" +
|
||||||
"SELECT room_id, state_key FROM current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
|
"SELECT room_id, state_key FROM current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
|
||||||
|
|
||||||
|
const selectEventsWithEventIDsSQL = "" +
|
||||||
|
"SELECT event_json FROM current_room_state WHERE event_id = ANY($1)"
|
||||||
|
|
||||||
type currentRoomStateStatements struct {
|
type currentRoomStateStatements struct {
|
||||||
upsertRoomStateStmt *sql.Stmt
|
upsertRoomStateStmt *sql.Stmt
|
||||||
deleteRoomStateByEventIDStmt *sql.Stmt
|
deleteRoomStateByEventIDStmt *sql.Stmt
|
||||||
selectRoomIDsWithMembershipStmt *sql.Stmt
|
selectRoomIDsWithMembershipStmt *sql.Stmt
|
||||||
selectCurrentStateStmt *sql.Stmt
|
selectCurrentStateStmt *sql.Stmt
|
||||||
selectJoinedUsersStmt *sql.Stmt
|
selectJoinedUsersStmt *sql.Stmt
|
||||||
|
selectEventsWithEventIDsStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) {
|
func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) {
|
||||||
|
@ -89,6 +94,9 @@ func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) {
|
||||||
if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil {
|
if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -141,6 +149,31 @@ func (s *currentRoomStateStatements) selectCurrentState(txn *sql.Tx, roomID stri
|
||||||
}
|
}
|
||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
|
|
||||||
|
return rowsToEvents(rows)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *currentRoomStateStatements) deleteRoomStateByEventID(txn *sql.Tx, eventID string) error {
|
||||||
|
_, err := txn.Stmt(s.deleteRoomStateByEventIDStmt).Exec(eventID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *currentRoomStateStatements) upsertRoomState(txn *sql.Tx, event gomatrixserverlib.Event, membership *string) error {
|
||||||
|
_, err := txn.Stmt(s.upsertRoomStateStmt).Exec(
|
||||||
|
event.RoomID(), event.EventID(), event.Type(), *event.StateKey(), event.JSON(), membership,
|
||||||
|
)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *currentRoomStateStatements) selectEventsWithEventIDs(txn *sql.Tx, eventIDs []string) ([]gomatrixserverlib.Event, error) {
|
||||||
|
rows, err := txn.Stmt(s.selectEventsWithEventIDsStmt).Query(pq.StringArray(eventIDs))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
return rowsToEvents(rows)
|
||||||
|
}
|
||||||
|
|
||||||
|
func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) {
|
||||||
var result []gomatrixserverlib.Event
|
var result []gomatrixserverlib.Event
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var eventBytes []byte
|
var eventBytes []byte
|
||||||
|
@ -156,15 +189,3 @@ func (s *currentRoomStateStatements) selectCurrentState(txn *sql.Tx, roomID stri
|
||||||
}
|
}
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *currentRoomStateStatements) deleteRoomStateByEventID(txn *sql.Tx, eventID string) error {
|
|
||||||
_, err := txn.Stmt(s.deleteRoomStateByEventIDStmt).Exec(eventID)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *currentRoomStateStatements) upsertRoomState(txn *sql.Tx, event gomatrixserverlib.Event, membership *string) error {
|
|
||||||
_, err := txn.Stmt(s.upsertRoomStateStmt).Exec(
|
|
||||||
event.RoomID(), event.EventID(), event.Type(), *event.StateKey(), event.JSON(), membership,
|
|
||||||
)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
|
@ -16,7 +16,6 @@ package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
|
@ -193,7 +192,7 @@ func (s *outputRoomEventsStatements) selectRecentEvents(
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
events, err := rowsToEvents(rows)
|
events, err := rowsToStreamEvents(rows)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -205,23 +204,19 @@ func (s *outputRoomEventsStatements) selectRecentEvents(
|
||||||
// Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing
|
// Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing
|
||||||
// from the database.
|
// from the database.
|
||||||
func (s *outputRoomEventsStatements) selectEvents(txn *sql.Tx, eventIDs []string) ([]streamEvent, error) {
|
func (s *outputRoomEventsStatements) selectEvents(txn *sql.Tx, eventIDs []string) ([]streamEvent, error) {
|
||||||
rows, err := txn.Stmt(s.selectEventsStmt).Query(pq.StringArray(eventIDs))
|
stmt := s.selectEventsStmt
|
||||||
|
if txn != nil {
|
||||||
|
stmt = txn.Stmt(stmt)
|
||||||
|
}
|
||||||
|
rows, err := stmt.Query(pq.StringArray(eventIDs))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
result, err := rowsToEvents(rows)
|
return rowsToStreamEvents(rows)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(result) != len(eventIDs) {
|
|
||||||
return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", len(result), len(eventIDs))
|
|
||||||
}
|
|
||||||
return result, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func rowsToEvents(rows *sql.Rows) ([]streamEvent, error) {
|
func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) {
|
||||||
var result []streamEvent
|
var result []streamEvent
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var (
|
var (
|
||||||
|
|
|
@ -17,6 +17,7 @@ package storage
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
// Import the postgres database driver.
|
// Import the postgres database driver.
|
||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
"github.com/matrix-org/dendrite/clientapi/events"
|
"github.com/matrix-org/dendrite/clientapi/events"
|
||||||
|
@ -75,10 +76,24 @@ func (d *SyncServerDatabase) AllJoinedUsersInRooms() (map[string][]string, error
|
||||||
return d.roomstate.selectJoinedUsers()
|
return d.roomstate.selectJoinedUsers()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Events lookups a list of event by their event ID.
|
||||||
|
// Returns a list of events matching the requested IDs found in the database.
|
||||||
|
// If an event is not found in the database then it will be omitted from the list.
|
||||||
|
// Returns an error if there was a problem talking with the database
|
||||||
|
func (d *SyncServerDatabase) Events(eventIDs []string) ([]gomatrixserverlib.Event, error) {
|
||||||
|
streamEvents, err := d.events.selectEvents(nil, eventIDs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return streamEventsToEvents(streamEvents), nil
|
||||||
|
}
|
||||||
|
|
||||||
// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races
|
// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races
|
||||||
// when generating the stream position for this event. Returns the sync stream position for the inserted event.
|
// when generating the stream position for this event. Returns the sync stream position for the inserted event.
|
||||||
// Returns an error if there was a problem inserting this event.
|
// Returns an error if there was a problem inserting this event.
|
||||||
func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string) (streamPos types.StreamPosition, returnErr error) {
|
func (d *SyncServerDatabase) WriteEvent(
|
||||||
|
ev *gomatrixserverlib.Event, addStateEvents []gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string,
|
||||||
|
) (streamPos types.StreamPosition, returnErr error) {
|
||||||
returnErr = runTransaction(d.db, func(txn *sql.Tx) error {
|
returnErr = runTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
var err error
|
var err error
|
||||||
pos, err := d.events.insertEvent(txn, ev, addStateEventIDs, removeStateEventIDs)
|
pos, err := d.events.insertEvent(txn, ev, addStateEventIDs, removeStateEventIDs)
|
||||||
|
@ -87,26 +102,12 @@ func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEve
|
||||||
}
|
}
|
||||||
streamPos = types.StreamPosition(pos)
|
streamPos = types.StreamPosition(pos)
|
||||||
|
|
||||||
if len(addStateEventIDs) == 0 && len(removeStateEventIDs) == 0 {
|
if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 {
|
||||||
// Nothing to do, the event may have just been a message event.
|
// Nothing to do, the event may have just been a message event.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the current room state based on the added/removed state event IDs.
|
return d.updateRoomState(txn, removeStateEventIDs, addStateEvents)
|
||||||
// In the common case there is a single added event ID which is the state event itself, assuming `ev` is a state event.
|
|
||||||
// However, conflict resolution may result in there being different events being added, or even some removed.
|
|
||||||
if len(removeStateEventIDs) == 0 && len(addStateEventIDs) == 1 && addStateEventIDs[0] == ev.EventID() {
|
|
||||||
// common case
|
|
||||||
return d.updateRoomState(txn, nil, []gomatrixserverlib.Event{*ev})
|
|
||||||
}
|
|
||||||
|
|
||||||
// uncommon case: we need to fetch the full event for each event ID mentioned, then update room state
|
|
||||||
added, err := d.events.selectEvents(txn, addStateEventIDs)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return d.updateRoomState(txn, removeStateEventIDs, streamEventsToEvents(added))
|
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -310,19 +311,61 @@ func (d *SyncServerDatabase) fetchStateEvents(txn *sql.Tx, roomIDToEventIDSet ma
|
||||||
for _, missingEvIDs := range missingEvents {
|
for _, missingEvIDs := range missingEvents {
|
||||||
allMissingEventIDs = append(allMissingEventIDs, missingEvIDs...)
|
allMissingEventIDs = append(allMissingEventIDs, missingEvIDs...)
|
||||||
}
|
}
|
||||||
evs, err := d.events.selectEvents(txn, allMissingEventIDs)
|
evs, err := d.fetchMissingStateEvents(txn, allMissingEventIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// we know we got them all otherwise an error would've been returned, so just loop the events
|
|
||||||
for _, ev := range evs {
|
for _, ev := range evs {
|
||||||
roomID := ev.RoomID()
|
roomID := ev.RoomID()
|
||||||
stateBetween[roomID] = append(stateBetween[roomID], ev)
|
stateBetween[roomID] = append(stateBetween[roomID])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return stateBetween, nil
|
return stateBetween, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *SyncServerDatabase) fetchMissingStateEvents(txn *sql.Tx, eventIDs []string) ([]streamEvent, error) {
|
||||||
|
// Fetch from the events table first so we pick up the stream ID for the
|
||||||
|
// event.
|
||||||
|
events, err := d.events.selectEvents(txn, eventIDs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
have := map[string]bool{}
|
||||||
|
for _, event := range events {
|
||||||
|
have[event.EventID()] = true
|
||||||
|
}
|
||||||
|
var missing []string
|
||||||
|
for _, eventID := range eventIDs {
|
||||||
|
if !have[eventID] {
|
||||||
|
missing = append(missing, eventID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(missing) == 0 {
|
||||||
|
return events, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// If they are missing from the events table then they should be state
|
||||||
|
// events that we received from outside the main event stream.
|
||||||
|
// These should be in the room state table.
|
||||||
|
stateEvents, err := d.roomstate.selectEventsWithEventIDs(txn, missing)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(stateEvents) != len(missing) {
|
||||||
|
return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", len(stateEvents), len(missing))
|
||||||
|
}
|
||||||
|
for _, e := range stateEvents {
|
||||||
|
// Set the stream position to 0 since these events occured outside the
|
||||||
|
// stream so probably happened before it.
|
||||||
|
// TOOD: What happens if we receive a state event from outside the
|
||||||
|
// timeline associated with an event in the middle of the timeline?
|
||||||
|
events = append(events, streamEvent{e, 0})
|
||||||
|
}
|
||||||
|
return events, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (d *SyncServerDatabase) getStateDeltas(txn *sql.Tx, fromPos, toPos types.StreamPosition, userID string) ([]stateDelta, error) {
|
func (d *SyncServerDatabase) getStateDeltas(txn *sql.Tx, fromPos, toPos types.StreamPosition, userID string) ([]stateDelta, error) {
|
||||||
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
|
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
|
||||||
// - Get membership list changes for this user in this sync response
|
// - Get membership list changes for this user in this sync response
|
||||||
|
|
Loading…
Reference in a new issue