Refactor by implementing the messagesReq structure

This commit is contained in:
Brendan Abolivier 2018-11-13 18:07:27 +00:00
parent 38475d1489
commit 4a3c9555b1
No known key found for this signature in database
GPG key ID: 8EF1500759F70623

View file

@ -31,6 +31,20 @@ import (
log "github.com/sirupsen/logrus"
)
type messagesReq struct {
ctx context.Context
db *storage.SyncServerDatabase
queryAPI api.RoomserverQueryAPI
federation *gomatrixserverlib.FederationClient
cfg *config.Dendrite
roomID string
from *types.PaginationToken
to *types.PaginationToken
wasToProvided bool
limit int
backwardOrdering bool
}
type messageResp struct {
Start string `json:"start"`
End string `json:"end"`
@ -75,7 +89,7 @@ func OnIncomingMessagesRequest(
// Pagination tokens. To is optional, and its default value depends on the
// direction ("b" or "f").
var to *types.PaginationToken
var toDefault bool
wasToProvided := true
if s := req.URL.Query().Get("to"); len(s) > 0 {
to, err = types.NewPaginationTokenFromString(s)
if err != nil {
@ -92,7 +106,7 @@ func OnIncomingMessagesRequest(
if err != nil {
return httputil.LogThenError(req, err)
}
toDefault = true
wasToProvided = false
}
// Maximum number of events to return; defaults to 10.
@ -117,10 +131,21 @@ func OnIncomingMessagesRequest(
}
}
clientEvents, start, end, err := retrieveEvents(
req.Context(), db, roomID, from, to, toDefault, limit, backwardOrdering,
federation, queryAPI, cfg,
)
mReq := messagesReq{
ctx: req.Context(),
db: db,
queryAPI: queryAPI,
federation: federation,
cfg: cfg,
roomID: roomID,
from: from,
to: to,
wasToProvided: wasToProvided,
limit: limit,
backwardOrdering: backwardOrdering,
}
clientEvents, start, end, err := mReq.retrieveEvents()
if err != nil {
return httputil.LogThenError(req, err)
}
@ -136,132 +161,43 @@ func OnIncomingMessagesRequest(
}
}
// setToDefault returns the default value for the "to" query parameter of a
// request to /messages if not provided. It defaults to either the earliest
// topological position (if we're going backward) or to the latest one (if we're
// going forward).
// Returns an error if there was an issue with retrieving the latest position
// from the database
func setToDefault(
ctx context.Context, db *storage.SyncServerDatabase, backwardOrdering bool,
roomID string,
) (to *types.PaginationToken, err error) {
if backwardOrdering {
to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, 1)
} else {
var pos types.StreamPosition
pos, err = db.MaxTopologicalPosition(ctx, roomID)
if err != nil {
return
}
to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, pos)
}
return
}
// retrieveEvents retrieve events from the local database for a request on
// /messages. If there's not enough events to retrieve, it asks another
// homeserver in the room for older events.
// Returns an error if there was an issue talking to the database or with the
// remote homeserver.
func retrieveEvents(
ctx context.Context, db *storage.SyncServerDatabase, roomID string,
from, to *types.PaginationToken, toDefault bool, limit int,
backwardOrdering bool,
federation *gomatrixserverlib.FederationClient,
queryAPI api.RoomserverQueryAPI,
cfg *config.Dendrite,
) (clientEvents []gomatrixserverlib.ClientEvent, start, end *types.PaginationToken, err error) {
func (r *messagesReq) retrieveEvents() (
clientEvents []gomatrixserverlib.ClientEvent, start,
end *types.PaginationToken, err error,
) {
// Retrieve the events from the local database.
streamEvents, err := db.GetEventsInRange(
ctx, from, to, roomID, limit, backwardOrdering,
streamEvents, err := r.db.GetEventsInRange(
r.ctx, r.from, r.to, r.roomID, r.limit, r.backwardOrdering,
)
if err != nil {
return
}
var backwardExtremity bool
var events []gomatrixserverlib.Event
// There can be two reasons for streamEvents to be empty: either we've
// reached the oldest event in the room (or the most recent one, depending
// on the ordering), or we've reached a backward extremity.
if len(streamEvents) == 0 {
var evs []storage.StreamEvent
var laterPosition types.StreamPosition
if backwardOrdering {
laterPosition = from.Position + 1
} else {
laterPosition = to.Position + 1
}
evs, err = db.EventsAtTopologicalPosition(ctx, roomID, laterPosition)
if err != nil {
if events, err = r.handleEmptyEventsSlice(); err != nil {
return
}
backwardExtremity, err = containsBackwardExtremity(ctx, db, evs, backwardOrdering)
if err != nil {
return
}
if backwardExtremity {
events, err = backfill(ctx, db, roomID, evs[0].EventID(), limit, queryAPI, cfg, federation)
if err != nil {
return
}
} else {
return []gomatrixserverlib.ClientEvent{}, from, to, nil
} else if len(events) == 0 {
return []gomatrixserverlib.ClientEvent{}, r.from, r.to, nil
}
} else {
// Check if we have enough events.
isSetLargeEnough := true
if len(streamEvents) < limit {
if backwardOrdering {
if !toDefault {
// The condition in the SQL query is a strict "greater than" so
// we need to check against to-1.
isSetLargeEnough = (to.Position-1 == streamEvents[len(streamEvents)-1].StreamPosition)
}
} else {
isSetLargeEnough = (from.Position-1 == streamEvents[0].StreamPosition)
}
}
// Check if the slice contains a backward extremity.
backwardExtremity, err = containsBackwardExtremity(
ctx, db, streamEvents, backwardOrdering,
)
if err != nil {
if events, err = r.handleNonEmptyEventsSlice(streamEvents); err != nil {
return
}
// Backfill is needed if we've reached a backward extremity and need more
// events. It's only needed if the direction is backward.
if backwardExtremity && !isSetLargeEnough && backwardOrdering {
var pdus []gomatrixserverlib.Event
// Only ask the remote server for enough events to reach the limit.
pdus, err = backfill(
ctx, db, roomID, streamEvents[0].EventID(), limit-len(streamEvents),
queryAPI, cfg, federation,
)
if err != nil {
return
}
// Append the PDUs to the list to send back to the client.
events = append(events, pdus...)
}
// Append the events ve previously retrieved locally.
events = append(events, storage.StreamEventsToEvents(nil, streamEvents)...)
}
// Sort the events to ensure we send them in the right order. We currently
// do that based on the event's timestamp.
if backwardOrdering {
if r.backwardOrdering {
sort.SliceStable(events, func(i int, j int) bool {
// Backward ordering is antichronological (latest event to oldest
// one).
@ -291,13 +227,99 @@ func retrieveEvents(
return
}
// sortEvents is a function to give to sort.SliceStable, and compares the
// timestamp of two Matrix events.
// Returns true if the first event happened before the second one, false
// otherwise.
func sortEvents(e1 *gomatrixserverlib.Event, e2 *gomatrixserverlib.Event) bool {
t := e1.OriginServerTS().Time()
return e2.OriginServerTS().Time().After(t)
// handleEmptyEventsSlice handles the case where the initial request to the
// database returned an empty slice of events. It does so by checking whether
// the set is empty because we've reached a backward extremity, and if that is
// the case, by retrieving as much events as requested by backfilling from
// another homeserver.
// Returns an error if there was an issue talking with the database or
// backfilling.
func (r *messagesReq) handleEmptyEventsSlice() (
events []gomatrixserverlib.Event, err error,
) {
var evs []storage.StreamEvent
// Determine what could be the oldest position of interest in the room's
// topology for this.
var laterPosition types.StreamPosition
if r.backwardOrdering {
laterPosition = r.from.Position + 1
} else {
laterPosition = r.to.Position + 1
}
// Retrieve events at that position.
evs, err = r.db.EventsAtTopologicalPosition(r.ctx, r.roomID, laterPosition)
if err != nil {
return
}
// Check if one of these events is a backward extremity.
backwardExtremity, err := r.containsBackwardExtremity(evs)
if err != nil {
return
}
// If so, retrieve as much events as requested through backfilling.
if backwardExtremity {
events, err = r.backfill(evs[0].EventID(), r.limit)
if err != nil {
return
}
} else {
// If not, it means the slice was empty because we reached the limit of
// the room's topology, so return an empty slice.
events = []gomatrixserverlib.Event{}
}
return
}
// handleNonEmptyEventsSlice handles the case where the initial request to the
// database returned a non-empty slice of events. It does so by checking whether
// events are missing from the expected result, and retrieve missing events
// through backfilling if needed.
// Returns an error if there was an issue while backfilling.
func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []storage.StreamEvent) (
events []gomatrixserverlib.Event, err error,
) {
// Check if we have enough events.
isSetLargeEnough := true
if len(streamEvents) < r.limit {
if r.backwardOrdering {
if r.wasToProvided {
// The condition in the SQL query is a strict "greater than" so
// we need to check against to-1.
isSetLargeEnough = (r.to.Position-1 == streamEvents[len(streamEvents)-1].StreamPosition)
}
} else {
isSetLargeEnough = (r.from.Position-1 == streamEvents[0].StreamPosition)
}
}
// Check if the slice contains a backward extremity.
backwardExtremity, err := r.containsBackwardExtremity(streamEvents)
if err != nil {
return
}
// Backfill is needed if we've reached a backward extremity and need more
// events. It's only needed if the direction is backward.
if backwardExtremity && !isSetLargeEnough && r.backwardOrdering {
var pdus []gomatrixserverlib.Event
// Only ask the remote server for enough events to reach the limit.
pdus, err = r.backfill(streamEvents[0].EventID(), r.limit-len(streamEvents))
if err != nil {
return
}
// Append the PDUs to the list to send back to the client.
events = append(events, pdus...)
}
// Append the events ve previously retrieved locally.
events = append(events, storage.StreamEventsToEvents(nil, streamEvents)...)
return
}
// containsBackwardExtremity checks if a slice of StreamEvent contains a
@ -306,20 +328,17 @@ func sortEvents(e1 *gomatrixserverlib.Event, e2 *gomatrixserverlib.Event) bool {
// considers the event itself a backward extremity if at least one of the parent
// events doesn't exist in the database.
// Returns an error if there was an issue with talking to the database.
func containsBackwardExtremity(
ctx context.Context, db *storage.SyncServerDatabase,
events []storage.StreamEvent, backwardOrdering bool,
) (bool, error) {
func (r *messagesReq) containsBackwardExtremity(events []storage.StreamEvent) (bool, error) {
// Select the earliest retrieved event.
var ev *storage.StreamEvent
if backwardOrdering {
if r.backwardOrdering {
ev = &(events[len(events)-1])
} else {
ev = &(events[0])
}
// Get the earliest retrieved event's parents.
prevIDs := ev.PrevEventIDs()
prevs, err := db.Events(ctx, prevIDs)
prevs, err := r.db.Events(r.ctx, prevIDs)
if err != nil {
return false, nil
}
@ -355,19 +374,15 @@ func containsBackwardExtremity(
// event, or if there is no remote homeserver to contact.
// Returns an error if there was an issue with retrieving the list of servers in
// the room or sending the request.
func backfill(
ctx context.Context, db *storage.SyncServerDatabase, roomID,
fromEventID string, limit int, queryAPI api.RoomserverQueryAPI,
cfg *config.Dendrite, federation *gomatrixserverlib.FederationClient,
) ([]gomatrixserverlib.Event, error) {
func (r *messagesReq) backfill(fromEventID string, limit int) ([]gomatrixserverlib.Event, error) {
// Query the list of servers in the room when the earlier event we know
// of was sent.
var serversResponse api.QueryServersInRoomAtEventResponse
serversRequest := api.QueryServersInRoomAtEventRequest{
RoomID: roomID,
RoomID: r.roomID,
EventID: fromEventID,
}
if err := queryAPI.QueryServersInRoomAtEvent(ctx, &serversRequest, &serversResponse); err != nil {
if err := r.queryAPI.QueryServersInRoomAtEvent(r.ctx, &serversRequest, &serversResponse); err != nil {
return nil, err
}
@ -378,7 +393,7 @@ func backfill(
// TODO: Be smarter at selecting the server to direct the request
// towards.
srvToBackfillFrom := serversResponse.Servers[0]
if srvToBackfillFrom == cfg.Matrix.ServerName {
if srvToBackfillFrom == r.cfg.Matrix.ServerName {
if len(serversResponse.Servers) > 1 {
srvToBackfillFrom = serversResponse.Servers[1]
} else {
@ -392,8 +407,8 @@ func backfill(
// If the roomserver responded with at least one server that isn't us,
// send it a request for backfill.
if len(srvToBackfillFrom) > 0 {
txn, err := federation.Backfill(
ctx, srvToBackfillFrom, roomID, limit, []string{fromEventID},
txn, err := r.federation.Backfill(
r.ctx, srvToBackfillFrom, r.roomID, limit, []string{fromEventID},
)
if err != nil {
return nil, err
@ -404,8 +419,8 @@ func backfill(
// Store the events in the database, while marking them as unfit to show
// up in responses to sync requests.
for _, pdu := range pdus {
if _, err = db.WriteEvent(
ctx, &pdu, []gomatrixserverlib.Event{}, []string{}, []string{},
if _, err = r.db.WriteEvent(
r.ctx, &pdu, []gomatrixserverlib.Event{}, []string{}, []string{},
nil, true,
); err != nil {
return nil, err
@ -415,3 +430,37 @@ func backfill(
return pdus, nil
}
// setToDefault returns the default value for the "to" query parameter of a
// request to /messages if not provided. It defaults to either the earliest
// topological position (if we're going backward) or to the latest one (if we're
// going forward).
// Returns an error if there was an issue with retrieving the latest position
// from the database
func setToDefault(
ctx context.Context, db *storage.SyncServerDatabase, backwardOrdering bool,
roomID string,
) (to *types.PaginationToken, err error) {
if backwardOrdering {
to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, 1)
} else {
var pos types.StreamPosition
pos, err = db.MaxTopologicalPosition(ctx, roomID)
if err != nil {
return
}
to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, pos)
}
return
}
// sortEvents is a function to give to sort.SliceStable, and compares the
// timestamp of two Matrix events.
// Returns true if the first event happened before the second one, false
// otherwise.
func sortEvents(e1 *gomatrixserverlib.Event, e2 *gomatrixserverlib.Event) bool {
t := e1.OriginServerTS().Time()
return e2.OriginServerTS().Time().After(t)
}