mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-07-29 12:42:46 +00:00
Consolidation of roomserver APIs (#994)
* Consolidation of roomserver APIs * Comment out alias tests for now, they are broken * Wire AS API into roomserver again * Roomserver didn't take asAPI param before so return to that * Prevent roomserver asking AS API for alias info * Rename some files * Remove alias_test, incoherent tests and unwanted appservice integration * Remove FS API inject on syncapi component
This commit is contained in:
parent
ebbfc12592
commit
e15f6676ac
72 changed files with 894 additions and 1170 deletions
|
@ -33,11 +33,11 @@ import (
|
|||
|
||||
// OutputRoomEventConsumer consumes events that originated in the room server.
|
||||
type OutputRoomEventConsumer struct {
|
||||
cfg *config.Dendrite
|
||||
roomServerConsumer *common.ContinualConsumer
|
||||
db storage.Database
|
||||
queues *queue.OutgoingQueues
|
||||
query api.RoomserverQueryAPI
|
||||
cfg *config.Dendrite
|
||||
rsAPI api.RoomserverInternalAPI
|
||||
rsConsumer *common.ContinualConsumer
|
||||
db storage.Database
|
||||
queues *queue.OutgoingQueues
|
||||
}
|
||||
|
||||
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
|
||||
|
@ -46,7 +46,7 @@ func NewOutputRoomEventConsumer(
|
|||
kafkaConsumer sarama.Consumer,
|
||||
queues *queue.OutgoingQueues,
|
||||
store storage.Database,
|
||||
queryAPI api.RoomserverQueryAPI,
|
||||
rsAPI api.RoomserverInternalAPI,
|
||||
) *OutputRoomEventConsumer {
|
||||
consumer := common.ContinualConsumer{
|
||||
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
||||
|
@ -54,11 +54,11 @@ func NewOutputRoomEventConsumer(
|
|||
PartitionStore: store,
|
||||
}
|
||||
s := &OutputRoomEventConsumer{
|
||||
cfg: cfg,
|
||||
roomServerConsumer: &consumer,
|
||||
db: store,
|
||||
queues: queues,
|
||||
query: queryAPI,
|
||||
cfg: cfg,
|
||||
rsConsumer: &consumer,
|
||||
db: store,
|
||||
queues: queues,
|
||||
rsAPI: rsAPI,
|
||||
}
|
||||
consumer.ProcessMessage = s.onMessage
|
||||
|
||||
|
@ -67,7 +67,7 @@ func NewOutputRoomEventConsumer(
|
|||
|
||||
// Start consuming from room servers
|
||||
func (s *OutputRoomEventConsumer) Start() error {
|
||||
return s.roomServerConsumer.Start()
|
||||
return s.rsConsumer.Start()
|
||||
}
|
||||
|
||||
// onMessage is called when the federation server receives a new event from the room server output log.
|
||||
|
@ -369,7 +369,7 @@ func (s *OutputRoomEventConsumer) lookupStateEvents(
|
|||
// from the roomserver using the query API.
|
||||
eventReq := api.QueryEventsByIDRequest{EventIDs: missing}
|
||||
var eventResp api.QueryEventsByIDResponse
|
||||
if err := s.query.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil {
|
||||
if err := s.rsAPI.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
|
|
@ -34,8 +34,7 @@ import (
|
|||
func SetupFederationSenderComponent(
|
||||
base *basecomponent.BaseDendrite,
|
||||
federation *gomatrixserverlib.FederationClient,
|
||||
rsQueryAPI roomserverAPI.RoomserverQueryAPI,
|
||||
rsInputAPI roomserverAPI.RoomserverInputAPI,
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||
keyRing *gomatrixserverlib.KeyRing,
|
||||
) api.FederationSenderInternalAPI {
|
||||
federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender))
|
||||
|
@ -43,13 +42,13 @@ func SetupFederationSenderComponent(
|
|||
logrus.WithError(err).Panic("failed to connect to federation sender db")
|
||||
}
|
||||
|
||||
roomserverProducer := producers.NewRoomserverProducer(rsInputAPI, base.Cfg.Matrix.ServerName)
|
||||
roomserverProducer := producers.NewRoomserverProducer(rsAPI, base.Cfg.Matrix.ServerName)
|
||||
|
||||
queues := queue.NewOutgoingQueues(base.Cfg.Matrix.ServerName, federation, roomserverProducer)
|
||||
|
||||
rsConsumer := consumers.NewOutputRoomEventConsumer(
|
||||
base.Cfg, base.KafkaConsumer, queues,
|
||||
federationSenderDB, rsQueryAPI,
|
||||
federationSenderDB, rsAPI,
|
||||
)
|
||||
if err = rsConsumer.Start(); err != nil {
|
||||
logrus.WithError(err).Panic("failed to start room server consumer")
|
||||
|
|
|
@ -23,16 +23,16 @@ import (
|
|||
|
||||
// RoomserverProducer produces events for the roomserver to consume.
|
||||
type RoomserverProducer struct {
|
||||
InputAPI api.RoomserverInputAPI
|
||||
InputAPI api.RoomserverInternalAPI
|
||||
serverName gomatrixserverlib.ServerName
|
||||
}
|
||||
|
||||
// NewRoomserverProducer creates a new RoomserverProducer
|
||||
func NewRoomserverProducer(
|
||||
inputAPI api.RoomserverInputAPI, serverName gomatrixserverlib.ServerName,
|
||||
rsAPI api.RoomserverInternalAPI, serverName gomatrixserverlib.ServerName,
|
||||
) *RoomserverProducer {
|
||||
return &RoomserverProducer{
|
||||
InputAPI: inputAPI,
|
||||
InputAPI: rsAPI,
|
||||
serverName: serverName,
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue