Pass client transaction IDs along the kafka streams (#362)

This commit is contained in:
Erik Johnston 2017-12-04 18:07:52 +00:00 committed by GitHub
parent 7236090989
commit 7d38e82f25
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 53 additions and 25 deletions

View file

@ -36,14 +36,16 @@ func NewRoomserverProducer(inputAPI api.RoomserverInputAPI) *RoomserverProducer
// SendEvents writes the given events to the roomserver input log. The events are written with KindNew. // SendEvents writes the given events to the roomserver input log. The events are written with KindNew.
func (c *RoomserverProducer) SendEvents( func (c *RoomserverProducer) SendEvents(
ctx context.Context, events []gomatrixserverlib.Event, sendAsServer gomatrixserverlib.ServerName, ctx context.Context, events []gomatrixserverlib.Event, sendAsServer gomatrixserverlib.ServerName,
txnID *api.TransactionID,
) error { ) error {
ires := make([]api.InputRoomEvent, len(events)) ires := make([]api.InputRoomEvent, len(events))
for i, event := range events { for i, event := range events {
ires[i] = api.InputRoomEvent{ ires[i] = api.InputRoomEvent{
Kind: api.KindNew, Kind: api.KindNew,
Event: event, Event: event,
AuthEventIDs: event.AuthEventIDs(), AuthEventIDs: event.AuthEventIDs(),
SendAsServer: string(sendAsServer), SendAsServer: string(sendAsServer),
TransactionID: txnID,
} }
} }
return c.SendInputRoomEvents(ctx, ires) return c.SendInputRoomEvents(ctx, ires)

View file

@ -214,7 +214,7 @@ func createRoom(req *http.Request, device *authtypes.Device,
} }
// send events to the room server // send events to the room server
err = producer.SendEvents(req.Context(), builtEvents, cfg.Matrix.ServerName) err = producer.SendEvents(req.Context(), builtEvents, cfg.Matrix.ServerName, nil)
if err != nil { if err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }

View file

@ -217,7 +217,7 @@ func (r joinRoomReq) joinRoomUsingServers(
var queryRes api.QueryLatestEventsAndStateResponse var queryRes api.QueryLatestEventsAndStateResponse
event, err := common.BuildEvent(r.req.Context(), &eb, r.cfg, r.queryAPI, &queryRes) event, err := common.BuildEvent(r.req.Context(), &eb, r.cfg, r.queryAPI, &queryRes)
if err == nil { if err == nil {
if err = r.producer.SendEvents(r.req.Context(), []gomatrixserverlib.Event{*event}, r.cfg.Matrix.ServerName); err != nil { if err = r.producer.SendEvents(r.req.Context(), []gomatrixserverlib.Event{*event}, r.cfg.Matrix.ServerName, nil); err != nil {
return httputil.LogThenError(r.req, err) return httputil.LogThenError(r.req, err)
} }
return util.JSONResponse{ return util.JSONResponse{

View file

@ -98,7 +98,7 @@ func SendMembership(
} }
if err := producer.SendEvents( if err := producer.SendEvents(
req.Context(), []gomatrixserverlib.Event{*event}, cfg.Matrix.ServerName, req.Context(), []gomatrixserverlib.Event{*event}, cfg.Matrix.ServerName, nil,
); err != nil { ); err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }

View file

@ -138,7 +138,7 @@ func SetAvatarURL(
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }
if err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName); err != nil { if err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName, nil); err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }
@ -230,7 +230,7 @@ func SetDisplayName(
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }
if err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName); err != nil { if err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName, nil); err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }

View file

@ -41,7 +41,7 @@ type sendEventResponse struct {
func SendEvent( func SendEvent(
req *http.Request, req *http.Request,
device *authtypes.Device, device *authtypes.Device,
roomID, eventType string, _, stateKey *string, roomID, eventType string, txnID, stateKey *string,
cfg config.Dendrite, cfg config.Dendrite,
queryAPI api.RoomserverQueryAPI, queryAPI api.RoomserverQueryAPI,
producer *producers.RoomserverProducer, producer *producers.RoomserverProducer,
@ -90,9 +90,17 @@ func SendEvent(
} }
} }
var txnAndDeviceID *api.TransactionID
if txnID != nil {
txnAndDeviceID = &api.TransactionID{
TransactionID: *txnID,
DeviceID: device.ID,
}
}
// pass the new event to the roomserver // pass the new event to the roomserver
if err := producer.SendEvents( if err := producer.SendEvents(
req.Context(), []gomatrixserverlib.Event{*e}, cfg.Matrix.ServerName, req.Context(), []gomatrixserverlib.Event{*e}, cfg.Matrix.ServerName, txnAndDeviceID,
); err != nil { ); err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }

View file

@ -355,5 +355,5 @@ func emit3PIDInviteEvent(
return err return err
} }
return producer.SendEvents(ctx, []gomatrixserverlib.Event{*event}, cfg.Matrix.ServerName) return producer.SendEvents(ctx, []gomatrixserverlib.Event{*event}, cfg.Matrix.ServerName, nil)
} }

