Table of Contents

Class KafkaConsumerConfigurationBuilder

Namespace
Silverback.Messaging.Configuration.Kafka
Assembly
Silverback.Integration.Kafka.dll
public class KafkaConsumerConfigurationBuilder : KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>
Inheritance
KafkaConsumerConfigurationBuilder
Inherited Members

Constructors

KafkaConsumerConfigurationBuilder(IServiceProvider)

Initializes a new instance of the KafkaConsumerConfigurationBuilder class.

public KafkaConsumerConfigurationBuilder(IServiceProvider serviceProvider)

Parameters

serviceProvider IServiceProvider

The IServiceProvider instance.

Properties

This

Gets this instance.

protected override KafkaConsumerConfigurationBuilder This { get; }

Property Value

KafkaConsumerConfigurationBuilder

Remarks

This is necessary to work around casting in the base classes.

Methods

AutoResetOffsetToEarliest()

Specifies that the offset needs to be reset to the smallest offset, when there is no initial offset in the offset store or the desired offset is out of range.

public KafkaConsumerConfigurationBuilder AutoResetOffsetToEarliest()

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

AutoResetOffsetToLatest()

Specifies that the offset needs to be reset to the largest offset, when there is no initial offset in the offset store or the desired offset is out of range.

public KafkaConsumerConfigurationBuilder AutoResetOffsetToLatest()

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

Build()

Builds the KafkaConsumerConfiguration instance.

public KafkaConsumerConfiguration Build()

Returns

KafkaConsumerConfiguration

The KafkaConsumerConfiguration.

BuildCore()

Builds the configuration.

protected override KafkaConsumerConfiguration BuildCore()

Returns

KafkaConsumerConfiguration

The configuration.

CommitOffsetEach(int)

Defines the number of message to be processed before committing the offset to the server. The most reliable level is 1 but it reduces throughput.

public KafkaConsumerConfigurationBuilder CommitOffsetEach(int commitOffsetEach)

Parameters

commitOffsetEach int

The number of message to be processed before committing the offset to the server.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

Remarks

Setting this value automatically disables automatic offset commit (see EnableAutoCommit()/DisableAutoCommit()).

Consume(Action<KafkaConsumerEndpointConfigurationBuilder<object>>)

Adds a consumer endpoint, which is a topic, a partition, or a group of topics/partitions that share the same configuration (deserializer, error policies, etc.).

public KafkaConsumerConfigurationBuilder Consume(Action<KafkaConsumerEndpointConfigurationBuilder<object>> configurationBuilderAction)

Parameters

configurationBuilderAction Action<KafkaConsumerEndpointConfigurationBuilder<object>>

An Action that takes the KafkaConsumerConfigurationBuilder and configures it.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

Consume(string?, Action<KafkaConsumerEndpointConfigurationBuilder<object>>)

Adds a consumer endpoint, which is a topic, a partition, or a group of topics/partitions that share the same configuration (deserializer, error policies, etc.).

public KafkaConsumerConfigurationBuilder Consume(string? name, Action<KafkaConsumerEndpointConfigurationBuilder<object>> configurationBuilderAction)

Parameters

name string

The name is used to guarantee that a duplicated configuration is discarded and is also displayed in the logs. By default, the name will be generated concatenating the topic name(s).

configurationBuilderAction Action<KafkaConsumerEndpointConfigurationBuilder<object>>

An Action that takes the KafkaConsumerConfigurationBuilder and configures it.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

Consume<TMessage>(Action<KafkaConsumerEndpointConfigurationBuilder<TMessage>>)

Adds a consumer endpoint, which is a topic, a partition, or a group of topics/partitions that share the same configuration (deserializer, error policies, etc.).

public KafkaConsumerConfigurationBuilder Consume<TMessage>(Action<KafkaConsumerEndpointConfigurationBuilder<TMessage>> configurationBuilderAction)

Parameters

