Retrieval of local messages

Rough outline with debug logging and no comment
This commit is contained in:
Brendan Abolivier 2018-11-08 14:52:42 +00:00
parent 4cb223f8dd
commit 4194ebf381
No known key found for this signature in database
GPG key ID: 8EF1500759F70623
4 changed files with 250 additions and 26 deletions

View file

@ -0,0 +1,170 @@
// Copyright 2018 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package routing
import (
// "encoding/json"
"net/http"
"strconv"
// "github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
log "github.com/sirupsen/logrus"
)
type messageResp struct {
Start string `json:"start"`
End string `json:"end"`
Chunk []gomatrixserverlib.ClientEvent `json:"chunk"`
}
const defaultMessagesLimit = 10
func OnIncomingMessagesRequest(req *http.Request, db *storage.SyncServerDatabase, roomID string) util.JSONResponse {
var from, to int
var err error
// Extract parameters from the request's URL.
// Pagination tokens.
from, err = strconv.Atoi(req.URL.Query().Get("from"))
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidArgumentValue("from could not be parsed into an integer: " + err.Error()),
}
}
fromPos := types.StreamPosition(from)
// Direction to return events from.
dir := req.URL.Query().Get("dir")
if dir != "b" && dir != "f" {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.MissingArgument("Bad or missing dir query parameter (should be either 'b' or 'f')"),
}
}
backwardOrdering := (dir == "b")
toStr := req.URL.Query().Get("to")
var toPos types.StreamPosition
if len(toStr) > 0 {
to, err = strconv.Atoi(toStr)
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidArgumentValue("to could not be parsed into an integer: " + err.Error()),
}
}
toPos = types.StreamPosition(to)
} else {
if backwardOrdering {
toPos = types.StreamPosition(0)
} else {
toPos, err = db.SyncStreamPosition(req.Context())
if err != nil {
return jsonerror.InternalServerError()
}
}
}
// Maximum number of events to return; defaults to 10.
limit := defaultMessagesLimit
if len(req.URL.Query().Get("limit")) > 0 {
limit, err = strconv.Atoi(req.URL.Query().Get("limit"))
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidArgumentValue("limit could not be parsed into an integer: " + err.Error()),
}
}
}
// TODO: Implement filtering (#587)
// Check the room ID's format.
if _, _, err = gomatrixserverlib.SplitID('!', roomID); err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.MissingArgument("Bad room ID: " + err.Error()),
}
}
streamEvents, err := db.GetEventsInRange(
req.Context(), fromPos, toPos, roomID, limit, backwardOrdering,
)
if err != nil {
return jsonerror.InternalServerError()
}
// Check if we don't have enough events, i.e. len(sev) < limit and the events
isSetLargeEnough := true
if len(streamEvents) < limit {
if backwardOrdering {
if len(toStr) > 0 {
// The condition in the SQL query is a strict "greater than" so
// we need to check against to-1.
isSetLargeEnough = (toPos-1 == streamEvents[0].StreamPosition)
}
} else {
// We need all events from < streamPos < to
isSetLargeEnough = (fromPos-1 == streamEvents[0].StreamPosition)
}
}
// Check if earliest event is a backward extremity, i.e. if one of its
// previous events is missing from the db.
prevIDs := streamEvents[0].PrevEventIDs()
prevs, err := db.Events(req.Context(), prevIDs)
var eventInDB, isBackwardExtremity bool
var id string
for _, id = range prevIDs {
eventInDB = false
for _, ev := range prevs {
if ev.EventID() == id {
eventInDB = true
}
}
if !eventInDB {
isBackwardExtremity = true
break
}
}
if isBackwardExtremity && !isSetLargeEnough {
log.WithFields(log.Fields{
"limit": limit,
"nb_events": len(streamEvents),
"from": fromPos.String(),
"to": toPos.String(),
"isBackwardExtremity": isBackwardExtremity,
"isSetLargeEnough": isSetLargeEnough,
}).Info("Backfilling!")
println("Backfilling!")
}
events := storage.StreamEventsToEvents(nil, streamEvents)
clientEvents := gomatrixserverlib.ToClientEvents(events, gomatrixserverlib.FormatAll)
return util.JSONResponse{
Code: http.StatusOK,
JSON: messageResp{
Chunk: clientEvents,
Start: streamEvents[0].StreamPosition.String(),
End: streamEvents[len(streamEvents)-1].StreamPosition.String(),
},
}
}

View file

