Export streamEvents

This makes it easier to handle stream positions at other places in the sync api component.
This commit is contained in:
Brendan Abolivier 2018-11-08 12:20:44 +00:00
parent 83c3c7e1db
commit 4cb223f8dd
No known key found for this signature in database
GPG key ID: 8EF1500759F70623
3 changed files with 21 additions and 20 deletions

View file

@ -205,7 +205,7 @@ func (s *currentRoomStateStatements) upsertRoomState(
func (s *currentRoomStateStatements) selectEventsWithEventIDs( func (s *currentRoomStateStatements) selectEventsWithEventIDs(
ctx context.Context, txn *sql.Tx, eventIDs []string, ctx context.Context, txn *sql.Tx, eventIDs []string,
) ([]streamEvent, error) { ) ([]StreamEvent, error) {
stmt := common.TxStmt(txn, s.selectEventsWithEventIDsStmt) stmt := common.TxStmt(txn, s.selectEventsWithEventIDsStmt)
rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs)) rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs))
if err != nil { if err != nil {

View file

@ -114,7 +114,7 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
// two positions, only the most recent state is returned. // two positions, only the most recent state is returned.
func (s *outputRoomEventsStatements) selectStateInRange( func (s *outputRoomEventsStatements) selectStateInRange(
ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition, ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition,
) (map[string]map[string]bool, map[string]streamEvent, error) { ) (map[string]map[string]bool, map[string]StreamEvent, error) {
stmt := common.TxStmt(txn, s.selectStateInRangeStmt) stmt := common.TxStmt(txn, s.selectStateInRangeStmt)
rows, err := stmt.QueryContext(ctx, oldPos, newPos) rows, err := stmt.QueryContext(ctx, oldPos, newPos)
@ -126,7 +126,7 @@ func (s *outputRoomEventsStatements) selectStateInRange(
// - For each room ID, build up an array of event IDs which represents cumulative adds/removes // - For each room ID, build up an array of event IDs which represents cumulative adds/removes
// For each room, map cumulative event IDs to events and return. This may need to a batch SELECT based on event ID // For each room, map cumulative event IDs to events and return. This may need to a batch SELECT based on event ID
// if they aren't in the event ID cache. We don't handle state deletion yet. // if they aren't in the event ID cache. We don't handle state deletion yet.
eventIDToEvent := make(map[string]streamEvent) eventIDToEvent := make(map[string]StreamEvent)
// RoomID => A set (map[string]bool) of state event IDs which are between the two positions // RoomID => A set (map[string]bool) of state event IDs which are between the two positions
stateNeeded := make(map[string]map[string]bool) stateNeeded := make(map[string]map[string]bool)
@ -169,7 +169,7 @@ func (s *outputRoomEventsStatements) selectStateInRange(
} }
stateNeeded[ev.RoomID()] = needSet stateNeeded[ev.RoomID()] = needSet
eventIDToEvent[ev.EventID()] = streamEvent{ eventIDToEvent[ev.EventID()] = StreamEvent{
Event: ev, Event: ev,
streamPosition: types.StreamPosition(streamPos), streamPosition: types.StreamPosition(streamPos),
} }
@ -224,7 +224,7 @@ func (s *outputRoomEventsStatements) insertEvent(
func (s *outputRoomEventsStatements) selectRecentEvents( func (s *outputRoomEventsStatements) selectRecentEvents(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
roomID string, fromPos, toPos types.StreamPosition, limit int, roomID string, fromPos, toPos types.StreamPosition, limit int,
) ([]streamEvent, error) { ) ([]StreamEvent, error) {
stmt := common.TxStmt(txn, s.selectRecentEventsStmt) stmt := common.TxStmt(txn, s.selectRecentEventsStmt)
rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit) rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit)
if err != nil { if err != nil {
@ -248,7 +248,7 @@ func (s *outputRoomEventsStatements) selectRecentEvents(
// from the database. // from the database.
func (s *outputRoomEventsStatements) selectEvents( func (s *outputRoomEventsStatements) selectEvents(
ctx context.Context, txn *sql.Tx, eventIDs []string, ctx context.Context, txn *sql.Tx, eventIDs []string,
) ([]streamEvent, error) { ) ([]StreamEvent, error) {
stmt := common.TxStmt(txn, s.selectEventsStmt) stmt := common.TxStmt(txn, s.selectEventsStmt)
rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs)) rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs))
if err != nil { if err != nil {
@ -258,8 +258,8 @@ func (s *outputRoomEventsStatements) selectEvents(
return rowsToStreamEvents(rows) return rowsToStreamEvents(rows)
} }
func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) { func rowsToStreamEvents(rows *sql.Rows) ([]StreamEvent, error) {
var result []streamEvent var result []StreamEvent
for rows.Next() { for rows.Next() {
var ( var (
streamPos int64 streamPos int64
@ -284,7 +284,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) {
} }
} }
result = append(result, streamEvent{ result = append(result, StreamEvent{
Event: ev, Event: ev,
streamPosition: types.StreamPosition(streamPos), streamPosition: types.StreamPosition(streamPos),
transactionID: transactionID, transactionID: transactionID,

View file

@ -39,8 +39,9 @@ type stateDelta struct {
membershipPos types.StreamPosition membershipPos types.StreamPosition
} }
// Same as gomatrixserverlib.Event but also has the stream position for this event. // StreamEvent is the same as gomatrixserverlib.Event but also has the stream
type streamEvent struct { // position for this event.
type StreamEvent struct {
gomatrixserverlib.Event gomatrixserverlib.Event
streamPosition types.StreamPosition streamPosition types.StreamPosition
transactionID *api.TransactionID transactionID *api.TransactionID
@ -296,7 +297,7 @@ func (d *SyncServerDatabase) CompleteSync(
} }
// TODO: When filters are added, we may need to call this multiple times to get enough events. // TODO: When filters are added, we may need to call this multiple times to get enough events.
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
var recentStreamEvents []streamEvent var recentStreamEvents []StreamEvent
recentStreamEvents, err = d.events.selectRecentEvents( recentStreamEvents, err = d.events.selectRecentEvents(
ctx, txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom, ctx, txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom,
) )
@ -463,9 +464,9 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse(
func (d *SyncServerDatabase) fetchStateEvents( func (d *SyncServerDatabase) fetchStateEvents(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
roomIDToEventIDSet map[string]map[string]bool, roomIDToEventIDSet map[string]map[string]bool,
eventIDToEvent map[string]streamEvent, eventIDToEvent map[string]StreamEvent,
) (map[string][]streamEvent, error) { ) (map[string][]StreamEvent, error) {
stateBetween := make(map[string][]streamEvent) stateBetween := make(map[string][]StreamEvent)
missingEvents := make(map[string][]string) missingEvents := make(map[string][]string)
for roomID, ids := range roomIDToEventIDSet { for roomID, ids := range roomIDToEventIDSet {
events := stateBetween[roomID] events := stateBetween[roomID]
@ -507,7 +508,7 @@ func (d *SyncServerDatabase) fetchStateEvents(
func (d *SyncServerDatabase) fetchMissingStateEvents( func (d *SyncServerDatabase) fetchMissingStateEvents(
ctx context.Context, txn *sql.Tx, eventIDs []string, ctx context.Context, txn *sql.Tx, eventIDs []string,
) ([]streamEvent, error) { ) ([]StreamEvent, error) {
// Fetch from the events table first so we pick up the stream ID for the // Fetch from the events table first so we pick up the stream ID for the
// event. // event.
events, err := d.events.selectEvents(ctx, txn, eventIDs) events, err := d.events.selectEvents(ctx, txn, eventIDs)
@ -583,9 +584,9 @@ func (d *SyncServerDatabase) getStateDeltas(
if err != nil { if err != nil {
return nil, err return nil, err
} }
s := make([]streamEvent, len(allState)) s := make([]StreamEvent, len(allState))
for i := 0; i < len(s); i++ { for i := 0; i < len(s); i++ {
s[i] = streamEvent{Event: allState[i], streamPosition: types.StreamPosition(0)} s[i] = StreamEvent{Event: allState[i], streamPosition: types.StreamPosition(0)}
} }
state[roomID] = s state[roomID] = s
continue // we'll add this room in when we do joined rooms continue // we'll add this room in when we do joined rooms
@ -618,10 +619,10 @@ func (d *SyncServerDatabase) getStateDeltas(
return deltas, nil return deltas, nil
} }
// streamEventsToEvents converts streamEvent to Event. If device is non-nil and // streamEventsToEvents converts StreamEvent to Event. If device is non-nil and
// matches the streamevent.transactionID device then the transaction ID gets // matches the streamevent.transactionID device then the transaction ID gets
// added to the unsigned section of the output event. // added to the unsigned section of the output event.
func streamEventsToEvents(device *authtypes.Device, in []streamEvent) []gomatrixserverlib.Event { func streamEventsToEvents(device *authtypes.Device, in []StreamEvent) []gomatrixserverlib.Event {
out := make([]gomatrixserverlib.Event, len(in)) out := make([]gomatrixserverlib.Event, len(in))
for i := 0; i < len(in); i++ { for i := 0; i < len(in); i++ {
out[i] = in[i].Event out[i] = in[i].Event