configurationBuilderAction Action<KafkaConsumerEndpointConfigurationBuilder<TMessage>>

An Action that takes the KafkaConsumerConfigurationBuilder and configures it.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

Type Parameters

TMessage

The type (or base type) of the messages being consumed. This is used to setup the deserializer and will determine the type of the message parameter in the nested configuration functions.

Consume<TMessage>(string?, Action<KafkaConsumerEndpointConfigurationBuilder<TMessage>>)

Adds a consumer endpoint, which is a topic, a partition, or a group of topics/partitions that share the same configuration (deserializer, error policies, etc.).

public KafkaConsumerConfigurationBuilder Consume<TMessage>(string? name, Action<KafkaConsumerEndpointConfigurationBuilder<TMessage>> configurationBuilderAction)

Parameters

name string

The name is used to guarantee that a duplicated configuration is discarded and is also displayed in the logs. By default, the name will be generated concatenating the topic name(s).

configurationBuilderAction Action<KafkaConsumerEndpointConfigurationBuilder<TMessage>>

An Action that takes the KafkaConsumerConfigurationBuilder and configures it.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

Type Parameters

TMessage

The type (or base type) of the messages being consumed. This is used to setup the deserializer and will determine the type of the message parameter in the nested configuration functions.

DisableAutoCommit()

Disable automatic offsets commit. Note: setting this does not prevent the consumer from fetching previously committed start offsets. To circumvent this behaviour set specific start offsets per partition in the call to assign().

public KafkaConsumerConfigurationBuilder DisableAutoCommit()

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

Remarks

DisableAutoOffsetReset()

Specifies that an error (ERR__AUTO_OFFSET_RESET) must be triggered, when there is no initial offset in the offset store or the desired offset is out of range.

public KafkaConsumerConfigurationBuilder DisableAutoOffsetReset()

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

DisableAutoRecovery()

Specifies that the consumer doesn't have to be automatically recycled when a Confluent.Kafka.KafkaException is thrown while polling/consuming or an issues is detected (e.g. a poll timeout is reported).

public KafkaConsumerConfigurationBuilder DisableAutoRecovery()

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

DisableCheckCrcs()

Disables the verification of the CRC32 of the consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage.

public KafkaConsumerConfigurationBuilder DisableCheckCrcs()

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

DisableOffsetsCommit()

Disables the offsets commit.

public KafkaConsumerConfigurationBuilder DisableOffsetsCommit()

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

Remarks

Disabling offsets commit will also disable auto commit (see DisableAutoCommit()) and clear the CommitOffsetEach setting (see CommitOffsetEach(int)).

DisablePartitionEof()

Don't invoke the IKafkaPartitionEofCallback when a partition end of file is reached.

public KafkaConsumerConfigurationBuilder DisablePartitionEof()

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

DisableSendOffsetsToTransaction()

Specifies that the consumer should not commit the consumed offsets in the same transaction of the produced messages. This is the default.

public KafkaConsumerConfigurationBuilder DisableSendOffsetsToTransaction()

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

EnableAutoCommit()

Automatically and periodically commit offsets in the background.

public KafkaConsumerConfigurationBuilder EnableAutoCommit()

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

Remarks

Enabling automatic offsets commit will clear the CommitOffsetEach setting (see CommitOffsetEach(int)).

EnableAutoRecovery()

Specifies that the consumer has to be automatically recycled when a Confluent.Kafka.KafkaException is thrown while polling/consuming or an issues is detected (e.g. a poll timeout is reported). This is the default.

public KafkaConsumerConfigurationBuilder EnableAutoRecovery()

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

EnableCheckCrcs()

Enables the verification of the CRC32 of the consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage.

public KafkaConsumerConfigurationBuilder EnableCheckCrcs()

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

EnableOffsetsCommit()

Enable the offsets commit. This is the default.

public KafkaConsumerConfigurationBuilder EnableOffsetsCommit()

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

EnablePartitionEof()

