dendrite/syncapi/streams/stream_typing.go
S7evinK 49dc49b232
Remove eduserver (#2306)
* Move receipt sending to own JetStream producer

* Move SendToDevice to producer

* Remove most parts of the EDU server

* Fix SendToDevice & copyrights

* Move structs, cleanup EDU Server traces

* Use HeadersOnly subscription

* Missing file

* Fix linter issues

* Move consumers to own files

* Rename durable consumer; Consumer cleanup

* Docs/config cleanup
2022-03-29 14:14:35 +02:00

60 lines
1.3 KiB
Go

package streams
import (
"context"
"encoding/json"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
type TypingStreamProvider struct {
StreamProvider
EDUCache *caching.EDUCache
}
func (p *TypingStreamProvider) CompleteSync(
ctx context.Context,
req *types.SyncRequest,
) types.StreamPosition {
return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
}
func (p *TypingStreamProvider) IncrementalSync(
ctx context.Context,
req *types.SyncRequest,
from, to types.StreamPosition,
) types.StreamPosition {
var err error
for roomID, membership := range req.Rooms {
if membership != gomatrixserverlib.Join {
continue
}
jr := *types.NewJoinResponse()
if existing, ok := req.Response.Rooms.Join[roomID]; ok {
jr = existing
}
if users, updated := p.EDUCache.GetTypingUsersIfUpdatedAfter(
roomID, int64(from),
); updated {
ev := gomatrixserverlib.ClientEvent{
Type: gomatrixserverlib.MTyping,
}
ev.Content, err = json.Marshal(map[string]interface{}{
"user_ids": users,
})
if err != nil {
req.Log.WithError(err).Error("json.Marshal failed")
return from
}
jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
req.Response.Rooms.Join[roomID] = jr
}
}
return to
}