mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-04-10 13:53:40 +00:00
Implement /messages (with backfill when needed)
This commit is contained in:
parent
77dc37ce6f
commit
c3d25bd7cc
8 changed files with 286 additions and 47 deletions
|
@ -70,7 +70,7 @@ func main() {
|
||||||
federationsender.SetupFederationSenderComponent(base, federation, query)
|
federationsender.SetupFederationSenderComponent(base, federation, query)
|
||||||
mediaapi.SetupMediaAPIComponent(base, deviceDB)
|
mediaapi.SetupMediaAPIComponent(base, deviceDB)
|
||||||
publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB)
|
publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB)
|
||||||
syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query)
|
syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query, federation, cfg)
|
||||||
|
|
||||||
httpHandler := common.WrapHandlerInCORS(base.APIMux)
|
httpHandler := common.WrapHandlerInCORS(base.APIMux)
|
||||||
|
|
||||||
|
|
|
@ -26,10 +26,11 @@ func main() {
|
||||||
|
|
||||||
deviceDB := base.CreateDeviceDB()
|
deviceDB := base.CreateDeviceDB()
|
||||||
accountDB := base.CreateAccountsDB()
|
accountDB := base.CreateAccountsDB()
|
||||||
|
federation := base.CreateFederationClient()
|
||||||
|
|
||||||
_, _, query := base.CreateHTTPRoomserverAPIs()
|
_, _, query := base.CreateHTTPRoomserverAPIs()
|
||||||
|
|
||||||
syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query)
|
syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query, federation, cfg)
|
||||||
|
|
||||||
base.SetupAndServeHTTP(string(base.Cfg.Listen.SyncAPI))
|
base.SetupAndServeHTTP(string(base.Cfg.Listen.SyncAPI))
|
||||||
}
|
}
|
||||||
|
|
|
@ -230,6 +230,20 @@ type QueryBackfillResponse struct {
|
||||||
Events []gomatrixserverlib.Event `json:"events"`
|
Events []gomatrixserverlib.Event `json:"events"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// QueryServersInRoomAtEventRequest is a request to QueryServersInRoomAtEvent
|
||||||
|
type QueryServersInRoomAtEventRequest struct {
|
||||||
|
// ID of the room to retrieve member servers for.
|
||||||
|
RoomID string `json:"room_id"`
|
||||||
|
// ID of the event for which to retrieve member servers.
|
||||||
|
EventID string `json:"event_id"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryServersInRoomAtEventResponse is a response to QueryServersInRoomAtEvent
|
||||||
|
type QueryServersInRoomAtEventResponse struct {
|
||||||
|
// Servers present in the room for these events.
|
||||||
|
Servers []gomatrixserverlib.ServerName `json:"servers"`
|
||||||
|
}
|
||||||
|
|
||||||
// RoomserverQueryAPI is used to query information from the room server.
|
// RoomserverQueryAPI is used to query information from the room server.
|
||||||
type RoomserverQueryAPI interface {
|
type RoomserverQueryAPI interface {
|
||||||
// Query the latest events and state for a room from the room server.
|
// Query the latest events and state for a room from the room server.
|
||||||
|
@ -303,6 +317,12 @@ type RoomserverQueryAPI interface {
|
||||||
request *QueryBackfillRequest,
|
request *QueryBackfillRequest,
|
||||||
response *QueryBackfillResponse,
|
response *QueryBackfillResponse,
|
||||||
) error
|
) error
|
||||||
|
|
||||||
|
QueryServersInRoomAtEvent(
|
||||||
|
ctx context.Context,
|
||||||
|
request *QueryServersInRoomAtEventRequest,
|
||||||
|
response *QueryServersInRoomAtEventResponse,
|
||||||
|
) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// RoomserverQueryLatestEventsAndStatePath is the HTTP path for the QueryLatestEventsAndState API.
|
// RoomserverQueryLatestEventsAndStatePath is the HTTP path for the QueryLatestEventsAndState API.
|
||||||
|
@ -333,7 +353,10 @@ const RoomserverQueryMissingEventsPath = "/api/roomserver/queryMissingEvents"
|
||||||
const RoomserverQueryStateAndAuthChainPath = "/api/roomserver/queryStateAndAuthChain"
|
const RoomserverQueryStateAndAuthChainPath = "/api/roomserver/queryStateAndAuthChain"
|
||||||
|
|
||||||
// RoomserverQueryBackfillPath is the HTTP path for the QueryMissingEvents API
|
// RoomserverQueryBackfillPath is the HTTP path for the QueryMissingEvents API
|
||||||
const RoomserverQueryBackfillPath = "/api/roomserver/QueryBackfill"
|
const RoomserverQueryBackfillPath = "/api/roomserver/queryBackfill"
|
||||||
|
|
||||||
|
// RoomserverQueryServersInRoomAtEvent is the HTTP path for the QueryServersInRoomAtEvent API
|
||||||
|
const RoomserverQueryServersInRoomAtEvent = "/api/roomserver/queryServersInRoomAtEvents"
|
||||||
|
|
||||||
// NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API.
|
// NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API.
|
||||||
// If httpClient is nil then it uses the http.DefaultClient
|
// If httpClient is nil then it uses the http.DefaultClient
|
||||||
|
@ -475,6 +498,19 @@ func (h *httpRoomserverQueryAPI) QueryBackfill(
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryBackfill")
|
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryBackfill")
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
apiURL := h.roomserverURL + RoomserverQueryMissingEventsPath
|
apiURL := h.roomserverURL + RoomserverQueryBackfillPath
|
||||||
|
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryServersInRoomAtEvent implements RoomServerQueryAPI
|
||||||
|
func (h *httpRoomserverQueryAPI) QueryServersInRoomAtEvent(
|
||||||
|
ctx context.Context,
|
||||||
|
request *QueryServersInRoomAtEventRequest,
|
||||||
|
response *QueryServersInRoomAtEventResponse,
|
||||||
|
) error {
|
||||||
|
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryBackfill")
|
||||||
|
defer span.Finish()
|
||||||
|
|
||||||
|
apiURL := h.roomserverURL + RoomserverQueryServersInRoomAtEvent
|
||||||
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||||
}
|
}
|
||||||
|
|
|
@ -643,6 +643,41 @@ func getAuthChain(
|
||||||
return authEvents, nil
|
return authEvents, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// QueryServersInRoomAtEvent implements api.RoomserverQueryAPI
|
||||||
|
func (r *RoomserverQueryAPI) QueryServersInRoomAtEvent(
|
||||||
|
ctx context.Context,
|
||||||
|
request *api.QueryServersInRoomAtEventRequest,
|
||||||
|
response *api.QueryServersInRoomAtEventResponse,
|
||||||
|
) error {
|
||||||
|
// getMembershipsBeforeEventNID requires a NID, so retrieving the NID for
|
||||||
|
// the event is necessary.
|
||||||
|
NIDs, err := r.DB.EventNIDs(ctx, []string{request.EventID})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retrieve all "m.room.member" state events of "join" membership, which
|
||||||
|
// contains the list of users in the room before the event, therefore all
|
||||||
|
// the servers in it at that moment.
|
||||||
|
events, err := r.getMembershipsBeforeEventNID(ctx, NIDs[request.EventID], true)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store the server names in a temporary map to avoid duplicates.
|
||||||
|
servers := make(map[gomatrixserverlib.ServerName]bool)
|
||||||
|
for _, event := range events {
|
||||||
|
servers[event.Origin()] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Populate the response.
|
||||||
|
for server := range servers {
|
||||||
|
response.Servers = append(response.Servers, server)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// SetupHTTP adds the RoomserverQueryAPI handlers to the http.ServeMux.
|
// SetupHTTP adds the RoomserverQueryAPI handlers to the http.ServeMux.
|
||||||
// nolint: gocyclo
|
// nolint: gocyclo
|
||||||
func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) {
|
func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) {
|
||||||
|
@ -786,4 +821,18 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) {
|
||||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
|
servMux.Handle(
|
||||||
|
api.RoomserverQueryServersInRoomAtEvent,
|
||||||
|
common.MakeInternalAPI("QueryServersInRoomAtEvent", func(req *http.Request) util.JSONResponse {
|
||||||
|
var request api.QueryServersInRoomAtEventRequest
|
||||||
|
var response api.QueryServersInRoomAtEventResponse
|
||||||
|
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||||
|
return util.ErrorResponse(err)
|
||||||
|
}
|
||||||
|
if err := r.QueryServersInRoomAtEvent(req.Context(), &request, &response); err != nil {
|
||||||
|
return util.ErrorResponse(err)
|
||||||
|
}
|
||||||
|
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||||
|
}),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,12 +15,14 @@
|
||||||
package routing
|
package routing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
// "encoding/json"
|
"context"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
// "github.com/matrix-org/dendrite/clientapi/httputil"
|
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
@ -36,9 +38,18 @@ type messageResp struct {
|
||||||
|
|
||||||
const defaultMessagesLimit = 10
|
const defaultMessagesLimit = 10
|
||||||
|
|
||||||
func OnIncomingMessagesRequest(req *http.Request, db *storage.SyncServerDatabase, roomID string) util.JSONResponse {
|
// OnIncomingMessagesRequest implements the /messages endpoint from the
|
||||||
|
// client-server API.
|
||||||
|
// See: https://matrix.org/docs/spec/client_server/r0.4.0.html#get-matrix-client-r0-rooms-roomid-messages
|
||||||
|
func OnIncomingMessagesRequest(
|
||||||
|
req *http.Request, db *storage.SyncServerDatabase, roomID string,
|
||||||
|
federation *gomatrixserverlib.FederationClient,
|
||||||
|
queryAPI api.RoomserverQueryAPI,
|
||||||
|
cfg *config.Dendrite,
|
||||||
|
) util.JSONResponse {
|
||||||
var from, to int
|
var from, to int
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
// Extract parameters from the request's URL.
|
// Extract parameters from the request's URL.
|
||||||
// Pagination tokens.
|
// Pagination tokens.
|
||||||
from, err = strconv.Atoi(req.URL.Query().Get("from"))
|
from, err = strconv.Atoi(req.URL.Query().Get("from"))
|
||||||
|
@ -58,8 +69,12 @@ func OnIncomingMessagesRequest(req *http.Request, db *storage.SyncServerDatabase
|
||||||
JSON: jsonerror.MissingArgument("Bad or missing dir query parameter (should be either 'b' or 'f')"),
|
JSON: jsonerror.MissingArgument("Bad or missing dir query parameter (should be either 'b' or 'f')"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// A boolean is easier to handle in this case, especially since dir is sure
|
||||||
|
// to have one of the two accepted values (so dir == "f" <=> !backwardOrdering).
|
||||||
backwardOrdering := (dir == "b")
|
backwardOrdering := (dir == "b")
|
||||||
|
|
||||||
|
// Pagination tokens. To is optional, and its default value depends on the
|
||||||
|
// direction ("b" or "f").
|
||||||
toStr := req.URL.Query().Get("to")
|
toStr := req.URL.Query().Get("to")
|
||||||
var toPos types.StreamPosition
|
var toPos types.StreamPosition
|
||||||
if len(toStr) > 0 {
|
if len(toStr) > 0 {
|
||||||
|
@ -72,13 +87,12 @@ func OnIncomingMessagesRequest(req *http.Request, db *storage.SyncServerDatabase
|
||||||
}
|
}
|
||||||
toPos = types.StreamPosition(to)
|
toPos = types.StreamPosition(to)
|
||||||
} else {
|
} else {
|
||||||
if backwardOrdering {
|
// If "to" isn't provided, it defaults to either the earliest stream
|
||||||
toPos = types.StreamPosition(0)
|
// position (if we're going backward) or to the latest one (if we're
|
||||||
} else {
|
// going forward).
|
||||||
toPos, err = db.SyncStreamPosition(req.Context())
|
toPos, err = setToDefault(req.Context(), backwardOrdering, db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return jsonerror.InternalServerError()
|
return httputil.LogThenError(req, err)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,14 +118,68 @@ func OnIncomingMessagesRequest(req *http.Request, db *storage.SyncServerDatabase
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamEvents, err := db.GetEventsInRange(
|
clientEvents, start, end, err := retrieveEvents(
|
||||||
req.Context(), fromPos, toPos, roomID, limit, backwardOrdering,
|
req.Context(), db, roomID, fromPos, toPos, toStr, limit, backwardOrdering,
|
||||||
|
federation, queryAPI, cfg,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return jsonerror.InternalServerError()
|
return httputil.LogThenError(req, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if we don't have enough events, i.e. len(sev) < limit and the events
|
// Respond with the events.
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusOK,
|
||||||
|
JSON: messageResp{
|
||||||
|
Chunk: clientEvents,
|
||||||
|
Start: start,
|
||||||
|
End: end,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// setToDefault returns the default value for the "to" query parameter of a
|
||||||
|
// request to /messages if not provided. It defaults to either the earliest
|
||||||
|
// stream position (if we're going backward) or to the latest one (if we're
|
||||||
|
// going forward).
|
||||||
|
// Returns an error if there was an issue with retrieving the latest position
|
||||||
|
// from the database
|
||||||
|
func setToDefault(
|
||||||
|
ctx context.Context, backwardOrdering bool, db *storage.SyncServerDatabase,
|
||||||
|
) (toPos types.StreamPosition, err error) {
|
||||||
|
if backwardOrdering {
|
||||||
|
toPos = types.StreamPosition(1)
|
||||||
|
} else {
|
||||||
|
toPos, err = db.SyncStreamPosition(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// retrieveEvents retrieve events from the local database for a request on
|
||||||
|
// /messages. If there's not enough events to retrieve, it asks another
|
||||||
|
// homeserver in the room for older events.
|
||||||
|
// Returns an error if there was an issue talking to the database or with the
|
||||||
|
// remote homeserver.
|
||||||
|
func retrieveEvents(
|
||||||
|
ctx context.Context, db *storage.SyncServerDatabase, roomID string,
|
||||||
|
fromPos, toPos types.StreamPosition, toStr string, limit int,
|
||||||
|
backwardOrdering bool,
|
||||||
|
federation *gomatrixserverlib.FederationClient,
|
||||||
|
queryAPI api.RoomserverQueryAPI,
|
||||||
|
cfg *config.Dendrite,
|
||||||
|
) (clientEvents []gomatrixserverlib.ClientEvent, start string, end string, err error) {
|
||||||
|
// Retrieve the events from the local database.
|
||||||
|
streamEvents, err := db.GetEventsInRange(
|
||||||
|
ctx, fromPos, toPos, roomID, limit, backwardOrdering,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if we have enough events.
|
||||||
isSetLargeEnough := true
|
isSetLargeEnough := true
|
||||||
if len(streamEvents) < limit {
|
if len(streamEvents) < limit {
|
||||||
if backwardOrdering {
|
if backwardOrdering {
|
||||||
|
@ -121,50 +189,121 @@ func OnIncomingMessagesRequest(req *http.Request, db *storage.SyncServerDatabase
|
||||||
isSetLargeEnough = (toPos-1 == streamEvents[0].StreamPosition)
|
isSetLargeEnough = (toPos-1 == streamEvents[0].StreamPosition)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// We need all events from < streamPos < to
|
|
||||||
isSetLargeEnough = (fromPos-1 == streamEvents[0].StreamPosition)
|
isSetLargeEnough = (fromPos-1 == streamEvents[0].StreamPosition)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if earliest event is a backward extremity, i.e. if one of its
|
// Check if earliest event is a backward extremity, i.e. if one of its
|
||||||
// previous events is missing from the db.
|
// previous events is missing from the database.
|
||||||
|
// Get the earliest retrieved event's parents.
|
||||||
prevIDs := streamEvents[0].PrevEventIDs()
|
prevIDs := streamEvents[0].PrevEventIDs()
|
||||||
prevs, err := db.Events(req.Context(), prevIDs)
|
prevs, err := db.Events(ctx, prevIDs)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Check if we have all of the events we requested. If not, it means we've
|
||||||
|
// reached a backard extremity.
|
||||||
var eventInDB, isBackwardExtremity bool
|
var eventInDB, isBackwardExtremity bool
|
||||||
var id string
|
var id string
|
||||||
|
// Iterate over the IDs we used in the request.
|
||||||
for _, id = range prevIDs {
|
for _, id = range prevIDs {
|
||||||
eventInDB = false
|
eventInDB = false
|
||||||
|
// Iterate over the events we got in response.
|
||||||
for _, ev := range prevs {
|
for _, ev := range prevs {
|
||||||
if ev.EventID() == id {
|
if ev.EventID() == id {
|
||||||
eventInDB = true
|
eventInDB = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// One occurrence of one the event's parents not being present in the
|
||||||
|
// database is enough to say that the event is a backward extremity.
|
||||||
if !eventInDB {
|
if !eventInDB {
|
||||||
isBackwardExtremity = true
|
isBackwardExtremity = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if isBackwardExtremity && !isSetLargeEnough {
|
var events []gomatrixserverlib.Event
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"limit": limit,
|
// Backfill is needed if we've reached a backward extremity and need more
|
||||||
"nb_events": len(streamEvents),
|
// events. It's only needed if the direction is backard.
|
||||||
"from": fromPos.String(),
|
if isBackwardExtremity && !isSetLargeEnough && backwardOrdering {
|
||||||
"to": toPos.String(),
|
var pdus []gomatrixserverlib.Event
|
||||||
"isBackwardExtremity": isBackwardExtremity,
|
// Only ask the remote server for enough events to reach the limit.
|
||||||
"isSetLargeEnough": isSetLargeEnough,
|
pdus, err = backfill(
|
||||||
}).Info("Backfilling!")
|
ctx, roomID, streamEvents[0].EventID(), limit-len(streamEvents),
|
||||||
println("Backfilling!")
|
queryAPI, cfg, federation,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
events = append(events, pdus...)
|
||||||
}
|
}
|
||||||
|
|
||||||
events := storage.StreamEventsToEvents(nil, streamEvents)
|
// Append the events we retrieved locally, then convert them into client
|
||||||
clientEvents := gomatrixserverlib.ToClientEvents(events, gomatrixserverlib.FormatAll)
|
// events.
|
||||||
|
events = append(events, storage.StreamEventsToEvents(nil, streamEvents)...)
|
||||||
|
clientEvents = gomatrixserverlib.ToClientEvents(events, gomatrixserverlib.FormatAll)
|
||||||
|
start = streamEvents[0].StreamPosition.String()
|
||||||
|
end = streamEvents[len(streamEvents)-1].StreamPosition.String()
|
||||||
|
|
||||||
return util.JSONResponse{
|
return
|
||||||
Code: http.StatusOK,
|
}
|
||||||
JSON: messageResp{
|
|
||||||
Chunk: clientEvents,
|
// backfill performs a backfill request over the federation on another
|
||||||
Start: streamEvents[0].StreamPosition.String(),
|
// homeserver in the room.
|
||||||
End: streamEvents[len(streamEvents)-1].StreamPosition.String(),
|
// See: https://matrix.org/docs/spec/server_server/unstable.html#get-matrix-federation-v1-backfill-roomid
|
||||||
},
|
// Returns with an empty string if the remote homeserver didn't return with any
|
||||||
}
|
// event, or if there is no remote homeserver to contact.
|
||||||
|
// Returns an error if there was an issue with retrieving the list of servers in
|
||||||
|
// the room or sending the request.
|
||||||
|
func backfill(
|
||||||
|
ctx context.Context, roomID, fromEventID string, limit int,
|
||||||
|
queryAPI api.RoomserverQueryAPI, cfg *config.Dendrite,
|
||||||
|
federation *gomatrixserverlib.FederationClient,
|
||||||
|
) ([]gomatrixserverlib.Event, error) {
|
||||||
|
// Query the list of servers in the room when the earlier event we know
|
||||||
|
// of was sent.
|
||||||
|
var serversResponse api.QueryServersInRoomAtEventResponse
|
||||||
|
serversRequest := api.QueryServersInRoomAtEventRequest{
|
||||||
|
RoomID: roomID,
|
||||||
|
EventID: fromEventID,
|
||||||
|
}
|
||||||
|
if err := queryAPI.QueryServersInRoomAtEvent(ctx, &serversRequest, &serversResponse); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use the first server from the response, except if that server is us.
|
||||||
|
// In that case, use the second one if the roomserver responded with
|
||||||
|
// enough servers. If not, use an empty string to prevent the backfill
|
||||||
|
// from happening as there's no server to direct the request towards.
|
||||||
|
// TODO: Be smarter at selecting the server to direct the request
|
||||||
|
// towards.
|
||||||
|
srvToBackfillFrom := serversResponse.Servers[0]
|
||||||
|
if srvToBackfillFrom == cfg.Matrix.ServerName {
|
||||||
|
if len(serversResponse.Servers) > 1 {
|
||||||
|
srvToBackfillFrom = serversResponse.Servers[1]
|
||||||
|
} else {
|
||||||
|
srvToBackfillFrom = gomatrixserverlib.ServerName("")
|
||||||
|
log.Warn("Not enough servers to backfill from")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pdus := make([]gomatrixserverlib.Event, 0)
|
||||||
|
|
||||||
|
// If the roomserver responded with at least one server that isn't us,
|
||||||
|
// send it a request for backfill.
|
||||||
|
if len(srvToBackfillFrom) > 0 {
|
||||||
|
txn, err := federation.Backfill(
|
||||||
|
ctx, srvToBackfillFrom, roomID, limit, []string{fromEventID},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Store the events in the database.
|
||||||
|
pdus = txn.PDUs
|
||||||
|
}
|
||||||
|
|
||||||
|
return pdus, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,15 +22,24 @@ import (
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/sync"
|
"github.com/matrix-org/dendrite/syncapi/sync"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
const pathPrefixR0 = "/_matrix/client/r0"
|
const pathPrefixR0 = "/_matrix/client/r0"
|
||||||
|
|
||||||
// Setup configures the given mux with sync-server listeners
|
// Setup configures the given mux with sync-server listeners
|
||||||
func Setup(apiMux *mux.Router, srp *sync.RequestPool, syncDB *storage.SyncServerDatabase, deviceDB *devices.Database) {
|
func Setup(
|
||||||
|
apiMux *mux.Router, srp *sync.RequestPool,
|
||||||
|
syncDB *storage.SyncServerDatabase, deviceDB *devices.Database,
|
||||||
|
federation *gomatrixserverlib.FederationClient,
|
||||||
|
queryAPI api.RoomserverQueryAPI,
|
||||||
|
cfg *config.Dendrite,
|
||||||
|
) {
|
||||||
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
|
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
|
||||||
|
|
||||||
authData := auth.Data{nil, deviceDB, nil}
|
authData := auth.Data{nil, deviceDB, nil}
|
||||||
|
@ -57,6 +66,6 @@ func Setup(apiMux *mux.Router, srp *sync.RequestPool, syncDB *storage.SyncServer
|
||||||
|
|
||||||
r0mux.Handle("/rooms/{roomID}/messages", common.MakeAuthAPI("room_messages", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
r0mux.Handle("/rooms/{roomID}/messages", common.MakeAuthAPI("room_messages", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||||
vars := mux.Vars(req)
|
vars := mux.Vars(req)
|
||||||
return OnIncomingMessagesRequest(req, syncDB, vars["roomID"])
|
return OnIncomingMessagesRequest(req, syncDB, vars["roomID"], federation, queryAPI, cfg)
|
||||||
})).Methods(http.MethodGet, http.MethodOptions)
|
})).Methods(http.MethodGet, http.MethodOptions)
|
||||||
}
|
}
|
||||||
|
|
|
@ -329,7 +329,7 @@ func (d *SyncServerDatabase) CompleteSync(
|
||||||
recentEvents := StreamEventsToEvents(nil, recentStreamEvents)
|
recentEvents := StreamEventsToEvents(nil, recentStreamEvents)
|
||||||
stateEvents = removeDuplicates(stateEvents, recentEvents)
|
stateEvents = removeDuplicates(stateEvents, recentEvents)
|
||||||
jr := types.NewJoinResponse()
|
jr := types.NewJoinResponse()
|
||||||
if prevBatch := recentStreamEvents[0].streamPosition - 1; prevBatch > 0 {
|
if prevBatch := recentStreamEvents[0].StreamPosition - 1; prevBatch > 0 {
|
||||||
jr.Timeline.PrevBatch = types.StreamPosition(prevBatch).String()
|
jr.Timeline.PrevBatch = types.StreamPosition(prevBatch).String()
|
||||||
} else {
|
} else {
|
||||||
jr.Timeline.PrevBatch = types.StreamPosition(1).String()
|
jr.Timeline.PrevBatch = types.StreamPosition(1).String()
|
||||||
|
@ -463,7 +463,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse(
|
||||||
switch delta.membership {
|
switch delta.membership {
|
||||||
case "join":
|
case "join":
|
||||||
jr := types.NewJoinResponse()
|
jr := types.NewJoinResponse()
|
||||||
if prevBatch := recentStreamEvents[0].streamPosition - 1; prevBatch > 0 {
|
if prevBatch := recentStreamEvents[0].StreamPosition - 1; prevBatch > 0 {
|
||||||
jr.Timeline.PrevBatch = types.StreamPosition(prevBatch).String()
|
jr.Timeline.PrevBatch = types.StreamPosition(prevBatch).String()
|
||||||
} else {
|
} else {
|
||||||
jr.Timeline.PrevBatch = types.StreamPosition(1).String()
|
jr.Timeline.PrevBatch = types.StreamPosition(1).String()
|
||||||
|
@ -478,7 +478,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse(
|
||||||
// TODO: recentEvents may contain events that this user is not allowed to see because they are
|
// TODO: recentEvents may contain events that this user is not allowed to see because they are
|
||||||
// no longer in the room.
|
// no longer in the room.
|
||||||
lr := types.NewLeaveResponse()
|
lr := types.NewLeaveResponse()
|
||||||
if prevBatch := recentStreamEvents[0].streamPosition - 1; prevBatch > 0 {
|
if prevBatch := recentStreamEvents[0].StreamPosition - 1; prevBatch > 0 {
|
||||||
lr.Timeline.PrevBatch = types.StreamPosition(prevBatch).String()
|
lr.Timeline.PrevBatch = types.StreamPosition(prevBatch).String()
|
||||||
} else {
|
} else {
|
||||||
lr.Timeline.PrevBatch = types.StreamPosition(1).String()
|
lr.Timeline.PrevBatch = types.StreamPosition(1).String()
|
||||||
|
|
|
@ -21,6 +21,7 @@ import (
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||||
"github.com/matrix-org/dendrite/common/basecomponent"
|
"github.com/matrix-org/dendrite/common/basecomponent"
|
||||||
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
||||||
|
@ -29,6 +30,8 @@ import (
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/sync"
|
"github.com/matrix-org/dendrite/syncapi/sync"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SetupSyncAPIComponent sets up and registers HTTP handlers for the SyncAPI
|
// SetupSyncAPIComponent sets up and registers HTTP handlers for the SyncAPI
|
||||||
|
@ -38,6 +41,8 @@ func SetupSyncAPIComponent(
|
||||||
deviceDB *devices.Database,
|
deviceDB *devices.Database,
|
||||||
accountsDB *accounts.Database,
|
accountsDB *accounts.Database,
|
||||||
queryAPI api.RoomserverQueryAPI,
|
queryAPI api.RoomserverQueryAPI,
|
||||||
|
federation *gomatrixserverlib.FederationClient,
|
||||||
|
cfg *config.Dendrite,
|
||||||
) {
|
) {
|
||||||
syncDB, err := storage.NewSyncServerDatabase(string(base.Cfg.Database.SyncAPI))
|
syncDB, err := storage.NewSyncServerDatabase(string(base.Cfg.Database.SyncAPI))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -71,5 +76,5 @@ func SetupSyncAPIComponent(
|
||||||
logrus.WithError(err).Panicf("failed to start client data consumer")
|
logrus.WithError(err).Panicf("failed to start client data consumer")
|
||||||
}
|
}
|
||||||
|
|
||||||
routing.Setup(base.APIMux, requestPool, syncDB, deviceDB)
|
routing.Setup(base.APIMux, requestPool, syncDB, deviceDB, federation, queryAPI, cfg)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue