diff --git a/internal/log_unix.go b/internal/log_unix.go index 25ad0420..621ca5fa 100644 --- a/internal/log_unix.go +++ b/internal/log_unix.go @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build !windows // +build !windows package internal diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index f561e8b5..857dbe1c 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -66,7 +66,7 @@ func (r *Inputer) Start() error { inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{}) inbox.(*phony.Inbox).Act(nil, func() { _ = msg.InProgress() - if _, err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil { + if err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil { sentry.CaptureException(err) _ = msg.Respond([]byte(err.Error())) } else { @@ -113,7 +113,7 @@ func (r *Inputer) InputRoomEvents( inputRoomEvent := e inbox, _ := r.workers.LoadOrStore(inputRoomEvent.Event.RoomID(), &phony.Inbox{}) inbox.(*phony.Inbox).Act(nil, func() { - _, err := r.processRoomEvent(context.TODO(), &inputRoomEvent) + err := r.processRoomEvent(context.TODO(), &inputRoomEvent) if err != nil { sentry.CaptureException(err) } else { diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index fc712f47..791f7f30 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -62,7 +62,7 @@ var processRoomEventDuration = prometheus.NewHistogramVec( func (r *Inputer) processRoomEvent( ctx context.Context, input *api.InputRoomEvent, -) (eventID string, err error) { +) (err error) { // Measure how long it takes to process this event. started := time.Now() defer func() { @@ -88,11 +88,11 @@ func (r *Inputer) processRoomEvent( case gomatrixserverlib.EventIDFormatV1: if bytes.Equal(event.EventReference().EventSHA256, evs[0].EventReference().EventSHA256) { util.GetLogger(ctx).WithField("event_id", event.EventID()).Infof("Already processed event; ignoring") - return event.EventID(), nil + return nil } default: util.GetLogger(ctx).WithField("event_id", event.EventID()).Infof("Already processed event; ignoring") - return event.EventID(), nil + return nil } } } @@ -124,14 +124,14 @@ func (r *Inputer) processRoomEvent( // Store the event. _, _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected) if err != nil { - return "", fmt.Errorf("r.DB.StoreEvent: %w", err) + return fmt.Errorf("r.DB.StoreEvent: %w", err) } // if storing this event results in it being redacted then do so. if !isRejected && redactedEventID == event.EventID() { r, rerr := eventutil.RedactEvent(redactionEvent, event) if rerr != nil { - return "", fmt.Errorf("eventutil.RedactEvent: %w", rerr) + return fmt.Errorf("eventutil.RedactEvent: %w", rerr) } event = r } @@ -146,15 +146,15 @@ func (r *Inputer) processRoomEvent( "room": event.RoomID(), "sender": event.Sender(), }).Debug("Stored outlier") - return event.EventID(), nil + return nil } roomInfo, err := r.DB.RoomInfo(ctx, event.RoomID()) if err != nil { - return "", fmt.Errorf("r.DB.RoomInfo: %w", err) + return fmt.Errorf("r.DB.RoomInfo: %w", err) } if roomInfo == nil { - return "", fmt.Errorf("r.DB.RoomInfo missing for room %s", event.RoomID()) + return fmt.Errorf("r.DB.RoomInfo missing for room %s", event.RoomID()) } if stateAtEvent.BeforeStateSnapshotNID == 0 { @@ -162,7 +162,7 @@ func (r *Inputer) processRoomEvent( // Lets calculate one. err = r.calculateAndSetState(ctx, input, *roomInfo, &stateAtEvent, event, isRejected) if err != nil && input.Kind != api.KindOld { - return "", fmt.Errorf("r.calculateAndSetState: %w", err) + return fmt.Errorf("r.calculateAndSetState: %w", err) } } @@ -175,7 +175,7 @@ func (r *Inputer) processRoomEvent( "soft_fail": softfail, "sender": event.Sender(), }).Debug("Stored rejected event") - return event.EventID(), rejectionErr + return rejectionErr } switch input.Kind { @@ -189,7 +189,7 @@ func (r *Inputer) processRoomEvent( input.TransactionID, // transaction ID input.HasState, // rewrites state? ); err != nil { - return "", fmt.Errorf("r.updateLatestEvents: %w", err) + return fmt.Errorf("r.updateLatestEvents: %w", err) } case api.KindOld: err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{ @@ -201,7 +201,7 @@ func (r *Inputer) processRoomEvent( }, }) if err != nil { - return "", fmt.Errorf("r.WriteOutputEvents (old): %w", err) + return fmt.Errorf("r.WriteOutputEvents (old): %w", err) } } @@ -220,12 +220,12 @@ func (r *Inputer) processRoomEvent( }, }) if err != nil { - return "", fmt.Errorf("r.WriteOutputEvents (redactions): %w", err) + return fmt.Errorf("r.WriteOutputEvents (redactions): %w", err) } } // Update the extremities of the event graph for the room - return event.EventID(), nil + return nil } func (r *Inputer) calculateAndSetState( diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index e03d874b..e85a181d 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -100,9 +100,9 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) { return } } - s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent) + err = s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent) case api.OutputTypeOldRoomEvent: - s.onOldRoomEvent(context.TODO(), *output.OldRoomEvent) + err = s.onOldRoomEvent(context.TODO(), *output.OldRoomEvent) case api.OutputTypeNewInviteEvent: s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent) case api.OutputTypeRetireInviteEvent: @@ -112,7 +112,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) { case api.OutputTypeRetirePeek: s.onRetirePeek(context.TODO(), *output.RetirePeek) case api.OutputTypeRedactedEvent: - s.onRedactEvent(context.TODO(), *output.RedactedEvent) + err = s.onRedactEvent(context.TODO(), *output.RedactedEvent) default: log.WithField("type", output.Type).Debug( "roomserver output log: ignoring unknown output type", diff --git a/syncapi/notifier/notifier_test.go b/syncapi/notifier/notifier_test.go index 1401fc67..c6d3df7e 100644 --- a/syncapi/notifier/notifier_test.go +++ b/syncapi/notifier/notifier_test.go @@ -127,7 +127,7 @@ func TestNewEventAndJoinedToRoom(t *testing.T) { go func() { pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore)) if err != nil { - t.Errorf("TestNewEventAndJoinedToRoom error: %w", err) + t.Errorf("TestNewEventAndJoinedToRoom error: %s", err) } mustEqualPositions(t, pos, syncPositionAfter) wg.Done() @@ -190,7 +190,7 @@ func TestNewInviteEventForUser(t *testing.T) { go func() { pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore)) if err != nil { - t.Errorf("TestNewInviteEventForUser error: %w", err) + t.Errorf("TestNewInviteEventForUser error: %s", err) } mustEqualPositions(t, pos, syncPositionAfter) wg.Done() @@ -246,7 +246,7 @@ func TestMultipleRequestWakeup(t *testing.T) { poll := func() { pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore)) if err != nil { - t.Errorf("TestMultipleRequestWakeup error: %w", err) + t.Errorf("TestMultipleRequestWakeup error: %s", err) } mustEqualPositions(t, pos, syncPositionAfter) wg.Done() @@ -284,7 +284,7 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) { go func() { pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore)) if err != nil { - t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %w", err) + t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err) } mustEqualPositions(t, pos, syncPositionAfter) leaveWG.Done() @@ -301,7 +301,7 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) { go func() { pos, err := waitForEvents(n, newTestSyncRequest(alice, aliceDev, syncPositionAfter)) if err != nil { - t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %w", err) + t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err) } mustEqualPositions(t, pos, syncPositionAfter2) aliceWG.Done()