mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-07-30 21:12:45 +00:00
Remove gmsl.HeaderedEvent (#3068)
Replaced with types.HeaderedEvent _for now_. In reality we want to move them all to gmsl.Event and only use HeaderedEvent when we _need_ to bundle the version/event ID with the event (seriailsation boundaries, and even then only when we don't have the room version). Requires https://github.com/matrix-org/gomatrixserverlib/pull/373
This commit is contained in:
parent
2475cf4b61
commit
b189edf4f4
108 changed files with 660 additions and 514 deletions
|
@ -21,7 +21,6 @@ import (
|
|||
"fmt"
|
||||
|
||||
"github.com/getsentry/sentry-go"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
@ -31,6 +30,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/internal/fulltext"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
|
@ -318,7 +318,7 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
|
|||
pduPos, err := s.db.WriteEvent(
|
||||
ctx,
|
||||
ev,
|
||||
[]*gomatrixserverlib.HeaderedEvent{},
|
||||
[]*rstypes.HeaderedEvent{},
|
||||
[]string{}, // adds no state
|
||||
[]string{}, // removes no state
|
||||
nil, // no transaction
|
||||
|
@ -362,7 +362,7 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, sp types.StreamPosition) (types.StreamPosition, error) {
|
||||
func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *rstypes.HeaderedEvent, sp types.StreamPosition) (types.StreamPosition, error) {
|
||||
if ev.Type() != spec.MRoomMember {
|
||||
return sp, nil
|
||||
}
|
||||
|
@ -496,7 +496,7 @@ func (s *OutputRoomEventConsumer) onPurgeRoom(
|
|||
}
|
||||
}
|
||||
|
||||
func (s *OutputRoomEventConsumer) updateStateEvent(event *gomatrixserverlib.HeaderedEvent) (*gomatrixserverlib.HeaderedEvent, error) {
|
||||
func (s *OutputRoomEventConsumer) updateStateEvent(event *rstypes.HeaderedEvent) (*rstypes.HeaderedEvent, error) {
|
||||
if event.StateKey() == nil {
|
||||
return event, nil
|
||||
}
|
||||
|
@ -531,7 +531,7 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event *gomatrixserverlib.Head
|
|||
return event, err
|
||||
}
|
||||
|
||||
func (s *OutputRoomEventConsumer) writeFTS(ev *gomatrixserverlib.HeaderedEvent, pduPosition types.StreamPosition) error {
|
||||
func (s *OutputRoomEventConsumer) writeFTS(ev *rstypes.HeaderedEvent, pduPosition types.StreamPosition) error {
|
||||
if !s.cfg.Fulltext.Enabled {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
"github.com/tidwall/gjson"
|
||||
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
)
|
||||
|
||||
|
@ -98,16 +99,16 @@ func (ev eventVisibility) allowed() (allowed bool) {
|
|||
}
|
||||
}
|
||||
|
||||
// ApplyHistoryVisibilityFilter applies the room history visibility filter on gomatrixserverlib.HeaderedEvents.
|
||||
// ApplyHistoryVisibilityFilter applies the room history visibility filter on types.HeaderedEvents.
|
||||
// Returns the filtered events and an error, if any.
|
||||
func ApplyHistoryVisibilityFilter(
|
||||
ctx context.Context,
|
||||
syncDB storage.DatabaseTransaction,
|
||||
rsAPI api.SyncRoomserverAPI,
|
||||
events []*gomatrixserverlib.HeaderedEvent,
|
||||
events []*types.HeaderedEvent,
|
||||
alwaysIncludeEventIDs map[string]struct{},
|
||||
userID, endpoint string,
|
||||
) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
||||
) ([]*types.HeaderedEvent, error) {
|
||||
if len(events) == 0 {
|
||||
return events, nil
|
||||
}
|
||||
|
@ -120,7 +121,7 @@ func ApplyHistoryVisibilityFilter(
|
|||
}
|
||||
|
||||
// Get the mapping from eventID -> eventVisibility
|
||||
eventsFiltered := make([]*gomatrixserverlib.HeaderedEvent, 0, len(events))
|
||||
eventsFiltered := make([]*types.HeaderedEvent, 0, len(events))
|
||||
visibilities := visibilityForEvents(ctx, rsAPI, events, userID, events[0].RoomID())
|
||||
for _, ev := range events {
|
||||
evVis := visibilities[ev.EventID()]
|
||||
|
@ -170,7 +171,7 @@ func ApplyHistoryVisibilityFilter(
|
|||
func visibilityForEvents(
|
||||
ctx context.Context,
|
||||
rsAPI api.SyncRoomserverAPI,
|
||||
events []*gomatrixserverlib.HeaderedEvent,
|
||||
events []*types.HeaderedEvent,
|
||||
userID, roomID string,
|
||||
) map[string]eventVisibility {
|
||||
eventIDs := make([]string, len(events))
|
||||
|
|
|
@ -20,9 +20,9 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
@ -87,7 +87,7 @@ func (n *Notifier) SetCurrentPosition(currPos types.StreamingToken) {
|
|||
// Typically a consumer supplies a posUpdate with the latest sync position for the
|
||||
// event type it handles, leaving other fields as 0.
|
||||
func (n *Notifier) OnNewEvent(
|
||||
ev *gomatrixserverlib.HeaderedEvent, roomID string, userIDs []string,
|
||||
ev *rstypes.HeaderedEvent, roomID string, userIDs []string,
|
||||
posUpdate types.StreamingToken,
|
||||
) {
|
||||
// update the current position then notify relevant /sync streams.
|
||||
|
|
|
@ -22,16 +22,16 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
)
|
||||
|
||||
var (
|
||||
randomMessageEvent gomatrixserverlib.HeaderedEvent
|
||||
aliceInviteBobEvent gomatrixserverlib.HeaderedEvent
|
||||
bobLeaveEvent gomatrixserverlib.HeaderedEvent
|
||||
randomMessageEvent rstypes.HeaderedEvent
|
||||
aliceInviteBobEvent rstypes.HeaderedEvent
|
||||
bobLeaveEvent rstypes.HeaderedEvent
|
||||
syncPositionVeryOld = types.StreamingToken{PDUPosition: 5}
|
||||
syncPositionBefore = types.StreamingToken{PDUPosition: 11}
|
||||
syncPositionAfter = types.StreamingToken{PDUPosition: 12}
|
||||
|
|
|
@ -27,12 +27,12 @@ import (
|
|||
"github.com/matrix-org/dendrite/internal/caching"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
roomserver "github.com/matrix-org/dendrite/roomserver/api"
|
||||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/syncapi/internal"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/synctypes"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
@ -122,7 +122,7 @@ func Context(
|
|||
|
||||
// verify the user is allowed to see the context for this room/event
|
||||
startTime := time.Now()
|
||||
filteredEvents, err := internal.ApplyHistoryVisibilityFilter(ctx, snapshot, rsAPI, []*gomatrixserverlib.HeaderedEvent{&requestedEvent}, nil, device.UserID, "context")
|
||||
filteredEvents, err := internal.ApplyHistoryVisibilityFilter(ctx, snapshot, rsAPI, []*rstypes.HeaderedEvent{&requestedEvent}, nil, device.UserID, "context")
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("unable to apply history visibility filter")
|
||||
return jsonerror.InternalServerError()
|
||||
|
@ -212,9 +212,9 @@ func Context(
|
|||
// and an error, if any.
|
||||
func applyHistoryVisibilityOnContextEvents(
|
||||
ctx context.Context, snapshot storage.DatabaseTransaction, rsAPI roomserver.SyncRoomserverAPI,
|
||||
eventsBefore, eventsAfter []*gomatrixserverlib.HeaderedEvent,
|
||||
eventsBefore, eventsAfter []*rstypes.HeaderedEvent,
|
||||
userID string,
|
||||
) (filteredBefore, filteredAfter []*gomatrixserverlib.HeaderedEvent, err error) {
|
||||
) (filteredBefore, filteredAfter []*rstypes.HeaderedEvent, err error) {
|
||||
eventIDsBefore := make(map[string]struct{}, len(eventsBefore))
|
||||
eventIDsAfter := make(map[string]struct{}, len(eventsAfter))
|
||||
|
||||
|
@ -245,7 +245,7 @@ func applyHistoryVisibilityOnContextEvents(
|
|||
return filteredBefore, filteredAfter, nil
|
||||
}
|
||||
|
||||
func getStartEnd(ctx context.Context, snapshot storage.DatabaseTransaction, startEvents, endEvents []*gomatrixserverlib.HeaderedEvent) (start, end types.TopologyToken, err error) {
|
||||
func getStartEnd(ctx context.Context, snapshot storage.DatabaseTransaction, startEvents, endEvents []*rstypes.HeaderedEvent) (start, end types.TopologyToken, err error) {
|
||||
if len(startEvents) > 0 {
|
||||
start, err = snapshot.EventPositionInTopology(ctx, startEvents[0].EventID())
|
||||
if err != nil {
|
||||
|
@ -265,7 +265,7 @@ func applyLazyLoadMembers(
|
|||
roomID string,
|
||||
events []synctypes.ClientEvent,
|
||||
lazyLoadCache caching.LazyLoadCache,
|
||||
) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
||||
) ([]*rstypes.HeaderedEvent, error) {
|
||||
eventSenders := make(map[string]struct{})
|
||||
// get members who actually send an event
|
||||
for _, e := range events {
|
||||
|
|
|
@ -31,6 +31,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/internal/caching"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/syncapi/internal"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
|
@ -303,7 +304,7 @@ func (r *messagesReq) retrieveEvents() (
|
|||
return
|
||||
}
|
||||
|
||||
var events []*gomatrixserverlib.HeaderedEvent
|
||||
var events []*rstypes.HeaderedEvent
|
||||
util.GetLogger(r.ctx).WithFields(logrus.Fields{
|
||||
"start": r.from,
|
||||
"end": r.to,
|
||||
|
@ -342,8 +343,8 @@ func (r *messagesReq) retrieveEvents() (
|
|||
// Sort the events to ensure we send them in the right order.
|
||||
if r.backwardOrdering {
|
||||
// This reverses the array from old->new to new->old
|
||||
reversed := func(in []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent {
|
||||
out := make([]*gomatrixserverlib.HeaderedEvent, len(in))
|
||||
reversed := func(in []*rstypes.HeaderedEvent) []*rstypes.HeaderedEvent {
|
||||
out := make([]*rstypes.HeaderedEvent, len(in))
|
||||
for i := 0; i < len(in); i++ {
|
||||
out[i] = in[len(in)-i-1]
|
||||
}
|
||||
|
@ -367,7 +368,7 @@ func (r *messagesReq) retrieveEvents() (
|
|||
return synctypes.HeaderedToClientEvents(filteredEvents, synctypes.FormatAll), start, end, err
|
||||
}
|
||||
|
||||
func (r *messagesReq) getStartEnd(events []*gomatrixserverlib.HeaderedEvent) (start, end types.TopologyToken, err error) {
|
||||
func (r *messagesReq) getStartEnd(events []*rstypes.HeaderedEvent) (start, end types.TopologyToken, err error) {
|
||||
if r.backwardOrdering {
|
||||
start = *r.from
|
||||
if events[len(events)-1].Type() == spec.MRoomCreate {
|
||||
|
@ -406,7 +407,7 @@ func (r *messagesReq) getStartEnd(events []*gomatrixserverlib.HeaderedEvent) (st
|
|||
// Returns an error if there was an issue talking with the database or
|
||||
// backfilling.
|
||||
func (r *messagesReq) handleEmptyEventsSlice() (
|
||||
events []*gomatrixserverlib.HeaderedEvent, err error,
|
||||
events []*rstypes.HeaderedEvent, err error,
|
||||
) {
|
||||
backwardExtremities, err := r.snapshot.BackwardExtremitiesForRoom(r.ctx, r.roomID)
|
||||
|
||||
|
@ -420,7 +421,7 @@ func (r *messagesReq) handleEmptyEventsSlice() (
|
|||
} else {
|
||||
// If not, it means the slice was empty because we reached the room's
|
||||
// creation, so return an empty slice.
|
||||
events = []*gomatrixserverlib.HeaderedEvent{}
|
||||
events = []*rstypes.HeaderedEvent{}
|
||||
}
|
||||
|
||||
return
|
||||
|
@ -432,7 +433,7 @@ func (r *messagesReq) handleEmptyEventsSlice() (
|
|||
// through backfilling if needed.
|
||||
// Returns an error if there was an issue while backfilling.
|
||||
func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent) (
|
||||
events []*gomatrixserverlib.HeaderedEvent, err error,
|
||||
events []*rstypes.HeaderedEvent, err error,
|
||||
) {
|
||||
// Check if we have enough events.
|
||||
isSetLargeEnough := len(streamEvents) >= r.filter.Limit
|
||||
|
@ -460,7 +461,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
|
|||
// Backfill is needed if we've reached a backward extremity and need more
|
||||
// events. It's only needed if the direction is backward.
|
||||
if len(backwardExtremities) > 0 && !isSetLargeEnough && r.backwardOrdering {
|
||||
var pdus []*gomatrixserverlib.HeaderedEvent
|
||||
var pdus []*rstypes.HeaderedEvent
|
||||
// Only ask the remote server for enough events to reach the limit.
|
||||
pdus, err = r.backfill(r.roomID, backwardExtremities, r.filter.Limit-len(streamEvents))
|
||||
if err != nil {
|
||||
|
@ -478,7 +479,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
|
|||
return
|
||||
}
|
||||
|
||||
type eventsByDepth []*gomatrixserverlib.HeaderedEvent
|
||||
type eventsByDepth []*rstypes.HeaderedEvent
|
||||
|
||||
func (e eventsByDepth) Len() int {
|
||||
return len(e)
|
||||
|
@ -499,7 +500,7 @@ func (e eventsByDepth) Less(i, j int) bool {
|
|||
// event, or if there is no remote homeserver to contact.
|
||||
// Returns an error if there was an issue with retrieving the list of servers in
|
||||
// the room or sending the request.
|
||||
func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]string, limit int) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
||||
func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]string, limit int) ([]*rstypes.HeaderedEvent, error) {
|
||||
var res api.PerformBackfillResponse
|
||||
err := r.rsAPI.PerformBackfill(context.Background(), &api.PerformBackfillRequest{
|
||||
RoomID: roomID,
|
||||
|
@ -532,7 +533,7 @@ func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]
|
|||
_, err = r.db.WriteEvent(
|
||||
context.Background(),
|
||||
res.Events[i],
|
||||
[]*gomatrixserverlib.HeaderedEvent{},
|
||||
[]*rstypes.HeaderedEvent{},
|
||||
[]string{},
|
||||
[]string{},
|
||||
nil, true,
|
||||
|
|
|
@ -18,13 +18,13 @@ import (
|
|||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/syncapi/internal"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/synctypes"
|
||||
|
@ -96,7 +96,7 @@ func Relations(
|
|||
return util.ErrorResponse(err)
|
||||
}
|
||||
|
||||
headeredEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, len(events))
|
||||
headeredEvents := make([]*rstypes.HeaderedEvent, 0, len(events))
|
||||
for _, event := range events {
|
||||
headeredEvents = append(headeredEvents, event.HeaderedEvent)
|
||||
}
|
||||
|
|
|
@ -22,7 +22,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/blevesearch/bleve/v2/search"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
@ -32,6 +31,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
"github.com/matrix-org/dendrite/internal/fulltext"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/synctypes"
|
||||
"github.com/matrix-org/dendrite/userapi/api"
|
||||
|
@ -263,10 +263,10 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts
|
|||
func contextEvents(
|
||||
ctx context.Context,
|
||||
snapshot storage.DatabaseTransaction,
|
||||
event *gomatrixserverlib.HeaderedEvent,
|
||||
event *types.HeaderedEvent,
|
||||
roomFilter *synctypes.RoomEventFilter,
|
||||
searchReq SearchRequest,
|
||||
) ([]*gomatrixserverlib.HeaderedEvent, []*gomatrixserverlib.HeaderedEvent, error) {
|
||||
) ([]*types.HeaderedEvent, []*types.HeaderedEvent, error) {
|
||||
id, _, err := snapshot.SelectContextEvent(ctx, event.RoomID(), event.EventID())
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("failed to query context event")
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
"github.com/matrix-org/dendrite/internal/fulltext"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/synctypes"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
|
@ -215,7 +216,7 @@ func TestSearch(t *testing.T) {
|
|||
// store the events in the database
|
||||
var sp types.StreamPosition
|
||||
for _, x := range room.Events() {
|
||||
var stateEvents []*gomatrixserverlib.HeaderedEvent
|
||||
var stateEvents []*rstypes.HeaderedEvent
|
||||
var stateEventIDs []string
|
||||
if x.Type() == spec.MRoomMember {
|
||||
stateEvents = append(stateEvents, x)
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/shared"
|
||||
"github.com/matrix-org/dendrite/syncapi/synctypes"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
|
@ -42,16 +43,16 @@ type DatabaseTransaction interface {
|
|||
MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error)
|
||||
MaxStreamPositionForRelations(ctx context.Context) (types.StreamPosition, error)
|
||||
|
||||
CurrentState(ctx context.Context, roomID string, stateFilterPart *synctypes.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error)
|
||||
CurrentState(ctx context.Context, roomID string, stateFilterPart *synctypes.StateFilter, excludeEventIDs []string) ([]*rstypes.HeaderedEvent, error)
|
||||
GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *synctypes.StateFilter) ([]types.StateDelta, []string, error)
|
||||
GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *synctypes.StateFilter) ([]types.StateDelta, []string, error)
|
||||
RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error)
|
||||
MembershipCount(ctx context.Context, roomID, membership string, pos types.StreamPosition) (int, error)
|
||||
GetRoomSummary(ctx context.Context, roomID, userID string) (summary *types.Summary, err error)
|
||||
RecentEvents(ctx context.Context, roomIDs []string, r types.Range, eventFilter *synctypes.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) (map[string]types.RecentEvents, error)
|
||||
GetBackwardTopologyPos(ctx context.Context, events []*gomatrixserverlib.HeaderedEvent) (types.TopologyToken, error)
|
||||
GetBackwardTopologyPos(ctx context.Context, events []*rstypes.HeaderedEvent) (types.TopologyToken, error)
|
||||
PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error)
|
||||
InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error)
|
||||
InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*rstypes.HeaderedEvent, map[string]*rstypes.HeaderedEvent, types.StreamPosition, error)
|
||||
PeeksInRange(ctx context.Context, userID, deviceID string, r types.Range) (peeks []types.Peek, err error)
|
||||
RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []types.OutputReceiptEvent, error)
|
||||
// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
|
||||
|
@ -65,15 +66,15 @@ type DatabaseTransaction interface {
|
|||
// If an event is not found in the database then it will be omitted from the list.
|
||||
// Returns an error if there was a problem talking with the database.
|
||||
// Does not include any transaction IDs in the returned events.
|
||||
Events(ctx context.Context, eventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error)
|
||||
Events(ctx context.Context, eventIDs []string) ([]*rstypes.HeaderedEvent, error)
|
||||
// GetStateEvent returns the Matrix state event of a given type for a given room with a given state key
|
||||
// If no event could be found, returns nil
|
||||
// If there was an issue during the retrieval, returns an error
|
||||
GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error)
|
||||
GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*rstypes.HeaderedEvent, error)
|
||||
// GetStateEventsForRoom fetches the state events for a given room.
|
||||
// Returns an empty slice if no state events could be found for this room.
|
||||
// Returns an error if there was an issue with the retrieval.
|
||||
GetStateEventsForRoom(ctx context.Context, roomID string, stateFilterPart *synctypes.StateFilter) (stateEvents []*gomatrixserverlib.HeaderedEvent, err error)
|
||||
GetStateEventsForRoom(ctx context.Context, roomID string, stateFilterPart *synctypes.StateFilter) (stateEvents []*rstypes.HeaderedEvent, err error)
|
||||
// GetAccountDataInRange returns all account data for a given user inserted or
|
||||
// updated between two given positions
|
||||
// Returns a map following the format data[roomID] = []dataTypes
|
||||
|
@ -89,15 +90,15 @@ type DatabaseTransaction interface {
|
|||
// StreamEventsToEvents converts streamEvent to Event. If device is non-nil and
|
||||
// matches the streamevent.transactionID device then the transaction ID gets
|
||||
// added to the unsigned section of the output event.
|
||||
StreamEventsToEvents(device *userapi.Device, in []types.StreamEvent) []*gomatrixserverlib.HeaderedEvent
|
||||
StreamEventsToEvents(device *userapi.Device, in []types.StreamEvent) []*rstypes.HeaderedEvent
|
||||
// SendToDeviceUpdatesForSync returns a list of send-to-device updates. It returns the
|
||||
// relevant events within the given ranges for the supplied user ID and device ID.
|
||||
SendToDeviceUpdatesForSync(ctx context.Context, userID, deviceID string, from, to types.StreamPosition) (pos types.StreamPosition, events []types.SendToDeviceEvent, err error)
|
||||
// GetRoomReceipts gets all receipts for a given roomID
|
||||
GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]types.OutputReceiptEvent, error)
|
||||
SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
|
||||
SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
|
||||
SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
|
||||
SelectContextEvent(ctx context.Context, roomID, eventID string) (int, rstypes.HeaderedEvent, error)
|
||||
SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) ([]*rstypes.HeaderedEvent, error)
|
||||
SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) (int, []*rstypes.HeaderedEvent, error)
|
||||
StreamToTopologicalPosition(ctx context.Context, roomID string, streamPos types.StreamPosition, backwardOrdering bool) (types.TopologyToken, error)
|
||||
IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error)
|
||||
// SelectMembershipForUser returns the membership of the user before and including the given position. If no membership can be found
|
||||
|
@ -123,11 +124,11 @@ type Database interface {
|
|||
// If an event is not found in the database then it will be omitted from the list.
|
||||
// Returns an error if there was a problem talking with the database.
|
||||
// Does not include any transaction IDs in the returned events.
|
||||
Events(ctx context.Context, eventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error)
|
||||
Events(ctx context.Context, eventIDs []string) ([]*rstypes.HeaderedEvent, error)
|
||||
// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races
|
||||
// when generating the sync stream position for this event. Returns the sync stream position for the inserted event.
|
||||
// Returns an error if there was a problem inserting this event.
|
||||
WriteEvent(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, addStateEvents []*gomatrixserverlib.HeaderedEvent,
|
||||
WriteEvent(ctx context.Context, ev *rstypes.HeaderedEvent, addStateEvents []*rstypes.HeaderedEvent,
|
||||
addStateEventIDs []string, removeStateEventIDs []string, transactionID *api.TransactionID, excludeFromSync bool,
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility,
|
||||
) (types.StreamPosition, error)
|
||||
|
@ -146,7 +147,7 @@ type Database interface {
|
|||
// AddInviteEvent stores a new invite event for a user.
|
||||
// If the invite was successfully stored this returns the stream ID it was stored at.
|
||||
// Returns an error if there was a problem communicating with the database.
|
||||
AddInviteEvent(ctx context.Context, inviteEvent *gomatrixserverlib.HeaderedEvent) (types.StreamPosition, error)
|
||||
AddInviteEvent(ctx context.Context, inviteEvent *rstypes.HeaderedEvent) (types.StreamPosition, error)
|
||||
// RetireInviteEvent removes an old invite event from the database. Returns the new position of the retired invite.
|
||||
// Returns an error if there was a problem communicating with the database.
|
||||
RetireInviteEvent(ctx context.Context, inviteEventID string) (types.StreamPosition, error)
|
||||
|
@ -173,12 +174,12 @@ type Database interface {
|
|||
// goes wrong.
|
||||
PutFilter(ctx context.Context, localpart string, filter *synctypes.Filter) (string, error)
|
||||
// RedactEvent wipes an event in the database and sets the unsigned.redacted_because key to the redaction event
|
||||
RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *gomatrixserverlib.HeaderedEvent) error
|
||||
RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *rstypes.HeaderedEvent) error
|
||||
// StoreReceipt stores new receipt events
|
||||
StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp spec.Timestamp) (pos types.StreamPosition, err error)
|
||||
UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error
|
||||
ReIndex(ctx context.Context, limit, afterID int64) (map[int64]gomatrixserverlib.HeaderedEvent, error)
|
||||
UpdateRelations(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error
|
||||
ReIndex(ctx context.Context, limit, afterID int64) (map[int64]rstypes.HeaderedEvent, error)
|
||||
UpdateRelations(ctx context.Context, event *rstypes.HeaderedEvent) error
|
||||
RedactRelations(ctx context.Context, roomID, redactedEventID string) error
|
||||
SelectMemberships(
|
||||
ctx context.Context,
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/syncapi/synctypes"
|
||||
|
@ -274,7 +275,7 @@ func (s *currentRoomStateStatements) SelectCurrentState(
|
|||
ctx context.Context, txn *sql.Tx, roomID string,
|
||||
stateFilter *synctypes.StateFilter,
|
||||
excludeEventIDs []string,
|
||||
) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
||||
) ([]*rstypes.HeaderedEvent, error) {
|
||||
stmt := sqlutil.TxStmt(txn, s.selectCurrentStateStmt)
|
||||
senders, notSenders := getSendersStateFilterFilter(stateFilter)
|
||||
// We're going to query members later, so remove them from this request
|
||||
|
@ -320,7 +321,7 @@ func (s *currentRoomStateStatements) DeleteRoomStateForRoom(
|
|||
|
||||
func (s *currentRoomStateStatements) UpsertRoomState(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
event *gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition,
|
||||
event *rstypes.HeaderedEvent, membership *string, addedAt types.StreamPosition,
|
||||
) error {
|
||||
// Parse content as JSON and search for an "url" key
|
||||
containsURL := false
|
||||
|
@ -378,8 +379,8 @@ func currentRoomStateRowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, er
|
|||
return nil, err
|
||||
}
|
||||
// TODO: Handle redacted events
|
||||
var ev gomatrixserverlib.HeaderedEvent
|
||||
if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
|
||||
var ev rstypes.HeaderedEvent
|
||||
if err := json.Unmarshal(eventBytes, &ev); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -394,8 +395,8 @@ func currentRoomStateRowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, er
|
|||
return events, nil
|
||||
}
|
||||
|
||||
func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
||||
result := []*gomatrixserverlib.HeaderedEvent{}
|
||||
func rowsToEvents(rows *sql.Rows) ([]*rstypes.HeaderedEvent, error) {
|
||||
result := []*rstypes.HeaderedEvent{}
|
||||
for rows.Next() {
|
||||
var eventID string
|
||||
var eventBytes []byte
|
||||
|
@ -403,8 +404,8 @@ func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
|||
return nil, err
|
||||
}
|
||||
// TODO: Handle redacted events
|
||||
var ev gomatrixserverlib.HeaderedEvent
|
||||
if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
|
||||
var ev rstypes.HeaderedEvent
|
||||
if err := json.Unmarshal(eventBytes, &ev); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, &ev)
|
||||
|
@ -414,7 +415,7 @@ func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
|||
|
||||
func (s *currentRoomStateStatements) SelectStateEvent(
|
||||
ctx context.Context, txn *sql.Tx, roomID, evType, stateKey string,
|
||||
) (*gomatrixserverlib.HeaderedEvent, error) {
|
||||
) (*rstypes.HeaderedEvent, error) {
|
||||
stmt := sqlutil.TxStmt(txn, s.selectStateEventStmt)
|
||||
var res []byte
|
||||
err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res)
|
||||
|
@ -424,7 +425,7 @@ func (s *currentRoomStateStatements) SelectStateEvent(
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var ev gomatrixserverlib.HeaderedEvent
|
||||
var ev rstypes.HeaderedEvent
|
||||
if err = json.Unmarshal(res, &ev); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
|
@ -79,7 +80,7 @@ func currentHistoryVisibilities(ctx context.Context, tx *sql.Tx) (map[string]gom
|
|||
defer rows.Close() // nolint: errcheck
|
||||
var eventBytes []byte
|
||||
var roomID string
|
||||
var event gomatrixserverlib.HeaderedEvent
|
||||
var event types.HeaderedEvent
|
||||
var hisVis gomatrixserverlib.HistoryVisibility
|
||||
historyVisibilities := make(map[string]gomatrixserverlib.HistoryVisibility)
|
||||
for rows.Next() {
|
||||
|
|
|
@ -22,9 +22,9 @@ import (
|
|||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
const inviteEventsSchema = `
|
||||
|
@ -89,7 +89,7 @@ func NewPostgresInvitesTable(db *sql.DB) (tables.Invites, error) {
|
|||
}
|
||||
|
||||
func (s *inviteEventsStatements) InsertInviteEvent(
|
||||
ctx context.Context, txn *sql.Tx, inviteEvent *gomatrixserverlib.HeaderedEvent,
|
||||
ctx context.Context, txn *sql.Tx, inviteEvent *rstypes.HeaderedEvent,
|
||||
) (streamPos types.StreamPosition, err error) {
|
||||
var headeredJSON []byte
|
||||
headeredJSON, err = json.Marshal(inviteEvent)
|
||||
|
@ -119,7 +119,7 @@ func (s *inviteEventsStatements) DeleteInviteEvent(
|
|||
// active invites for the target user ID in the supplied range.
|
||||
func (s *inviteEventsStatements) SelectInviteEventsInRange(
|
||||
ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range,
|
||||
) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error) {
|
||||
) (map[string]*rstypes.HeaderedEvent, map[string]*rstypes.HeaderedEvent, types.StreamPosition, error) {
|
||||
var lastPos types.StreamPosition
|
||||
stmt := sqlutil.TxStmt(txn, s.selectInviteEventsInRangeStmt)
|
||||
rows, err := stmt.QueryContext(ctx, targetUserID, r.Low(), r.High())
|
||||
|
@ -127,8 +127,8 @@ func (s *inviteEventsStatements) SelectInviteEventsInRange(
|
|||
return nil, nil, lastPos, err
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "selectInviteEventsInRange: rows.close() failed")
|
||||
result := map[string]*gomatrixserverlib.HeaderedEvent{}
|
||||
retired := map[string]*gomatrixserverlib.HeaderedEvent{}
|
||||
result := map[string]*rstypes.HeaderedEvent{}
|
||||
retired := map[string]*rstypes.HeaderedEvent{}
|
||||
for rows.Next() {
|
||||
var (
|
||||
id types.StreamPosition
|
||||
|
@ -151,7 +151,7 @@ func (s *inviteEventsStatements) SelectInviteEventsInRange(
|
|||
continue
|
||||
}
|
||||
|
||||
var event *gomatrixserverlib.HeaderedEvent
|
||||
var event *rstypes.HeaderedEvent
|
||||
if err := json.Unmarshal(eventJSON, &event); err != nil {
|
||||
return nil, nil, lastPos, err
|
||||
}
|
||||
|
|
|
@ -19,9 +19,8 @@ import (
|
|||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
)
|
||||
|
@ -100,7 +99,7 @@ func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
|
|||
}
|
||||
|
||||
func (s *membershipsStatements) UpsertMembership(
|
||||
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent,
|
||||
ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent,
|
||||
streamPos, topologicalPos types.StreamPosition,
|
||||
) error {
|
||||
membership, err := event.Membership()
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/syncapi/synctypes"
|
||||
|
@ -264,7 +265,7 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
|
|||
}.Prepare(db)
|
||||
}
|
||||
|
||||
func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent) error {
|
||||
func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent) error {
|
||||
headeredJSON, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -329,8 +330,8 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
|
|||
}
|
||||
|
||||
// TODO: Handle redacted events
|
||||
var ev gomatrixserverlib.HeaderedEvent
|
||||
if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
|
||||
var ev rstypes.HeaderedEvent
|
||||
if err := json.Unmarshal(eventBytes, &ev); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
needSet := stateNeeded[ev.RoomID()]
|
||||
|
@ -375,7 +376,7 @@ func (s *outputRoomEventsStatements) SelectMaxEventID(
|
|||
// of the inserted event.
|
||||
func (s *outputRoomEventsStatements) InsertEvent(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
event *gomatrixserverlib.HeaderedEvent, addState, removeState []string,
|
||||
event *rstypes.HeaderedEvent, addState, removeState []string,
|
||||
transactionID *api.TransactionID, excludeFromSync bool, historyVisibility gomatrixserverlib.HistoryVisibility,
|
||||
) (streamPos types.StreamPosition, err error) {
|
||||
var txnID *string
|
||||
|
@ -465,8 +466,8 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
|
|||
return nil, err
|
||||
}
|
||||
// TODO: Handle redacted events
|
||||
var ev gomatrixserverlib.HeaderedEvent
|
||||
if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
|
||||
var ev rstypes.HeaderedEvent
|
||||
if err := json.Unmarshal(eventBytes, &ev); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -577,7 +578,7 @@ func (s *outputRoomEventsStatements) DeleteEventsForRoom(
|
|||
return err
|
||||
}
|
||||
|
||||
func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (id int, evt gomatrixserverlib.HeaderedEvent, err error) {
|
||||
func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (id int, evt rstypes.HeaderedEvent, err error) {
|
||||
row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID)
|
||||
|
||||
var eventAsString string
|
||||
|
@ -595,7 +596,7 @@ func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn
|
|||
|
||||
func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
|
||||
ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter,
|
||||
) (evts []*gomatrixserverlib.HeaderedEvent, err error) {
|
||||
) (evts []*rstypes.HeaderedEvent, err error) {
|
||||
senders, notSenders := getSendersRoomEventFilter(filter)
|
||||
rows, err := sqlutil.TxStmt(txn, s.selectContextBeforeEventStmt).QueryContext(
|
||||
ctx, roomID, id, filter.Limit,
|
||||
|
@ -612,7 +613,7 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
|
|||
for rows.Next() {
|
||||
var (
|
||||
eventBytes []byte
|
||||
evt *gomatrixserverlib.HeaderedEvent
|
||||
evt *rstypes.HeaderedEvent
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
)
|
||||
if err = rows.Scan(&eventBytes, &historyVisibility); err != nil {
|
||||
|
@ -630,7 +631,7 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
|
|||
|
||||
func (s *outputRoomEventsStatements) SelectContextAfterEvent(
|
||||
ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter,
|
||||
) (lastID int, evts []*gomatrixserverlib.HeaderedEvent, err error) {
|
||||
) (lastID int, evts []*rstypes.HeaderedEvent, err error) {
|
||||
senders, notSenders := getSendersRoomEventFilter(filter)
|
||||
rows, err := sqlutil.TxStmt(txn, s.selectContextAfterEventStmt).QueryContext(
|
||||
ctx, roomID, id, filter.Limit,
|
||||
|
@ -647,7 +648,7 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent(
|
|||
for rows.Next() {
|
||||
var (
|
||||
eventBytes []byte
|
||||
evt *gomatrixserverlib.HeaderedEvent
|
||||
evt *rstypes.HeaderedEvent
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
)
|
||||
if err = rows.Scan(&lastID, &eventBytes, &historyVisibility); err != nil {
|
||||
|
@ -680,8 +681,8 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
|
|||
return nil, err
|
||||
}
|
||||
// TODO: Handle redacted events
|
||||
var ev gomatrixserverlib.HeaderedEvent
|
||||
if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
|
||||
var ev rstypes.HeaderedEvent
|
||||
if err := json.Unmarshal(eventBytes, &ev); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -709,7 +710,7 @@ func (s *outputRoomEventsStatements) PurgeEvents(
|
|||
return err
|
||||
}
|
||||
|
||||
func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error) {
|
||||
func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]rstypes.HeaderedEvent, error) {
|
||||
rows, err := sqlutil.TxStmt(txn, s.selectSearchStmt).QueryContext(ctx, afterID, pq.StringArray(types), limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -718,14 +719,14 @@ func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, l
|
|||
|
||||
var eventID string
|
||||
var id int64
|
||||
result := make(map[int64]gomatrixserverlib.HeaderedEvent)
|
||||
result := make(map[int64]rstypes.HeaderedEvent)
|
||||
for rows.Next() {
|
||||
var ev gomatrixserverlib.HeaderedEvent
|
||||
var ev rstypes.HeaderedEvent
|
||||
var eventBytes []byte
|
||||
if err = rows.Scan(&id, &eventID, &eventBytes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
|
||||
if err = json.Unmarshal(eventBytes, &ev); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result[id] = ev
|
||||
|
|
|
@ -18,10 +18,9 @@ import (
|
|||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
)
|
||||
|
@ -105,7 +104,7 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
|
|||
// InsertEventInTopology inserts the given event in the room's topology, based
|
||||
// on the event's depth.
|
||||
func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
|
||||
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition,
|
||||
ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent, pos types.StreamPosition,
|
||||
) (topoPos types.StreamPosition, err error) {
|
||||
err = sqlutil.TxStmt(txn, s.insertEventInTopologyStmt).QueryRowContext(
|
||||
ctx, event.EventID(), event.Depth(), event.RoomID(), pos,
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
|
||||
"github.com/tidwall/gjson"
|
||||
|
||||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||
|
||||
|
@ -90,7 +91,7 @@ func (d *Database) NewDatabaseTransaction(ctx context.Context) (*DatabaseTransac
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (d *Database) Events(ctx context.Context, eventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
||||
func (d *Database) Events(ctx context.Context, eventIDs []string) ([]*rstypes.HeaderedEvent, error) {
|
||||
streamEvents, err := d.OutputEvents.SelectEvents(ctx, nil, eventIDs, nil, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -105,7 +106,7 @@ func (d *Database) Events(ctx context.Context, eventIDs []string) ([]*gomatrixse
|
|||
// If the invite was successfully stored this returns the stream ID it was stored at.
|
||||
// Returns an error if there was a problem communicating with the database.
|
||||
func (d *Database) AddInviteEvent(
|
||||
ctx context.Context, inviteEvent *gomatrixserverlib.HeaderedEvent,
|
||||
ctx context.Context, inviteEvent *rstypes.HeaderedEvent,
|
||||
) (sp types.StreamPosition, err error) {
|
||||
_ = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
sp, err = d.Invites.InsertInviteEvent(ctx, txn, inviteEvent)
|
||||
|
@ -189,8 +190,8 @@ func (d *Database) UpsertAccountData(
|
|||
return
|
||||
}
|
||||
|
||||
func (d *Database) StreamEventsToEvents(device *userapi.Device, in []types.StreamEvent) []*gomatrixserverlib.HeaderedEvent {
|
||||
out := make([]*gomatrixserverlib.HeaderedEvent, len(in))
|
||||
func (d *Database) StreamEventsToEvents(device *userapi.Device, in []types.StreamEvent) []*rstypes.HeaderedEvent {
|
||||
out := make([]*rstypes.HeaderedEvent, len(in))
|
||||
for i := 0; i < len(in); i++ {
|
||||
out[i] = in[i].HeaderedEvent
|
||||
if device != nil && in[i].TransactionID != nil {
|
||||
|
@ -213,7 +214,7 @@ func (d *Database) StreamEventsToEvents(device *userapi.Device, in []types.Strea
|
|||
// the events listed in the event's 'prev_events'. This function also updates the backwards extremities table
|
||||
// to account for the fact that the given event is no longer a backwards extremity, but may be marked as such.
|
||||
// This function should always be called within a sqlutil.Writer for safety in SQLite.
|
||||
func (d *Database) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error {
|
||||
func (d *Database) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *rstypes.HeaderedEvent) error {
|
||||
if err := d.BackwardExtremities.DeleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -246,8 +247,8 @@ func (d *Database) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, e
|
|||
|
||||
func (d *Database) WriteEvent(
|
||||
ctx context.Context,
|
||||
ev *gomatrixserverlib.HeaderedEvent,
|
||||
addStateEvents []*gomatrixserverlib.HeaderedEvent,
|
||||
ev *rstypes.HeaderedEvent,
|
||||
addStateEvents []*rstypes.HeaderedEvent,
|
||||
addStateEventIDs, removeStateEventIDs []string,
|
||||
transactionID *api.TransactionID, excludeFromSync bool,
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility,
|
||||
|
@ -288,7 +289,7 @@ func (d *Database) WriteEvent(
|
|||
func (d *Database) updateRoomState(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
removedEventIDs []string,
|
||||
addedEvents []*gomatrixserverlib.HeaderedEvent,
|
||||
addedEvents []*rstypes.HeaderedEvent,
|
||||
pduPosition types.StreamPosition,
|
||||
topoPosition types.StreamPosition,
|
||||
) error {
|
||||
|
@ -342,7 +343,7 @@ func (d *Database) PutFilter(
|
|||
return filterID, err
|
||||
}
|
||||
|
||||
func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *gomatrixserverlib.HeaderedEvent) error {
|
||||
func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *rstypes.HeaderedEvent) error {
|
||||
redactedEvents, err := d.Events(ctx, []string{redactedEventID})
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -351,13 +352,13 @@ func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, reda
|
|||
logrus.WithField("event_id", redactedEventID).WithField("redaction_event", redactedBecause.EventID()).Warnf("missing redacted event for redaction")
|
||||
return nil
|
||||
}
|
||||
eventToRedact := redactedEvents[0].Unwrap()
|
||||
redactionEvent := redactedBecause.Unwrap()
|
||||
eventToRedact := redactedEvents[0].Event
|
||||
redactionEvent := redactedBecause.Event
|
||||
if err = eventutil.RedactEvent(redactionEvent, eventToRedact); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
newEvent := eventToRedact.Headered(redactedBecause.RoomVersion)
|
||||
newEvent := &rstypes.HeaderedEvent{Event: eventToRedact}
|
||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
return d.OutputEvents.UpdateEventJSON(ctx, txn, newEvent)
|
||||
})
|
||||
|
@ -521,14 +522,14 @@ func (d *Database) UpsertRoomUnreadNotificationCounts(ctx context.Context, userI
|
|||
return
|
||||
}
|
||||
|
||||
func (d *Database) SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) {
|
||||
func (d *Database) SelectContextEvent(ctx context.Context, roomID, eventID string) (int, rstypes.HeaderedEvent, error) {
|
||||
return d.OutputEvents.SelectContextEvent(ctx, nil, roomID, eventID)
|
||||
}
|
||||
|
||||
func (d *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
||||
func (d *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) ([]*rstypes.HeaderedEvent, error) {
|
||||
return d.OutputEvents.SelectContextBeforeEvent(ctx, nil, id, roomID, filter)
|
||||
}
|
||||
func (d *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) {
|
||||
func (d *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *synctypes.RoomEventFilter) (int, []*rstypes.HeaderedEvent, error) {
|
||||
return d.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, filter)
|
||||
}
|
||||
|
||||
|
@ -560,7 +561,7 @@ func (d *Database) SelectMembershipForUser(ctx context.Context, roomID, userID s
|
|||
return d.Memberships.SelectMembershipForUser(ctx, nil, roomID, userID, pos)
|
||||
}
|
||||
|
||||
func (d *Database) ReIndex(ctx context.Context, limit, afterID int64) (map[int64]gomatrixserverlib.HeaderedEvent, error) {
|
||||
func (d *Database) ReIndex(ctx context.Context, limit, afterID int64) (map[int64]rstypes.HeaderedEvent, error) {
|
||||
return d.OutputEvents.ReIndex(ctx, nil, limit, afterID, []string{
|
||||
spec.MRoomName,
|
||||
spec.MRoomTopic,
|
||||
|
@ -568,7 +569,7 @@ func (d *Database) ReIndex(ctx context.Context, limit, afterID int64) (map[int64
|
|||
})
|
||||
}
|
||||
|
||||
func (d *Database) UpdateRelations(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error {
|
||||
func (d *Database) UpdateRelations(ctx context.Context, event *rstypes.HeaderedEvent) error {
|
||||
// No need to unmarshal if the event is a redaction
|
||||
if event.Type() == spec.MRoomRedaction {
|
||||
return nil
|
||||
|
|
|
@ -6,11 +6,11 @@ import (
|
|||
"fmt"
|
||||
"math"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||
"github.com/tidwall/gjson"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/syncapi/synctypes"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
|
@ -84,7 +84,7 @@ func (d *DatabaseTransaction) MaxStreamPositionForNotificationData(ctx context.C
|
|||
return types.StreamPosition(id), nil
|
||||
}
|
||||
|
||||
func (d *DatabaseTransaction) CurrentState(ctx context.Context, roomID string, stateFilterPart *synctypes.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
||||
func (d *DatabaseTransaction) CurrentState(ctx context.Context, roomID string, stateFilterPart *synctypes.StateFilter, excludeEventIDs []string) ([]*rstypes.HeaderedEvent, error) {
|
||||
return d.CurrentRoomState.SelectCurrentState(ctx, d.txn, roomID, stateFilterPart, excludeEventIDs)
|
||||
}
|
||||
|
||||
|
@ -161,7 +161,7 @@ func (d *DatabaseTransaction) PositionInTopology(ctx context.Context, eventID st
|
|||
return d.Topology.SelectPositionInTopology(ctx, d.txn, eventID)
|
||||
}
|
||||
|
||||
func (d *DatabaseTransaction) InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error) {
|
||||
func (d *DatabaseTransaction) InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*rstypes.HeaderedEvent, map[string]*rstypes.HeaderedEvent, types.StreamPosition, error) {
|
||||
return d.Invites.SelectInviteEventsInRange(ctx, d.txn, targetUserID, r)
|
||||
}
|
||||
|
||||
|
@ -178,7 +178,7 @@ func (d *DatabaseTransaction) RoomReceiptsAfter(ctx context.Context, roomIDs []s
|
|||
// If an event is not found in the database then it will be omitted from the list.
|
||||
// Returns an error if there was a problem talking with the database.
|
||||
// Does not include any transaction IDs in the returned events.
|
||||
func (d *DatabaseTransaction) Events(ctx context.Context, eventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
||||
func (d *DatabaseTransaction) Events(ctx context.Context, eventIDs []string) ([]*rstypes.HeaderedEvent, error) {
|
||||
streamEvents, err := d.OutputEvents.SelectEvents(ctx, d.txn, eventIDs, nil, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -207,13 +207,13 @@ func (d *DatabaseTransaction) SharedUsers(ctx context.Context, userID string, ot
|
|||
|
||||
func (d *DatabaseTransaction) GetStateEvent(
|
||||
ctx context.Context, roomID, evType, stateKey string,
|
||||
) (*gomatrixserverlib.HeaderedEvent, error) {
|
||||
) (*rstypes.HeaderedEvent, error) {
|
||||
return d.CurrentRoomState.SelectStateEvent(ctx, d.txn, roomID, evType, stateKey)
|
||||
}
|
||||
|
||||
func (d *DatabaseTransaction) GetStateEventsForRoom(
|
||||
ctx context.Context, roomID string, stateFilter *synctypes.StateFilter,
|
||||
) (stateEvents []*gomatrixserverlib.HeaderedEvent, err error) {
|
||||
) (stateEvents []*rstypes.HeaderedEvent, err error) {
|
||||
stateEvents, err = d.CurrentRoomState.SelectCurrentState(ctx, d.txn, roomID, stateFilter, nil)
|
||||
return
|
||||
}
|
||||
|
@ -302,7 +302,7 @@ func (d *DatabaseTransaction) StreamToTopologicalPosition(
|
|||
// oldest event in the room's topology.
|
||||
func (d *DatabaseTransaction) GetBackwardTopologyPos(
|
||||
ctx context.Context,
|
||||
events []*gomatrixserverlib.HeaderedEvent,
|
||||
events []*rstypes.HeaderedEvent,
|
||||
) (types.TopologyToken, error) {
|
||||
zeroToken := types.TopologyToken{}
|
||||
if len(events) == 0 {
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/syncapi/synctypes"
|
||||
|
@ -268,7 +269,7 @@ func (s *currentRoomStateStatements) SelectCurrentState(
|
|||
ctx context.Context, txn *sql.Tx, roomID string,
|
||||
stateFilter *synctypes.StateFilter,
|
||||
excludeEventIDs []string,
|
||||
) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
||||
) ([]*rstypes.HeaderedEvent, error) {
|
||||
// We're going to query members later, so remove them from this request
|
||||
if stateFilter.LazyLoadMembers && !stateFilter.IncludeRedundantMembers {
|
||||
notTypes := &[]string{spec.MRoomMember}
|
||||
|
@ -319,7 +320,7 @@ func (s *currentRoomStateStatements) DeleteRoomStateForRoom(
|
|||
|
||||
func (s *currentRoomStateStatements) UpsertRoomState(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
event *gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition,
|
||||
event *rstypes.HeaderedEvent, membership *string, addedAt types.StreamPosition,
|
||||
) error {
|
||||
// Parse content as JSON and search for an "url" key
|
||||
containsURL := false
|
||||
|
@ -405,8 +406,8 @@ func currentRoomStateRowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, er
|
|||
return nil, err
|
||||
}
|
||||
// TODO: Handle redacted events
|
||||
var ev gomatrixserverlib.HeaderedEvent
|
||||
if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
|
||||
var ev rstypes.HeaderedEvent
|
||||
if err := json.Unmarshal(eventBytes, &ev); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -421,8 +422,8 @@ func currentRoomStateRowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, er
|
|||
return events, nil
|
||||
}
|
||||
|
||||
func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
||||
result := []*gomatrixserverlib.HeaderedEvent{}
|
||||
func rowsToEvents(rows *sql.Rows) ([]*rstypes.HeaderedEvent, error) {
|
||||
result := []*rstypes.HeaderedEvent{}
|
||||
for rows.Next() {
|
||||
var eventID string
|
||||
var eventBytes []byte
|
||||
|
@ -430,8 +431,8 @@ func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
|||
return nil, err
|
||||
}
|
||||
// TODO: Handle redacted events
|
||||
var ev gomatrixserverlib.HeaderedEvent
|
||||
if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
|
||||
var ev rstypes.HeaderedEvent
|
||||
if err := json.Unmarshal(eventBytes, &ev); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, &ev)
|
||||
|
@ -441,7 +442,7 @@ func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
|||
|
||||
func (s *currentRoomStateStatements) SelectStateEvent(
|
||||
ctx context.Context, txn *sql.Tx, roomID, evType, stateKey string,
|
||||
) (*gomatrixserverlib.HeaderedEvent, error) {
|
||||
) (*rstypes.HeaderedEvent, error) {
|
||||
stmt := sqlutil.TxStmt(txn, s.selectStateEventStmt)
|
||||
var res []byte
|
||||
err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res)
|
||||
|
@ -451,7 +452,7 @@ func (s *currentRoomStateStatements) SelectStateEvent(
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var ev gomatrixserverlib.HeaderedEvent
|
||||
var ev rstypes.HeaderedEvent
|
||||
if err = json.Unmarshal(res, &ev); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
|
@ -91,7 +92,7 @@ func currentHistoryVisibilities(ctx context.Context, tx *sql.Tx) (map[string]gom
|
|||
defer rows.Close() // nolint: errcheck
|
||||
var eventBytes []byte
|
||||
var roomID string
|
||||
var event gomatrixserverlib.HeaderedEvent
|
||||
var event types.HeaderedEvent
|
||||
var hisVis gomatrixserverlib.HistoryVisibility
|
||||
historyVisibilities := make(map[string]gomatrixserverlib.HistoryVisibility)
|
||||
for rows.Next() {
|
||||
|
|
|
@ -22,9 +22,9 @@ import (
|
|||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
const inviteEventsSchema = `
|
||||
|
@ -89,7 +89,7 @@ func NewSqliteInvitesTable(db *sql.DB, streamID *StreamIDStatements) (tables.Inv
|
|||
}
|
||||
|
||||
func (s *inviteEventsStatements) InsertInviteEvent(
|
||||
ctx context.Context, txn *sql.Tx, inviteEvent *gomatrixserverlib.HeaderedEvent,
|
||||
ctx context.Context, txn *sql.Tx, inviteEvent *rstypes.HeaderedEvent,
|
||||
) (streamPos types.StreamPosition, err error) {
|
||||
streamPos, err = s.streamIDStatements.nextInviteID(ctx, txn)
|
||||
if err != nil {
|
||||
|
@ -130,7 +130,7 @@ func (s *inviteEventsStatements) DeleteInviteEvent(
|
|||
// active invites for the target user ID in the supplied range.
|
||||
func (s *inviteEventsStatements) SelectInviteEventsInRange(
|
||||
ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range,
|
||||
) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error) {
|
||||
) (map[string]*rstypes.HeaderedEvent, map[string]*rstypes.HeaderedEvent, types.StreamPosition, error) {
|
||||
var lastPos types.StreamPosition
|
||||
stmt := sqlutil.TxStmt(txn, s.selectInviteEventsInRangeStmt)
|
||||
rows, err := stmt.QueryContext(ctx, targetUserID, r.Low(), r.High())
|
||||
|
@ -138,8 +138,8 @@ func (s *inviteEventsStatements) SelectInviteEventsInRange(
|
|||
return nil, nil, lastPos, err
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "selectInviteEventsInRange: rows.close() failed")
|
||||
result := map[string]*gomatrixserverlib.HeaderedEvent{}
|
||||
retired := map[string]*gomatrixserverlib.HeaderedEvent{}
|
||||
result := map[string]*rstypes.HeaderedEvent{}
|
||||
retired := map[string]*rstypes.HeaderedEvent{}
|
||||
for rows.Next() {
|
||||
var (
|
||||
id types.StreamPosition
|
||||
|
@ -162,7 +162,7 @@ func (s *inviteEventsStatements) SelectInviteEventsInRange(
|
|||
continue
|
||||
}
|
||||
|
||||
var event *gomatrixserverlib.HeaderedEvent
|
||||
var event *rstypes.HeaderedEvent
|
||||
if err := json.Unmarshal(eventJSON, &event); err != nil {
|
||||
return nil, nil, lastPos, err
|
||||
}
|
||||
|
|
|
@ -19,9 +19,8 @@ import (
|
|||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
)
|
||||
|
@ -103,7 +102,7 @@ func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) {
|
|||
}
|
||||
|
||||
func (s *membershipsStatements) UpsertMembership(
|
||||
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent,
|
||||
ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent,
|
||||
streamPos, topologicalPos types.StreamPosition,
|
||||
) error {
|
||||
membership, err := event.Membership()
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/syncapi/synctypes"
|
||||
|
@ -166,7 +167,7 @@ func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Even
|
|||
}.Prepare(db)
|
||||
}
|
||||
|
||||
func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent) error {
|
||||
func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent) error {
|
||||
headeredJSON, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -249,8 +250,8 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
|
|||
}
|
||||
|
||||
// TODO: Handle redacted events
|
||||
var ev gomatrixserverlib.HeaderedEvent
|
||||
if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
|
||||
var ev rstypes.HeaderedEvent
|
||||
if err := json.Unmarshal(eventBytes, &ev); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
needSet := stateNeeded[ev.RoomID()]
|
||||
|
@ -296,7 +297,7 @@ func (s *outputRoomEventsStatements) SelectMaxEventID(
|
|||
// of the inserted event.
|
||||
func (s *outputRoomEventsStatements) InsertEvent(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
event *gomatrixserverlib.HeaderedEvent, addState, removeState []string,
|
||||
event *rstypes.HeaderedEvent, addState, removeState []string,
|
||||
transactionID *api.TransactionID, excludeFromSync bool, historyVisibility gomatrixserverlib.HistoryVisibility,
|
||||
) (types.StreamPosition, error) {
|
||||
var txnID *string
|
||||
|
@ -498,8 +499,8 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
|
|||
return nil, err
|
||||
}
|
||||
// TODO: Handle redacted events
|
||||
var ev gomatrixserverlib.HeaderedEvent
|
||||
if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
|
||||
var ev rstypes.HeaderedEvent
|
||||
if err := json.Unmarshal(eventBytes, &ev); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -523,7 +524,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
|
|||
}
|
||||
func (s *outputRoomEventsStatements) SelectContextEvent(
|
||||
ctx context.Context, txn *sql.Tx, roomID, eventID string,
|
||||
) (id int, evt gomatrixserverlib.HeaderedEvent, err error) {
|
||||
) (id int, evt rstypes.HeaderedEvent, err error) {
|
||||
row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID)
|
||||
var eventAsString string
|
||||
var historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
|
@ -540,7 +541,7 @@ func (s *outputRoomEventsStatements) SelectContextEvent(
|
|||
|
||||
func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
|
||||
ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter,
|
||||
) (evts []*gomatrixserverlib.HeaderedEvent, err error) {
|
||||
) (evts []*rstypes.HeaderedEvent, err error) {
|
||||
stmt, params, err := prepareWithFilters(
|
||||
s.db, txn, selectContextBeforeEventSQL,
|
||||
[]interface{}{
|
||||
|
@ -564,7 +565,7 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
|
|||
for rows.Next() {
|
||||
var (
|
||||
eventBytes []byte
|
||||
evt *gomatrixserverlib.HeaderedEvent
|
||||
evt *rstypes.HeaderedEvent
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
)
|
||||
if err = rows.Scan(&eventBytes, &historyVisibility); err != nil {
|
||||
|
@ -582,7 +583,7 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
|
|||
|
||||
func (s *outputRoomEventsStatements) SelectContextAfterEvent(
|
||||
ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter,
|
||||
) (lastID int, evts []*gomatrixserverlib.HeaderedEvent, err error) {
|
||||
) (lastID int, evts []*rstypes.HeaderedEvent, err error) {
|
||||
stmt, params, err := prepareWithFilters(
|
||||
s.db, txn, selectContextAfterEventSQL,
|
||||
[]interface{}{
|
||||
|
@ -606,7 +607,7 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent(
|
|||
for rows.Next() {
|
||||
var (
|
||||
eventBytes []byte
|
||||
evt *gomatrixserverlib.HeaderedEvent
|
||||
evt *rstypes.HeaderedEvent
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
)
|
||||
if err = rows.Scan(&lastID, &eventBytes, &historyVisibility); err != nil {
|
||||
|
@ -642,7 +643,7 @@ func (s *outputRoomEventsStatements) PurgeEvents(
|
|||
return err
|
||||
}
|
||||
|
||||
func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error) {
|
||||
func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]rstypes.HeaderedEvent, error) {
|
||||
params := make([]interface{}, len(types)+1)
|
||||
params[0] = afterID
|
||||
for i := range types {
|
||||
|
@ -664,14 +665,14 @@ func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, l
|
|||
|
||||
var eventID string
|
||||
var id int64
|
||||
result := make(map[int64]gomatrixserverlib.HeaderedEvent)
|
||||
result := make(map[int64]rstypes.HeaderedEvent)
|
||||
for rows.Next() {
|
||||
var ev gomatrixserverlib.HeaderedEvent
|
||||
var ev rstypes.HeaderedEvent
|
||||
var eventBytes []byte
|
||||
if err = rows.Scan(&id, &eventID, &eventBytes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
|
||||
if err = json.Unmarshal(eventBytes, &ev); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result[id] = ev
|
||||
|
|
|
@ -18,9 +18,8 @@ import (
|
|||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
)
|
||||
|
@ -104,7 +103,7 @@ func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) {
|
|||
// insertEventInTopology inserts the given event in the room's topology, based
|
||||
// on the event's depth.
|
||||
func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
|
||||
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition,
|
||||
ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent, pos types.StreamPosition,
|
||||
) (types.StreamPosition, error) {
|
||||
_, err := sqlutil.TxStmt(txn, s.insertEventInTopologyStmt).ExecContext(
|
||||
ctx, event.EventID(), event.Depth(), event.RoomID(), pos,
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/synctypes"
|
||||
|
@ -34,9 +35,9 @@ func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, fun
|
|||
return db, close
|
||||
}
|
||||
|
||||
func MustWriteEvents(t *testing.T, db storage.Database, events []*gomatrixserverlib.HeaderedEvent) (positions []types.StreamPosition) {
|
||||
func MustWriteEvents(t *testing.T, db storage.Database, events []*rstypes.HeaderedEvent) (positions []types.StreamPosition) {
|
||||
for _, ev := range events {
|
||||
var addStateEvents []*gomatrixserverlib.HeaderedEvent
|
||||
var addStateEvents []*rstypes.HeaderedEvent
|
||||
var addStateEventIDs []string
|
||||
var removeStateEventIDs []string
|
||||
if ev.StateKey() != nil {
|
||||
|
@ -106,7 +107,7 @@ func TestRecentEventsPDU(t *testing.T) {
|
|||
To types.StreamPosition
|
||||
Limit int
|
||||
ReverseOrder bool
|
||||
WantEvents []*gomatrixserverlib.HeaderedEvent
|
||||
WantEvents []*rstypes.HeaderedEvent
|
||||
WantLimited bool
|
||||
}{
|
||||
// The purpose of this test is to make sure that incremental syncs are including up to the latest events.
|
||||
|
@ -316,7 +317,7 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) {
|
|||
t.Parallel()
|
||||
db := MustCreateDatabase(t)
|
||||
|
||||
var events []*gomatrixserverlib.HeaderedEvent
|
||||
var events []*types.HeaderedEvent
|
||||
events = append(events, MustCreateEvent(t, testRoomID, nil, &gomatrixserverlib.EventBuilder{
|
||||
Content: []byte(fmt.Sprintf(`{"room_version":"4","creator":"%s"}`, testUserIDA)),
|
||||
Type: "m.room.create",
|
||||
|
@ -324,7 +325,7 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) {
|
|||
Sender: testUserIDA,
|
||||
Depth: int64(len(events) + 1),
|
||||
}))
|
||||
events = append(events, MustCreateEvent(t, testRoomID, []*gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
|
||||
events = append(events, MustCreateEvent(t, testRoomID, []*types.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
|
||||
Content: []byte(`{"membership":"join"}`),
|
||||
Type: "m.room.member",
|
||||
StateKey: &testUserIDA,
|
||||
|
@ -332,7 +333,7 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) {
|
|||
Depth: int64(len(events) + 1),
|
||||
}))
|
||||
// fork the dag into three, same prev_events and depth
|
||||
parent := []*gomatrixserverlib.HeaderedEvent{events[len(events)-1]}
|
||||
parent := []*types.HeaderedEvent{events[len(events)-1]}
|
||||
depth := int64(len(events) + 1)
|
||||
for i := 0; i < 3; i++ {
|
||||
events = append(events, MustCreateEvent(t, testRoomID, parent, &gomatrixserverlib.EventBuilder{
|
||||
|
@ -365,7 +366,7 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) {
|
|||
Name string
|
||||
From types.TopologyToken
|
||||
Limit int
|
||||
Wants []*gomatrixserverlib.HeaderedEvent
|
||||
Wants []*types.HeaderedEvent
|
||||
}{
|
||||
{
|
||||
Name: "Pagination over the whole fork",
|
||||
|
@ -406,7 +407,7 @@ func TestGetEventsInTopologicalRangeMultiRoom(t *testing.T) {
|
|||
t.Parallel()
|
||||
db := MustCreateDatabase(t)
|
||||
|
||||
makeEvents := func(roomID string) (events []*gomatrixserverlib.HeaderedEvent) {
|
||||
makeEvents := func(roomID string) (events []*types.HeaderedEvent) {
|
||||
events = append(events, MustCreateEvent(t, roomID, nil, &gomatrixserverlib.EventBuilder{
|
||||
Content: []byte(fmt.Sprintf(`{"room_version":"4","creator":"%s"}`, testUserIDA)),
|
||||
Type: "m.room.create",
|
||||
|
@ -414,7 +415,7 @@ func TestGetEventsInTopologicalRangeMultiRoom(t *testing.T) {
|
|||
Sender: testUserIDA,
|
||||
Depth: int64(len(events) + 1),
|
||||
}))
|
||||
events = append(events, MustCreateEvent(t, roomID, []*gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
|
||||
events = append(events, MustCreateEvent(t, roomID, []*types.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
|
||||
Content: []byte(`{"membership":"join"}`),
|
||||
Type: "m.room.member",
|
||||
StateKey: &testUserIDA,
|
||||
|
@ -460,14 +461,14 @@ func TestGetEventsInRangeWithEventsInsertedLikeBackfill(t *testing.T) {
|
|||
|
||||
// "federation" join
|
||||
userC := fmt.Sprintf("@radiance:%s", testOrigin)
|
||||
joinEvent := MustCreateEvent(t, testRoomID, []*gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
|
||||
joinEvent := MustCreateEvent(t, testRoomID, []*types.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{
|
||||
Content: []byte(`{"membership":"join"}`),
|
||||
Type: "m.room.member",
|
||||
StateKey: &userC,
|
||||
Sender: userC,
|
||||
Depth: int64(len(events) + 1),
|
||||
})
|
||||
MustWriteEvents(t, db, []*gomatrixserverlib.HeaderedEvent{joinEvent})
|
||||
MustWriteEvents(t, db, []*types.HeaderedEvent{joinEvent})
|
||||
|
||||
// Sync will return this for the prev_batch
|
||||
from := topologyTokenBefore(t, db, joinEvent.EventID())
|
||||
|
@ -638,7 +639,7 @@ func TestInviteBehaviour(t *testing.T) {
|
|||
StateKey: &testUserIDA,
|
||||
Sender: "@inviteUser2:somewhere",
|
||||
})
|
||||
for _, ev := range []*gomatrixserverlib.HeaderedEvent{inviteEvent1, inviteEvent2} {
|
||||
for _, ev := range []*types.HeaderedEvent{inviteEvent1, inviteEvent2} {
|
||||
_, err := db.AddInviteEvent(ctx, ev)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to AddInviteEvent: %s", err)
|
||||
|
@ -695,7 +696,7 @@ func assertInvitedToRooms(t *testing.T, res *types.Response, roomIDs []string) {
|
|||
}
|
||||
}
|
||||
|
||||
func assertEventsEqual(t *testing.T, msg string, checkRoomID bool, gots []gomatrixserverlib.ClientEvent, wants []*gomatrixserverlib.HeaderedEvent) {
|
||||
func assertEventsEqual(t *testing.T, msg string, checkRoomID bool, gots []gomatrixserverlib.ClientEvent, wants []*types.HeaderedEvent) {
|
||||
t.Helper()
|
||||
if len(gots) != len(wants) {
|
||||
t.Fatalf("%s response returned %d events, want %d", msg, len(gots), len(wants))
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
|
||||
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/syncapi/synctypes"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
)
|
||||
|
@ -35,11 +36,11 @@ type AccountData interface {
|
|||
}
|
||||
|
||||
type Invites interface {
|
||||
InsertInviteEvent(ctx context.Context, txn *sql.Tx, inviteEvent *gomatrixserverlib.HeaderedEvent) (streamPos types.StreamPosition, err error)
|
||||
InsertInviteEvent(ctx context.Context, txn *sql.Tx, inviteEvent *rstypes.HeaderedEvent) (streamPos types.StreamPosition, err error)
|
||||
DeleteInviteEvent(ctx context.Context, txn *sql.Tx, inviteEventID string) (types.StreamPosition, error)
|
||||
// SelectInviteEventsInRange returns a map of room ID to invite events. If multiple invite/retired invites exist in the given range, return the latest value
|
||||
// for the room.
|
||||
SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (invites map[string]*gomatrixserverlib.HeaderedEvent, retired map[string]*gomatrixserverlib.HeaderedEvent, maxID types.StreamPosition, err error)
|
||||
SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (invites map[string]*rstypes.HeaderedEvent, retired map[string]*rstypes.HeaderedEvent, maxID types.StreamPosition, err error)
|
||||
SelectMaxInviteID(ctx context.Context, txn *sql.Tx) (id int64, err error)
|
||||
PurgeInvites(ctx context.Context, txn *sql.Tx, roomID string) error
|
||||
}
|
||||
|
@ -59,7 +60,7 @@ type Events interface {
|
|||
SelectMaxEventID(ctx context.Context, txn *sql.Tx) (id int64, err error)
|
||||
InsertEvent(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
event *gomatrixserverlib.HeaderedEvent,
|
||||
event *rstypes.HeaderedEvent,
|
||||
addState, removeState []string,
|
||||
transactionID *api.TransactionID,
|
||||
excludeFromSync bool,
|
||||
|
@ -70,16 +71,16 @@ type Events interface {
|
|||
// Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`.
|
||||
SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomIDs []string, r types.Range, eventFilter *synctypes.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) (map[string]types.RecentEvents, error)
|
||||
SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string, filter *synctypes.RoomEventFilter, preserveOrder bool) ([]types.StreamEvent, error)
|
||||
UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent) error
|
||||
UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent) error
|
||||
// DeleteEventsForRoom removes all event information for a room. This should only be done when removing the room entirely.
|
||||
DeleteEventsForRoom(ctx context.Context, txn *sql.Tx, roomID string) (err error)
|
||||
|
||||
SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
|
||||
SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
|
||||
SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
|
||||
SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, rstypes.HeaderedEvent, error)
|
||||
SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter) ([]*rstypes.HeaderedEvent, error)
|
||||
SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *synctypes.RoomEventFilter) (int, []*rstypes.HeaderedEvent, error)
|
||||
|
||||
PurgeEvents(ctx context.Context, txn *sql.Tx, roomID string) error
|
||||
ReIndex(ctx context.Context, txn *sql.Tx, limit, offset int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error)
|
||||
ReIndex(ctx context.Context, txn *sql.Tx, limit, offset int64, types []string) (map[int64]rstypes.HeaderedEvent, error)
|
||||
}
|
||||
|
||||
// Topology keeps track of the depths and stream positions for all events.
|
||||
|
@ -87,7 +88,7 @@ type Events interface {
|
|||
type Topology interface {
|
||||
// InsertEventInTopology inserts the given event in the room's topology, based on the event's depth.
|
||||
// `pos` is the stream position of this event in the events table, and is used to order events which have the same depth.
|
||||
InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) (topoPos types.StreamPosition, err error)
|
||||
InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent, pos types.StreamPosition) (topoPos types.StreamPosition, err error)
|
||||
// SelectEventIDsInRange selects the IDs of events whose depths are within a given range in a given room's topological order.
|
||||
// Events with `minDepth` are *exclusive*, as is the event which has exactly `minDepth`,`maxStreamPos`.
|
||||
// `maxStreamPos` is only used when events have the same depth as `maxDepth`, which results in events less than `maxStreamPos` being returned.
|
||||
|
@ -101,13 +102,13 @@ type Topology interface {
|
|||
}
|
||||
|
||||
type CurrentRoomState interface {
|
||||
SelectStateEvent(ctx context.Context, txn *sql.Tx, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error)
|
||||
SelectStateEvent(ctx context.Context, txn *sql.Tx, roomID, evType, stateKey string) (*rstypes.HeaderedEvent, error)
|
||||
SelectEventsWithEventIDs(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error)
|
||||
UpsertRoomState(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition) error
|
||||
UpsertRoomState(ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent, membership *string, addedAt types.StreamPosition) error
|
||||
DeleteRoomStateByEventID(ctx context.Context, txn *sql.Tx, eventID string) error
|
||||
DeleteRoomStateForRoom(ctx context.Context, txn *sql.Tx, roomID string) error
|
||||
// SelectCurrentState returns all the current state events for the given room.
|
||||
SelectCurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilter *synctypes.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error)
|
||||
SelectCurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilter *synctypes.StateFilter, excludeEventIDs []string) ([]*rstypes.HeaderedEvent, error)
|
||||
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
|
||||
SelectRoomIDsWithMembership(ctx context.Context, txn *sql.Tx, userID string, membership string) ([]string, error)
|
||||
// SelectRoomIDsWithAnyMembership returns a map of all memberships for the given user.
|
||||
|
@ -191,7 +192,7 @@ type Receipts interface {
|
|||
}
|
||||
|
||||
type Memberships interface {
|
||||
UpsertMembership(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error
|
||||
UpsertMembership(ctx context.Context, txn *sql.Tx, event *rstypes.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error
|
||||
SelectMembershipCount(ctx context.Context, txn *sql.Tx, roomID, membership string, pos types.StreamPosition) (count int, err error)
|
||||
SelectMembershipForUser(ctx context.Context, txn *sql.Tx, roomID, userID string, pos int64) (membership string, topologicalPos int, err error)
|
||||
PurgeMemberships(ctx context.Context, txn *sql.Tx, roomID string) error
|
||||
|
|
|
@ -6,10 +6,10 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/postgres"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3"
|
||||
|
@ -47,7 +47,7 @@ func TestMembershipsTable(t *testing.T) {
|
|||
room := test.NewRoom(t, alice)
|
||||
|
||||
// Create users
|
||||
var userEvents []*gomatrixserverlib.HeaderedEvent
|
||||
var userEvents []*rstypes.HeaderedEvent
|
||||
users := []string{alice.ID}
|
||||
for _, x := range room.CurrentState() {
|
||||
if x.StateKeyEquals(alice.ID) {
|
||||
|
@ -114,7 +114,7 @@ func testMembershipCount(t *testing.T, ctx context.Context, table tables.Members
|
|||
})
|
||||
}
|
||||
|
||||
func testUpsert(t *testing.T, ctx context.Context, table tables.Memberships, membershipEvent *gomatrixserverlib.HeaderedEvent, user *test.User, room *test.Room) {
|
||||
func testUpsert(t *testing.T, ctx context.Context, table tables.Memberships, membershipEvent *rstypes.HeaderedEvent, user *test.User, room *test.Room) {
|
||||
t.Run("upserting works as expected", func(t *testing.T) {
|
||||
if err := table.UpsertMembership(ctx, nil, membershipEvent, 1, 1); err != nil {
|
||||
t.Fatalf("failed to upsert membership: %s", err)
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
|
||||
"github.com/matrix-org/dendrite/internal/caching"
|
||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/syncapi/internal"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/synctypes"
|
||||
|
@ -273,10 +274,21 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
|||
recentStreamEvents := dbEvents[delta.RoomID].Events
|
||||
limited := dbEvents[delta.RoomID].Limited
|
||||
|
||||
recentEvents := gomatrixserverlib.HeaderedReverseTopologicalOrdering(
|
||||
snapshot.StreamEventsToEvents(device, recentStreamEvents),
|
||||
hisVisMap := map[string]gomatrixserverlib.HistoryVisibility{}
|
||||
for _, re := range recentStreamEvents {
|
||||
hisVisMap[re.EventID()] = re.Visibility
|
||||
}
|
||||
recEvents := gomatrixserverlib.HeaderedReverseTopologicalOrdering(
|
||||
toEvents(snapshot.StreamEventsToEvents(device, recentStreamEvents)),
|
||||
gomatrixserverlib.TopologicalOrderByPrevEvents,
|
||||
)
|
||||
recentEvents := make([]*rstypes.HeaderedEvent, len(recEvents))
|
||||
for i := range recEvents {
|
||||
recentEvents[i] = &rstypes.HeaderedEvent{
|
||||
Event: recEvents[i],
|
||||
Visibility: hisVisMap[recEvents[i].EventID()],
|
||||
}
|
||||
}
|
||||
|
||||
// If we didn't return any events at all then don't bother doing anything else.
|
||||
if len(recentEvents) == 0 && len(delta.StateEvents) == 0 {
|
||||
|
@ -341,10 +353,21 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
|||
// Now that we've filtered the timeline, work out which state events are still
|
||||
// left. Anything that appears in the filtered timeline will be removed from the
|
||||
// "state" section and kept in "timeline".
|
||||
delta.StateEvents = gomatrixserverlib.HeaderedReverseTopologicalOrdering(
|
||||
removeDuplicates(delta.StateEvents, events),
|
||||
hisVisMap = map[string]gomatrixserverlib.HistoryVisibility{}
|
||||
for _, re := range delta.StateEvents {
|
||||
hisVisMap[re.EventID()] = re.Visibility
|
||||
}
|
||||
sEvents := gomatrixserverlib.HeaderedReverseTopologicalOrdering(
|
||||
toEvents(removeDuplicates(delta.StateEvents, events)),
|
||||
gomatrixserverlib.TopologicalOrderByAuthEvents,
|
||||
)
|
||||
delta.StateEvents = make([]*rstypes.HeaderedEvent, len(sEvents))
|
||||
for i := range sEvents {
|
||||
delta.StateEvents[i] = &rstypes.HeaderedEvent{
|
||||
Event: sEvents[i],
|
||||
Visibility: hisVisMap[sEvents[i].EventID()],
|
||||
}
|
||||
}
|
||||
|
||||
if len(delta.StateEvents) > 0 {
|
||||
if last := delta.StateEvents[len(delta.StateEvents)-1]; last != nil {
|
||||
|
@ -407,8 +430,8 @@ func applyHistoryVisibilityFilter(
|
|||
snapshot storage.DatabaseTransaction,
|
||||
rsAPI roomserverAPI.SyncRoomserverAPI,
|
||||
roomID, userID string,
|
||||
recentEvents []*gomatrixserverlib.HeaderedEvent,
|
||||
) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
||||
recentEvents []*rstypes.HeaderedEvent,
|
||||
) ([]*rstypes.HeaderedEvent, error) {
|
||||
// We need to make sure we always include the latest state events, if they are in the timeline.
|
||||
alwaysIncludeIDs := make(map[string]struct{})
|
||||
var stateTypes []string
|
||||
|
@ -555,8 +578,8 @@ func (p *PDUStreamProvider) lazyLoadMembers(
|
|||
ctx context.Context, snapshot storage.DatabaseTransaction, roomID string,
|
||||
incremental, limited bool, stateFilter *synctypes.StateFilter,
|
||||
device *userapi.Device,
|
||||
timelineEvents, stateEvents []*gomatrixserverlib.HeaderedEvent,
|
||||
) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
||||
timelineEvents, stateEvents []*rstypes.HeaderedEvent,
|
||||
) ([]*rstypes.HeaderedEvent, error) {
|
||||
if len(timelineEvents) == 0 {
|
||||
return stateEvents, nil
|
||||
}
|
||||
|
@ -573,7 +596,7 @@ func (p *PDUStreamProvider) lazyLoadMembers(
|
|||
}
|
||||
}
|
||||
// Preallocate with the same amount, even if it will end up with fewer values
|
||||
newStateEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, len(stateEvents))
|
||||
newStateEvents := make([]*rstypes.HeaderedEvent, 0, len(stateEvents))
|
||||
// Remove existing membership events we don't care about, e.g. users not in the timeline.events
|
||||
for _, event := range stateEvents {
|
||||
if event.Type() == spec.MRoomMember && event.StateKey() != nil {
|
||||
|
@ -633,7 +656,7 @@ func (p *PDUStreamProvider) addIgnoredUsersToFilter(ctx context.Context, snapsho
|
|||
return nil
|
||||
}
|
||||
|
||||
func removeDuplicates(stateEvents, recentEvents []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent {
|
||||
func removeDuplicates(stateEvents, recentEvents []*rstypes.HeaderedEvent) []*rstypes.HeaderedEvent {
|
||||
for _, recentEv := range recentEvents {
|
||||
if recentEv.StateKey() == nil {
|
||||
continue // not a state event
|
||||
|
@ -654,3 +677,11 @@ func removeDuplicates(stateEvents, recentEvents []*gomatrixserverlib.HeaderedEve
|
|||
}
|
||||
return stateEvents
|
||||
}
|
||||
|
||||
func toEvents(events []*rstypes.HeaderedEvent) []*gomatrixserverlib.Event {
|
||||
result := make([]*gomatrixserverlib.Event, len(events))
|
||||
for i := range events {
|
||||
result[i] = events[i].Event
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"github.com/nats-io/nats.go"
|
||||
"github.com/tidwall/gjson"
|
||||
|
||||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/syncapi/routing"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/synctypes"
|
||||
|
@ -490,7 +491,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
|
|||
afterJoinBody := fmt.Sprintf("After join in a %s room", tc.historyVisibility)
|
||||
msgEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": afterJoinBody})
|
||||
|
||||
eventsToSend = append([]*gomatrixserverlib.HeaderedEvent{}, inviteEv, afterInviteEv, joinEv, msgEv)
|
||||
eventsToSend = append([]*rstypes.HeaderedEvent{}, inviteEv, afterInviteEv, joinEv, msgEv)
|
||||
|
||||
if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", "test", nil, false); err != nil {
|
||||
t.Fatalf("failed to send events: %v", err)
|
||||
|
@ -523,7 +524,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
|
|||
}
|
||||
}
|
||||
|
||||
func verifyEventVisible(t *testing.T, wantVisible bool, wantVisibleEvent *gomatrixserverlib.HeaderedEvent, chunk []synctypes.ClientEvent) {
|
||||
func verifyEventVisible(t *testing.T, wantVisible bool, wantVisibleEvent *rstypes.HeaderedEvent, chunk []synctypes.ClientEvent) {
|
||||
t.Helper()
|
||||
if wantVisible {
|
||||
for _, ev := range chunk {
|
||||
|
@ -1228,7 +1229,7 @@ func syncUntil(t *testing.T,
|
|||
}
|
||||
}
|
||||
|
||||
func toNATSMsgs(t *testing.T, cfg *config.Dendrite, input ...*gomatrixserverlib.HeaderedEvent) []*nats.Msg {
|
||||
func toNATSMsgs(t *testing.T, cfg *config.Dendrite, input ...*rstypes.HeaderedEvent) []*nats.Msg {
|
||||
result := make([]*nats.Msg, len(input))
|
||||
for i, ev := range input {
|
||||
var addsStateIDs []string
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
package synctypes
|
||||
|
||||
import (
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||
)
|
||||
|
@ -56,7 +57,7 @@ func ToClientEvents(serverEvs []*gomatrixserverlib.Event, format ClientEventForm
|
|||
}
|
||||
|
||||
// HeaderedToClientEvents converts headered server events to client events.
|
||||
func HeaderedToClientEvents(serverEvs []*gomatrixserverlib.HeaderedEvent, format ClientEventFormat) []ClientEvent {
|
||||
func HeaderedToClientEvents(serverEvs []*types.HeaderedEvent, format ClientEventFormat) []ClientEvent {
|
||||
evs := make([]ClientEvent, 0, len(serverEvs))
|
||||
for _, se := range serverEvs {
|
||||
if se == nil {
|
||||
|
@ -86,6 +87,6 @@ func ToClientEvent(se *gomatrixserverlib.Event, format ClientEventFormat) Client
|
|||
}
|
||||
|
||||
// HeaderedToClientEvent converts a single headered server event to a client event.
|
||||
func HeaderedToClientEvent(se *gomatrixserverlib.HeaderedEvent, format ClientEventFormat) ClientEvent {
|
||||
func HeaderedToClientEvent(se *types.HeaderedEvent, format ClientEventFormat) ClientEvent {
|
||||
return ToClientEvent(se.Event, format)
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
"github.com/tidwall/gjson"
|
||||
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/syncapi/synctypes"
|
||||
)
|
||||
|
||||
|
@ -38,7 +39,7 @@ var (
|
|||
|
||||
type StateDelta struct {
|
||||
RoomID string
|
||||
StateEvents []*gomatrixserverlib.HeaderedEvent
|
||||
StateEvents []*types.HeaderedEvent
|
||||
NewlyJoined bool
|
||||
Membership string
|
||||
// The PDU stream position of the latest membership event for this user, if applicable.
|
||||
|
@ -59,7 +60,7 @@ func NewStreamPositionFromString(s string) (StreamPosition, error) {
|
|||
|
||||
// StreamEvent is the same as gomatrixserverlib.Event but also has the PDU stream position for this event.
|
||||
type StreamEvent struct {
|
||||
*gomatrixserverlib.HeaderedEvent
|
||||
*types.HeaderedEvent
|
||||
StreamPosition StreamPosition
|
||||
TransactionID *api.TransactionID
|
||||
ExcludeFromSync bool
|
||||
|
@ -538,7 +539,7 @@ type InviteResponse struct {
|
|||
}
|
||||
|
||||
// NewInviteResponse creates an empty response with initialised arrays.
|
||||
func NewInviteResponse(event *gomatrixserverlib.HeaderedEvent) *InviteResponse {
|
||||
func NewInviteResponse(event *types.HeaderedEvent) *InviteResponse {
|
||||
res := InviteResponse{}
|
||||
res.InviteState.Events = []json.RawMessage{}
|
||||
|
||||
|
@ -551,7 +552,7 @@ func NewInviteResponse(event *gomatrixserverlib.HeaderedEvent) *InviteResponse {
|
|||
|
||||
// Then we'll see if we can create a partial of the invite event itself.
|
||||
// This is needed for clients to work out *who* sent the invite.
|
||||
inviteEvent := synctypes.ToClientEvent(event.Unwrap(), synctypes.FormatSync)
|
||||
inviteEvent := synctypes.ToClientEvent(event.Event, synctypes.FormatSync)
|
||||
inviteEvent.Unsigned = nil
|
||||
if ev, err := json.Marshal(inviteEvent); err == nil {
|
||||
res.InviteState.Events = append(res.InviteState.Events, ev)
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/syncapi/synctypes"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
@ -55,7 +56,7 @@ func TestNewInviteResponse(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
res := NewInviteResponse(ev.Headered(gomatrixserverlib.RoomVersionV5))
|
||||
res := NewInviteResponse(&types.HeaderedEvent{Event: ev})
|
||||
j, err := json.Marshal(res)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue