diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go index 852bfd76..24ea121d 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go @@ -205,7 +205,7 @@ func (s *currentRoomStateStatements) upsertRoomState( func (s *currentRoomStateStatements) selectEventsWithEventIDs( ctx context.Context, txn *sql.Tx, eventIDs []string, -) ([]streamEvent, error) { +) ([]StreamEvent, error) { stmt := common.TxStmt(txn, s.selectEventsWithEventIDsStmt) rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs)) if err != nil { diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go index 035db988..686fc39e 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go @@ -114,7 +114,7 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { // two positions, only the most recent state is returned. func (s *outputRoomEventsStatements) selectStateInRange( ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition, -) (map[string]map[string]bool, map[string]streamEvent, error) { +) (map[string]map[string]bool, map[string]StreamEvent, error) { stmt := common.TxStmt(txn, s.selectStateInRangeStmt) rows, err := stmt.QueryContext(ctx, oldPos, newPos) @@ -126,7 +126,7 @@ func (s *outputRoomEventsStatements) selectStateInRange( // - For each room ID, build up an array of event IDs which represents cumulative adds/removes // For each room, map cumulative event IDs to events and return. This may need to a batch SELECT based on event ID // if they aren't in the event ID cache. We don't handle state deletion yet. - eventIDToEvent := make(map[string]streamEvent) + eventIDToEvent := make(map[string]StreamEvent) // RoomID => A set (map[string]bool) of state event IDs which are between the two positions stateNeeded := make(map[string]map[string]bool) @@ -169,7 +169,7 @@ func (s *outputRoomEventsStatements) selectStateInRange( } stateNeeded[ev.RoomID()] = needSet - eventIDToEvent[ev.EventID()] = streamEvent{ + eventIDToEvent[ev.EventID()] = StreamEvent{ Event: ev, streamPosition: types.StreamPosition(streamPos), } @@ -224,7 +224,7 @@ func (s *outputRoomEventsStatements) insertEvent( func (s *outputRoomEventsStatements) selectRecentEvents( ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int, -) ([]streamEvent, error) { +) ([]StreamEvent, error) { stmt := common.TxStmt(txn, s.selectRecentEventsStmt) rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit) if err != nil { @@ -248,7 +248,7 @@ func (s *outputRoomEventsStatements) selectRecentEvents( // from the database. func (s *outputRoomEventsStatements) selectEvents( ctx context.Context, txn *sql.Tx, eventIDs []string, -) ([]streamEvent, error) { +) ([]StreamEvent, error) { stmt := common.TxStmt(txn, s.selectEventsStmt) rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs)) if err != nil { @@ -258,8 +258,8 @@ func (s *outputRoomEventsStatements) selectEvents( return rowsToStreamEvents(rows) } -func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) { - var result []streamEvent +func rowsToStreamEvents(rows *sql.Rows) ([]StreamEvent, error) { + var result []StreamEvent for rows.Next() { var ( streamPos int64 @@ -284,7 +284,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) { } } - result = append(result, streamEvent{ + result = append(result, StreamEvent{ Event: ev, streamPosition: types.StreamPosition(streamPos), transactionID: transactionID, diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 84417a34..4476da81 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -39,8 +39,9 @@ type stateDelta struct { membershipPos types.StreamPosition } -// Same as gomatrixserverlib.Event but also has the stream position for this event. -type streamEvent struct { +// StreamEvent is the same as gomatrixserverlib.Event but also has the stream +// position for this event. +type StreamEvent struct { gomatrixserverlib.Event streamPosition types.StreamPosition transactionID *api.TransactionID @@ -296,7 +297,7 @@ func (d *SyncServerDatabase) CompleteSync( } // TODO: When filters are added, we may need to call this multiple times to get enough events. // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 - var recentStreamEvents []streamEvent + var recentStreamEvents []StreamEvent recentStreamEvents, err = d.events.selectRecentEvents( ctx, txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom, ) @@ -463,9 +464,9 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse( func (d *SyncServerDatabase) fetchStateEvents( ctx context.Context, txn *sql.Tx, roomIDToEventIDSet map[string]map[string]bool, - eventIDToEvent map[string]streamEvent, -) (map[string][]streamEvent, error) { - stateBetween := make(map[string][]streamEvent) + eventIDToEvent map[string]StreamEvent, +) (map[string][]StreamEvent, error) { + stateBetween := make(map[string][]StreamEvent) missingEvents := make(map[string][]string) for roomID, ids := range roomIDToEventIDSet { events := stateBetween[roomID] @@ -507,7 +508,7 @@ func (d *SyncServerDatabase) fetchStateEvents( func (d *SyncServerDatabase) fetchMissingStateEvents( ctx context.Context, txn *sql.Tx, eventIDs []string, -) ([]streamEvent, error) { +) ([]StreamEvent, error) { // Fetch from the events table first so we pick up the stream ID for the // event. events, err := d.events.selectEvents(ctx, txn, eventIDs) @@ -583,9 +584,9 @@ func (d *SyncServerDatabase) getStateDeltas( if err != nil { return nil, err } - s := make([]streamEvent, len(allState)) + s := make([]StreamEvent, len(allState)) for i := 0; i < len(s); i++ { - s[i] = streamEvent{Event: allState[i], streamPosition: types.StreamPosition(0)} + s[i] = StreamEvent{Event: allState[i], streamPosition: types.StreamPosition(0)} } state[roomID] = s continue // we'll add this room in when we do joined rooms @@ -618,10 +619,10 @@ func (d *SyncServerDatabase) getStateDeltas( return deltas, nil } -// streamEventsToEvents converts streamEvent to Event. If device is non-nil and +// streamEventsToEvents converts StreamEvent to Event. If device is non-nil and // matches the streamevent.transactionID device then the transaction ID gets // added to the unsigned section of the output event. -func streamEventsToEvents(device *authtypes.Device, in []streamEvent) []gomatrixserverlib.Event { +func streamEventsToEvents(device *authtypes.Device, in []StreamEvent) []gomatrixserverlib.Event { out := make([]gomatrixserverlib.Event, len(in)) for i := 0; i < len(in); i++ { out[i] = in[i].Event