mirror of
https://github.com/hoernschen/dendrite.git
synced 2024-12-27 07:28:27 +00:00
Minor refactoring (#106)
- `s/Server/OutputRoomEvent/` in `consumers` to accurately reflect what is being consumed. - `s/set/userIDSet/` in `notifier.go` for clarity. - Removed lying comments.
This commit is contained in:
parent
c8c5a40ebd
commit
917c433fd2
3 changed files with 22 additions and 23 deletions
|
@ -81,12 +81,12 @@ func main() {
|
||||||
if err := n.Load(db); err != nil {
|
if err := n.Load(db); err != nil {
|
||||||
log.Panicf("startup: failed to set up notifier: %s", err)
|
log.Panicf("startup: failed to set up notifier: %s", err)
|
||||||
}
|
}
|
||||||
server, err := consumers.NewServer(cfg, n, db)
|
consumer, err := consumers.NewOutputRoomEvent(cfg, n, db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("startup: failed to create sync server: %s", err)
|
log.Panicf("startup: failed to create room server consumer: %s", err)
|
||||||
}
|
}
|
||||||
if err = server.Start(); err != nil {
|
if err = consumer.Start(); err != nil {
|
||||||
log.Panicf("startup: failed to start sync server")
|
log.Panicf("startup: failed to start room server consumer")
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("Starting sync server on ", *bindAddr)
|
log.Info("Starting sync server on ", *bindAddr)
|
||||||
|
|
|
@ -28,15 +28,15 @@ import (
|
||||||
sarama "gopkg.in/Shopify/sarama.v1"
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Server contains all the logic for running a sync server
|
// OutputRoomEvent consumes events that originated in the room server.
|
||||||
type Server struct {
|
type OutputRoomEvent struct {
|
||||||
roomServerConsumer *common.ContinualConsumer
|
roomServerConsumer *common.ContinualConsumer
|
||||||
db *storage.SyncServerDatabase
|
db *storage.SyncServerDatabase
|
||||||
notifier *sync.Notifier
|
notifier *sync.Notifier
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServer creates a new sync server. Call Start() to begin consuming from room servers.
|
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
|
||||||
func NewServer(cfg *config.Sync, n *sync.Notifier, store *storage.SyncServerDatabase) (*Server, error) {
|
func NewOutputRoomEvent(cfg *config.Sync, n *sync.Notifier, store *storage.SyncServerDatabase) (*OutputRoomEvent, error) {
|
||||||
kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil)
|
kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -47,7 +47,7 @@ func NewServer(cfg *config.Sync, n *sync.Notifier, store *storage.SyncServerData
|
||||||
Consumer: kafkaConsumer,
|
Consumer: kafkaConsumer,
|
||||||
PartitionStore: store,
|
PartitionStore: store,
|
||||||
}
|
}
|
||||||
s := &Server{
|
s := &OutputRoomEvent{
|
||||||
roomServerConsumer: &consumer,
|
roomServerConsumer: &consumer,
|
||||||
db: store,
|
db: store,
|
||||||
notifier: n,
|
notifier: n,
|
||||||
|
@ -58,14 +58,14 @@ func NewServer(cfg *config.Sync, n *sync.Notifier, store *storage.SyncServerData
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start consuming from room servers
|
// Start consuming from room servers
|
||||||
func (s *Server) Start() error {
|
func (s *OutputRoomEvent) Start() error {
|
||||||
return s.roomServerConsumer.Start()
|
return s.roomServerConsumer.Start()
|
||||||
}
|
}
|
||||||
|
|
||||||
// onMessage is called when the sync server receives a new event from the room server output log.
|
// onMessage is called when the sync server receives a new event from the room server output log.
|
||||||
// It is not safe for this function to be called from multiple goroutines, or else the
|
// It is not safe for this function to be called from multiple goroutines, or else the
|
||||||
// sync stream position may race and be incorrectly calculated.
|
// sync stream position may race and be incorrectly calculated.
|
||||||
func (s *Server) onMessage(msg *sarama.ConsumerMessage) error {
|
func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
// Parse out the event JSON
|
// Parse out the event JSON
|
||||||
var output api.OutputRoomEvent
|
var output api.OutputRoomEvent
|
||||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||||
|
|
|
@ -32,11 +32,10 @@ import (
|
||||||
// in missed events.
|
// in missed events.
|
||||||
type Notifier struct {
|
type Notifier struct {
|
||||||
// A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine
|
// A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine
|
||||||
roomIDToJoinedUsers map[string]set
|
roomIDToJoinedUsers map[string]userIDSet
|
||||||
// Protects currPos and userStreams.
|
// Protects currPos and userStreams.
|
||||||
streamLock *sync.Mutex
|
streamLock *sync.Mutex
|
||||||
// The latest sync stream position: guarded by 'currPosMutex' which is RW to allow
|
// The latest sync stream position
|
||||||
// for concurrent reads on /sync requests
|
|
||||||
currPos types.StreamPosition
|
currPos types.StreamPosition
|
||||||
// A map of user_id => UserStream which can be used to wake a given user's /sync request.
|
// A map of user_id => UserStream which can be used to wake a given user's /sync request.
|
||||||
userStreams map[string]*UserStream
|
userStreams map[string]*UserStream
|
||||||
|
@ -44,11 +43,11 @@ type Notifier struct {
|
||||||
|
|
||||||
// NewNotifier creates a new notifier set to the given stream position.
|
// NewNotifier creates a new notifier set to the given stream position.
|
||||||
// In order for this to be of any use, the Notifier needs to be told all rooms and
|
// In order for this to be of any use, the Notifier needs to be told all rooms and
|
||||||
// the joined users within each of them by calling Notifier.LoadFromDatabase().
|
// the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase).
|
||||||
func NewNotifier(pos types.StreamPosition) *Notifier {
|
func NewNotifier(pos types.StreamPosition) *Notifier {
|
||||||
return &Notifier{
|
return &Notifier{
|
||||||
currPos: pos,
|
currPos: pos,
|
||||||
roomIDToJoinedUsers: make(map[string]set),
|
roomIDToJoinedUsers: make(map[string]userIDSet),
|
||||||
userStreams: make(map[string]*UserStream),
|
userStreams: make(map[string]*UserStream),
|
||||||
streamLock: &sync.Mutex{},
|
streamLock: &sync.Mutex{},
|
||||||
}
|
}
|
||||||
|
@ -142,7 +141,7 @@ func (n *Notifier) setUsersJoinedToRooms(roomIDToUserIDs map[string][]string) {
|
||||||
// This is just the bulk form of addJoinedUser
|
// This is just the bulk form of addJoinedUser
|
||||||
for roomID, userIDs := range roomIDToUserIDs {
|
for roomID, userIDs := range roomIDToUserIDs {
|
||||||
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
|
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
|
||||||
n.roomIDToJoinedUsers[roomID] = make(set)
|
n.roomIDToJoinedUsers[roomID] = make(userIDSet)
|
||||||
}
|
}
|
||||||
for _, userID := range userIDs {
|
for _, userID := range userIDs {
|
||||||
n.roomIDToJoinedUsers[roomID].add(userID)
|
n.roomIDToJoinedUsers[roomID].add(userID)
|
||||||
|
@ -174,7 +173,7 @@ func (n *Notifier) fetchUserStream(userID string, makeIfNotExists bool) *UserStr
|
||||||
// Not thread-safe: must be called on the OnNewEvent goroutine only
|
// Not thread-safe: must be called on the OnNewEvent goroutine only
|
||||||
func (n *Notifier) addJoinedUser(roomID, userID string) {
|
func (n *Notifier) addJoinedUser(roomID, userID string) {
|
||||||
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
|
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
|
||||||
n.roomIDToJoinedUsers[roomID] = make(set)
|
n.roomIDToJoinedUsers[roomID] = make(userIDSet)
|
||||||
}
|
}
|
||||||
n.roomIDToJoinedUsers[roomID].add(userID)
|
n.roomIDToJoinedUsers[roomID].add(userID)
|
||||||
}
|
}
|
||||||
|
@ -182,7 +181,7 @@ func (n *Notifier) addJoinedUser(roomID, userID string) {
|
||||||
// Not thread-safe: must be called on the OnNewEvent goroutine only
|
// Not thread-safe: must be called on the OnNewEvent goroutine only
|
||||||
func (n *Notifier) removeJoinedUser(roomID, userID string) {
|
func (n *Notifier) removeJoinedUser(roomID, userID string) {
|
||||||
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
|
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
|
||||||
n.roomIDToJoinedUsers[roomID] = make(set)
|
n.roomIDToJoinedUsers[roomID] = make(userIDSet)
|
||||||
}
|
}
|
||||||
n.roomIDToJoinedUsers[roomID].remove(userID)
|
n.roomIDToJoinedUsers[roomID].remove(userID)
|
||||||
}
|
}
|
||||||
|
@ -196,17 +195,17 @@ func (n *Notifier) joinedUsers(roomID string) (userIDs []string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// A string set, mainly existing for improving clarity of structs in this file.
|
// A string set, mainly existing for improving clarity of structs in this file.
|
||||||
type set map[string]bool
|
type userIDSet map[string]bool
|
||||||
|
|
||||||
func (s set) add(str string) {
|
func (s userIDSet) add(str string) {
|
||||||
s[str] = true
|
s[str] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s set) remove(str string) {
|
func (s userIDSet) remove(str string) {
|
||||||
delete(s, str)
|
delete(s, str)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s set) values() (vals []string) {
|
func (s userIDSet) values() (vals []string) {
|
||||||
for str := range s {
|
for str := range s {
|
||||||
vals = append(vals, str)
|
vals = append(vals, str)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue