Table of Contents

Class KafkaConsumerConfiguration

Namespace
Silverback.Messaging.Configuration.Kafka
Assembly
Silverback.Integration.Kafka.dll

Wraps the Confluent.Kafka.ConsumerConfig adding the Silverback specific settings.

public sealed record KafkaConsumerConfiguration : KafkaClientConfiguration<ConsumerConfig>, IValidatableSettings, IEquatable<KafkaClientConfiguration<ConsumerConfig>>, IEquatable<KafkaConsumerConfiguration>
Inheritance
KafkaClientConfiguration<ConsumerConfig>
KafkaConsumerConfiguration
Implements
Inherited Members

Constructors

KafkaConsumerConfiguration()

public KafkaConsumerConfiguration()

Properties

AutoCommitIntervalMs

Gets the frequency in milliseconds at which the consumer offsets are committed.

public int? AutoCommitIntervalMs { get; init; }

Property Value

int?

AutoOffsetReset

Gets the action to take when there is no initial offset in the offset store or the desired offset is out of range: Confluent.Kafka.AutoOffsetReset.Earliest to automatically reset to the smallest offset, Confluent.Kafka.AutoOffsetReset.Latest to automatically reset to the largest offset, and Confluent.Kafka.AutoOffsetReset.Error to trigger an error (ERR__AUTO_OFFSET_RESET).

public AutoOffsetReset? AutoOffsetReset { get; init; }

Property Value

AutoOffsetReset?

BackpressureLimit

Gets the maximum number of messages to be consumed and enqueued waiting to be processed. When ProcessPartitionsIndependently is set to true (default) the limit will be applied per partition. The default is 50.

public int BackpressureLimit { get; init; }

Property Value

int

CheckCrcs

Gets a value indicating whether the CRC32 of the consumed messages must be verified, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage.

public bool? CheckCrcs { get; init; }

Property Value

bool?

ClientSideOffsetStore

Gets the settings for the IKafkaOffsetStore to be used to store the offsets. The stored offsets will be used during the partitions assignment to determine the starting offset and ensure that each message is consumed only once.

public KafkaOffsetStoreSettings? ClientSideOffsetStore { get; init; }

Property Value

KafkaOffsetStoreSettings

CommitOffsetEach

Gets the number of messages to be processed before committing the offset to the server. The most reliable level is 1, but it reduces throughput.

public int? CommitOffsetEach { get; init; }

Property Value

int?

CommitOffsets

Gets a value indicating whether the offsets must be committed. The default is true.

public bool CommitOffsets { get; init; }

Property Value

bool

ConsumeResultFields

Gets a comma-separated list of fields that may be optionally set in Confluent.Kafka.ConsumeResult<TKey, TValue> objects returned by the Consume(TimeSpan) method. Disabling fields that you do not require will improve throughput and reduce memory consumption. Allowed values: headers, timestamp, topic, all, none.

public string? ConsumeResultFields { get; init; }

Property Value

string

CoordinatorQueryIntervalMs

Gets the interval (in milliseconds) at which the current group coordinator must be queried. If the currently assigned coordinator is down the configured query interval will be divided by ten to more quickly recover in case of coordinator reassignment.

public int? CoordinatorQueryIntervalMs { get; init; }

Property Value

int?

EnableAutoCommit

Gets a value indicating whether the offsets must be automatically and periodically committed in the background.
Note: setting this to false does not prevent the consumer from fetching previously committed start offsets. To circumvent this behaviour set specific start offsets per partition in the call to assign(). The default is true.

public bool EnableAutoCommit { get; init; }

Property Value

bool

EnableAutoRecovery

Gets a value indicating whether the consumer has to be automatically recycled when a Confluent.Kafka.KafkaException is thrown while polling/consuming or an issues is detected (e.g. a poll timeout is reported). The default is true.

public bool EnableAutoRecovery { get; init; }

Property Value

bool

EnablePartitionEof

Gets a value indicating whether the partition EOF event must be emitted whenever the consumer reaches the end of a partition.

public bool? EnablePartitionEof { get; init; }

Property Value

bool?

Endpoints

Gets the configured endpoints.

public IValueReadOnlyCollection<KafkaConsumerEndpointConfiguration> Endpoints { get; init; }

Property Value

IValueReadOnlyCollection<KafkaConsumerEndpointConfiguration>

EqualityContract

protected override Type EqualityContract { get; }