@ -54,4 +54,9 @@ func Setup(apiMux *mux.Router, srp *sync.RequestPool, syncDB *storage.SyncServer
vars := mux.Vars(req)
return OnIncomingStateTypeRequest(req, syncDB, vars["roomID"], vars["type"], vars["stateKey"])
})).Methods(http.MethodGet, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/messages", common.MakeAuthAPI("room_messages", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return OnIncomingMessagesRequest(req, syncDB, vars["roomID"])
})).Methods(http.MethodGet, http.MethodOptions)
}

View file

@ -68,6 +68,11 @@ const selectRecentEventsSQL = "" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
" ORDER BY id DESC LIMIT $4"
const selectEarlyEventsSQL = "" +
"SELECT id, event_json, device_id, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
" ORDER BY id ASC LIMIT $4"
const selectMaxEventIDSQL = "" +
"SELECT MAX(id) FROM syncapi_output_room_events"
@ -83,6 +88,7 @@ type outputRoomEventsStatements struct {
selectEventsStmt *sql.Stmt
selectMaxEventIDStmt *sql.Stmt
selectRecentEventsStmt *sql.Stmt
selectEarlyEventsStmt *sql.Stmt
selectStateInRangeStmt *sql.Stmt
}
@ -103,6 +109,9 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil {
return
}
if s.selectEarlyEventsStmt, err = db.Prepare(selectEarlyEventsSQL); err != nil {
return
}
if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
return
}
@ -171,7 +180,7 @@ func (s *outputRoomEventsStatements) selectStateInRange(
eventIDToEvent[ev.EventID()] = StreamEvent{
Event: ev,
streamPosition: types.StreamPosition(streamPos),
StreamPosition: types.StreamPosition(streamPos),
}
}
@ -224,6 +233,7 @@ func (s *outputRoomEventsStatements) insertEvent(
func (s *outputRoomEventsStatements) selectRecentEvents(
ctx context.Context, txn *sql.Tx,
roomID string, fromPos, toPos types.StreamPosition, limit int,
chronologicalOrder bool,
) ([]StreamEvent, error) {
stmt := common.TxStmt(txn, s.selectRecentEventsStmt)
rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit)
@ -235,12 +245,33 @@ func (s *outputRoomEventsStatements) selectRecentEvents(
if err != nil {
return nil, err
}
// The events need to be returned from oldest to latest, which isn't
// necessary the way the SQL query returns them, so a sort is necessary to
// ensure the events are in the right order in the slice.
sort.SliceStable(events, func(i int, j int) bool {
return events[i].streamPosition < events[j].streamPosition
})
if chronologicalOrder {
// The events need to be returned from oldest to latest, which isn't
// necessary the way the SQL query returns them, so a sort is necessary to
// ensure the events are in the right order in the slice.
sort.SliceStable(events, func(i int, j int) bool {
return events[i].StreamPosition < events[j].StreamPosition
})
}
return events, nil
}
// selectEarlyEvents returns the earliest events in the given room, starting
// from a given position, up to a maximum of 'limit'.
func (s *outputRoomEventsStatements) selectEarlyEvents(
ctx context.Context, txn *sql.Tx,
roomID string, fromPos, toPos types.StreamPosition, limit int,
) ([]StreamEvent, error) {
stmt := common.TxStmt(txn, s.selectEarlyEventsStmt)
rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit)
if err != nil {
return nil, err
}
defer rows.Close() // nolint: errcheck
events, err := rowsToStreamEvents(rows)
if err != nil {
return nil, err
}
return events, nil
}
@ -286,8 +317,8 @@ func rowsToStreamEvents(rows *sql.Rows) ([]StreamEvent, error) {
result = append(result, StreamEvent{
Event: ev,
streamPosition: types.StreamPosition(streamPos),
transactionID: transactionID,
StreamPosition: types.StreamPosition(streamPos),
TransactionID: transactionID,
})
}
return result, nil

View file

