Fix lint issues

This commit is contained in:
Neil Alexander 2021-12-17 14:40:54 +00:00
parent a4bf763409
commit 901adbf066
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
5 changed files with 25 additions and 24 deletions

View file

@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
//go:build !windows
// +build !windows // +build !windows
package internal package internal

View file

@ -66,7 +66,7 @@ func (r *Inputer) Start() error {
inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{}) inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{})
inbox.(*phony.Inbox).Act(nil, func() { inbox.(*phony.Inbox).Act(nil, func() {
_ = msg.InProgress() _ = msg.InProgress()
if _, err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil { if err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil {
sentry.CaptureException(err) sentry.CaptureException(err)
_ = msg.Respond([]byte(err.Error())) _ = msg.Respond([]byte(err.Error()))
} else { } else {
@ -113,7 +113,7 @@ func (r *Inputer) InputRoomEvents(
inputRoomEvent := e inputRoomEvent := e
inbox, _ := r.workers.LoadOrStore(inputRoomEvent.Event.RoomID(), &phony.Inbox{}) inbox, _ := r.workers.LoadOrStore(inputRoomEvent.Event.RoomID(), &phony.Inbox{})
inbox.(*phony.Inbox).Act(nil, func() { inbox.(*phony.Inbox).Act(nil, func() {
_, err := r.processRoomEvent(context.TODO(), &inputRoomEvent) err := r.processRoomEvent(context.TODO(), &inputRoomEvent)
if err != nil { if err != nil {
sentry.CaptureException(err) sentry.CaptureException(err)
} else { } else {

View file

@ -62,7 +62,7 @@ var processRoomEventDuration = prometheus.NewHistogramVec(
func (r *Inputer) processRoomEvent( func (r *Inputer) processRoomEvent(
ctx context.Context, ctx context.Context,
input *api.InputRoomEvent, input *api.InputRoomEvent,
) (eventID string, err error) { ) (err error) {
// Measure how long it takes to process this event. // Measure how long it takes to process this event.
started := time.Now() started := time.Now()
defer func() { defer func() {
@ -88,11 +88,11 @@ func (r *Inputer) processRoomEvent(
case gomatrixserverlib.EventIDFormatV1: case gomatrixserverlib.EventIDFormatV1:
if bytes.Equal(event.EventReference().EventSHA256, evs[0].EventReference().EventSHA256) { if bytes.Equal(event.EventReference().EventSHA256, evs[0].EventReference().EventSHA256) {
util.GetLogger(ctx).WithField("event_id", event.EventID()).Infof("Already processed event; ignoring") util.GetLogger(ctx).WithField("event_id", event.EventID()).Infof("Already processed event; ignoring")
return event.EventID(), nil return nil
} }
default: default:
util.GetLogger(ctx).WithField("event_id", event.EventID()).Infof("Already processed event; ignoring") util.GetLogger(ctx).WithField("event_id", event.EventID()).Infof("Already processed event; ignoring")
return event.EventID(), nil return nil
} }
} }
} }
@ -124,14 +124,14 @@ func (r *Inputer) processRoomEvent(
// Store the event. // Store the event.
_, _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected) _, _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected)
if err != nil { if err != nil {
return "", fmt.Errorf("r.DB.StoreEvent: %w", err) return fmt.Errorf("r.DB.StoreEvent: %w", err)
} }
// if storing this event results in it being redacted then do so. // if storing this event results in it being redacted then do so.
if !isRejected && redactedEventID == event.EventID() { if !isRejected && redactedEventID == event.EventID() {
r, rerr := eventutil.RedactEvent(redactionEvent, event) r, rerr := eventutil.RedactEvent(redactionEvent, event)
if rerr != nil { if rerr != nil {
return "", fmt.Errorf("eventutil.RedactEvent: %w", rerr) return fmt.Errorf("eventutil.RedactEvent: %w", rerr)
} }
event = r event = r
} }
@ -146,15 +146,15 @@ func (r *Inputer) processRoomEvent(
"room": event.RoomID(), "room": event.RoomID(),
"sender": event.Sender(), "sender": event.Sender(),
}).Debug("Stored outlier") }).Debug("Stored outlier")
return event.EventID(), nil return nil
} }
roomInfo, err := r.DB.RoomInfo(ctx, event.RoomID()) roomInfo, err := r.DB.RoomInfo(ctx, event.RoomID())
if err != nil { if err != nil {
return "", fmt.Errorf("r.DB.RoomInfo: %w", err) return fmt.Errorf("r.DB.RoomInfo: %w", err)
} }
if roomInfo == nil { if roomInfo == nil {
return "", fmt.Errorf("r.DB.RoomInfo missing for room %s", event.RoomID()) return fmt.Errorf("r.DB.RoomInfo missing for room %s", event.RoomID())
} }
if stateAtEvent.BeforeStateSnapshotNID == 0 { if stateAtEvent.BeforeStateSnapshotNID == 0 {
@ -162,7 +162,7 @@ func (r *Inputer) processRoomEvent(
// Lets calculate one. // Lets calculate one.
err = r.calculateAndSetState(ctx, input, *roomInfo, &stateAtEvent, event, isRejected) err = r.calculateAndSetState(ctx, input, *roomInfo, &stateAtEvent, event, isRejected)
if err != nil && input.Kind != api.KindOld { if err != nil && input.Kind != api.KindOld {
return "", fmt.Errorf("r.calculateAndSetState: %w", err) return fmt.Errorf("r.calculateAndSetState: %w", err)
} }
} }
@ -175,7 +175,7 @@ func (r *Inputer) processRoomEvent(
"soft_fail": softfail, "soft_fail": softfail,
"sender": event.Sender(), "sender": event.Sender(),
}).Debug("Stored rejected event") }).Debug("Stored rejected event")
return event.EventID(), rejectionErr return rejectionErr
} }
switch input.Kind { switch input.Kind {
@ -189,7 +189,7 @@ func (r *Inputer) processRoomEvent(
input.TransactionID, // transaction ID input.TransactionID, // transaction ID
input.HasState, // rewrites state? input.HasState, // rewrites state?
); err != nil { ); err != nil {
return "", fmt.Errorf("r.updateLatestEvents: %w", err) return fmt.Errorf("r.updateLatestEvents: %w", err)
} }
case api.KindOld: case api.KindOld:
err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{ err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{
@ -201,7 +201,7 @@ func (r *Inputer) processRoomEvent(
}, },
}) })
if err != nil { if err != nil {
return "", fmt.Errorf("r.WriteOutputEvents (old): %w", err) return fmt.Errorf("r.WriteOutputEvents (old): %w", err)
} }
} }
@ -220,12 +220,12 @@ func (r *Inputer) processRoomEvent(
}, },
}) })
if err != nil { if err != nil {
return "", fmt.Errorf("r.WriteOutputEvents (redactions): %w", err) return fmt.Errorf("r.WriteOutputEvents (redactions): %w", err)
} }
} }
// Update the extremities of the event graph for the room // Update the extremities of the event graph for the room
return event.EventID(), nil return nil
} }
func (r *Inputer) calculateAndSetState( func (r *Inputer) calculateAndSetState(

View file

@ -100,9 +100,9 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
return return
} }
} }
s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent) err = s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent)
case api.OutputTypeOldRoomEvent: case api.OutputTypeOldRoomEvent:
s.onOldRoomEvent(context.TODO(), *output.OldRoomEvent) err = s.onOldRoomEvent(context.TODO(), *output.OldRoomEvent)
case api.OutputTypeNewInviteEvent: case api.OutputTypeNewInviteEvent:
s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent) s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent)
case api.OutputTypeRetireInviteEvent: case api.OutputTypeRetireInviteEvent:
@ -112,7 +112,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
case api.OutputTypeRetirePeek: case api.OutputTypeRetirePeek:
s.onRetirePeek(context.TODO(), *output.RetirePeek) s.onRetirePeek(context.TODO(), *output.RetirePeek)
case api.OutputTypeRedactedEvent: case api.OutputTypeRedactedEvent:
s.onRedactEvent(context.TODO(), *output.RedactedEvent) err = s.onRedactEvent(context.TODO(), *output.RedactedEvent)
default: default:
log.WithField("type", output.Type).Debug( log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type", "roomserver output log: ignoring unknown output type",

View file

@ -127,7 +127,7 @@ func TestNewEventAndJoinedToRoom(t *testing.T) {
go func() { go func() {
pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore)) pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore))
if err != nil { if err != nil {
t.Errorf("TestNewEventAndJoinedToRoom error: %w", err) t.Errorf("TestNewEventAndJoinedToRoom error: %s", err)
} }
mustEqualPositions(t, pos, syncPositionAfter) mustEqualPositions(t, pos, syncPositionAfter)
wg.Done() wg.Done()
@ -190,7 +190,7 @@ func TestNewInviteEventForUser(t *testing.T) {
go func() { go func() {
pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore)) pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore))
if err != nil { if err != nil {
t.Errorf("TestNewInviteEventForUser error: %w", err) t.Errorf("TestNewInviteEventForUser error: %s", err)
} }
mustEqualPositions(t, pos, syncPositionAfter) mustEqualPositions(t, pos, syncPositionAfter)
wg.Done() wg.Done()
@ -246,7 +246,7 @@ func TestMultipleRequestWakeup(t *testing.T) {
poll := func() { poll := func() {
pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore)) pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore))
if err != nil { if err != nil {
t.Errorf("TestMultipleRequestWakeup error: %w", err) t.Errorf("TestMultipleRequestWakeup error: %s", err)
} }
mustEqualPositions(t, pos, syncPositionAfter) mustEqualPositions(t, pos, syncPositionAfter)
wg.Done() wg.Done()
@ -284,7 +284,7 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
go func() { go func() {
pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore)) pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore))
if err != nil { if err != nil {
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %w", err) t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err)
} }
mustEqualPositions(t, pos, syncPositionAfter) mustEqualPositions(t, pos, syncPositionAfter)
leaveWG.Done() leaveWG.Done()
@ -301,7 +301,7 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
go func() { go func() {
pos, err := waitForEvents(n, newTestSyncRequest(alice, aliceDev, syncPositionAfter)) pos, err := waitForEvents(n, newTestSyncRequest(alice, aliceDev, syncPositionAfter))
if err != nil { if err != nil {
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %w", err) t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err)
} }
mustEqualPositions(t, pos, syncPositionAfter2) mustEqualPositions(t, pos, syncPositionAfter2)
aliceWG.Done() aliceWG.Done()