Invoke the IKafkaPartitionEofCallback whenever a partition end of file is reached.

public KafkaConsumerConfigurationBuilder EnablePartitionEof()

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

LimitBackpressure(int)

Sets the maximum number of messages to be consumed and enqueued waiting to be processed. The limit will be applied per partition when processing the partitions independently (default). The default limit is 2.

public KafkaConsumerConfigurationBuilder LimitBackpressure(int backpressureLimit)

Parameters

backpressureLimit int

The maximum number of messages to be enqueued.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

LimitParallelism(int)

Sets the maximum number of incoming message that can be processed concurrently. Up to a message per each subscribed partition can be processed in parallel. The default limit is 100.

public KafkaConsumerConfigurationBuilder LimitParallelism(int maxDegreeOfParallelism)

Parameters

maxDegreeOfParallelism int

The maximum number of incoming message that can be processed concurrently.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

ProcessAllPartitionsTogether()

Specifies that all partitions must be processed together. This means that a single stream will published for the messages from all the partitions and the sequences (ChunkSequence, BatchSequence, ...) can span across the partitions.

public KafkaConsumerConfigurationBuilder ProcessAllPartitionsTogether()

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

ProcessPartitionsIndependently()

Specifies that the partitions must be processed independently. This means that a stream will published per each partition and the sequences (ChunkSequence, BatchSequence, ...) cannot span across the partitions. This option is enabled by default. Use ProcessAllPartitionsTogether() to disable it.

public KafkaConsumerConfigurationBuilder ProcessPartitionsIndependently()

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

SendOffsetsToTransaction()

Specifies that the consumer should commit the consumed offsets in the same transaction of the produced messages.

public KafkaConsumerConfigurationBuilder SendOffsetsToTransaction()

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

StoreOffsetsClientSide(KafkaOffsetStoreSettings)

Specifies that the offsets have to stored in the specified client store, additionally to or instead of being committed to the message broker.

public KafkaConsumerConfigurationBuilder StoreOffsetsClientSide(KafkaOffsetStoreSettings settings)

Parameters

settings KafkaOffsetStoreSettings

The offset store settings.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

StoreOffsetsClientSide(Func<KafkaOffsetStoreSettingsBuilder, IKafkaOffsetStoreSettingsImplementationBuilder>)

Specifies that the offsets have to stored in the specified client store, additionally to or instead of being committed to the message broker.

public KafkaConsumerConfigurationBuilder StoreOffsetsClientSide(Func<KafkaOffsetStoreSettingsBuilder, IKafkaOffsetStoreSettingsImplementationBuilder> settingsBuilderFunc)

Parameters

settingsBuilderFunc Func<KafkaOffsetStoreSettingsBuilder, IKafkaOffsetStoreSettingsImplementationBuilder>

A Func<TResult> that takes the KafkaOffsetStoreSettingsBuilder and configures it.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

WithAutoCommitIntervalMs(int?)

Sets the frequency in milliseconds at which the consumer offsets are committed.

public KafkaConsumerConfigurationBuilder WithAutoCommitIntervalMs(int? autoCommitIntervalMs)

Parameters

autoCommitIntervalMs int?

The frequency at which the consumer offsets are committed.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

WithAutoOffsetReset(AutoOffsetReset?)

Sets the action to take when there is no initial offset in the offset store or the desired offset is out of range: Confluent.Kafka.AutoOffsetReset.Earliest to automatically reset to the smallest offset, Confluent.Kafka.AutoOffsetReset.Latest to automatically reset to the largest offset, and Confluent.Kafka.AutoOffsetReset.Error to trigger an error (ERR__AUTO_OFFSET_RESET).

public KafkaConsumerConfigurationBuilder WithAutoOffsetReset(AutoOffsetReset? autoOffsetReset)

Parameters

autoOffsetReset AutoOffsetReset?

The action to take when there is no initial offset in the offset store or the desired offset is out of range.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

WithConsumeResultFields(string?)