Property Value

Type

FetchErrorBackoffMs

Gets how long to postpone the next fetch request for a topic and partition in case of a fetch error.

public int? FetchErrorBackoffMs { get; init; }

Property Value

int?

FetchMaxBytes

Gets the maximum amount of data the broker shall return for a fetch request. The messages are fetched in batches by the consumer and if the first message batch in the first non-empty partition of the fetch request is larger than this value, then the message batch will still be returned to ensure that the consumer can make progress. The maximum message batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (broker topic config). This value is automatically adjusted upwards to be at least message.max.bytes (consumer config).

public int? FetchMaxBytes { get; init; }

Property Value

int?

FetchMinBytes

Gets the minimum number of bytes that the broker must respond with. If FetchWaitMaxMs expires the accumulated data will be sent to the client regardless of this setting.

public int? FetchMinBytes { get; init; }

Property Value

int?

FetchQueueBackoffMs

Gets the maximum time in milliseconds to postpone the next fetch request for a topic+partition in case the current fetch queue thresholds (QueuedMinMessages or QueuedMaxMessagesKbytes) have been exceeded. This property may need to be decreased if the queue thresholds are set low and the application is experiencing long (~1s) delays between messages. Low values may increase CPU utilization.

public int? FetchQueueBackoffMs { get; init; }

Property Value

int?

FetchWaitMaxMs

Gets the maximum time (in milliseconds) that the broker may wait to fill the fetch response with enough messages to match the size specified by FetchMinBytes.

public int? FetchWaitMaxMs { get; init; }

Property Value

int?

GetMetadataTimeout

Gets the timeout to wait for the metadata to be retrieved from the broker. The default is 30 seconds.

public TimeSpan GetMetadataTimeout { get; init; }

Property Value

TimeSpan

GroupId

Gets the client group id. All clients sharing the same group id belong to the same group. The default is null (which will internally be replaced with "not-set" since the underlying library requires a value).

public string? GroupId { get; init; }

Property Value

string

GroupInstanceId

Gets the static instance id used to enable static group membership. Static group members are able to leave and rejoin a group within the configured SessionTimeoutMs without prompting a group rebalance. This should be used in combination with a larger SessionTimeoutMs to avoid group rebalances caused by transient unavailability (e.g. process restarts). Requires broker version >= 2.3.0.

public string? GroupInstanceId { get; init; }

Property Value

string

GroupProtocol

Gets the group protocol to use. Use Confluent.Kafka.GroupProtocol.Classic for the original protocol and Confluent.Kafka.GroupProtocol.Consumer for the new protocol introduced in KIP-848. Default is Confluent.Kafka.GroupProtocol.Classic, but will change to Confluent.Kafka.GroupProtocol.Consumer in next releases.

public GroupProtocol? GroupProtocol { get; init; }

Property Value

GroupProtocol?

GroupProtocolType

Gets the group protocol type.

public string? GroupProtocolType { get; init; }

Property Value

string

GroupRemoteAssignor

Gets the server-side assignor to use. Keep it null to make the server select a suitable assignor for the group. Available assignors: uniform or range.

public string? GroupRemoteAssignor { get; init; }

Property Value

string

HeartbeatIntervalMs

Gets the interval (in milliseconds) at which the heartbeats have to be sent to the broker.

public int? HeartbeatIntervalMs { get; init; }

Property Value

int?

IsolationLevel

Gets a value indicating how to read the messages written inside a transaction: Confluent.Kafka.IsolationLevel.ReadCommitted to only return transactional messages which have been committed, or Confluent.Kafka.IsolationLevel.ReadUncommitted to return all messages, even transactional messages which have been aborted.

public IsolationLevel? IsolationLevel { get; init; }

Property Value

IsolationLevel?

MaxDegreeOfParallelism

Gets the maximum number of incoming message that can be processed concurrently. Up to a message per each subscribed partition can be processed in parallel when processing them independently. The default is 100.

public int MaxDegreeOfParallelism { get; init; }

Property Value

int

MaxPartitionFetchBytes

Gets the initial maximum number of bytes per topic and partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched.

public int? MaxPartitionFetchBytes { get; init; }

Property Value

int?

MaxPollIntervalMs

Gets the maximum allowed time (in milliseconds) between calls to consume messages. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member.
Warning: Offset commits may be not possible at this point.