View file

@ -387,7 +387,8 @@ func main() {
"adds_state_event_ids":["$1463671337126266wrSBX:matrix.org", "$1463671339126270PnVwC:matrix.org"], "adds_state_event_ids":["$1463671337126266wrSBX:matrix.org", "$1463671339126270PnVwC:matrix.org"],
"removes_state_event_ids":null, "removes_state_event_ids":null,
"last_sent_event_id":"", "last_sent_event_id":"",
"send_as_server":"" "send_as_server":"",
"transaction_id": null
}}`, }}`,
} }

View file

@ -169,7 +169,7 @@ func SendJoin(
// Send the events to the room server. // Send the events to the room server.
// We are responsible for notifying other servers that the user has joined // We are responsible for notifying other servers that the user has joined
// the room, so set SendAsServer to cfg.Matrix.ServerName // the room, so set SendAsServer to cfg.Matrix.ServerName
err = producer.SendEvents(ctx, []gomatrixserverlib.Event{event}, cfg.Matrix.ServerName) err = producer.SendEvents(ctx, []gomatrixserverlib.Event{event}, cfg.Matrix.ServerName, nil)
if err != nil { if err != nil {
return httputil.LogThenError(httpReq, err) return httputil.LogThenError(httpReq, err)
} }

View file

@ -170,7 +170,7 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event) error {
// TODO: Check that the event is allowed by its auth_events. // TODO: Check that the event is allowed by its auth_events.
// pass the event to the roomserver // pass the event to the roomserver
return t.producer.SendEvents(t.context, []gomatrixserverlib.Event{e}, api.DoNotSendToOtherServers) return t.producer.SendEvents(t.context, []gomatrixserverlib.Event{e}, api.DoNotSendToOtherServers, nil)
} }
func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserverlib.Event) error { func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserverlib.Event) error {

View file

@ -81,7 +81,7 @@ func CreateInvitesFrom3PIDInvites(
} }
// Send all the events // Send all the events
if err := producer.SendEvents(req.Context(), evs, cfg.Matrix.ServerName); err != nil { if err := producer.SendEvents(req.Context(), evs, cfg.Matrix.ServerName, nil); err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }
@ -154,7 +154,7 @@ func ExchangeThirdPartyInvite(
// Send the event to the roomserver // Send the event to the roomserver
if err = producer.SendEvents( if err = producer.SendEvents(
httpReq.Context(), []gomatrixserverlib.Event{signedEvent.Event}, cfg.Matrix.ServerName, httpReq.Context(), []gomatrixserverlib.Event{signedEvent.Event}, cfg.Matrix.ServerName, nil,
); err != nil { ); err != nil {
return httputil.LogThenError(httpReq, err) return httputil.LogThenError(httpReq, err)
} }

View file

@ -68,6 +68,16 @@ type InputRoomEvent struct {
// The server name to use to push this event to other servers. // The server name to use to push this event to other servers.
// Or empty if this event shouldn't be pushed to other servers. // Or empty if this event shouldn't be pushed to other servers.
SendAsServer string `json:"send_as_server"` SendAsServer string `json:"send_as_server"`
// The transaction ID of the send request if sent by a local user and one
// was specified
TransactionID *TransactionID `json:"transaction_id"`
}
// TransactionID contains the transaction ID sent by a client when sending an
// event, along with the ID of that device.
type TransactionID struct {
DeviceID string `json:"device_id"`
TransactionID string `json:"id"`
} }
// InputInviteEvent is a matrix invite event received over federation without // InputInviteEvent is a matrix invite event received over federation without

