Merge branch 'nats' into add-nats-support

This commit is contained in:
Neil Alexander 2021-12-17 14:42:07 +00:00
commit 1c3635ae93
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
13 changed files with 188 additions and 174 deletions

View file

@ -19,7 +19,6 @@ not contain the Go toolchain etc.
There are three sample `docker-compose` files: There are three sample `docker-compose` files:
- `docker-compose.deps.yml` which runs the Postgres and Kafka prerequisites
- `docker-compose.monolith.yml` which runs a monolith Dendrite deployment - `docker-compose.monolith.yml` which runs a monolith Dendrite deployment
- `docker-compose.polylith.yml` which runs a polylith Dendrite deployment - `docker-compose.polylith.yml` which runs a polylith Dendrite deployment
@ -28,7 +27,7 @@ There are three sample `docker-compose` files:
The `docker-compose` files refer to the `/etc/dendrite` volume as where the The `docker-compose` files refer to the `/etc/dendrite` volume as where the
runtime config should come from. The mounted folder must contain: runtime config should come from. The mounted folder must contain:
- `dendrite.yaml` configuration file (based on the [`dendrite-config.yaml`](https://raw.githubusercontent.com/matrix-org/dendrite/master/dendrite-config.yaml) - `dendrite.yaml` configuration file (from the [Docker config folder](https://github.com/matrix-org/dendrite/tree/master/build/docker/config)
sample in the `build/docker/config` folder of this repository.) sample in the `build/docker/config` folder of this repository.)
- `matrix_key.pem` server key, as generated using `cmd/generate-keys` - `matrix_key.pem` server key, as generated using `cmd/generate-keys`
- `server.crt` certificate file - `server.crt` certificate file
@ -50,15 +49,9 @@ The key files will now exist in your current working directory, and can be mount
## Starting Dendrite as a monolith deployment ## Starting Dendrite as a monolith deployment
Create your config based on the [`dendrite-config.yaml`](https://raw.githubusercontent.com/matrix-org/dendrite/master/dendrite-config.yaml) configuration file in the `build/docker/config` folder of this repository. And rename the config file to `dendrite.yml` (and put it in your `config` directory). Create your config based on the [`dendrite.yaml`](https://github.com/matrix-org/dendrite/tree/master/build/docker/config) configuration file in the `build/docker/config` folder of this repository.
Once in place, start the PostgreSQL dependency: Then start the deployment:
```
docker-compose -f docker-compose.deps.yml up postgres
```
Wait a few seconds for PostgreSQL to finish starting up, and then start a monolith:
``` ```
docker-compose -f docker-compose.monolith.yml up docker-compose -f docker-compose.monolith.yml up
@ -66,15 +59,9 @@ docker-compose -f docker-compose.monolith.yml up
## Starting Dendrite as a polylith deployment ## Starting Dendrite as a polylith deployment
Create your config based on the [`dendrite-config.yaml`](https://raw.githubusercontent.com/matrix-org/dendrite/master/dendrite-config.yaml) configuration file in the `build/docker/config` folder of this repository. And rename the config file to `dendrite.yml` (and put it in your `config` directory). Create your config based on the [`dendrite-config.yaml`](https://github.com/matrix-org/dendrite/tree/master/build/docker/config) configuration file in the `build/docker/config` folder of this repository.
Once in place, start all the dependencies: Then start the deployment:
```
docker-compose -f docker-compose.deps.yml up
```
Wait a few seconds for PostgreSQL and Kafka to finish starting up, and then start a polylith:
``` ```
docker-compose -f docker-compose.polylith.yml up docker-compose -f docker-compose.polylith.yml up

View file

@ -62,29 +62,28 @@ global:
- matrix.org - matrix.org
- vector.im - vector.im
# Configuration for Kafka/Naffka. # Configuration for NATS JetStream
kafka: jetstream:
# List of Kafka broker addresses to connect to. This is not needed if using # A list of NATS Server addresses to connect to. If none are specified, an
# Naffka in monolith mode. # internal NATS server will be started automatically when running Dendrite
# in monolith mode. It is required to specify the address of at least one
# NATS Server node if running in polylith mode.
addresses: addresses:
- kafka:9092 - jetstream:4222
# The prefix to use for Kafka topic names for this homeserver. Change this only if # Keep all NATS streams in memory, rather than persisting it to the storage
# you are running more than one Dendrite homeserver on the same Kafka deployment. # path below. This option is present primarily for integration testing and
# should not be used on a real world Dendrite deployment.
in_memory: false
# Persistent directory to store JetStream streams in. This directory
# should be preserved across Dendrite restarts.
storage_path: ./
# The prefix to use for stream names for this homeserver - really only
# useful if running more than one Dendrite on the same NATS deployment.
topic_prefix: Dendrite topic_prefix: Dendrite
# Whether to use Naffka instead of Kafka. This is only available in monolith
# mode, but means that you can run a single-process server without requiring
# Kafka.
use_naffka: false
# Naffka database options. Not required when using Kafka.
naffka_database:
connection_string: postgresql://dendrite:itsasecret@postgres/dendrite_naffka?sslmode=disable
max_open_conns: 10
max_idle_conns: 2
conn_max_lifetime: -1
# Configuration for Prometheus metric collection. # Configuration for Prometheus metric collection.
metrics: metrics:
# Whether or not Prometheus metrics are enabled. # Whether or not Prometheus metrics are enabled.
@ -266,6 +265,19 @@ media_api:
height: 480 height: 480
method: scale method: scale
# Configuration for experimental MSC's
mscs:
# A list of enabled MSC's
# Currently valid values are:
# - msc2836 (Threading, see https://github.com/matrix-org/matrix-doc/pull/2836)
# - msc2946 (Spaces Summary, see https://github.com/matrix-org/matrix-doc/pull/2946)
mscs: []
database:
connection_string: postgresql://dendrite:itsasecret@postgres/dendrite_mscs?sslmode=disable
max_open_conns: 5
max_idle_conns: 2
conn_max_lifetime: -1
# Configuration for the Room Server. # Configuration for the Room Server.
room_server: room_server:
internal_api: internal_api:

View file

@ -1,42 +0,0 @@
version: "3.4"
services:
# PostgreSQL is needed for both polylith and monolith modes.
postgres:
hostname: postgres
image: postgres:14
restart: always
volumes:
- ./postgres/create_db.sh:/docker-entrypoint-initdb.d/20-create_db.sh
# To persist your PostgreSQL databases outside of the Docker image, to
# prevent data loss, you will need to add something like this:
# - ./path/to/persistent/storage:/var/lib/postgresql/data
environment:
POSTGRES_PASSWORD: itsasecret
POSTGRES_USER: dendrite
networks:
- internal
# Zookeeper is only needed for polylith mode!
zookeeper:
hostname: zookeeper
image: zookeeper
networks:
- internal
# Kafka is only needed for polylith mode!
kafka:
container_name: dendrite_kafka
hostname: kafka
image: wurstmeister/kafka
environment:
KAFKA_ADVERTISED_HOST_NAME: "kafka"
KAFKA_DELETE_TOPIC_ENABLE: "true"
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
depends_on:
- zookeeper
networks:
- internal
networks:
internal:
attachable: true

View file

@ -1,5 +1,25 @@
version: "3.4" version: "3.4"
services: services:
postgres:
hostname: postgres
image: postgres:14
restart: always
volumes:
- ./postgres/create_db.sh:/docker-entrypoint-initdb.d/20-create_db.sh
# To persist your PostgreSQL databases outside of the Docker image,
# to prevent data loss, modify the following ./path_to path:
- ./path_to/postgresql:/var/lib/postgresql/data
environment:
POSTGRES_PASSWORD: itsasecret
POSTGRES_USER: dendrite
healthcheck:
test: ["CMD-SHELL", "pg_isready -U dendrite"]
interval: 5s
timeout: 5s
retries: 5
networks:
- internal
monolith: monolith:
hostname: monolith hostname: monolith
image: matrixdotorg/dendrite-monolith:latest image: matrixdotorg/dendrite-monolith:latest
@ -13,8 +33,11 @@ services:
volumes: volumes:
- ./config:/etc/dendrite - ./config:/etc/dendrite
- ./media:/var/dendrite/media - ./media:/var/dendrite/media
depends_on:
- postgres
networks: networks:
- internal - internal
restart: unless-stopped
networks: networks:
internal: internal:

View file

@ -1,13 +1,51 @@
version: "3.4" version: "3.4"
services: services:
postgres:
hostname: postgres
image: postgres:14
restart: always
volumes:
- ./postgres/create_db.sh:/docker-entrypoint-initdb.d/20-create_db.sh
# To persist your PostgreSQL databases outside of the Docker image,
# to prevent data loss, modify the following ./path_to path:
- ./path_to/postgresql:/var/lib/postgresql/data
environment:
POSTGRES_PASSWORD: itsasecret
POSTGRES_USER: dendrite
healthcheck:
test: ["CMD-SHELL", "pg_isready -U dendrite"]
interval: 5s
timeout: 5s
retries: 5
networks:
- internal
jetstream:
hostname: jetstream
image: nats:latest
command: |
--jetstream
--store_dir /var/lib/nats
--cluster_name Dendrite
volumes:
# To persist your NATS JetStream streams outside of the Docker image,
# prevent data loss, modify the following ./path_to path:
- ./path_to/nats:/var/lib/nats
networks:
- internal
client_api: client_api:
hostname: client_api hostname: client_api
image: matrixdotorg/dendrite-polylith:latest image: matrixdotorg/dendrite-polylith:latest
command: clientapi command: clientapi
volumes: volumes:
- ./config:/etc/dendrite - ./config:/etc/dendrite
depends_on:
- jetstream
- postgres
networks: networks:
- internal - internal
restart: unless-stopped
media_api: media_api:
hostname: media_api hostname: media_api
@ -18,6 +56,7 @@ services:
- ./media:/var/dendrite/media - ./media:/var/dendrite/media
networks: networks:
- internal - internal
restart: unless-stopped
sync_api: sync_api:
hostname: sync_api hostname: sync_api
@ -25,8 +64,12 @@ services:
command: syncapi command: syncapi
volumes: volumes:
- ./config:/etc/dendrite - ./config:/etc/dendrite
depends_on:
- jetstream
- postgres
networks: networks:
- internal - internal
restart: unless-stopped
room_server: room_server:
hostname: room_server hostname: room_server
@ -34,8 +77,12 @@ services:
command: roomserver command: roomserver
volumes: volumes:
- ./config:/etc/dendrite - ./config:/etc/dendrite
depends_on:
- jetstream
- postgres
networks: networks:
- internal - internal
restart: unless-stopped
edu_server: edu_server:
hostname: edu_server hostname: edu_server
@ -43,8 +90,11 @@ services:
command: eduserver command: eduserver
volumes: volumes:
- ./config:/etc/dendrite - ./config:/etc/dendrite
depends_on:
- jetstream
networks: networks:
- internal - internal
restart: unless-stopped
federation_api: federation_api:
hostname: federation_api hostname: federation_api
@ -52,8 +102,12 @@ services:
command: federationapi command: federationapi
volumes: volumes:
- ./config:/etc/dendrite - ./config:/etc/dendrite
depends_on:
- jetstream
- postgres
networks: networks:
- internal - internal
restart: unless-stopped
key_server: key_server:
hostname: key_server hostname: key_server
@ -61,8 +115,12 @@ services:
command: keyserver command: keyserver
volumes: volumes:
- ./config:/etc/dendrite - ./config:/etc/dendrite
depends_on:
- jetstream
- postgres
networks: networks:
- internal - internal
restart: unless-stopped
user_api: user_api:
hostname: user_api hostname: user_api
@ -70,8 +128,12 @@ services:
command: userapi command: userapi
volumes: volumes:
- ./config:/etc/dendrite - ./config:/etc/dendrite
depends_on:
- jetstream
- postgres
networks: networks:
- internal - internal
restart: unless-stopped
appservice_api: appservice_api:
hostname: appservice_api hostname: appservice_api
@ -82,8 +144,11 @@ services:
networks: networks:
- internal - internal
depends_on: depends_on:
- jetstream
- postgres
- room_server - room_server
- user_api - user_api
restart: unless-stopped
networks: networks:
internal: internal:

View file

@ -1,5 +1,5 @@
#!/bin/sh #!/bin/sh
for db in userapi_accounts userapi_devices mediaapi syncapi roomserver keyserver federationapi appservice naffka; do for db in userapi_accounts userapi_devices mediaapi syncapi roomserver keyserver federationapi appservice mscs; do
createdb -U dendrite -O dendrite dendrite_$db createdb -U dendrite -O dendrite dendrite_$db
done done

View file

@ -70,15 +70,20 @@ global:
# Configuration for NATS JetStream # Configuration for NATS JetStream
jetstream: jetstream:
# A list of NATS addresses to connect to. If none are specified, an # A list of NATS Server addresses to connect to. If none are specified, an
# internal NATS server will be used when running in monolith mode only. # internal NATS server will be started automatically when running Dendrite
# in monolith mode. It is required to specify the address of at least one
# NATS Server node if running in polylith mode.
addresses: addresses:
# - localhost:4222 # - localhost:4222
# Keep all storage in memory. This is mostly useful for unit tests. # Keep all NATS streams in memory, rather than persisting it to the storage
# path below. This option is present primarily for integration testing and
# should not be used on a real world Dendrite deployment.
in_memory: false in_memory: false
# Persistent directory to store JetStream streams in. # Persistent directory to store JetStream streams in. This directory
# should be preserved across Dendrite restarts.
storage_path: ./ storage_path: ./
# The prefix to use for stream names for this homeserver - really only # The prefix to use for stream names for this homeserver - really only

View file

@ -2,21 +2,23 @@
Dendrite can be run in one of two configurations: Dendrite can be run in one of two configurations:
* **Polylith mode**: A cluster of individual components, dealing with different
aspects of the Matrix protocol (see [WIRING.md](WIRING-Current.md)). Components communicate
with each other using internal HTTP APIs and [Apache Kafka](https://kafka.apache.org).
This will almost certainly be the preferred model for large-scale deployments.
* **Monolith mode**: All components run in the same process. In this mode, * **Monolith mode**: All components run in the same process. In this mode,
Kafka is completely optional and can instead be replaced with an in-process it is possible to run an in-process [NATS Server](https://github.com/nats-io/nats-server)
lightweight implementation called [Naffka](https://github.com/matrix-org/naffka). This instead of running a standalone deployment. This will usually be the preferred model for
will usually be the preferred model for low-volume, low-user or experimental deployments. low-to-mid volume deployments, providing the best balance between performance and resource usage.
For most deployments, it is **recommended to run in monolith mode with PostgreSQL databases**. * **Polylith mode**: A cluster of individual components running in their own processes, dealing
with different aspects of the Matrix protocol (see [WIRING.md](WIRING-Current.md)). Components
communicate with each other using internal HTTP APIs and [NATS Server](https://github.com/nats-io/nats-server).
This will almost certainly be the preferred model for very large deployments but scalability
comes with a cost. API calls are expensive and therefore a polylith deployment may end up using
disproportionately more resources for a smaller number of users compared to a monolith deployment.
In almost all cases, it is **recommended to run in monolith mode with PostgreSQL databases**.
Regardless of whether you are running in polylith or monolith mode, each Dendrite component that Regardless of whether you are running in polylith or monolith mode, each Dendrite component that
requires storage has its own database. Both Postgres and SQLite are supported and can be requires storage has its own database connections. Both Postgres and SQLite are supported and can
mixed-and-matched across components as needed in the configuration file. be mixed-and-matched across components as needed in the configuration file.
Be advised that Dendrite is still in development and it's not recommended for Be advised that Dendrite is still in development and it's not recommended for
use in production environments just yet! use in production environments just yet!
@ -26,13 +28,11 @@ use in production environments just yet!
Dendrite requires: Dendrite requires:
* Go 1.15 or higher * Go 1.15 or higher
* Postgres 9.6 or higher (if using Postgres databases, not needed for SQLite) * PostgreSQL 12 or higher (if using PostgreSQL databases, not needed for SQLite)
If you want to run a polylith deployment, you also need: If you want to run a polylith deployment, you also need:
* Apache Kafka 0.10.2+ * A standalone [NATS Server](https://github.com/nats-io/nats-server) deployment with JetStream enabled
Please note that Kafka is **not required** for a monolith deployment.
## Building Dendrite ## Building Dendrite
@ -49,40 +49,18 @@ Then build it:
./build.sh ./build.sh
``` ```
## Install Kafka (polylith only) ## Install NATS Server
Install and start Kafka (c.f. [scripts/install-local-kafka.sh](scripts/install-local-kafka.sh)): Follow the [NATS Server installation instructions](https://docs.nats.io/running-a-nats-service/introduction/installation) and then [start your NATS deployment](https://docs.nats.io/running-a-nats-service/introduction/running).
```bash JetStream must be enabled, either by passing the `-js` flag to `nats-server`,
KAFKA_URL=http://archive.apache.org/dist/kafka/2.1.0/kafka_2.11-2.1.0.tgz or by specifying the `store_dir` option in the the `jetstream` configuration.
# Only download the kafka if it isn't already downloaded.
test -f kafka.tgz || wget $KAFKA_URL -O kafka.tgz
# Unpack the kafka over the top of any existing installation
mkdir -p kafka && tar xzf kafka.tgz -C kafka --strip-components 1
# Start the zookeeper running in the background.
# By default the zookeeper listens on localhost:2181
kafka/bin/zookeeper-server-start.sh -daemon kafka/config/zookeeper.properties
# Start the kafka server running in the background.
# By default the kafka listens on localhost:9092
kafka/bin/kafka-server-start.sh -daemon kafka/config/server.properties
```
On macOS, you can use [Homebrew](https://brew.sh/) for easier setup of Kafka:
```bash
brew install kafka
brew services start zookeeper
brew services start kafka
```
## Configuration ## Configuration
### PostgreSQL database setup ### PostgreSQL database setup
Assuming that PostgreSQL 9.6 (or later) is installed: Assuming that PostgreSQL 12 (or later) is installed:
* Create role, choosing a new password when prompted: * Create role, choosing a new password when prompted:
@ -109,7 +87,7 @@ On macOS, omit `sudo -u postgres` from the below commands.
* If you want to run each Dendrite component with its own database: * If you want to run each Dendrite component with its own database:
```bash ```bash
for i in mediaapi syncapi roomserver signingkeyserver federationsender appservice keyserver userapi_accounts userapi_devices naffka; do for i in mediaapi syncapi roomserver federationapi appservice keyserver userapi_accounts userapi_devices; do
sudo -u postgres createdb -O dendrite dendrite_$i sudo -u postgres createdb -O dendrite dendrite_$i
done done
``` ```
@ -163,7 +141,11 @@ Create config file, based on `dendrite-config.yaml`. Call it `dendrite.yaml`. Th
* `postgres://dendrite:password@localhost/dendrite_userapi_account?sslmode=disable` to connect to PostgreSQL without SSL/TLS * `postgres://dendrite:password@localhost/dendrite_userapi_account?sslmode=disable` to connect to PostgreSQL without SSL/TLS
* For SQLite on disk: `file:component.db` or `file:///path/to/component.db`, e.g. `file:userapi_account.db` * For SQLite on disk: `file:component.db` or `file:///path/to/component.db`, e.g. `file:userapi_account.db`
* Postgres and SQLite can be mixed and matched on different components as desired. * Postgres and SQLite can be mixed and matched on different components as desired.
* The `use_naffka` option if using Naffka in a monolith deployment * Either one of the following in the `jetstream` configuration section:
* The `addresses` option — a list of one or more addresses of an external standalone
NATS Server deployment
* The `storage_path` — where on the filesystem the built-in NATS server should
store durable queues, if using the built-in NATS server
There are other options which may be useful so review them all. In particular, There are other options which may be useful so review them all. In particular,
if you are trying to federate from your Dendrite instance into public rooms if you are trying to federate from your Dendrite instance into public rooms
@ -177,11 +159,6 @@ using SQLite, all components **MUST** use their own database file.
## Starting a monolith server ## Starting a monolith server
It is possible to use Naffka as an in-process replacement to Kafka when using
the monolith server. To do this, set `use_naffka: true` in your `dendrite.yaml`
configuration and uncomment the relevant Naffka line in the `database` section.
Be sure to update the database username and password if needed.
The monolith server can be started as shown below. By default it listens for The monolith server can be started as shown below. By default it listens for
HTTP connections on port 8008, so you can configure your Matrix client to use HTTP connections on port 8008, so you can configure your Matrix client to use
`http://servername:8008` as the server: `http://servername:8008` as the server:
@ -197,6 +174,9 @@ for HTTPS connections on port 8448:
./bin/dendrite-monolith-server --tls-cert=server.crt --tls-key=server.key ./bin/dendrite-monolith-server --tls-cert=server.crt --tls-key=server.key
``` ```
If the `jetstream` section of the configuration contains no `addresses` but does
contain a `store_dir`,
## Starting a polylith deployment ## Starting a polylith deployment
The following contains scripts which will run all the required processes in order to point a Matrix client at Dendrite. The following contains scripts which will run all the required processes in order to point a Matrix client at Dendrite.
@ -263,15 +243,6 @@ This is what implements the room DAG. Clients do not talk to this.
./bin/dendrite-polylith-multi --config=dendrite.yaml roomserver ./bin/dendrite-polylith-multi --config=dendrite.yaml roomserver
``` ```
#### Federation sender
This sends events from our users to other servers. This is only required if
you want to support federation.
```bash
./bin/dendrite-polylith-multi --config=dendrite.yaml federationsender
```
#### Appservice server #### Appservice server
This sends events from the network to [application This sends events from the network to [application
@ -291,14 +262,6 @@ This manages end-to-end encryption keys for users.
./bin/dendrite-polylith-multi --config=dendrite.yaml keyserver ./bin/dendrite-polylith-multi --config=dendrite.yaml keyserver
``` ```
#### Signing key server
This manages signing keys for servers.
```bash
./bin/dendrite-polylith-multi --config=dendrite.yaml signingkeyserver
```
#### EDU server #### EDU server
This manages processing EDUs such as typing, send-to-device events and presence. Clients do not talk to This manages processing EDUs such as typing, send-to-device events and presence. Clients do not talk to

View file

@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
//go:build !windows
// +build !windows // +build !windows
package internal package internal

View file

@ -66,7 +66,7 @@ func (r *Inputer) Start() error {
inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{}) inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{})
inbox.(*phony.Inbox).Act(nil, func() { inbox.(*phony.Inbox).Act(nil, func() {
_ = msg.InProgress() _ = msg.InProgress()
if _, err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil { if err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil {
sentry.CaptureException(err) sentry.CaptureException(err)
_ = msg.Respond([]byte(err.Error())) _ = msg.Respond([]byte(err.Error()))
} else { } else {
@ -113,7 +113,7 @@ func (r *Inputer) InputRoomEvents(
inputRoomEvent := e inputRoomEvent := e
inbox, _ := r.workers.LoadOrStore(inputRoomEvent.Event.RoomID(), &phony.Inbox{}) inbox, _ := r.workers.LoadOrStore(inputRoomEvent.Event.RoomID(), &phony.Inbox{})
inbox.(*phony.Inbox).Act(nil, func() { inbox.(*phony.Inbox).Act(nil, func() {
_, err := r.processRoomEvent(context.TODO(), &inputRoomEvent) err := r.processRoomEvent(context.TODO(), &inputRoomEvent)
if err != nil { if err != nil {
sentry.CaptureException(err) sentry.CaptureException(err)
} else { } else {

View file

@ -62,7 +62,7 @@ var processRoomEventDuration = prometheus.NewHistogramVec(
func (r *Inputer) processRoomEvent( func (r *Inputer) processRoomEvent(
ctx context.Context, ctx context.Context,
input *api.InputRoomEvent, input *api.InputRoomEvent,
) (eventID string, err error) { ) (err error) {
// Measure how long it takes to process this event. // Measure how long it takes to process this event.
started := time.Now() started := time.Now()
defer func() { defer func() {
@ -88,11 +88,11 @@ func (r *Inputer) processRoomEvent(
case gomatrixserverlib.EventIDFormatV1: case gomatrixserverlib.EventIDFormatV1:
if bytes.Equal(event.EventReference().EventSHA256, evs[0].EventReference().EventSHA256) { if bytes.Equal(event.EventReference().EventSHA256, evs[0].EventReference().EventSHA256) {
util.GetLogger(ctx).WithField("event_id", event.EventID()).Infof("Already processed event; ignoring") util.GetLogger(ctx).WithField("event_id", event.EventID()).Infof("Already processed event; ignoring")
return event.EventID(), nil return nil
} }
default: default:
util.GetLogger(ctx).WithField("event_id", event.EventID()).Infof("Already processed event; ignoring") util.GetLogger(ctx).WithField("event_id", event.EventID()).Infof("Already processed event; ignoring")
return event.EventID(), nil return nil
} }
} }
} }
@ -124,14 +124,14 @@ func (r *Inputer) processRoomEvent(
// Store the event. // Store the event.
_, _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected) _, _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected)
if err != nil { if err != nil {
return "", fmt.Errorf("r.DB.StoreEvent: %w", err) return fmt.Errorf("r.DB.StoreEvent: %w", err)
} }
// if storing this event results in it being redacted then do so. // if storing this event results in it being redacted then do so.
if !isRejected && redactedEventID == event.EventID() { if !isRejected && redactedEventID == event.EventID() {
r, rerr := eventutil.RedactEvent(redactionEvent, event) r, rerr := eventutil.RedactEvent(redactionEvent, event)
if rerr != nil { if rerr != nil {
return "", fmt.Errorf("eventutil.RedactEvent: %w", rerr) return fmt.Errorf("eventutil.RedactEvent: %w", rerr)
} }
event = r event = r
} }
@ -146,15 +146,15 @@ func (r *Inputer) processRoomEvent(
"room": event.RoomID(), "room": event.RoomID(),
"sender": event.Sender(), "sender": event.Sender(),
}).Debug("Stored outlier") }).Debug("Stored outlier")
return event.EventID(), nil return nil
} }
roomInfo, err := r.DB.RoomInfo(ctx, event.RoomID()) roomInfo, err := r.DB.RoomInfo(ctx, event.RoomID())
if err != nil { if err != nil {
return "", fmt.Errorf("r.DB.RoomInfo: %w", err) return fmt.Errorf("r.DB.RoomInfo: %w", err)
} }
if roomInfo == nil { if roomInfo == nil {
return "", fmt.Errorf("r.DB.RoomInfo missing for room %s", event.RoomID()) return fmt.Errorf("r.DB.RoomInfo missing for room %s", event.RoomID())
} }
if stateAtEvent.BeforeStateSnapshotNID == 0 { if stateAtEvent.BeforeStateSnapshotNID == 0 {
@ -162,7 +162,7 @@ func (r *Inputer) processRoomEvent(
// Lets calculate one. // Lets calculate one.
err = r.calculateAndSetState(ctx, input, *roomInfo, &stateAtEvent, event, isRejected) err = r.calculateAndSetState(ctx, input, *roomInfo, &stateAtEvent, event, isRejected)
if err != nil && input.Kind != api.KindOld { if err != nil && input.Kind != api.KindOld {
return "", fmt.Errorf("r.calculateAndSetState: %w", err) return fmt.Errorf("r.calculateAndSetState: %w", err)
} }
} }
@ -175,7 +175,7 @@ func (r *Inputer) processRoomEvent(
"soft_fail": softfail, "soft_fail": softfail,
"sender": event.Sender(), "sender": event.Sender(),
}).Debug("Stored rejected event") }).Debug("Stored rejected event")
return event.EventID(), rejectionErr return rejectionErr
} }
switch input.Kind { switch input.Kind {
@ -189,7 +189,7 @@ func (r *Inputer) processRoomEvent(
input.TransactionID, // transaction ID input.TransactionID, // transaction ID
input.HasState, // rewrites state? input.HasState, // rewrites state?
); err != nil { ); err != nil {
return "", fmt.Errorf("r.updateLatestEvents: %w", err) return fmt.Errorf("r.updateLatestEvents: %w", err)
} }
case api.KindOld: case api.KindOld:
err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{ err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{
@ -201,7 +201,7 @@ func (r *Inputer) processRoomEvent(
}, },
}) })
if err != nil { if err != nil {
return "", fmt.Errorf("r.WriteOutputEvents (old): %w", err) return fmt.Errorf("r.WriteOutputEvents (old): %w", err)
} }
} }
@ -220,12 +220,12 @@ func (r *Inputer) processRoomEvent(
}, },
}) })
if err != nil { if err != nil {
return "", fmt.Errorf("r.WriteOutputEvents (redactions): %w", err) return fmt.Errorf("r.WriteOutputEvents (redactions): %w", err)
} }
} }
// Update the extremities of the event graph for the room // Update the extremities of the event graph for the room
return event.EventID(), nil return nil
} }
func (r *Inputer) calculateAndSetState( func (r *Inputer) calculateAndSetState(

View file

@ -100,9 +100,9 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
return return
} }
} }
s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent) err = s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent)
case api.OutputTypeOldRoomEvent: case api.OutputTypeOldRoomEvent:
s.onOldRoomEvent(context.TODO(), *output.OldRoomEvent) err = s.onOldRoomEvent(context.TODO(), *output.OldRoomEvent)
case api.OutputTypeNewInviteEvent: case api.OutputTypeNewInviteEvent:
s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent) s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent)
case api.OutputTypeRetireInviteEvent: case api.OutputTypeRetireInviteEvent:
@ -112,7 +112,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
case api.OutputTypeRetirePeek: case api.OutputTypeRetirePeek:
s.onRetirePeek(context.TODO(), *output.RetirePeek) s.onRetirePeek(context.TODO(), *output.RetirePeek)
case api.OutputTypeRedactedEvent: case api.OutputTypeRedactedEvent:
s.onRedactEvent(context.TODO(), *output.RedactedEvent) err = s.onRedactEvent(context.TODO(), *output.RedactedEvent)
default: default:
log.WithField("type", output.Type).Debug( log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type", "roomserver output log: ignoring unknown output type",

View file

@ -127,7 +127,7 @@ func TestNewEventAndJoinedToRoom(t *testing.T) {
go func() { go func() {
pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore)) pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore))
if err != nil { if err != nil {
t.Errorf("TestNewEventAndJoinedToRoom error: %w", err) t.Errorf("TestNewEventAndJoinedToRoom error: %s", err)
} }
mustEqualPositions(t, pos, syncPositionAfter) mustEqualPositions(t, pos, syncPositionAfter)
wg.Done() wg.Done()
@ -190,7 +190,7 @@ func TestNewInviteEventForUser(t *testing.T) {
go func() { go func() {
pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore)) pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore))
if err != nil { if err != nil {
t.Errorf("TestNewInviteEventForUser error: %w", err) t.Errorf("TestNewInviteEventForUser error: %s", err)
} }
mustEqualPositions(t, pos, syncPositionAfter) mustEqualPositions(t, pos, syncPositionAfter)
wg.Done() wg.Done()
@ -246,7 +246,7 @@ func TestMultipleRequestWakeup(t *testing.T) {
poll := func() { poll := func() {
pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore)) pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore))
if err != nil { if err != nil {
t.Errorf("TestMultipleRequestWakeup error: %w", err) t.Errorf("TestMultipleRequestWakeup error: %s", err)
} }
mustEqualPositions(t, pos, syncPositionAfter) mustEqualPositions(t, pos, syncPositionAfter)
wg.Done() wg.Done()
@ -284,7 +284,7 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
go func() { go func() {
pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore)) pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore))
if err != nil { if err != nil {
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %w", err) t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err)
} }
mustEqualPositions(t, pos, syncPositionAfter) mustEqualPositions(t, pos, syncPositionAfter)
leaveWG.Done() leaveWG.Done()
@ -301,7 +301,7 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
go func() { go func() {
pos, err := waitForEvents(n, newTestSyncRequest(alice, aliceDev, syncPositionAfter)) pos, err := waitForEvents(n, newTestSyncRequest(alice, aliceDev, syncPositionAfter))
if err != nil { if err != nil {
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %w", err) t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err)
} }
mustEqualPositions(t, pos, syncPositionAfter2) mustEqualPositions(t, pos, syncPositionAfter2)
aliceWG.Done() aliceWG.Done()