More things

This commit is contained in:
Erik Johnston 2018-01-03 14:53:09 +00:00
parent 1dcc896893
commit 546afcc519
7 changed files with 186 additions and 68 deletions

View file

@ -16,6 +16,7 @@ package input
import (
"context"
"encoding/json"
"fmt"
"github.com/matrix-org/dendrite/common"
@ -23,6 +24,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
sarama "gopkg.in/Shopify/sarama.v1"
)
// A RoomEventDatabase has the storage APIs needed to store a room event.
@ -33,6 +35,7 @@ type RoomEventDatabase interface {
ctx context.Context,
event gomatrixserverlib.Event,
authEventNIDs []types.EventNID,
sendAsServer string, transactionID *api.TransactionID,
) (types.RoomNID, types.StateAtEvent, error)
// Look up the state entries for a list of string event IDs
// Returns an error if the there is an error talking to the database
@ -61,12 +64,34 @@ type RoomEventDatabase interface {
MembershipUpdater(
ctx context.Context, roomID, targerUserID string,
) (types.MembershipUpdater, error)
// UnsentEvents gets a list of events that have persisted but haven't yet been
// confirmed sent down the kaffka stream. Events should be sent in order.
UnsentEvents(ctx context.Context) ([]types.EventForSending, error)
}
// OutputRoomEventWriter has the APIs needed to write an event to the output logs.
type OutputRoomEventWriter interface {
// Write a list of events for a room
WriteOutputEvents(roomID string, updates []api.OutputEvent) error
type OutputRoomEventWriter struct {
Producer sarama.SyncProducer
// The kafkaesque topic to output new room events to.
// This is the name used in kafka to identify the stream to write events to.
OutputRoomEventTopic string
}
// WriteOutputEvents writes a list of events for a room
func (r *OutputRoomEventWriter) WriteOutputEvents(roomID string, updates []api.OutputEvent) error {
messages := make([]*sarama.ProducerMessage, len(updates))
for i := range updates {
value, err := json.Marshal(updates[i])
if err != nil {
return err
}
messages[i] = &sarama.ProducerMessage{
Topic: r.OutputRoomEventTopic,
Key: sarama.StringEncoder(roomID),
Value: sarama.ByteEncoder(value),
}
}
return r.Producer.SendMessages(messages)
}
// processRoomEvent can only be called once at a time
@ -77,7 +102,7 @@ type OutputRoomEventWriter interface {
func processRoomEvent(
ctx context.Context,
db RoomEventDatabase,
ow OutputRoomEventWriter,
eventSender EventSender,
input api.InputRoomEvent,
) error {
// Parse and validate the event JSON
@ -90,7 +115,7 @@ func processRoomEvent(
}
// Store the event
roomNID, stateAtEvent, err := db.StoreEvent(ctx, event, authEventNIDs)
roomNID, stateAtEvent, err := db.StoreEvent(ctx, event, authEventNIDs, input.SendAsServer, input.TransactionID)
if err != nil {
return err
}
@ -134,7 +159,7 @@ func processRoomEvent(
}
// Update the extremities of the event graph for the room
return updateLatestEvents(ctx, db, ow, roomNID, stateAtEvent, event, input.SendAsServer, input.TransactionID)
return eventSender.Send(ctx, roomNID, stateAtEvent, event, input.SendAsServer, input.TransactionID)
}
func processInviteEvent(

View file

@ -24,37 +24,16 @@ import (
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/util"
sarama "gopkg.in/Shopify/sarama.v1"
)
// RoomserverInputAPI implements api.RoomserverInputAPI
type RoomserverInputAPI struct {
DB RoomEventDatabase
Producer sarama.SyncProducer
// The kafkaesque topic to output new room events to.
// This is the name used in kafka to identify the stream to write events to.
OutputRoomEventTopic string
DB RoomEventDatabase
EventSender EventSender
// Protects calls to processRoomEvent
mutex sync.Mutex
}
// WriteOutputEvents implements OutputRoomEventWriter
func (r *RoomserverInputAPI) WriteOutputEvents(roomID string, updates []api.OutputEvent) error {
messages := make([]*sarama.ProducerMessage, len(updates))
for i := range updates {
value, err := json.Marshal(updates[i])
if err != nil {
return err
}
messages[i] = &sarama.ProducerMessage{
Topic: r.OutputRoomEventTopic,
Key: sarama.StringEncoder(roomID),
Value: sarama.ByteEncoder(value),
}
}
return r.Producer.SendMessages(messages)
}
// InputRoomEvents implements api.RoomserverInputAPI
func (r *RoomserverInputAPI) InputRoomEvents(
ctx context.Context,
@ -65,12 +44,12 @@ func (r *RoomserverInputAPI) InputRoomEvents(
r.mutex.Lock()
defer r.mutex.Unlock()
for i := range request.InputRoomEvents {
if err := processRoomEvent(ctx, r.DB, r, request.InputRoomEvents[i]); err != nil {
if err := processRoomEvent(ctx, r.DB, r.EventSender, request.InputRoomEvents[i]); err != nil {
return err
}
}
for i := range request.InputInviteEvents {
if err := processInviteEvent(ctx, r.DB, r, request.InputInviteEvents[i]); err != nil {
if err := processInviteEvent(ctx, r.DB, r.EventSender.OutputWriter, request.InputInviteEvents[i]); err != nil {
return err
}
}

View file

@ -18,6 +18,8 @@ import (
"bytes"
"context"
"github.com/pkg/errors"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/state"
@ -34,9 +36,9 @@ type EventSenderValue struct {
}
type EventSender struct {
db RoomEventDatabase
outputWriter OutputRoomEventWriter
linearizer common.Linearizer
DB RoomEventDatabase
OutputWriter OutputRoomEventWriter
Linearizer common.Linearizer
}
func (e *EventSender) Send(
@ -47,27 +49,41 @@ func (e *EventSender) Send(
sendAsServer string,
transactionID *api.TransactionID,
) (err error) {
e.linearizer.Await(event.RoomID(), func() {
err = updateLatestEvents(ctx, e.db, e.outputWriter, roomNID, stateAtEvent, event, sendAsServer, transactionID)
e.Linearizer.Await(event.RoomID(), func() {
err = updateLatestEvents(ctx, e.DB, e.OutputWriter, roomNID, stateAtEvent, event, sendAsServer, transactionID)
})
return
}
func (e *EventSender) SendMany(
ctx context.Context,
roomNID types.RoomNID,
roomID string,
events []EventSenderValue,
) (err error) {
e.linearizer.Await(roomID, func() {
for i := range events {
err = updateLatestEvents(ctx, e.db, e.outputWriter, roomNID, events[i].stateAtEvent, events[i].event, events[i].sendAsServer, events[i].transactionID)
if err != nil {
return
func (e *EventSender) Start(ctx context.Context) (err error) {
entries, err := e.DB.UnsentEvents(ctx)
if err != nil {
return errors.WithMessage(err, "failed to get unsent events from DB")
}
var eventToRoomMap = make(map[types.RoomNID][]types.EventForSending)
for i := range entries {
roomNID := entries[i].RoomNID
eventToRoomMap[roomNID] = append(eventToRoomMap[roomNID], entries[i])
}
for roomNID, roomEntries := range eventToRoomMap {
e.Linearizer.Await(roomEntries[0].Event.RoomID(), func() {
for i := range roomEntries {
err = updateLatestEvents(
ctx, e.DB, e.OutputWriter, roomNID,
roomEntries[i].StateAtEvent,
roomEntries[i].Event.Event,
roomEntries[i].SendAsServer,
roomEntries[i].TransactionID,
)
if err != nil {
return
}
}
}
})
})
}
return
}

View file

@ -15,8 +15,10 @@
package roomserver
import (
"context"
"net/http"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/common/basecomponent"
@ -39,12 +41,28 @@ func SetupRoomServerComponent(
logrus.WithError(err).Panicf("failed to connect to room server db")
}
inputAPI := input.RoomserverInputAPI{
DB: roomserverDB,
outputRoomEventWriter := input.OutputRoomEventWriter{
Producer: base.KafkaProducer,
OutputRoomEventTopic: string(base.Cfg.Kafka.Topics.OutputRoomEvent),
}
eventSender := input.EventSender{
DB: roomserverDB,
OutputWriter: outputRoomEventWriter,
Linearizer: common.NewLinearizer(),
}
ctx := context.Background()
err = eventSender.Start(ctx)
if err != nil {
logrus.WithError(err).Panicf("failed to handle unsent events")
}
inputAPI := input.RoomserverInputAPI{
DB: roomserverDB,
EventSender: eventSender,
}
inputAPI.SetupHTTP(http.DefaultServeMux)
queryAPI := query.RoomserverQueryAPI{DB: roomserverDB}

View file

@ -19,6 +19,8 @@ import (
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/lib/pq"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/roomserver/types"
@ -42,7 +44,7 @@ CREATE TABLE IF NOT EXISTS roomserver_events (
-- This is 0 if the event is not a state event.
event_state_key_nid BIGINT NOT NULL,
-- Whether the event has been written to the output log.
sent_to_output BOOLEAN NOT NULL DEFAULT FALSE,
sent_to_output BOOLEAN NOT NULL DEFAULT FALSE,
-- Local numeric ID for the state at the event.
-- This is 0 if we don't know the state at the event.
-- If the state is not 0 then this event is part of the contiguous
@ -61,13 +63,20 @@ CREATE TABLE IF NOT EXISTS roomserver_events (
-- Needed for setting reference hashes when sending new events.
reference_sha256 BYTEA NOT NULL,
-- A list of numeric IDs for events that can authenticate this event.
auth_event_nids BIGINT[] NOT NULL
auth_event_nids BIGINT[] NOT NULL,
-- Whether to send this event over federation as the given server, empty
-- if it shouldn't be sent.
send_as_server TEXT NOT NULL,
-- Senders device, if local
device_id TEXT,
-- The transaction ID the client used to send the event, if a local event.
transaction_id TEXT
);
`
const insertEventSQL = "" +
"INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth)" +
" VALUES ($1, $2, $3, $4, $5, $6, $7)" +
"INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth, send_as_server, device_id, transaction_id)" +
" VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)" +
" ON CONFLICT ON CONSTRAINT roomserver_event_id_unique" +
" DO NOTHING" +
" RETURNING event_nid, state_snapshot_nid"
@ -116,7 +125,7 @@ const selectMaxEventDepthSQL = "" +
"SELECT COALESCE(MAX(depth) + 1, 0) FROM roomserver_events WHERE event_nid = ANY($1)"
const selectUnsentEventsSQL = "" +
"SELECT event_nid FROM roomserver_events WHERE NOT sent_to_output"
"SELECT event_nid, event_id, room_nid, send_as_server, device_id, transaction_id FROM roomserver_events WHERE NOT sent_to_output"
type eventStatements struct {
insertEventStmt *sql.Stmt
@ -168,12 +177,15 @@ func (s *eventStatements) insertEvent(
referenceSHA256 []byte,
authEventNIDs []types.EventNID,
depth int64,
sendAsServer string,
transactionID *api.TransactionID,
) (types.EventNID, types.StateSnapshotNID, error) {
var eventNID int64
var stateNID int64
err := s.insertEventStmt.QueryRowContext(
ctx, int64(roomNID), int64(eventTypeNID), int64(eventStateKeyNID),
eventID, referenceSHA256, eventNIDsAsArray(authEventNIDs), depth,
sendAsServer, transactionID.DeviceID, transactionID.TransactionID,
).Scan(&eventNID, &stateNID)
return types.EventNID(eventNID), types.StateSnapshotNID(stateNID), err
}
@ -414,22 +426,55 @@ func eventNIDsAsArray(eventNIDs []types.EventNID) pq.Int64Array {
return nids
}
func (s *eventStatements) getUnsentEventNids(ctx context.Context) ([]types.EventNID, error) {
func (s *eventStatements) getUnsentEventNids(ctx context.Context) (results []struct {
eventNID types.EventNID
eventID string
roomNID types.RoomNID
sendAsServer string
txnID *api.TransactionID
}, err error) {
rows, err := s.selectUnsentEventsStmt.QueryContext(ctx)
if err != nil {
return nil, err
}
defer rows.Close() // nolint: errcheck
var results []types.EventNID
for rows.Next() {
var eventNID int64
if err = rows.Scan(&eventNID); err != nil {
var (
eventNID int64
eventID string
roomNID int64
sendAsServer string
deviceID sql.NullString
transactionID sql.NullString
)
if err = rows.Scan(&eventNID, &eventID, &sendAsServer, &deviceID, &transactionID); err != nil {
return nil, err
}
results = append(results, types.EventNID(eventNID))
var tID *api.TransactionID
if deviceID.Valid && transactionID.Valid {
tID = &api.TransactionID{
DeviceID: deviceID.String,
TransactionID: transactionID.String,
}
}
results = append(results, struct {
eventNID types.EventNID
eventID string
roomNID types.RoomNID
sendAsServer string
txnID *api.TransactionID
}{
eventNID: types.EventNID(eventNID),
eventID: eventID,
roomNID: types.RoomNID(roomNID),
sendAsServer: sendAsServer,
txnID: tID,
})
}
return results, nil
return
}

View file

@ -21,6 +21,7 @@ import (
// Import the postgres database driver.
_ "github.com/lib/pq"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
)
@ -47,6 +48,7 @@ func Open(dataSourceName string) (*Database, error) {
// StoreEvent implements input.EventDatabase
func (d *Database) StoreEvent(
ctx context.Context, event gomatrixserverlib.Event, authEventNIDs []types.EventNID,
sendAsServer string, transactionID *api.TransactionID,
) (types.RoomNID, types.StateAtEvent, error) {
var (
roomNID types.RoomNID
@ -83,6 +85,8 @@ func (d *Database) StoreEvent(
event.EventReference().EventSHA256,
authEventNIDs,
event.Depth(),
sendAsServer,
transactionID,
); err != nil {
if err == sql.ErrNoRows {
// We've already inserted the event so select the numeric event ID
@ -667,22 +671,44 @@ func (d *Database) EventsFromIDs(ctx context.Context, eventIDs []string) ([]type
return d.Events(ctx, nids)
}
// UnsentEvents gets a list of events that have persisted but haven't yet been
// confirmed sent down the kaffka stream. Events should be sent in order.
func (d *Database) UnsentEvents(ctx context.Context) ([]types.Event, error) {
nids, err := d.statements.getUnsentEventNids(ctx)
// UnsentEvents implements input.RoomEventDatabase
func (d *Database) UnsentEvents(ctx context.Context) ([]types.EventForSending, error) {
entries, err := d.statements.getUnsentEventNids(ctx)
if err != nil {
return nil, err
}
sort.Slice(entries, func(i, j int) bool { return entries[i].eventNID < entries[j].eventNID })
nids := make([]types.EventNID, 0, len(entries))
ids := make([]string, 0, len(entries))
for i := range entries {
nids = append(nids, entries[i].eventNID)
ids = append(ids, entries[i].eventID)
}
events, err := d.Events(ctx, nids)
if err != nil {
return nil, err
}
sort.Slice(events, func(i, j int) bool { return events[i].EventNID < events[j].EventNID })
state, err := d.StateAtEventIDs(ctx, ids)
if err != nil {
return nil, err
}
return events, nil
results := make([]types.EventForSending, len(entries))
for i := range entries {
results[i] = types.EventForSending{
Event: events[i],
SendAsServer: entries[i].sendAsServer,
TransactionID: entries[i].txnID,
StateAtEvent: state[i],
RoomNID: entries[i].roomNID,
}
}
return results, nil
}
type transaction struct {

View file

@ -17,6 +17,7 @@ package types
import (
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
)
@ -201,3 +202,11 @@ type MembershipUpdater interface {
type MissingEventError string
func (e MissingEventError) Error() string { return string(e) }
type EventForSending struct {
Event Event
RoomNID RoomNID
SendAsServer string
TransactionID *api.TransactionID
StateAtEvent StateAtEvent
}