public int? MaxPollIntervalMs { get; init; }

Property Value

int?

PartitionAssignmentStrategy

Gets the partition assignment strategy: Confluent.Kafka.PartitionAssignmentStrategy.Range to co-localize the partitions of several topics, Confluent.Kafka.PartitionAssignmentStrategy.RoundRobin to evenly distribute the partitions among the consumer group members, Confluent.Kafka.PartitionAssignmentStrategy.CooperativeSticky to evenly distribute the partitions and limit minimize the partitions movements. The default is Confluent.Kafka.PartitionAssignmentStrategy.Range.

public PartitionAssignmentStrategy? PartitionAssignmentStrategy { get; init; }

Property Value

PartitionAssignmentStrategy?

PollingTimeout

Gets the timeout to wait for the consumer to poll for new messages before initiating a new poll. The default is 500 milliseconds.

public TimeSpan PollingTimeout { get; init; }

Property Value

TimeSpan

ProcessPartitionsIndependently

Gets a value indicating whether the partitions must be processed independently. When true a stream will be published per each partition and the sequences (ChunkSequence, BatchSequence, ...) cannot span across the partitions. The default is true.

public bool ProcessPartitionsIndependently { get; init; }

Property Value

bool

Remarks

Settings this value to false implicitly sets the MaxDegreeOfParallelism to 1.

QueuedMaxMessagesKbytes

Gets the maximum number of kilobytes of queued pre-fetched messages to store in the local consumer queue. This setting applies to the single consumer queue, regardless of the number of partitions. This value may be overshot by FetchMaxBytes. This property has higher priority than QueuedMinMessages.

public int? QueuedMaxMessagesKbytes { get; init; }

Property Value

int?

QueuedMinMessages

Gets the minimum number of messages per topic and partition that the underlying library must try to maintain in the local consumer queue.

public int? QueuedMinMessages { get; init; }

Property Value

int?

SendOffsetsToTransaction

Gets a value indicating whether the consumer should commit the consumed offsets in the same transaction of the produced messages. The default is false.

public bool SendOffsetsToTransaction { get; init; }

Property Value

bool

SessionTimeoutMs

Gets the client group session and failure detection timeout (in milliseconds). The consumer sends periodic heartbeats HeartbeatIntervalMs to indicate its liveness to the broker. If no heartbeat is received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance. Also see MaxPollIntervalMs.

public int? SessionTimeoutMs { get; init; }

Property Value

int?

StallDetectionThreshold

Gets the maximum time to wait for a message to be consumed before the consumer is considered stale. A stale consumer is automatically restarted. The default is null, which means that the consumer is never considered stale and is never restarted.

public TimeSpan? StallDetectionThreshold { get; init; }

Property Value

TimeSpan?

Methods

Equals(KafkaClientConfiguration<ConsumerConfig>?)

public override sealed bool Equals(KafkaClientConfiguration<ConsumerConfig>? other)

Parameters

other KafkaClientConfiguration<ConsumerConfig>

Returns

bool

Equals(KafkaConsumerConfiguration?)

public bool Equals(KafkaConsumerConfiguration? other)

Parameters

other KafkaConsumerConfiguration

Returns

bool

Equals(object?)

public override bool Equals(object? obj)

Parameters

obj object

Returns

bool

GetHashCode()

public override int GetHashCode()

Returns

int

MapCore()

Maps to the Confluent client configuration.

protected override ConsumerConfig MapCore()

Returns

ConsumerConfig

The Confluent client configuration.

PrintMembers(StringBuilder)

protected override bool PrintMembers(StringBuilder builder)

Parameters

builder StringBuilder

Returns

bool

ToString()

public override string ToString()

Returns

string

Validate()

Throws a SilverbackConfigurationException if the configuration is not valid.

public override void Validate()

Operators

operator ==(KafkaConsumerConfiguration?, KafkaConsumerConfiguration?)

public static bool operator ==(KafkaConsumerConfiguration? left, KafkaConsumerConfiguration? right)

Parameters

left KafkaConsumerConfiguration
right KafkaConsumerConfiguration

Returns

bool

operator !=(KafkaConsumerConfiguration?, KafkaConsumerConfiguration?)

public static bool operator !=(KafkaConsumerConfiguration? left, KafkaConsumerConfiguration? right)

Parameters

left KafkaConsumerConfiguration
right KafkaConsumerConfiguration

Returns

bool