Sets a comma-separated list of fields that may be optionally set in Confluent.Kafka.ConsumeResult<TKey, TValue> objects returned by the Consume(TimeSpan) method. Disabling fields that you do not require will improve throughput and reduce memory consumption. Allowed values: headers, timestamp, topic, all, none.

public KafkaConsumerConfigurationBuilder WithConsumeResultFields(string? consumeResultFields)

Parameters

consumeResultFields string

A comma-separated list of fields that may be optionally set in Confluent.Kafka.ConsumeResult<TKey, TValue>.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

WithCooperativeStickyPartitionAssignmentStrategy()

Sets the partition assignment strategy to Confluent.Kafka.PartitionAssignmentStrategy.CooperativeSticky to evenly distribute the partitions and minimize the partitions movements.

public KafkaConsumerConfigurationBuilder WithCooperativeStickyPartitionAssignmentStrategy()

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

WithCoordinatorQueryIntervalMs(int?)

Sets the interval (in milliseconds) at which the current group coordinator must be queried. If the currently assigned coordinator is down the configured query interval will be divided by ten to more quickly recover in case of coordinator reassignment.

public KafkaConsumerConfigurationBuilder WithCoordinatorQueryIntervalMs(int? coordinatorQueryIntervalMs)

Parameters

coordinatorQueryIntervalMs int?

The interval at which the current group coordinator must be queried.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

WithFetchErrorBackoffMs(int?)

Sets how long to postpone the next fetch request for a topic and partition in case of a fetch error.

public KafkaConsumerConfigurationBuilder WithFetchErrorBackoffMs(int? fetchErrorBackoffMs)

Parameters

fetchErrorBackoffMs int?

How long to postpone the next fetch request in case of a fetch error.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

WithFetchMaxBytes(int?)

Sets the maximum amount of data the broker shall return for a fetch request. The messages are fetched in batches by the consumer and if the first message batch in the first non-empty partition of the fetch request is larger than this value, then the message batch will still be returned to ensure that the consumer can make progress. The maximum message batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (broker topic config). This value is automatically adjusted upwards to be at least message.max.bytes (consumer config).

public KafkaConsumerConfigurationBuilder WithFetchMaxBytes(int? fetchMaxBytes)

Parameters

fetchMaxBytes int?

The maximum amount of data the broker shall return for a fetch request.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

WithFetchMinBytes(int?)

Sets the minimum number of bytes that the broker must respond with. If FetchWaitMaxMs expires the accumulated data will be sent to the client regardless of this setting.

public KafkaConsumerConfigurationBuilder WithFetchMinBytes(int? fetchMinBytes)

Parameters

fetchMinBytes int?

The minimum number of bytes that the broker must respond with.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

WithFetchQueueBackoffMs(int?)

Sets the maximum time in milliseconds to postpone the next fetch request for a topic+partition in case the current fetch queue thresholds (QueuedMinMessages or QueuedMaxMessagesKbytes) have been exceeded. This property may need to be decreased if the queue thresholds are set low and the application is experiencing long (~1s) delays between messages. Low values may increase CPU utilization.

public KafkaConsumerConfigurationBuilder WithFetchQueueBackoffMs(int? fetchQueueBackoffMs)

Parameters

fetchQueueBackoffMs int?

The maximum time in milliseconds to postpone the next fetch request for a topic+partition in case the current fetch queue thresholds have been exceeded.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

WithFetchWaitMaxMs(int?)

Sets the maximum time (in milliseconds) that the broker may wait to fill the fetch response with enough messages to match the size specified by FetchMinBytes.

public KafkaConsumerConfigurationBuilder WithFetchWaitMaxMs(int? fetchWaitMaxMs)

Parameters

fetchWaitMaxMs int?

The maximum time that the broker may wait to fill the fetch response.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

WithGetMetadataTimeout(TimeSpan?)

Sets the timeout used to wait for the metadata to be retrieved from the broker. The default is 30 seconds.

