Use SenderID Type (#3105)

This commit is contained in:
devonh 2023-06-07 17:14:35 +00:00 committed by GitHub
parent 7a1fd7f512
commit 8ea1a11105
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
60 changed files with 502 additions and 275 deletions

View file

@ -44,8 +44,8 @@ type DatabaseTransaction interface {
MaxStreamPositionForRelations(ctx context.Context) (types.StreamPosition, error)
CurrentState(ctx context.Context, roomID string, stateFilterPart *synctypes.StateFilter, excludeEventIDs []string) ([]*rstypes.HeaderedEvent, error)
GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *synctypes.StateFilter) ([]types.StateDelta, []string, error)
GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *synctypes.StateFilter) ([]types.StateDelta, []string, error)
GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *synctypes.StateFilter, rsAPI api.SyncRoomserverAPI) ([]types.StateDelta, []string, error)
GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *synctypes.StateFilter, rsAPI api.SyncRoomserverAPI) ([]types.StateDelta, []string, error)
RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error)
MembershipCount(ctx context.Context, roomID, membership string, pos types.StreamPosition) (int, error)
GetRoomSummary(ctx context.Context, roomID, userID string) (summary *types.Summary, err error)
@ -90,7 +90,7 @@ type DatabaseTransaction interface {
// StreamEventsToEvents converts streamEvent to Event. If device is non-nil and
// matches the streamevent.transactionID device then the transaction ID gets
// added to the unsigned section of the output event.
StreamEventsToEvents(device *userapi.Device, in []types.StreamEvent) []*rstypes.HeaderedEvent
StreamEventsToEvents(ctx context.Context, device *userapi.Device, in []types.StreamEvent, rsAPI api.SyncRoomserverAPI) []*rstypes.HeaderedEvent
// SendToDeviceUpdatesForSync returns a list of send-to-device updates. It returns the
// relevant events within the given ranges for the supplied user ID and device ID.
SendToDeviceUpdatesForSync(ctx context.Context, userID, deviceID string, from, to types.StreamPosition) (pos types.StreamPosition, events []types.SendToDeviceEvent, err error)

View file

@ -99,7 +99,41 @@ func (d *Database) Events(ctx context.Context, eventIDs []string) ([]*rstypes.He
// We don't include a device here as we only include transaction IDs in
// incremental syncs.
return d.StreamEventsToEvents(nil, streamEvents), nil
return d.StreamEventsToEvents(ctx, nil, streamEvents, nil), nil
}
func (d *Database) StreamEventsToEvents(ctx context.Context, device *userapi.Device, in []types.StreamEvent, rsAPI api.SyncRoomserverAPI) []*rstypes.HeaderedEvent {
out := make([]*rstypes.HeaderedEvent, len(in))
for i := 0; i < len(in); i++ {
out[i] = in[i].HeaderedEvent
if device != nil && in[i].TransactionID != nil {
userID, err := spec.NewUserID(device.UserID, true)
if err != nil {
logrus.WithFields(logrus.Fields{
"event_id": out[i].EventID(),
}).WithError(err).Warnf("Failed to add transaction ID to event")
continue
}
deviceSenderID, err := rsAPI.QuerySenderIDForUser(ctx, in[i].RoomID(), *userID)
if err != nil {
logrus.WithFields(logrus.Fields{
"event_id": out[i].EventID(),
}).WithError(err).Warnf("Failed to add transaction ID to event")
continue
}
if deviceSenderID == in[i].SenderID() && device.SessionID == in[i].TransactionID.SessionID {
err := out[i].SetUnsignedField(
"transaction_id", in[i].TransactionID.TransactionID,
)
if err != nil {
logrus.WithFields(logrus.Fields{
"event_id": out[i].EventID(),
}).WithError(err).Warnf("Failed to add transaction ID to event")
}
}
}
}
return out
}
// AddInviteEvent stores a new invite event for a user.
@ -190,45 +224,6 @@ func (d *Database) UpsertAccountData(
return
}
func (d *Database) StreamEventsToEvents(device *userapi.Device, in []types.StreamEvent) []*rstypes.HeaderedEvent {
out := make([]*rstypes.HeaderedEvent, len(in))
for i := 0; i < len(in); i++ {
out[i] = in[i].HeaderedEvent
if device != nil && in[i].TransactionID != nil {
userID, err := spec.NewUserID(device.UserID, true)
if err != nil {
logrus.WithFields(logrus.Fields{
"event_id": out[i].EventID(),
}).WithError(err).Warnf("Failed to add transaction ID to event")
continue
}
deviceSenderID, err := d.getSenderIDForUser(in[i].RoomID(), *userID)
if err != nil {
logrus.WithFields(logrus.Fields{
"event_id": out[i].EventID(),
}).WithError(err).Warnf("Failed to add transaction ID to event")
continue
}
if deviceSenderID == in[i].SenderID() && device.SessionID == in[i].TransactionID.SessionID {
err := out[i].SetUnsignedField(
"transaction_id", in[i].TransactionID.TransactionID,
)
if err != nil {
logrus.WithFields(logrus.Fields{
"event_id": out[i].EventID(),
}).WithError(err).Warnf("Failed to add transaction ID to event")
}
}
}
}
return out
}
func (d *Database) getSenderIDForUser(roomID string, userID spec.UserID) (string, error) { // nolint
// TODO: Repalce with actual logic for pseudoIDs
return userID.String(), nil
}
// handleBackwardExtremities adds this event as a backwards extremity if and only if we do not have all of
// the events listed in the event's 'prev_events'. This function also updates the backwards extremities table
// to account for the fact that the given event is no longer a backwards extremity, but may be marked as such.

View file

@ -10,6 +10,7 @@ import (
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api"
rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/synctypes"
"github.com/matrix-org/dendrite/syncapi/types"
@ -186,7 +187,7 @@ func (d *DatabaseTransaction) Events(ctx context.Context, eventIDs []string) ([]
// We don't include a device here as we only include transaction IDs in
// incremental syncs.
return d.StreamEventsToEvents(nil, streamEvents), nil
return d.StreamEventsToEvents(ctx, nil, streamEvents, nil), nil
}
func (d *DatabaseTransaction) AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) {
@ -325,7 +326,7 @@ func (d *DatabaseTransaction) GetBackwardTopologyPos(
func (d *DatabaseTransaction) GetStateDeltas(
ctx context.Context, device *userapi.Device,
r types.Range, userID string,
stateFilter *synctypes.StateFilter,
stateFilter *synctypes.StateFilter, rsAPI api.SyncRoomserverAPI,
) (deltas []types.StateDelta, joinedRoomsIDs []string, err error) {
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
// - Get membership list changes for this user in this sync response
@ -417,7 +418,7 @@ func (d *DatabaseTransaction) GetStateDeltas(
if !peek.Deleted {
deltas = append(deltas, types.StateDelta{
Membership: spec.Peek,
StateEvents: d.StreamEventsToEvents(device, state[peek.RoomID]),
StateEvents: d.StreamEventsToEvents(ctx, device, state[peek.RoomID], rsAPI),
RoomID: peek.RoomID,
})
}
@ -462,7 +463,7 @@ func (d *DatabaseTransaction) GetStateDeltas(
deltas = append(deltas, types.StateDelta{
Membership: membership,
MembershipPos: ev.StreamPosition,
StateEvents: d.StreamEventsToEvents(device, stateFiltered[roomID]),
StateEvents: d.StreamEventsToEvents(ctx, device, stateFiltered[roomID], rsAPI),
RoomID: roomID,
})
break
@ -474,7 +475,7 @@ func (d *DatabaseTransaction) GetStateDeltas(
for _, joinedRoomID := range joinedRoomIDs {
deltas = append(deltas, types.StateDelta{
Membership: spec.Join,
StateEvents: d.StreamEventsToEvents(device, stateFiltered[joinedRoomID]),
StateEvents: d.StreamEventsToEvents(ctx, device, stateFiltered[joinedRoomID], rsAPI),
RoomID: joinedRoomID,
NewlyJoined: newlyJoinedRooms[joinedRoomID],
})
@ -490,7 +491,7 @@ func (d *DatabaseTransaction) GetStateDeltas(
func (d *DatabaseTransaction) GetStateDeltasForFullStateSync(
ctx context.Context, device *userapi.Device,
r types.Range, userID string,
stateFilter *synctypes.StateFilter,
stateFilter *synctypes.StateFilter, rsAPI api.SyncRoomserverAPI,
) ([]types.StateDelta, []string, error) {
// Look up all memberships for the user. We only care about rooms that a
// user has ever interacted with — joined to, kicked/banned from, left.
@ -531,7 +532,7 @@ func (d *DatabaseTransaction) GetStateDeltasForFullStateSync(
}
deltas[peek.RoomID] = types.StateDelta{
Membership: spec.Peek,
StateEvents: d.StreamEventsToEvents(device, s),
StateEvents: d.StreamEventsToEvents(ctx, device, s, rsAPI),
RoomID: peek.RoomID,
}
}
@ -560,7 +561,7 @@ func (d *DatabaseTransaction) GetStateDeltasForFullStateSync(
deltas[roomID] = types.StateDelta{
Membership: membership,
MembershipPos: ev.StreamPosition,
StateEvents: d.StreamEventsToEvents(device, stateStreamEvents),
StateEvents: d.StreamEventsToEvents(ctx, device, stateStreamEvents, rsAPI),
RoomID: roomID,
}
}
@ -581,7 +582,7 @@ func (d *DatabaseTransaction) GetStateDeltasForFullStateSync(
}
deltas[joinedRoomID] = types.StateDelta{
Membership: spec.Join,
StateEvents: d.StreamEventsToEvents(device, s),
StateEvents: d.StreamEventsToEvents(ctx, device, s, rsAPI),
RoomID: joinedRoomID,
}
}

View file

@ -214,7 +214,7 @@ func TestGetEventsInRangeWithTopologyToken(t *testing.T) {
if err != nil {
t.Fatalf("GetEventsInTopologicalRange returned an error: %s", err)
}
gots := snapshot.StreamEventsToEvents(nil, paginatedEvents)
gots := snapshot.StreamEventsToEvents(context.Background(), nil, paginatedEvents, nil)
test.AssertEventsEqual(t, gots, test.Reversed(events[len(events)-5:]))
})
})