Fulltext implementation using Bleve (#2675)

Based on #2480

This actually indexes events based on their event type. They are removed
from the index if we receive a `m.room.redaction` event on the
`OutputRoomEvent` stream.
An admin endpoint is added to reindex all existing events.


Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
This commit is contained in:
Till 2022-09-27 18:06:49 +02:00 committed by GitHub
parent 6c67552bf9
commit 87be32ca26
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 680 additions and 53 deletions

View file

@ -17,14 +17,18 @@ package consumers
import (
"context"
"encoding/json"
"strings"
"time"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/fulltext"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
@ -35,14 +39,18 @@ import (
// OutputClientDataConsumer consumes events that originated in the client API server.
type OutputClientDataConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable string
topic string
db storage.Database
stream types.StreamProvider
notifier *notifier.Notifier
serverName gomatrixserverlib.ServerName
ctx context.Context
jetstream nats.JetStreamContext
nats *nats.Conn
durable string
topic string
topicReIndex string
db storage.Database
stream types.StreamProvider
notifier *notifier.Notifier
serverName gomatrixserverlib.ServerName
fts *fulltext.Search
cfg *config.SyncAPI
}
// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
@ -50,24 +58,93 @@ func NewOutputClientDataConsumer(
process *process.ProcessContext,
cfg *config.SyncAPI,
js nats.JetStreamContext,
nats *nats.Conn,
store storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
fts *fulltext.Search,
) *OutputClientDataConsumer {
return &OutputClientDataConsumer{
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
durable: cfg.Matrix.JetStream.Durable("SyncAPIAccountDataConsumer"),
db: store,
notifier: notifier,
stream: stream,
serverName: cfg.Matrix.ServerName,
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
topicReIndex: cfg.Matrix.JetStream.Prefixed(jetstream.InputFulltextReindex),
durable: cfg.Matrix.JetStream.Durable("SyncAPIAccountDataConsumer"),
nats: nats,
db: store,
notifier: notifier,
stream: stream,
serverName: cfg.Matrix.ServerName,
fts: fts,
cfg: cfg,
}
}
// Start consuming from room servers
func (s *OutputClientDataConsumer) Start() error {
_, err := s.nats.Subscribe(s.topicReIndex, func(msg *nats.Msg) {
if err := msg.Ack(); err != nil {
return
}
if !s.cfg.Fulltext.Enabled {
logrus.Warn("Fulltext indexing is disabled")
return
}
ctx := context.Background()
logrus.Debugf("Starting to index events")
var offset int
start := time.Now()
count := 0
var id int64 = 0
for {
evs, err := s.db.ReIndex(ctx, 1000, id)
if err != nil {
logrus.WithError(err).Errorf("unable to get events to index")
return
}
if len(evs) == 0 {
break
}
logrus.Debugf("Indexing %d events", len(evs))
elements := make([]fulltext.IndexElement, 0, len(evs))
for streamPos, ev := range evs {
id = streamPos
e := fulltext.IndexElement{
EventID: ev.EventID(),
RoomID: ev.RoomID(),
StreamPosition: streamPos,
}
e.SetContentType(ev.Type())
switch ev.Type() {
case "m.room.message":
e.Content = gjson.GetBytes(ev.Content(), "body").String()
case gomatrixserverlib.MRoomName:
e.Content = gjson.GetBytes(ev.Content(), "name").String()
case gomatrixserverlib.MRoomTopic:
e.Content = gjson.GetBytes(ev.Content(), "topic").String()
default:
continue
}
if strings.TrimSpace(e.Content) == "" {
continue
}
elements = append(elements, e)
}
if err = s.fts.Index(elements...); err != nil {
logrus.WithError(err).Error("unable to index events")
continue
}
offset += len(evs)
count += len(elements)
}
logrus.Debugf("Indexed %d events in %v", count, time.Since(start))
})
if err != nil {
return err
}
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, 1,
s.onMessage, nats.DeliverAll(), nats.ManualAck(),

View file

@ -24,7 +24,9 @@ import (
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/internal/fulltext"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
@ -46,6 +48,7 @@ type OutputRoomEventConsumer struct {
pduStream types.StreamProvider
inviteStream types.StreamProvider
notifier *notifier.Notifier
fts *fulltext.Search
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
@ -58,6 +61,7 @@ func NewOutputRoomEventConsumer(
pduStream types.StreamProvider,
inviteStream types.StreamProvider,
rsAPI api.SyncRoomserverAPI,
fts *fulltext.Search,
) *OutputRoomEventConsumer {
return &OutputRoomEventConsumer{
ctx: process.Context(),
@ -70,6 +74,7 @@ func NewOutputRoomEventConsumer(
pduStream: pduStream,
inviteStream: inviteStream,
rsAPI: rsAPI,
fts: fts,
}
}
@ -251,6 +256,12 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
}).Panicf("roomserver output log: write new event failure")
return nil
}
if err = s.writeFTS(ev, pduPos); err != nil {
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"type": ev.Type(),
}).WithError(err).Warn("failed to index fulltext element")
}
if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil {
log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
@ -295,6 +306,13 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
return nil
}
if err = s.writeFTS(ev, pduPos); err != nil {
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"type": ev.Type(),
}).WithError(err).Warn("failed to index fulltext element")
}
if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil {
log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
return err
@ -451,3 +469,39 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event *gomatrixserverlib.Head
event.Event, err = event.SetUnsigned(prev)
return event, err
}
func (s *OutputRoomEventConsumer) writeFTS(ev *gomatrixserverlib.HeaderedEvent, pduPosition types.StreamPosition) error {
if !s.cfg.Fulltext.Enabled {
return nil
}
e := fulltext.IndexElement{
EventID: ev.EventID(),
RoomID: ev.RoomID(),
StreamPosition: int64(pduPosition),
}
e.SetContentType(ev.Type())
switch ev.Type() {
case "m.room.message":
e.Content = gjson.GetBytes(ev.Content(), "body").String()
case gomatrixserverlib.MRoomName:
e.Content = gjson.GetBytes(ev.Content(), "name").String()
case gomatrixserverlib.MRoomTopic:
e.Content = gjson.GetBytes(ev.Content(), "topic").String()
case gomatrixserverlib.MRoomRedaction:
log.Tracef("Redacting event: %s", ev.Redacts())
if err := s.fts.Delete(ev.Redacts()); err != nil {
return fmt.Errorf("failed to delete entry from fulltext index: %w", err)
}
return nil
default:
return nil
}
if e.Content != "" {
log.Tracef("Indexing element: %+v", e)
if err := s.fts.Index(e); err != nil {
return err
}
}
return nil
}