public KafkaConsumerConfigurationBuilder WithGetMetadataTimeout(TimeSpan? getMetadataTimeout)

Parameters

getMetadataTimeout TimeSpan?

The timeout used to wait for the metadata to be retrieved from the broker.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

WithGroupId(string?)

Sets the client group id string. All clients sharing the same group.id belong to the same group.

public KafkaConsumerConfigurationBuilder WithGroupId(string? groupId)

Parameters

groupId string

The client group id string. All clients sharing the same group.id belong to the same group.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

WithGroupInstanceId(string?)

Sets the static instance id used to enable static group membership. Static group members are able to leave and rejoin a group within the configured SessionTimeoutMs without prompting a group rebalance. This should be used in combination with a larger SessionTimeoutMs to avoid group rebalances caused by transient unavailability (e.g. process restarts). Requires broker version >= 2.3.0.

public KafkaConsumerConfigurationBuilder WithGroupInstanceId(string? groupInstanceId)

Parameters

groupInstanceId string

The static instance id used to enable static group membership.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

WithGroupProtocol(GroupProtocol?)

Sets the group protocol to use. Use Confluent.Kafka.GroupProtocol.Classic for the original protocol and Confluent.Kafka.GroupProtocol.Consumer for the new protocol introduced in KIP-848. Default is Confluent.Kafka.GroupProtocol.Classic, but will change to Confluent.Kafka.GroupProtocol.Consumer in next releases.

public KafkaConsumerConfigurationBuilder WithGroupProtocol(GroupProtocol? groupProtocol)

Parameters

groupProtocol GroupProtocol?

The group protocol to use. Use Confluent.Kafka.GroupProtocol.Classic for the original protocol and Confluent.Kafka.GroupProtocol.Consumer for the new protocol introduced in KIP-848.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

WithGroupProtocolType(string?)

Sets the group protocol type.

public KafkaConsumerConfigurationBuilder WithGroupProtocolType(string? groupProtocolType)

Parameters

groupProtocolType string

The group protocol type.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

WithGroupRemoteAssignor(string?)

Sets the server-side assignor to use. Keep it null to make the server select a suitable assignor for the group. Available assignors: uniform or range.

public KafkaConsumerConfigurationBuilder WithGroupRemoteAssignor(string? groupRemoteAssignor)

Parameters

groupRemoteAssignor string

The server-side assignor to use.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

WithHeartbeatIntervalMs(int?)

Sets the interval (in milliseconds) at which the heartbeats have to be sent to the broker.

public KafkaConsumerConfigurationBuilder WithHeartbeatIntervalMs(int? heartbeatIntervalMs)

Parameters

heartbeatIntervalMs int?

The interval at which the heartbeats have to be sent.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

WithIsolationLevel(IsolationLevel?)

Sets a value indicating how to read messages written inside a transaction: Confluent.Kafka.IsolationLevel.ReadCommitted to only return transactional messages which have been committed, or Confluent.Kafka.IsolationLevel.ReadUncommitted to return all messages, even transactional messages which have been aborted.

public KafkaConsumerConfigurationBuilder WithIsolationLevel(IsolationLevel? isolationLevel)

Parameters

isolationLevel IsolationLevel?

A value indicating how to read messages written inside a transaction.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

WithMaxPartitionFetchBytes(int?)

Sets the initial maximum number of bytes per topic and partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched.

public KafkaConsumerConfigurationBuilder WithMaxPartitionFetchBytes(int? maxPartitionFetchBytes)

Parameters

maxPartitionFetchBytes int?

The initial maximum number of bytes per topic and partition to request when fetching messages.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

WithMaxPollIntervalMs(int?)

Sets the maximum allowed time (in milliseconds) between calls to consume messages. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member.
Warning: Offset commits may be not possible at this point.

public KafkaConsumerConfigurationBuilder WithMaxPollIntervalMs(int? maxPollIntervalMs)

