Report which component failed to consume (#1375)

This commit is contained in:
Neil Alexander 2020-09-01 16:53:38 +01:00 committed by GitHub
parent 6d79f04354
commit 89c772fb78
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 13 additions and 1 deletions

View file

@ -33,6 +33,7 @@ type PartitionStorer interface {
// A ContinualConsumer continually consumes logs even across restarts. It requires a PartitionStorer to
// remember the offset it reached.
type ContinualConsumer struct {
ComponentName string
// The kafkaesque topic to consume events from.
// This is the name used in kafka to identify the stream to consume events from.
Topic string
@ -111,7 +112,7 @@ func (c *ContinualConsumer) consumePartition(pc sarama.PartitionConsumer) {
msgErr := c.ProcessMessage(message)
// Advance our position in the stream so that we will start at the right position after a restart.
if err := c.PartitionStore.SetPartitionOffset(context.TODO(), c.Topic, message.Partition, message.Offset); err != nil {
panic(fmt.Errorf("the ContinualConsumer failed to SetPartitionOffset: %w", err))
panic(fmt.Errorf("the ContinualConsumer in %q failed to SetPartitionOffset: %w", c.ComponentName, err))
}
// Shutdown if we were told to do so.
if msgErr == ErrShutdown {