@ -43,8 +43,8 @@ type stateDelta struct {
// position for this event.
type StreamEvent struct {
gomatrixserverlib.Event
streamPosition types.StreamPosition
transactionID *api.TransactionID
StreamPosition types.StreamPosition
TransactionID *api.TransactionID
}
// SyncServerDatabase represents a sync server database
@ -100,7 +100,7 @@ func (d *SyncServerDatabase) Events(ctx context.Context, eventIDs []string) ([]g
// We don't include a device here as we only include transaction IDs in
// incremental syncs.
return streamEventsToEvents(nil, streamEvents), nil
return StreamEventsToEvents(nil, streamEvents), nil
}
// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races
@ -187,6 +187,24 @@ func (d *SyncServerDatabase) GetStateEventsForRoom(
return
}
// GetEventsInRange retrieves all of the events on a given ordering using the
// given extremities and limit.
func (d *SyncServerDatabase) GetEventsInRange(
ctx context.Context,
from, to types.StreamPosition,
roomID string, limit int,
backwardOrdering bool,
) (events []StreamEvent, err error) {
if backwardOrdering {
// We need all events matching to < streamPos < from
return d.events.selectRecentEvents(ctx, nil, roomID, to, from, limit, false)
}
// We need all events from < streamPos < to
return d.events.selectEarlyEvents(ctx, nil, roomID, from, to, limit)
}
// SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet.
func (d *SyncServerDatabase) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) {
return d.syncStreamPositionTx(ctx, nil)
@ -299,7 +317,8 @@ func (d *SyncServerDatabase) CompleteSync(
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
var recentStreamEvents []StreamEvent
recentStreamEvents, err = d.events.selectRecentEvents(
ctx, txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom,
ctx, txn, roomID, types.StreamPosition(0), pos,
numRecentEventsPerRoom, true,
)
if err != nil {
return nil, err
@ -307,8 +326,7 @@ func (d *SyncServerDatabase) CompleteSync(
// We don't include a device here as we don't need to send down
// transaction IDs for complete syncs
recentEvents := streamEventsToEvents(nil, recentStreamEvents)
recentEvents := StreamEventsToEvents(nil, recentStreamEvents)
stateEvents = removeDuplicates(stateEvents, recentEvents)
jr := types.NewJoinResponse()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
@ -424,12 +442,12 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse(
endPos = delta.membershipPos
}
recentStreamEvents, err := d.events.selectRecentEvents(
ctx, txn, delta.roomID, fromPos, endPos, numRecentEventsPerRoom,
ctx, txn, delta.roomID, fromPos, endPos, numRecentEventsPerRoom, true,
)
if err != nil {
return err
}
recentEvents := streamEventsToEvents(device, recentStreamEvents)
recentEvents := StreamEventsToEvents(device, recentStreamEvents)
delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
// Don't bother appending empty room entries
@ -586,7 +604,7 @@ func (d *SyncServerDatabase) getStateDeltas(
}
s := make([]StreamEvent, len(allState))
for i := 0; i < len(s); i++ {
s[i] = StreamEvent{Event: allState[i], streamPosition: types.StreamPosition(0)}
s[i] = StreamEvent{Event: allState[i], StreamPosition: types.StreamPosition(0)}
}
state[roomID] = s
continue // we'll add this room in when we do joined rooms
@ -594,8 +612,8 @@ func (d *SyncServerDatabase) getStateDeltas(
deltas = append(deltas, stateDelta{
membership: membership,
membershipPos: ev.streamPosition,
stateEvents: streamEventsToEvents(device, stateStreamEvents),
membershipPos: ev.StreamPosition,
stateEvents: StreamEventsToEvents(device, stateStreamEvents),
roomID: roomID,
})
break
@ -611,7 +629,7 @@ func (d *SyncServerDatabase) getStateDeltas(
for _, joinedRoomID := range joinedRoomIDs {
deltas = append(deltas, stateDelta{
membership: "join",
stateEvents: streamEventsToEvents(device, state[joinedRoomID]),
stateEvents: StreamEventsToEvents(device, state[joinedRoomID]),
roomID: joinedRoomID,
})
}
@ -619,17 +637,17 @@ func (d *SyncServerDatabase) getStateDeltas(
return deltas, nil
}
// streamEventsToEvents converts StreamEvent to Event. If device is non-nil and
// StreamEventsToEvents converts StreamEvent to Event. If device is non-nil and
// matches the streamevent.transactionID device then the transaction ID gets
// added to the unsigned section of the output event.
func streamEventsToEvents(device *authtypes.Device, in []StreamEvent) []gomatrixserverlib.Event {
func StreamEventsToEvents(device *authtypes.Device, in []StreamEvent) []gomatrixserverlib.Event {
out := make([]gomatrixserverlib.Event, len(in))
for i := 0; i < len(in); i++ {
out[i] = in[i].Event
if device != nil && in[i].transactionID != nil {
if device.UserID == in[i].Sender() && device.ID == in[i].transactionID.DeviceID {
if device != nil && in[i].TransactionID != nil {
if device.UserID == in[i].Sender() && device.ID == in[i].TransactionID.DeviceID {
err := out[i].SetUnsignedField(
"transaction_id", in[i].transactionID.TransactionID,
"transaction_id", in[i].TransactionID.TransactionID,
)
if err != nil {
logrus.WithFields(logrus.Fields{