View file

@ -107,6 +107,9 @@ type OutputNewRoomEvent struct {
// We encode the server name that the event should be sent using here to // We encode the server name that the event should be sent using here to
// future proof the API for virtual hosting. // future proof the API for virtual hosting.
SendAsServer string `json:"send_as_server"` SendAsServer string `json:"send_as_server"`
// The transaction ID of the send request if sent by a local user and one
// was specified
TransactionID *TransactionID `json:"transaction_id"`
} }
// An OutputNewInviteEvent is written whenever an invite becomes active. // An OutputNewInviteEvent is written whenever an invite becomes active.

View file

@ -129,7 +129,7 @@ func processRoomEvent(
} }
// Update the extremities of the event graph for the room // Update the extremities of the event graph for the room
return updateLatestEvents(ctx, db, ow, roomNID, stateAtEvent, event, input.SendAsServer) return updateLatestEvents(ctx, db, ow, roomNID, stateAtEvent, event, input.SendAsServer, input.TransactionID)
} }
func processInviteEvent( func processInviteEvent(

View file

@ -50,6 +50,7 @@ func updateLatestEvents(
stateAtEvent types.StateAtEvent, stateAtEvent types.StateAtEvent,
event gomatrixserverlib.Event, event gomatrixserverlib.Event,
sendAsServer string, sendAsServer string,
transactionID *api.TransactionID,
) (err error) { ) (err error) {
updater, err := db.GetLatestEventsForUpdate(ctx, roomNID) updater, err := db.GetLatestEventsForUpdate(ctx, roomNID)
if err != nil { if err != nil {
@ -61,6 +62,7 @@ func updateLatestEvents(
u := latestEventsUpdater{ u := latestEventsUpdater{
ctx: ctx, db: db, updater: updater, ow: ow, roomNID: roomNID, ctx: ctx, db: db, updater: updater, ow: ow, roomNID: roomNID,
stateAtEvent: stateAtEvent, event: event, sendAsServer: sendAsServer, stateAtEvent: stateAtEvent, event: event, sendAsServer: sendAsServer,
transactionID: transactionID,
} }
if err = u.doUpdateLatestEvents(); err != nil { if err = u.doUpdateLatestEvents(); err != nil {
return err return err
@ -75,13 +77,14 @@ func updateLatestEvents(
// The state could be passed using function arguments, but it becomes impractical // The state could be passed using function arguments, but it becomes impractical
// when there are so many variables to pass around. // when there are so many variables to pass around.
type latestEventsUpdater struct { type latestEventsUpdater struct {
ctx context.Context ctx context.Context
db RoomEventDatabase db RoomEventDatabase
updater types.RoomRecentEventsUpdater updater types.RoomRecentEventsUpdater
ow OutputRoomEventWriter ow OutputRoomEventWriter
roomNID types.RoomNID roomNID types.RoomNID
stateAtEvent types.StateAtEvent stateAtEvent types.StateAtEvent
event gomatrixserverlib.Event event gomatrixserverlib.Event
transactionID *api.TransactionID
// Which server to send this event as. // Which server to send this event as.
sendAsServer string sendAsServer string
// The eventID of the event that was processed before this one. // The eventID of the event that was processed before this one.
@ -241,6 +244,7 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
Event: u.event, Event: u.event,
LastSentEventID: u.lastEventIDSent, LastSentEventID: u.lastEventIDSent,
LatestEventIDs: latestEventIDs, LatestEventIDs: latestEventIDs,
TransactionID: u.transactionID,
} }
var stateEventNIDs []types.EventNID var stateEventNIDs []types.EventNID