Parameters

maxPollIntervalMs int?

The maximum allowed time between calls to consume messages.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

WithPartitionAssignmentStrategy(PartitionAssignmentStrategy?)

Sets the partition assignment strategy: Confluent.Kafka.PartitionAssignmentStrategy.Range to co-localize the partitions of several topics, Confluent.Kafka.PartitionAssignmentStrategy.RoundRobin to evenly distribute the partitions among the consumer group members, Confluent.Kafka.PartitionAssignmentStrategy.CooperativeSticky to evenly distribute the partitions and minimize the partitions movements. The default is Confluent.Kafka.PartitionAssignmentStrategy.Range.

public KafkaConsumerConfigurationBuilder WithPartitionAssignmentStrategy(PartitionAssignmentStrategy? partitionAssignmentStrategy)

Parameters

partitionAssignmentStrategy PartitionAssignmentStrategy?

The partition assignment strategy.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

WithPollingTimeout(TimeSpan)

Sets the timeout to wait for the consumer to poll for new messages before initiating a new poll. The default is 100 milliseconds.

public KafkaConsumerConfigurationBuilder WithPollingTimeout(TimeSpan pollingTimeout)

Parameters

pollingTimeout TimeSpan

The timeout to wait for the consumer to poll for new messages.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

WithQueuedMaxMessagesKbytes(int?)

Sets the maximum number of kilobytes of queued pre-fetched messages to store in the local consumer queue. This setting applies to the single consumer queue, regardless of the number of partitions. This value may be overshot by FetchMaxBytes. This property has higher priority than QueuedMinMessages.

public KafkaConsumerConfigurationBuilder WithQueuedMaxMessagesKbytes(int? queuedMaxMessagesKbytes)

Parameters

queuedMaxMessagesKbytes int?

The maximum number of kilobytes of queued pre-fetched messages to store in the local consumer queue.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

WithQueuedMinMessages(int?)

Sets the minimum number of messages per topic and partition that the underlying library must try to maintain in the local consumer queue.

public KafkaConsumerConfigurationBuilder WithQueuedMinMessages(int? queuedMinMessages)

Parameters

queuedMinMessages int?

The minimum number of messages that must be maintained in the local consumer queue.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

WithRangePartitionAssignmentStrategy()

Sets the partition assignment strategy to Confluent.Kafka.PartitionAssignmentStrategy.Range to co-localize the partitions of several topics.

public KafkaConsumerConfigurationBuilder WithRangePartitionAssignmentStrategy()

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

WithRoundRobinPartitionAssignmentStrategy()

Sets the partition assignment strategy to Confluent.Kafka.PartitionAssignmentStrategy.RoundRobin to evenly distribute the partitions among the consumer group members.

public KafkaConsumerConfigurationBuilder WithRoundRobinPartitionAssignmentStrategy()

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

WithSessionTimeoutMs(int?)

Sets the client group session and failure detection timeout (in milliseconds). The consumer sends periodic heartbeats HeartbeatIntervalMs to indicate its liveness to the broker. If no heartbeat is received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance. Also see MaxPollIntervalMs.

public KafkaConsumerConfigurationBuilder WithSessionTimeoutMs(int? sessionTimeoutMs)

Parameters

sessionTimeoutMs int?

The client group session and failure detection timeout.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

WithStallDetectionThreshold(TimeSpan?)

Sets the maximum time to wait for a message to be consumed before the consumer is considered stale. A stale consumer is automatically restarted. The default is null, which means that the consumer is never considered stale and is never restarted.

public KafkaConsumerConfigurationBuilder WithStallDetectionThreshold(TimeSpan? stallDetectionThreshold)

Parameters

stallDetectionThreshold TimeSpan?

The maximum time to wait for a message to be consumed before the consumer is considered stale.

Returns

KafkaConsumerConfigurationBuilder

The KafkaConsumerConfigurationBuilder so that additional calls can be chained.