Pass pointers to events — reloaded (#1583)

* Pass events as pointers

* Fix lint errors

* Update gomatrixserverlib

* Update gomatrixserverlib

* Update to matrix-org/gomatrixserverlib#240
This commit is contained in:
Neil Alexander 2020-11-16 15:44:53 +00:00 committed by GitHub
parent d8b526b603
commit 20a01bceb2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
67 changed files with 310 additions and 323 deletions

View file

@ -118,7 +118,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
func (s *OutputRoomEventConsumer) onRedactEvent(
ctx context.Context, msg api.OutputRedactedEvent,
) error {
err := s.db.RedactEvent(ctx, msg.RedactedEventID, &msg.RedactedBecause)
err := s.db.RedactEvent(ctx, msg.RedactedEventID, msg.RedactedBecause)
if err != nil {
log.WithError(err).Error("RedactEvent error'd")
return err
@ -156,7 +156,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
pduPos, err := s.db.WriteEvent(
ctx,
&ev,
ev,
addsStateEvents,
msg.AddsStateEventIDs,
msg.RemovesStateEventIDs,
@ -174,12 +174,12 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
return nil
}
if pduPos, err = s.notifyJoinedPeeks(ctx, &ev, pduPos); err != nil {
if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil {
logrus.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
return err
}
s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0, nil))
s.notifier.OnNewEvent(ev, "", nil, types.NewStreamToken(pduPos, 0, nil))
return nil
}
@ -197,8 +197,8 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
// from confusing clients into thinking they've joined/left rooms.
pduPos, err := s.db.WriteEvent(
ctx,
&ev,
[]gomatrixserverlib.HeaderedEvent{},
ev,
[]*gomatrixserverlib.HeaderedEvent{},
[]string{}, // adds no state
[]string{}, // removes no state
nil, // no transaction
@ -213,12 +213,12 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
return nil
}
if pduPos, err = s.notifyJoinedPeeks(ctx, &ev, pduPos); err != nil {
if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil {
logrus.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
return err
}
s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0, nil))
s.notifier.OnNewEvent(ev, "", nil, types.NewStreamToken(pduPos, 0, nil))
return nil
}
@ -267,7 +267,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
}).Panicf("roomserver output log: write invite failure")
return nil
}
s.notifier.OnNewEvent(&msg.Event, "", nil, types.NewStreamToken(pduPos, 0, nil))
s.notifier.OnNewEvent(msg.Event, "", nil, types.NewStreamToken(pduPos, 0, nil))
return nil
}
@ -309,7 +309,7 @@ func (s *OutputRoomEventConsumer) onNewPeek(
return nil
}
func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.HeaderedEvent) (gomatrixserverlib.HeaderedEvent, error) {
func (s *OutputRoomEventConsumer) updateStateEvent(event *gomatrixserverlib.HeaderedEvent) (*gomatrixserverlib.HeaderedEvent, error) {
if event.StateKey() == nil {
return event, nil
}

View file

@ -235,7 +235,7 @@ func (r *messagesReq) retrieveEvents() (
return
}
var events []gomatrixserverlib.HeaderedEvent
var events []*gomatrixserverlib.HeaderedEvent
util.GetLogger(r.ctx).WithField("start", start).WithField("end", end).Infof("Fetched %d events locally", len(streamEvents))
// There can be two reasons for streamEvents to be empty: either we've
@ -259,8 +259,8 @@ func (r *messagesReq) retrieveEvents() (
// Sort the events to ensure we send them in the right order.
if r.backwardOrdering {
// This reverses the array from old->new to new->old
reversed := func(in []gomatrixserverlib.HeaderedEvent) []gomatrixserverlib.HeaderedEvent {
out := make([]gomatrixserverlib.HeaderedEvent, len(in))
reversed := func(in []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent {
out := make([]*gomatrixserverlib.HeaderedEvent, len(in))
for i := 0; i < len(in); i++ {
out[i] = in[len(in)-i-1]
}
@ -287,7 +287,7 @@ func (r *messagesReq) retrieveEvents() (
}
// nolint:gocyclo
func (r *messagesReq) filterHistoryVisible(events []gomatrixserverlib.HeaderedEvent) []gomatrixserverlib.HeaderedEvent {
func (r *messagesReq) filterHistoryVisible(events []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent {
// TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the
// user shouldn't see, we check the recent events and remove any prior to the join event of the user
// which is equiv to history_visibility: joined
@ -302,8 +302,8 @@ func (r *messagesReq) filterHistoryVisible(events []gomatrixserverlib.HeaderedEv
}
}
var result []gomatrixserverlib.HeaderedEvent
var eventsToCheck []gomatrixserverlib.HeaderedEvent
var result []*gomatrixserverlib.HeaderedEvent
var eventsToCheck []*gomatrixserverlib.HeaderedEvent
if joinEventIndex != -1 {
if r.backwardOrdering {
result = events[:joinEventIndex+1]
@ -313,7 +313,7 @@ func (r *messagesReq) filterHistoryVisible(events []gomatrixserverlib.HeaderedEv
eventsToCheck = append(eventsToCheck, result[len(result)-1])
}
} else {
eventsToCheck = []gomatrixserverlib.HeaderedEvent{events[0], events[len(events)-1]}
eventsToCheck = []*gomatrixserverlib.HeaderedEvent{events[0], events[len(events)-1]}
result = events
}
// make sure the user was in the room for both the earliest and latest events, we need this because
@ -337,9 +337,9 @@ func (r *messagesReq) filterHistoryVisible(events []gomatrixserverlib.HeaderedEv
for i := range queryRes.StateEvents {
switch queryRes.StateEvents[i].Type() {
case gomatrixserverlib.MRoomMember:
membershipEvent = &queryRes.StateEvents[i]
membershipEvent = queryRes.StateEvents[i]
case gomatrixserverlib.MRoomHistoryVisibility:
hisVisEvent = &queryRes.StateEvents[i]
hisVisEvent = queryRes.StateEvents[i]
}
}
if hisVisEvent == nil {
@ -365,12 +365,12 @@ func (r *messagesReq) filterHistoryVisible(events []gomatrixserverlib.HeaderedEv
}
if !wasJoined {
util.GetLogger(r.ctx).WithField("num_events", len(events)).Warnf("%s was not joined to room during these events, omitting them", r.device.UserID)
return []gomatrixserverlib.HeaderedEvent{}
return []*gomatrixserverlib.HeaderedEvent{}
}
return result
}
func (r *messagesReq) getStartEnd(events []gomatrixserverlib.HeaderedEvent) (start, end types.TopologyToken, err error) {
func (r *messagesReq) getStartEnd(events []*gomatrixserverlib.HeaderedEvent) (start, end types.TopologyToken, err error) {
start, err = r.db.EventPositionInTopology(
r.ctx, events[0].EventID(),
)
@ -410,7 +410,7 @@ func (r *messagesReq) getStartEnd(events []gomatrixserverlib.HeaderedEvent) (sta
// Returns an error if there was an issue talking with the database or
// backfilling.
func (r *messagesReq) handleEmptyEventsSlice() (
events []gomatrixserverlib.HeaderedEvent, err error,
events []*gomatrixserverlib.HeaderedEvent, err error,
) {
backwardExtremities, err := r.db.BackwardExtremitiesForRoom(r.ctx, r.roomID)
@ -424,7 +424,7 @@ func (r *messagesReq) handleEmptyEventsSlice() (
} else {
// If not, it means the slice was empty because we reached the room's
// creation, so return an empty slice.
events = []gomatrixserverlib.HeaderedEvent{}
events = []*gomatrixserverlib.HeaderedEvent{}
}
return
@ -436,7 +436,7 @@ func (r *messagesReq) handleEmptyEventsSlice() (
// through backfilling if needed.
// Returns an error if there was an issue while backfilling.
func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent) (
events []gomatrixserverlib.HeaderedEvent, err error,
events []*gomatrixserverlib.HeaderedEvent, err error,
) {
// Check if we have enough events.
isSetLargeEnough := len(streamEvents) >= r.limit
@ -464,7 +464,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
// Backfill is needed if we've reached a backward extremity and need more
// events. It's only needed if the direction is backward.
if len(backwardExtremities) > 0 && !isSetLargeEnough && r.backwardOrdering {
var pdus []gomatrixserverlib.HeaderedEvent
var pdus []*gomatrixserverlib.HeaderedEvent
// Only ask the remote server for enough events to reach the limit.
pdus, err = r.backfill(r.roomID, backwardExtremities, r.limit-len(streamEvents))
if err != nil {
@ -482,7 +482,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
return
}
type eventsByDepth []gomatrixserverlib.HeaderedEvent
type eventsByDepth []*gomatrixserverlib.HeaderedEvent
func (e eventsByDepth) Len() int {
return len(e)
@ -503,7 +503,7 @@ func (e eventsByDepth) Less(i, j int) bool {
// event, or if there is no remote homeserver to contact.
// Returns an error if there was an issue with retrieving the list of servers in
// the room or sending the request.
func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) {
func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]string, limit int) ([]*gomatrixserverlib.HeaderedEvent, error) {
var res api.PerformBackfillResponse
err := r.rsAPI.PerformBackfill(context.Background(), &api.PerformBackfillRequest{
RoomID: roomID,
@ -531,8 +531,8 @@ func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]
for i := range res.Events {
_, err = r.db.WriteEvent(
context.Background(),
&res.Events[i],
[]gomatrixserverlib.HeaderedEvent{},
res.Events[i],
[]*gomatrixserverlib.HeaderedEvent{},
[]string{},
[]string{},
nil, true,

View file

@ -39,11 +39,11 @@ type Database interface {
// If an event is not found in the database then it will be omitted from the list.
// Returns an error if there was a problem talking with the database.
// Does not include any transaction IDs in the returned events.
Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error)
Events(ctx context.Context, eventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error)
// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races
// when generating the sync stream position for this event. Returns the sync stream position for the inserted event.
// Returns an error if there was a problem inserting this event.
WriteEvent(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, addStateEvents []gomatrixserverlib.HeaderedEvent,
WriteEvent(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, addStateEvents []*gomatrixserverlib.HeaderedEvent,
addStateEventIDs []string, removeStateEventIDs []string, transactionID *api.TransactionID, excludeFromSync bool) (types.StreamPosition, error)
// PurgeRoomState completely purges room state from the sync API. This is done when
// receiving an output event that completely resets the state.
@ -55,7 +55,7 @@ type Database interface {
// GetStateEventsForRoom fetches the state events for a given room.
// Returns an empty slice if no state events could be found for this room.
// Returns an error if there was an issue with the retrieval.
GetStateEventsForRoom(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) (stateEvents []gomatrixserverlib.HeaderedEvent, err error)
GetStateEventsForRoom(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) (stateEvents []*gomatrixserverlib.HeaderedEvent, err error)
// SyncPosition returns the latest positions for syncing.
SyncPosition(ctx context.Context) (types.StreamingToken, error)
// IncrementalSync returns all the data needed in order to create an incremental
@ -84,7 +84,7 @@ type Database interface {
// AddInviteEvent stores a new invite event for a user.
// If the invite was successfully stored this returns the stream ID it was stored at.
// Returns an error if there was a problem communicating with the database.
AddInviteEvent(ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent) (types.StreamPosition, error)
AddInviteEvent(ctx context.Context, inviteEvent *gomatrixserverlib.HeaderedEvent) (types.StreamPosition, error)
// RetireInviteEvent removes an old invite event from the database. Returns the new position of the retired invite.
// Returns an error if there was a problem communicating with the database.
RetireInviteEvent(ctx context.Context, inviteEventID string) (types.StreamPosition, error)
@ -116,7 +116,7 @@ type Database interface {
// StreamEventsToEvents converts streamEvent to Event. If device is non-nil and
// matches the streamevent.transactionID device then the transaction ID gets
// added to the unsigned section of the output event.
StreamEventsToEvents(device *userapi.Device, in []types.StreamEvent) []gomatrixserverlib.HeaderedEvent
StreamEventsToEvents(device *userapi.Device, in []types.StreamEvent) []*gomatrixserverlib.HeaderedEvent
// AddSendToDevice increases the EDU position in the cache and returns the stream position.
AddSendToDevice() types.StreamPosition
// SendToDeviceUpdatesForSync returns a list of send-to-device updates. It returns three lists:

View file

@ -195,7 +195,7 @@ func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(
func (s *currentRoomStateStatements) SelectCurrentState(
ctx context.Context, txn *sql.Tx, roomID string,
stateFilter *gomatrixserverlib.StateFilter,
) ([]gomatrixserverlib.HeaderedEvent, error) {
) ([]*gomatrixserverlib.HeaderedEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectCurrentStateStmt)
rows, err := stmt.QueryContext(ctx, roomID,
pq.StringArray(stateFilter.Senders),
@ -231,7 +231,7 @@ func (s *currentRoomStateStatements) DeleteRoomStateForRoom(
func (s *currentRoomStateStatements) UpsertRoomState(
ctx context.Context, txn *sql.Tx,
event gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition,
event *gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition,
) error {
// Parse content as JSON and search for an "url" key
containsURL := false
@ -275,8 +275,8 @@ func (s *currentRoomStateStatements) SelectEventsWithEventIDs(
return rowsToStreamEvents(rows)
}
func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.HeaderedEvent, error) {
result := []gomatrixserverlib.HeaderedEvent{}
func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) {
result := []*gomatrixserverlib.HeaderedEvent{}
for rows.Next() {
var eventBytes []byte
if err := rows.Scan(&eventBytes); err != nil {
@ -287,7 +287,7 @@ func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.HeaderedEvent, error) {
if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
result = append(result, ev)
result = append(result, &ev)
}
return result, rows.Err()
}

View file

@ -91,7 +91,7 @@ func NewPostgresInvitesTable(db *sql.DB) (tables.Invites, error) {
}
func (s *inviteEventsStatements) InsertInviteEvent(
ctx context.Context, txn *sql.Tx, inviteEvent gomatrixserverlib.HeaderedEvent,
ctx context.Context, txn *sql.Tx, inviteEvent *gomatrixserverlib.HeaderedEvent,
) (streamPos types.StreamPosition, err error) {
var headeredJSON []byte
headeredJSON, err = json.Marshal(inviteEvent)
@ -121,15 +121,15 @@ func (s *inviteEventsStatements) DeleteInviteEvent(
// active invites for the target user ID in the supplied range.
func (s *inviteEventsStatements) SelectInviteEventsInRange(
ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range,
) (map[string]gomatrixserverlib.HeaderedEvent, map[string]gomatrixserverlib.HeaderedEvent, error) {
) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectInviteEventsInRangeStmt)
rows, err := stmt.QueryContext(ctx, targetUserID, r.Low(), r.High())
if err != nil {
return nil, nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectInviteEventsInRange: rows.close() failed")
result := map[string]gomatrixserverlib.HeaderedEvent{}
retired := map[string]gomatrixserverlib.HeaderedEvent{}
result := map[string]*gomatrixserverlib.HeaderedEvent{}
retired := map[string]*gomatrixserverlib.HeaderedEvent{}
for rows.Next() {
var (
roomID string
@ -148,7 +148,7 @@ func (s *inviteEventsStatements) SelectInviteEventsInRange(
continue
}
var event gomatrixserverlib.HeaderedEvent
var event *gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(eventJSON, &event); err != nil {
return nil, nil, err
}

View file

@ -247,7 +247,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
stateNeeded[ev.RoomID()] = needSet
eventIDToEvent[ev.EventID()] = types.StreamEvent{
HeaderedEvent: ev,
HeaderedEvent: &ev,
StreamPosition: streamPos,
ExcludeFromSync: excludeFromSync,
}
@ -437,7 +437,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
}
result = append(result, types.StreamEvent{
HeaderedEvent: ev,
HeaderedEvent: &ev,
StreamPosition: streamPos,
TransactionID: transactionID,
ExcludeFromSync: excludeFromSync,

View file

@ -57,7 +57,7 @@ type Database struct {
// If an event is not found in the database then it will be omitted from the list.
// Returns an error if there was a problem talking with the database.
// Does not include any transaction IDs in the returned events.
func (d *Database) Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error) {
func (d *Database) Events(ctx context.Context, eventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) {
streamEvents, err := d.OutputEvents.SelectEvents(ctx, nil, eventIDs)
if err != nil {
return nil, err
@ -135,7 +135,7 @@ func (d *Database) GetStateEvent(
func (d *Database) GetStateEventsForRoom(
ctx context.Context, roomID string, stateFilter *gomatrixserverlib.StateFilter,
) (stateEvents []gomatrixserverlib.HeaderedEvent, err error) {
) (stateEvents []*gomatrixserverlib.HeaderedEvent, err error) {
stateEvents, err = d.CurrentRoomState.SelectCurrentState(ctx, nil, roomID, stateFilter)
return
}
@ -144,7 +144,7 @@ func (d *Database) GetStateEventsForRoom(
// If the invite was successfully stored this returns the stream ID it was stored at.
// Returns an error if there was a problem communicating with the database.
func (d *Database) AddInviteEvent(
ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent,
ctx context.Context, inviteEvent *gomatrixserverlib.HeaderedEvent,
) (sp types.StreamPosition, err error) {
_ = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
sp, err = d.Invites.InsertInviteEvent(ctx, txn, inviteEvent)
@ -223,8 +223,8 @@ func (d *Database) UpsertAccountData(
return
}
func (d *Database) StreamEventsToEvents(device *userapi.Device, in []types.StreamEvent) []gomatrixserverlib.HeaderedEvent {
out := make([]gomatrixserverlib.HeaderedEvent, len(in))
func (d *Database) StreamEventsToEvents(device *userapi.Device, in []types.StreamEvent) []*gomatrixserverlib.HeaderedEvent {
out := make([]*gomatrixserverlib.HeaderedEvent, len(in))
for i := 0; i < len(in); i++ {
out[i] = in[i].HeaderedEvent
if device != nil && in[i].TransactionID != nil {
@ -295,7 +295,7 @@ func (d *Database) PurgeRoomState(
func (d *Database) WriteEvent(
ctx context.Context,
ev *gomatrixserverlib.HeaderedEvent,
addStateEvents []gomatrixserverlib.HeaderedEvent,
addStateEvents []*gomatrixserverlib.HeaderedEvent,
addStateEventIDs, removeStateEventIDs []string,
transactionID *api.TransactionID, excludeFromSync bool,
) (pduPosition types.StreamPosition, returnErr error) {
@ -332,7 +332,7 @@ func (d *Database) WriteEvent(
func (d *Database) updateRoomState(
ctx context.Context, txn *sql.Tx,
removedEventIDs []string,
addedEvents []gomatrixserverlib.HeaderedEvent,
addedEvents []*gomatrixserverlib.HeaderedEvent,
pduPosition types.StreamPosition,
) error {
// remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
@ -709,14 +709,14 @@ func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, reda
}
eventToRedact := redactedEvents[0].Unwrap()
redactionEvent := redactedBecause.Unwrap()
ev, err := eventutil.RedactEvent(&redactionEvent, &eventToRedact)
ev, err := eventutil.RedactEvent(redactionEvent, eventToRedact)
if err != nil {
return err
}
newEvent := ev.Headered(redactedBecause.RoomVersion)
err = d.Writer.Do(nil, nil, func(txn *sql.Tx) error {
return d.OutputEvents.UpdateEventJSON(ctx, &newEvent)
return d.OutputEvents.UpdateEventJSON(ctx, newEvent)
})
return err
}
@ -809,7 +809,7 @@ func (d *Database) getJoinResponseForCompleteSync(
stateFilter *gomatrixserverlib.StateFilter,
numRecentEventsPerRoom int, device userapi.Device,
) (jr *types.JoinResponse, err error) {
var stateEvents []gomatrixserverlib.HeaderedEvent
var stateEvents []*gomatrixserverlib.HeaderedEvent
stateEvents, err = d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilter)
if err != nil {
return
@ -1180,7 +1180,7 @@ func (d *Database) getStateDeltas(
// dupe join events will result in the entire room state coming down to the client again. This is added in
// the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to
// the timeline.
if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" {
if membership := getMembershipFromEvent(ev.Event, userID); membership != "" {
if membership == gomatrixserverlib.Join {
// send full room state down instead of a delta
var s []types.StreamEvent
@ -1264,7 +1264,7 @@ func (d *Database) getStateDeltasForFullStateSync(
for roomID, stateStreamEvents := range state {
for _, ev := range stateStreamEvents {
if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" {
if membership := getMembershipFromEvent(ev.Event, userID); membership != "" {
if membership != gomatrixserverlib.Join { // We've already added full state for all joined rooms above.
deltas[roomID] = stateDelta{
membership: membership,
@ -1426,7 +1426,7 @@ func (d *Database) CleanSendToDeviceUpdates(
// There may be some overlap where events in stateEvents are already in recentEvents, so filter
// them out so we don't include them twice in the /sync response. They should be in recentEvents
// only, so clients get to the correct state once they have rolled forward.
func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.HeaderedEvent) []gomatrixserverlib.HeaderedEvent {
func removeDuplicates(stateEvents, recentEvents []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent {
for _, recentEv := range recentEvents {
if recentEv.StateKey() == nil {
continue // not a state event
@ -1463,7 +1463,7 @@ func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string {
type stateDelta struct {
roomID string
stateEvents []gomatrixserverlib.HeaderedEvent
stateEvents []*gomatrixserverlib.HeaderedEvent
membership string
// The PDU stream position of the latest membership event for this user, if applicable.
// Can be 0 if there is no membership event in this delta.

View file

@ -184,7 +184,7 @@ func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(
func (s *currentRoomStateStatements) SelectCurrentState(
ctx context.Context, txn *sql.Tx, roomID string,
stateFilterPart *gomatrixserverlib.StateFilter,
) ([]gomatrixserverlib.HeaderedEvent, error) {
) ([]*gomatrixserverlib.HeaderedEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectCurrentStateStmt)
rows, err := stmt.QueryContext(ctx, roomID,
nil, // FIXME: pq.StringArray(stateFilterPart.Senders),
@ -220,7 +220,7 @@ func (s *currentRoomStateStatements) DeleteRoomStateForRoom(
func (s *currentRoomStateStatements) UpsertRoomState(
ctx context.Context, txn *sql.Tx,
event gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition,
event *gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition,
) error {
// Parse content as JSON and search for an "url" key
containsURL := false
@ -286,8 +286,8 @@ func (s *currentRoomStateStatements) SelectEventsWithEventIDs(
return res, nil
}
func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.HeaderedEvent, error) {
result := []gomatrixserverlib.HeaderedEvent{}
func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) {
result := []*gomatrixserverlib.HeaderedEvent{}
for rows.Next() {
var eventBytes []byte
if err := rows.Scan(&eventBytes); err != nil {
@ -298,7 +298,7 @@ func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.HeaderedEvent, error) {
if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
result = append(result, ev)
result = append(result, &ev)
}
return result, nil
}

View file

@ -91,7 +91,7 @@ func NewSqliteInvitesTable(db *sql.DB, streamID *streamIDStatements) (tables.Inv
}
func (s *inviteEventsStatements) InsertInviteEvent(
ctx context.Context, txn *sql.Tx, inviteEvent gomatrixserverlib.HeaderedEvent,
ctx context.Context, txn *sql.Tx, inviteEvent *gomatrixserverlib.HeaderedEvent,
) (streamPos types.StreamPosition, err error) {
streamPos, err = s.streamIDStatements.nextStreamID(ctx, txn)
if err != nil {
@ -132,15 +132,15 @@ func (s *inviteEventsStatements) DeleteInviteEvent(
// active invites for the target user ID in the supplied range.
func (s *inviteEventsStatements) SelectInviteEventsInRange(
ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range,
) (map[string]gomatrixserverlib.HeaderedEvent, map[string]gomatrixserverlib.HeaderedEvent, error) {
) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectInviteEventsInRangeStmt)
rows, err := stmt.QueryContext(ctx, targetUserID, r.Low(), r.High())
if err != nil {
return nil, nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectInviteEventsInRange: rows.close() failed")
result := map[string]gomatrixserverlib.HeaderedEvent{}
retired := map[string]gomatrixserverlib.HeaderedEvent{}
result := map[string]*gomatrixserverlib.HeaderedEvent{}
retired := map[string]*gomatrixserverlib.HeaderedEvent{}
for rows.Next() {
var (
roomID string
@ -159,7 +159,7 @@ func (s *inviteEventsStatements) SelectInviteEventsInRange(
continue
}
var event gomatrixserverlib.HeaderedEvent
var event *gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(eventJSON, &event); err != nil {
return nil, nil, err
}

View file

@ -246,7 +246,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
stateNeeded[ev.RoomID()] = needSet
eventIDToEvent[ev.EventID()] = types.StreamEvent{
HeaderedEvent: ev,
HeaderedEvent: &ev,
StreamPosition: streamPos,
ExcludeFromSync: excludeFromSync,
}
@ -452,7 +452,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
}
result = append(result, types.StreamEvent{
HeaderedEvent: ev,
HeaderedEvent: &ev,
StreamPosition: streamPos,
TransactionID: transactionID,
ExcludeFromSync: excludeFromSync,

View file

@ -37,7 +37,7 @@ var (
})
)
func MustCreateEvent(t *testing.T, roomID string, prevs []gomatrixserverlib.HeaderedEvent, b *gomatrixserverlib.EventBuilder) gomatrixserverlib.HeaderedEvent {
func MustCreateEvent(t *testing.T, roomID string, prevs []*gomatrixserverlib.HeaderedEvent, b *gomatrixserverlib.EventBuilder) *gomatrixserverlib.HeaderedEvent {
b.RoomID = roomID
if prevs != nil {
prevIDs := make([]string, len(prevs))
@ -70,8 +70,8 @@ func MustCreateDatabase(t *testing.T) storage.Database {
}
// Create a list of events which include a create event, join event and some messages.
func SimpleRoom(t *testing.T, roomID, userA, userB string) (msgs []gomatrixserverlib.HeaderedEvent, state []gomatrixserverlib.HeaderedEvent) {
var events []gomatrixserverlib.HeaderedEvent
func SimpleRoom(t *testing.T, roomID, userA, userB string) (msgs []*gomatrixserverlib.HeaderedEvent, state []*gomatrixserverlib.HeaderedEvent) {
var events []*gomatrixserverlib.HeaderedEvent
events = append(events, MustCreateEvent(t, roomID, nil, &gomatrixserverlib.EventBuilder{
Content: []byte(fmt.Sprintf(`{"room_version":"4","creator":"%s"}`, userA)),
Type: "m.room.create",
@ -80,7 +80,7 @@ func SimpleRoom(t *testing.T, roomID, userA, userB string) (msgs []gomatrixserve
Depth: int64(len(events) + 1),
}))
state = append(state, events[len(events)-1])
events = append(events, MustCreateEvent(t, roomID, []gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
events = append(events, MustCreateEvent(t, roomID, []*gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
Content: []byte(`{"membership":"join"}`),
Type: "m.room.member",
StateKey: &userA,
@ -89,14 +89,14 @@ func SimpleRoom(t *testing.T, roomID, userA, userB string) (msgs []gomatrixserve
}))
state = append(state, events[len(events)-1])
for i := 0; i < 10; i++ {
events = append(events, MustCreateEvent(t, roomID, []gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
events = append(events, MustCreateEvent(t, roomID, []*gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
Content: []byte(fmt.Sprintf(`{"body":"Message A %d"}`, i+1)),
Type: "m.room.message",
Sender: userA,
Depth: int64(len(events) + 1),
}))
}
events = append(events, MustCreateEvent(t, roomID, []gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
events = append(events, MustCreateEvent(t, roomID, []*gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
Content: []byte(`{"membership":"join"}`),
Type: "m.room.member",
StateKey: &userB,
@ -105,7 +105,7 @@ func SimpleRoom(t *testing.T, roomID, userA, userB string) (msgs []gomatrixserve
}))
state = append(state, events[len(events)-1])
for i := 0; i < 10; i++ {
events = append(events, MustCreateEvent(t, roomID, []gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
events = append(events, MustCreateEvent(t, roomID, []*gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
Content: []byte(fmt.Sprintf(`{"body":"Message B %d"}`, i+1)),
Type: "m.room.message",
Sender: userB,
@ -116,16 +116,16 @@ func SimpleRoom(t *testing.T, roomID, userA, userB string) (msgs []gomatrixserve
return events, state
}
func MustWriteEvents(t *testing.T, db storage.Database, events []gomatrixserverlib.HeaderedEvent) (positions []types.StreamPosition) {
func MustWriteEvents(t *testing.T, db storage.Database, events []*gomatrixserverlib.HeaderedEvent) (positions []types.StreamPosition) {
for _, ev := range events {
var addStateEvents []gomatrixserverlib.HeaderedEvent
var addStateEvents []*gomatrixserverlib.HeaderedEvent
var addStateEventIDs []string
var removeStateEventIDs []string
if ev.StateKey() != nil {
addStateEvents = append(addStateEvents, ev)
addStateEventIDs = append(addStateEventIDs, ev.EventID())
}
pos, err := db.WriteEvent(ctx, &ev, addStateEvents, addStateEventIDs, removeStateEventIDs, nil, false)
pos, err := db.WriteEvent(ctx, ev, addStateEvents, addStateEventIDs, removeStateEventIDs, nil, false)
if err != nil {
t.Fatalf("WriteEvent failed: %s", err)
}
@ -156,8 +156,8 @@ func TestSyncResponse(t *testing.T) {
testCases := []struct {
Name string
DoSync func() (*types.Response, error)
WantTimeline []gomatrixserverlib.HeaderedEvent
WantState []gomatrixserverlib.HeaderedEvent
WantTimeline []*gomatrixserverlib.HeaderedEvent
WantState []*gomatrixserverlib.HeaderedEvent
}{
// The purpose of this test is to make sure that incremental syncs are including up to the latest events.
// It's a basic sanity test that sync works. It creates a `since` token that is on the penultimate event.
@ -339,7 +339,7 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) {
t.Parallel()
db := MustCreateDatabase(t)
var events []gomatrixserverlib.HeaderedEvent
var events []*gomatrixserverlib.HeaderedEvent
events = append(events, MustCreateEvent(t, testRoomID, nil, &gomatrixserverlib.EventBuilder{
Content: []byte(fmt.Sprintf(`{"room_version":"4","creator":"%s"}`, testUserIDA)),
Type: "m.room.create",
@ -347,7 +347,7 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) {
Sender: testUserIDA,
Depth: int64(len(events) + 1),
}))
events = append(events, MustCreateEvent(t, testRoomID, []gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
events = append(events, MustCreateEvent(t, testRoomID, []*gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
Content: []byte(`{"membership":"join"}`),
Type: "m.room.member",
StateKey: &testUserIDA,
@ -355,7 +355,7 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) {
Depth: int64(len(events) + 1),
}))
// fork the dag into three, same prev_events and depth
parent := []gomatrixserverlib.HeaderedEvent{events[len(events)-1]}
parent := []*gomatrixserverlib.HeaderedEvent{events[len(events)-1]}
depth := int64(len(events) + 1)
for i := 0; i < 3; i++ {
events = append(events, MustCreateEvent(t, testRoomID, parent, &gomatrixserverlib.EventBuilder{
@ -388,7 +388,7 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) {
Name string
From types.TopologyToken
Limit int
Wants []gomatrixserverlib.HeaderedEvent
Wants []*gomatrixserverlib.HeaderedEvent
}{
{
Name: "Pagination over the whole fork",
@ -429,7 +429,7 @@ func TestGetEventsInTopologicalRangeMultiRoom(t *testing.T) {
t.Parallel()
db := MustCreateDatabase(t)
makeEvents := func(roomID string) (events []gomatrixserverlib.HeaderedEvent) {
makeEvents := func(roomID string) (events []*gomatrixserverlib.HeaderedEvent) {
events = append(events, MustCreateEvent(t, roomID, nil, &gomatrixserverlib.EventBuilder{
Content: []byte(fmt.Sprintf(`{"room_version":"4","creator":"%s"}`, testUserIDA)),
Type: "m.room.create",
@ -437,7 +437,7 @@ func TestGetEventsInTopologicalRangeMultiRoom(t *testing.T) {
Sender: testUserIDA,
Depth: int64(len(events) + 1),
}))
events = append(events, MustCreateEvent(t, roomID, []gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
events = append(events, MustCreateEvent(t, roomID, []*gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
Content: []byte(`{"membership":"join"}`),
Type: "m.room.member",
StateKey: &testUserIDA,
@ -483,14 +483,14 @@ func TestGetEventsInRangeWithEventsInsertedLikeBackfill(t *testing.T) {
// "federation" join
userC := fmt.Sprintf("@radiance:%s", testOrigin)
joinEvent := MustCreateEvent(t, testRoomID, []gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
joinEvent := MustCreateEvent(t, testRoomID, []*gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
Content: []byte(`{"membership":"join"}`),
Type: "m.room.member",
StateKey: &userC,
Sender: userC,
Depth: int64(len(events) + 1),
})
MustWriteEvents(t, db, []gomatrixserverlib.HeaderedEvent{joinEvent})
MustWriteEvents(t, db, []*gomatrixserverlib.HeaderedEvent{joinEvent})
// Sync will return this for the prev_batch
from := topologyTokenBefore(t, db, joinEvent.EventID())
@ -627,7 +627,7 @@ func TestInviteBehaviour(t *testing.T) {
StateKey: &testUserIDA,
Sender: "@inviteUser2:somewhere",
})
for _, ev := range []gomatrixserverlib.HeaderedEvent{inviteEvent1, inviteEvent2} {
for _, ev := range []*gomatrixserverlib.HeaderedEvent{inviteEvent1, inviteEvent2} {
_, err := db.AddInviteEvent(ctx, ev)
if err != nil {
t.Fatalf("Failed to AddInviteEvent: %s", err)
@ -688,7 +688,7 @@ func assertInvitedToRooms(t *testing.T, res *types.Response, roomIDs []string) {
}
}
func assertEventsEqual(t *testing.T, msg string, checkRoomID bool, gots []gomatrixserverlib.ClientEvent, wants []gomatrixserverlib.HeaderedEvent) {
func assertEventsEqual(t *testing.T, msg string, checkRoomID bool, gots []gomatrixserverlib.ClientEvent, wants []*gomatrixserverlib.HeaderedEvent) {
t.Helper()
if len(gots) != len(wants) {
t.Fatalf("%s response returned %d events, want %d", msg, len(gots), len(wants))
@ -738,8 +738,8 @@ func topologyTokenBefore(t *testing.T, db storage.Database, eventID string) *typ
return &tok
}
func reversed(in []gomatrixserverlib.HeaderedEvent) []gomatrixserverlib.HeaderedEvent {
out := make([]gomatrixserverlib.HeaderedEvent, len(in))
func reversed(in []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent {
out := make([]*gomatrixserverlib.HeaderedEvent, len(in))
for i := 0; i < len(in); i++ {
out[i] = in[len(in)-i-1]
}

View file

@ -32,11 +32,11 @@ type AccountData interface {
}
type Invites interface {
InsertInviteEvent(ctx context.Context, txn *sql.Tx, inviteEvent gomatrixserverlib.HeaderedEvent) (streamPos types.StreamPosition, err error)
InsertInviteEvent(ctx context.Context, txn *sql.Tx, inviteEvent *gomatrixserverlib.HeaderedEvent) (streamPos types.StreamPosition, err error)
DeleteInviteEvent(ctx context.Context, txn *sql.Tx, inviteEventID string) (types.StreamPosition, error)
// SelectInviteEventsInRange returns a map of room ID to invite events. If multiple invite/retired invites exist in the given range, return the latest value
// for the room.
SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (invites map[string]gomatrixserverlib.HeaderedEvent, retired map[string]gomatrixserverlib.HeaderedEvent, err error)
SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (invites map[string]*gomatrixserverlib.HeaderedEvent, retired map[string]*gomatrixserverlib.HeaderedEvent, err error)
SelectMaxInviteID(ctx context.Context, txn *sql.Tx) (id int64, err error)
}
@ -87,11 +87,11 @@ type Topology interface {
type CurrentRoomState interface {
SelectStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error)
SelectEventsWithEventIDs(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error)
UpsertRoomState(ctx context.Context, txn *sql.Tx, event gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition) error
UpsertRoomState(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition) error
DeleteRoomStateByEventID(ctx context.Context, txn *sql.Tx, eventID string) error
DeleteRoomStateForRoom(ctx context.Context, txn *sql.Tx, roomID string) error
// SelectCurrentState returns all the current state events for the given room.
SelectCurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilter *gomatrixserverlib.StateFilter) ([]gomatrixserverlib.HeaderedEvent, error)
SelectCurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilter *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
SelectRoomIDsWithMembership(ctx context.Context, txn *sql.Tx, userID string, membership string) ([]string, error)
// SelectJoinedUsers returns a map of room ID to a list of joined user IDs.

View file

@ -59,7 +59,7 @@ func (p *LogPosition) IsAfter(lp *LogPosition) bool {
// StreamEvent is the same as gomatrixserverlib.Event but also has the PDU stream position for this event.
type StreamEvent struct {
gomatrixserverlib.HeaderedEvent
*gomatrixserverlib.HeaderedEvent
StreamPosition StreamPosition
TransactionID *api.TransactionID
ExcludeFromSync bool
@ -471,7 +471,7 @@ type InviteResponse struct {
}
// NewInviteResponse creates an empty response with initialised arrays.
func NewInviteResponse(event gomatrixserverlib.HeaderedEvent) *InviteResponse {
func NewInviteResponse(event *gomatrixserverlib.HeaderedEvent) *InviteResponse {
res := InviteResponse{}
res.InviteState.Events = []json.RawMessage{}