diff --git a/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go b/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go index b1c39012..02a5956c 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go @@ -42,7 +42,7 @@ func NewRoomserverProducer(kafkaURIs []string, topic string) (*RoomserverProduce } // SendEvents writes the given events to the roomserver input log. The events are written with KindNew. -func (c *RoomserverProducer) SendEvents(events []gomatrixserverlib.Event) error { +func (c *RoomserverProducer) SendEvents(events []gomatrixserverlib.Event, sendAsServer gomatrixserverlib.ServerName) error { eventIDs := make([]string, len(events)) ires := make([]api.InputRoomEvent, len(events)) for i, event := range events { @@ -50,6 +50,7 @@ func (c *RoomserverProducer) SendEvents(events []gomatrixserverlib.Event) error Kind: api.KindNew, Event: event.JSON(), AuthEventIDs: authEventIDs(event), + SendAsServer: string(sendAsServer), } eventIDs[i] = event.EventID() } diff --git a/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go b/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go index 29095190..e05c1742 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go +++ b/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go @@ -188,7 +188,7 @@ func createRoom(req *http.Request, device *authtypes.Device, cfg config.Dendrite } // send events to the room server - if err := producer.SendEvents(builtEvents); err != nil { + if err := producer.SendEvents(builtEvents, cfg.Matrix.ServerName); err != nil { return httputil.LogThenError(req, err) } diff --git a/src/github.com/matrix-org/dendrite/clientapi/writers/sendevent.go b/src/github.com/matrix-org/dendrite/clientapi/writers/sendevent.go index 22efa35d..3a4b50c7 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/writers/sendevent.go +++ b/src/github.com/matrix-org/dendrite/clientapi/writers/sendevent.go @@ -93,6 +93,7 @@ func SendEvent( refs = append(refs, e.EventReference()) } builder.AuthEvents = refs + builder.Depth = queryRes.Depth eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), cfg.Matrix.ServerName) e, err := builder.Build( eventID, time.Now(), cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey, @@ -115,7 +116,7 @@ func SendEvent( } // pass the new event to the roomserver - if err := producer.SendEvents([]gomatrixserverlib.Event{e}); err != nil { + if err := producer.SendEvents([]gomatrixserverlib.Event{e}, cfg.Matrix.ServerName); err != nil { return httputil.LogThenError(req, err) } diff --git a/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go b/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go index b303a06b..27c07b27 100644 --- a/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go @@ -345,7 +345,8 @@ func main() { "LatestEventIDs":["$1463671339126270PnVwC:matrix.org"], "AddsStateEventIDs":["$1463671337126266wrSBX:matrix.org", "$1463671339126270PnVwC:matrix.org"], "RemovesStateEventIDs":null, - "LastSentEventID":"" + "LastSentEventID":"", + "SendAsServer":"" }`, } diff --git a/src/github.com/matrix-org/dendrite/federationapi/writers/send.go b/src/github.com/matrix-org/dendrite/federationapi/writers/send.go index cdaabb81..a6013cea 100644 --- a/src/github.com/matrix-org/dendrite/federationapi/writers/send.go +++ b/src/github.com/matrix-org/dendrite/federationapi/writers/send.go @@ -159,7 +159,7 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event) error { // TODO: Check that the event is allowed by its auth_events. // pass the event to the roomserver - if err := t.producer.SendEvents([]gomatrixserverlib.Event{e}); err != nil { + if err := t.producer.SendEvents([]gomatrixserverlib.Event{e}, api.DoNotSendToOtherServers); err != nil { return err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/input.go b/src/github.com/matrix-org/dendrite/roomserver/api/input.go index ba9ce5d6..4d576aef 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/api/input.go +++ b/src/github.com/matrix-org/dendrite/roomserver/api/input.go @@ -40,6 +40,10 @@ const ( KindBackfill = 4 ) +// DoNotSendToOtherServers tells us not to send the event to other matrix +// servers. +const DoNotSendToOtherServers = "" + // InputRoomEvent is a matrix room event to add to the room server database. // TODO: Implement UnmarshalJSON/MarshalJSON in a way that does something sensible with the event JSON. type InputRoomEvent struct { @@ -62,6 +66,9 @@ type InputRoomEvent struct { // These are only used if HasState is true. // The list can be empty, for example when storing the first event in a room. StateEventIDs []string + // The server name to use to push this event to other servers. + // Or empty if this event shouldn't be pushed to other servers. + SendAsServer string } // UnmarshalJSON implements json.Unmarshaller @@ -76,6 +83,7 @@ func (ire *InputRoomEvent) UnmarshalJSON(data []byte) error { AuthEventIDs []string StateEventIDs []string HasState bool + SendAsServer string } if err := json.Unmarshal(data, &content); err != nil { return err @@ -84,6 +92,7 @@ func (ire *InputRoomEvent) UnmarshalJSON(data []byte) error { ire.AuthEventIDs = content.AuthEventIDs ire.StateEventIDs = content.StateEventIDs ire.HasState = content.HasState + ire.SendAsServer = content.SendAsServer if content.Event != nil { ire.Event = []byte(*content.Event) } @@ -103,12 +112,14 @@ func (ire InputRoomEvent) MarshalJSON() ([]byte, error) { AuthEventIDs []string StateEventIDs []string HasState bool + SendAsServer string }{ Kind: ire.Kind, AuthEventIDs: ire.AuthEventIDs, StateEventIDs: ire.StateEventIDs, Event: &event, HasState: ire.HasState, + SendAsServer: ire.SendAsServer, } return json.Marshal(&content) } diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/output.go b/src/github.com/matrix-org/dendrite/roomserver/api/output.go index 0b2aee64..8055ce1b 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/api/output.go +++ b/src/github.com/matrix-org/dendrite/roomserver/api/output.go @@ -66,6 +66,22 @@ type OutputRoomEvent struct { // The state event IDs that are part of the current state, but not part // of the state at the event. StateBeforeRemovesEventIDs []string + // The server name to use to push this event to other servers. + // Or empty if this event shouldn't be pushed to other servers. + // + // This is used by the federation sender component. We need to tell it what + // event it needs to send because it can't tell on its own. Normally if an + // event was created on this server then we are responsible for sending it. + // However there are a couple of exceptions. The first is that when the + // server joins a remote room through another matrix server, it is the job + // of the other matrix server to send the event over federation. The second + // is the reverse of the first, that is when a remote server joins a room + // that we are in over federation using our server it is our responsibility + // to send the join event to other matrix servers. + // + // We encode the server name that the event should be sent using here to + // future proof the API for virtual hosting. + SendAsServer string } // UnmarshalJSON implements json.Unmarshaller @@ -82,6 +98,7 @@ func (ore *OutputRoomEvent) UnmarshalJSON(data []byte) error { LastSentEventID string StateBeforeAddsEventIDs []string StateBeforeRemovesEventIDs []string + SendAsServer string } if err := json.Unmarshal(data, &content); err != nil { return err @@ -95,6 +112,7 @@ func (ore *OutputRoomEvent) UnmarshalJSON(data []byte) error { ore.LastSentEventID = content.LastSentEventID ore.StateBeforeAddsEventIDs = content.StateBeforeAddsEventIDs ore.StateBeforeRemovesEventIDs = content.StateBeforeRemovesEventIDs + ore.SendAsServer = content.SendAsServer return nil } @@ -113,6 +131,7 @@ func (ore OutputRoomEvent) MarshalJSON() ([]byte, error) { LastSentEventID string StateBeforeAddsEventIDs []string StateBeforeRemovesEventIDs []string + SendAsServer string }{ Event: &event, LatestEventIDs: ore.LatestEventIDs, @@ -121,6 +140,7 @@ func (ore OutputRoomEvent) MarshalJSON() ([]byte, error) { LastSentEventID: ore.LastSentEventID, StateBeforeAddsEventIDs: ore.StateBeforeAddsEventIDs, StateBeforeRemovesEventIDs: ore.StateBeforeRemovesEventIDs, + SendAsServer: ore.SendAsServer, } return json.Marshal(&content) } diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/query.go b/src/github.com/matrix-org/dendrite/roomserver/api/query.go index 6f26212d..d51e8b2c 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/api/query.go +++ b/src/github.com/matrix-org/dendrite/roomserver/api/query.go @@ -32,6 +32,8 @@ type QueryLatestEventsAndStateRequest struct { } // QueryLatestEventsAndStateResponse is a response to QueryLatestEventsAndState +// This is used when sending events to set the prev_events, auth_events and depth. +// It is also used to tell whether the event is allowed by the event auth rules. type QueryLatestEventsAndStateResponse struct { // Copy of the request for debugging. QueryLatestEventsAndStateRequest @@ -39,10 +41,17 @@ type QueryLatestEventsAndStateResponse struct { // If the room doesn't exist this will be false and LatestEvents will be empty. RoomExists bool // The latest events in the room. + // These are used to set the prev_events when sending an event. LatestEvents []gomatrixserverlib.EventReference // The state events requested. // This list will be in an arbitrary order. + // These are used to set the auth_events when sending an event. + // These are used to check whether the event is allowed. StateEvents []gomatrixserverlib.Event + // The depth of the latest events. + // This is one greater than the maximum depth of the latest events. + // This is used to set the depth when sending an event. + Depth int64 } // QueryStateAfterEventsRequest is a request to QueryStateAfterEvents diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/events.go b/src/github.com/matrix-org/dendrite/roomserver/input/events.go index b379b3ae..8031a795 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/events.go @@ -102,7 +102,7 @@ func processRoomEvent(db RoomEventDatabase, ow OutputRoomEventWriter, input api. } // Update the extremities of the event graph for the room - if err := updateLatestEvents(db, ow, roomNID, stateAtEvent, event); err != nil { + if err := updateLatestEvents(db, ow, roomNID, stateAtEvent, event, input.SendAsServer); err != nil { return err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go index ce7ac0bf..d8de51b8 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go @@ -40,7 +40,12 @@ import ( // 7 <----- latest // func updateLatestEvents( - db RoomEventDatabase, ow OutputRoomEventWriter, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event, + db RoomEventDatabase, + ow OutputRoomEventWriter, + roomNID types.RoomNID, + stateAtEvent types.StateAtEvent, + event gomatrixserverlib.Event, + sendAsServer string, ) (err error) { updater, err := db.GetLatestEventsForUpdate(roomNID) if err != nil { @@ -60,12 +65,18 @@ func updateLatestEvents( } }() - err = doUpdateLatestEvents(db, updater, ow, roomNID, stateAtEvent, event) + err = doUpdateLatestEvents(db, updater, ow, roomNID, stateAtEvent, event, sendAsServer) return } func doUpdateLatestEvents( - db RoomEventDatabase, updater types.RoomRecentEventsUpdater, ow OutputRoomEventWriter, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event, + db RoomEventDatabase, + updater types.RoomRecentEventsUpdater, + ow OutputRoomEventWriter, + roomNID types.RoomNID, + stateAtEvent types.StateAtEvent, + event gomatrixserverlib.Event, + sendAsServer string, ) error { var err error var prevEvents []gomatrixserverlib.EventReference @@ -128,7 +139,7 @@ func doUpdateLatestEvents( // necessary bookkeeping we'll keep the event sending synchronous for now. if err = writeEvent( db, ow, lastEventIDSent, event, newLatest, removed, added, - stateBeforeEventRemoves, stateBeforeEventAdds, + stateBeforeEventRemoves, stateBeforeEventAdds, sendAsServer, ); err != nil { return err } @@ -182,6 +193,7 @@ func writeEvent( event gomatrixserverlib.Event, latest []types.StateAtEventAndReference, removed, added []types.StateEntry, stateBeforeEventRemoves, stateBeforeEventAdds []types.StateEntry, + sendAsServer string, ) error { latestEventIDs := make([]string, len(latest)) @@ -225,6 +237,7 @@ func writeEvent( for _, entry := range stateBeforeEventAdds { ore.StateBeforeAddsEventIDs = append(ore.StateBeforeAddsEventIDs, eventIDMap[entry.EventNID]) } + ore.SendAsServer = sendAsServer return ow.WriteOutputRoomEvent(ore) } diff --git a/src/github.com/matrix-org/dendrite/roomserver/query/query.go b/src/github.com/matrix-org/dendrite/roomserver/query/query.go index 1b1820f0..31ebc022 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/query/query.go +++ b/src/github.com/matrix-org/dendrite/roomserver/query/query.go @@ -33,8 +33,9 @@ type RoomserverQueryAPIDatabase interface { // Returns an error if there was a problem talking to the database. RoomNID(roomID string) (types.RoomNID, error) // Lookup event references for the latest events in the room and the current state snapshot. + // Returns the latest events, the current state and the maximum depth of the latest events plus 1. // Returns an error if there was a problem talking to the database. - LatestEventIDs(roomNID types.RoomNID) ([]gomatrixserverlib.EventReference, types.StateSnapshotNID, error) + LatestEventIDs(roomNID types.RoomNID) ([]gomatrixserverlib.EventReference, types.StateSnapshotNID, int64, error) // Lookup the numeric IDs for a list of events. // Returns an error if there was a problem talking to the database. EventNIDs(eventIDs []string) (map[string]types.EventNID, error) @@ -60,7 +61,7 @@ func (r *RoomserverQueryAPI) QueryLatestEventsAndState( } response.RoomExists = true var currentStateSnapshotNID types.StateSnapshotNID - response.LatestEvents, currentStateSnapshotNID, err = r.DB.LatestEventIDs(roomNID) + response.LatestEvents, currentStateSnapshotNID, response.Depth, err = r.DB.LatestEventIDs(roomNID) if err != nil { return err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go index a2813ae9..b4f1bfa3 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go @@ -46,7 +46,9 @@ CREATE TABLE IF NOT EXISTS events ( -- part of the event graph -- Since many different events can have the same state we store the -- state into a separate state table and refer to it by numeric ID. - state_snapshot_nid bigint NOT NULL DEFAULT 0, + state_snapshot_nid BIGINT NOT NULL DEFAULT 0, + -- Depth of the event in the event graph. + depth BIGINT NOT NULL, -- The textual event id. -- Used to lookup the numeric ID when processing requests. -- Needed for state resolution. @@ -61,8 +63,8 @@ CREATE TABLE IF NOT EXISTS events ( ` const insertEventSQL = "" + - "INSERT INTO events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids)" + - " VALUES ($1, $2, $3, $4, $5, $6)" + + "INSERT INTO events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth)" + + " VALUES ($1, $2, $3, $4, $5, $6, $7)" + " ON CONFLICT ON CONSTRAINT event_id_unique" + " DO NOTHING" + " RETURNING event_nid, state_snapshot_nid" @@ -107,6 +109,9 @@ const bulkSelectEventIDSQL = "" + const bulkSelectEventNIDSQL = "" + "SELECT event_id, event_nid FROM events WHERE event_id = ANY($1)" +const selectMaxEventDepthSQL = "" + + "SELECT COALESCE(MAX(depth) + 1, 0) FROM events WHERE event_nid = ANY($1)" + type eventStatements struct { insertEventStmt *sql.Stmt selectEventStmt *sql.Stmt @@ -120,6 +125,7 @@ type eventStatements struct { bulkSelectEventReferenceStmt *sql.Stmt bulkSelectEventIDStmt *sql.Stmt bulkSelectEventNIDStmt *sql.Stmt + selectMaxEventDepthStmt *sql.Stmt } func (s *eventStatements) prepare(db *sql.DB) (err error) { @@ -141,6 +147,7 @@ func (s *eventStatements) prepare(db *sql.DB) (err error) { {&s.bulkSelectEventReferenceStmt, bulkSelectEventReferenceSQL}, {&s.bulkSelectEventIDStmt, bulkSelectEventIDSQL}, {&s.bulkSelectEventNIDStmt, bulkSelectEventNIDSQL}, + {&s.selectMaxEventDepthStmt, selectMaxEventDepthSQL}, }.prepare(db) } @@ -149,12 +156,13 @@ func (s *eventStatements) insertEvent( eventID string, referenceSHA256 []byte, authEventNIDs []types.EventNID, + depth int64, ) (types.EventNID, types.StateSnapshotNID, error) { var eventNID int64 var stateNID int64 err := s.insertEventStmt.QueryRow( int64(roomNID), int64(eventTypeNID), int64(eventStateKeyNID), eventID, referenceSHA256, - eventNIDsAsArray(authEventNIDs), + eventNIDsAsArray(authEventNIDs), depth, ).Scan(&eventNID, &stateNID) return types.EventNID(eventNID), types.StateSnapshotNID(stateNID), err } @@ -357,6 +365,15 @@ func (s *eventStatements) bulkSelectEventNID(eventIDs []string) (map[string]type return results, nil } +func (s *eventStatements) selectMaxEventDepth(eventNIDs []types.EventNID) (int64, error) { + var result int64 + err := s.selectMaxEventDepthStmt.QueryRow(eventNIDsAsArray(eventNIDs)).Scan(&result) + if err != nil { + return 0, err + } + return result, nil +} + func eventNIDsAsArray(eventNIDs []types.EventNID) pq.Int64Array { nids := make([]int64, len(eventNIDs)) for i := range eventNIDs { diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go index b9b5eb1c..1dfc89d4 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go @@ -87,6 +87,7 @@ func (d *Database) StoreEvent(event gomatrixserverlib.Event, authEventNIDs []typ event.EventID(), event.EventReference().EventSHA256, authEventNIDs, + event.Depth(), ); err != nil { if err == sql.ErrNoRows { // We've already inserted the event so select the numeric event ID @@ -349,16 +350,20 @@ func (d *Database) RoomNID(roomID string) (types.RoomNID, error) { } // LatestEventIDs implements query.RoomserverQueryAPIDB -func (d *Database) LatestEventIDs(roomNID types.RoomNID) ([]gomatrixserverlib.EventReference, types.StateSnapshotNID, error) { +func (d *Database) LatestEventIDs(roomNID types.RoomNID) ([]gomatrixserverlib.EventReference, types.StateSnapshotNID, int64, error) { eventNIDs, currentStateSnapshotNID, err := d.statements.selectLatestEventNIDs(roomNID) if err != nil { - return nil, 0, err + return nil, 0, 0, err } references, err := d.statements.bulkSelectEventReference(eventNIDs) if err != nil { - return nil, 0, err + return nil, 0, 0, err } - return references, currentStateSnapshotNID, nil + depth, err := d.statements.selectMaxEventDepth(eventNIDs) + if err != nil { + return nil, 0, 0, err + } + return references, currentStateSnapshotNID, depth, nil } // StateEntriesForTuples implements state.RoomStateDatabase