Class KafkaConsumerConfigurationBuilder
- Namespace
- Silverback.Messaging.Configuration.Kafka
- Assembly
- Silverback.Integration.Kafka.dll
Builds the KafkaConsumerConfiguration.
public class KafkaConsumerConfigurationBuilder : KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>
- Inheritance
-
KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>KafkaConsumerConfigurationBuilder
- Inherited Members
Constructors
KafkaConsumerConfigurationBuilder(IServiceProvider)
Initializes a new instance of the KafkaConsumerConfigurationBuilder class.
public KafkaConsumerConfigurationBuilder(IServiceProvider serviceProvider)
Parameters
serviceProviderIServiceProviderThe IServiceProvider instance.
Properties
This
Gets this instance.
protected override KafkaConsumerConfigurationBuilder This { get; }
Property Value
Remarks
This is necessary to work around casting in the base classes.
Methods
AutoResetOffsetToEarliest()
Specifies that the offset needs to be reset to the smallest offset, when there is no initial offset in the offset store or the desired offset is out of range.
public KafkaConsumerConfigurationBuilder AutoResetOffsetToEarliest()
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
AutoResetOffsetToLatest()
Specifies that the offset needs to be reset to the largest offset, when there is no initial offset in the offset store or the desired offset is out of range.
public KafkaConsumerConfigurationBuilder AutoResetOffsetToLatest()
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
Build()
Builds the KafkaConsumerConfiguration instance.
public KafkaConsumerConfiguration Build()
Returns
BuildCore()
Builds the configuration.
protected override KafkaConsumerConfiguration BuildCore()
Returns
- KafkaConsumerConfiguration
The configuration.
CommitOffsetEach(int)
Defines the number of message to be processed before committing the offset to the server. The most reliable level is 1 but it reduces throughput.
public KafkaConsumerConfigurationBuilder CommitOffsetEach(int commitOffsetEach)
Parameters
commitOffsetEachintThe number of message to be processed before committing the offset to the server.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
Remarks
Setting this value automatically disables automatic offset commit (see EnableAutoCommit()/DisableAutoCommit()).
Consume(Action<KafkaConsumerEndpointConfigurationBuilder<object>>)
Adds a consumer endpoint, which is a topic, a partition, or a group of topics/partitions that share the same configuration (deserializer, error policies, etc.).
public KafkaConsumerConfigurationBuilder Consume(Action<KafkaConsumerEndpointConfigurationBuilder<object>> configurationBuilderAction)
Parameters
configurationBuilderActionAction<KafkaConsumerEndpointConfigurationBuilder<object>>An Action that takes the KafkaConsumerConfigurationBuilder and configures it.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
Consume(string?, Action<KafkaConsumerEndpointConfigurationBuilder<object>>)
Adds a consumer endpoint, which is a topic, a partition, or a group of topics/partitions that share the same configuration (deserializer, error policies, etc.).
public KafkaConsumerConfigurationBuilder Consume(string? name, Action<KafkaConsumerEndpointConfigurationBuilder<object>> configurationBuilderAction)
Parameters
namestringThe name is used to guarantee that a duplicated configuration is discarded and is also displayed in the logs. By default, the name will be generated concatenating the topic name(s).
configurationBuilderActionAction<KafkaConsumerEndpointConfigurationBuilder<object>>An Action that takes the KafkaConsumerConfigurationBuilder and configures it.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
Consume<TMessage>(Action<KafkaConsumerEndpointConfigurationBuilder<TMessage>>)
Adds a consumer endpoint, which is a topic, a partition, or a group of topics/partitions that share the same configuration (deserializer, error policies, etc.).
public KafkaConsumerConfigurationBuilder Consume<TMessage>(Action<KafkaConsumerEndpointConfigurationBuilder<TMessage>> configurationBuilderAction)
Parameters
configurationBuilderActionAction<KafkaConsumerEndpointConfigurationBuilder<TMessage>>An Action that takes the KafkaConsumerConfigurationBuilder and configures it.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
Type Parameters
TMessageThe type (or base type) of the messages being consumed. This is used to setup the deserializer and will determine the type of the message parameter in the nested configuration functions.
Consume<TMessage>(string?, Action<KafkaConsumerEndpointConfigurationBuilder<TMessage>>)
Adds a consumer endpoint, which is a topic, a partition, or a group of topics/partitions that share the same configuration (deserializer, error policies, etc.).
public KafkaConsumerConfigurationBuilder Consume<TMessage>(string? name, Action<KafkaConsumerEndpointConfigurationBuilder<TMessage>> configurationBuilderAction)
Parameters
namestringThe name is used to guarantee that a duplicated configuration is discarded and is also displayed in the logs. By default, the name will be generated concatenating the topic name(s).
configurationBuilderActionAction<KafkaConsumerEndpointConfigurationBuilder<TMessage>>An Action that takes the KafkaConsumerConfigurationBuilder and configures it.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
Type Parameters
TMessageThe type (or base type) of the messages being consumed. This is used to setup the deserializer and will determine the type of the message parameter in the nested configuration functions.
DisableAutoCommit()
Disable automatic offsets commit. Note: setting this does not prevent the consumer from fetching previously committed start offsets. To circumvent this behaviour set specific start offsets per partition in the call to assign().
public KafkaConsumerConfigurationBuilder DisableAutoCommit()
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
Remarks
See also CommitOffsetEach(int).
DisableAutoOffsetReset()
Specifies that an error (ERR__AUTO_OFFSET_RESET) must be triggered, when there is no initial offset in the offset store or the desired offset is out of range.
public KafkaConsumerConfigurationBuilder DisableAutoOffsetReset()
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
DisableAutoRecovery()
Specifies that the consumer doesn't have to be automatically recycled when a Confluent.Kafka.KafkaException is thrown while polling/consuming or an issues is detected (e.g. a poll timeout is reported).
public KafkaConsumerConfigurationBuilder DisableAutoRecovery()
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
DisableCheckCrcs()
Disables the verification of the CRC32 of the consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage.
public KafkaConsumerConfigurationBuilder DisableCheckCrcs()
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
DisableOffsetsCommit()
Disables the offsets commit.
public KafkaConsumerConfigurationBuilder DisableOffsetsCommit()
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
Remarks
Disabling offsets commit will also disable auto commit (see DisableAutoCommit()) and clear the CommitOffsetEach setting (see CommitOffsetEach(int)).
DisablePartitionEof()
Don't invoke the IKafkaPartitionEofCallback when a partition end of file is reached.
public KafkaConsumerConfigurationBuilder DisablePartitionEof()
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
DisableSendOffsetsToTransaction()
Specifies that the consumer should not commit the consumed offsets in the same transaction of the produced messages. This is the default.
public KafkaConsumerConfigurationBuilder DisableSendOffsetsToTransaction()
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
EnableAutoCommit()
Automatically and periodically commit offsets in the background.
public KafkaConsumerConfigurationBuilder EnableAutoCommit()
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
Remarks
Enabling automatic offsets commit will clear the CommitOffsetEach setting (see CommitOffsetEach(int)).
EnableAutoRecovery()
Specifies that the consumer has to be automatically recycled when a Confluent.Kafka.KafkaException is thrown while polling/consuming or an issues is detected (e.g. a poll timeout is reported). This is the default.
public KafkaConsumerConfigurationBuilder EnableAutoRecovery()
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
EnableCheckCrcs()
Enables the verification of the CRC32 of the consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage.
public KafkaConsumerConfigurationBuilder EnableCheckCrcs()
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
EnableOffsetsCommit()
Enable the offsets commit. This is the default.
public KafkaConsumerConfigurationBuilder EnableOffsetsCommit()
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
EnablePartitionEof()
Invoke the IKafkaPartitionEofCallback whenever a partition end of file is reached.
public KafkaConsumerConfigurationBuilder EnablePartitionEof()
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
LimitBackpressure(int)
Sets the maximum number of messages to be consumed and enqueued waiting to be processed. The limit will be applied per partition when processing the partitions independently (default). The default limit is 2.
public KafkaConsumerConfigurationBuilder LimitBackpressure(int backpressureLimit)
Parameters
backpressureLimitintThe maximum number of messages to be enqueued.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
LimitParallelism(int)
Sets the maximum number of incoming message that can be processed concurrently. Up to a message per each subscribed partition can be processed in parallel. The default limit is 100.
public KafkaConsumerConfigurationBuilder LimitParallelism(int maxDegreeOfParallelism)
Parameters
maxDegreeOfParallelismintThe maximum number of incoming message that can be processed concurrently.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
ProcessAllPartitionsTogether()
Specifies that all partitions must be processed together. This means that a single stream will published for the messages from all the partitions and the sequences (ChunkSequence, BatchSequence, ...) can span across the partitions.
public KafkaConsumerConfigurationBuilder ProcessAllPartitionsTogether()
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
ProcessPartitionsIndependently()
Specifies that the partitions must be processed independently. This means that a stream will published per each partition and the sequences (ChunkSequence, BatchSequence, ...) cannot span across the partitions. This option is enabled by default. Use ProcessAllPartitionsTogether() to disable it.
public KafkaConsumerConfigurationBuilder ProcessPartitionsIndependently()
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
SendOffsetsToTransaction()
Specifies that the consumer should commit the consumed offsets in the same transaction of the produced messages.
public KafkaConsumerConfigurationBuilder SendOffsetsToTransaction()
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
StoreOffsetsClientSide(KafkaOffsetStoreSettings)
Specifies that the offsets have to stored in the specified client store, additionally to or instead of being committed to the message broker.
public KafkaConsumerConfigurationBuilder StoreOffsetsClientSide(KafkaOffsetStoreSettings settings)
Parameters
settingsKafkaOffsetStoreSettingsThe offset store settings.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
StoreOffsetsClientSide(Func<KafkaOffsetStoreSettingsBuilder, IKafkaOffsetStoreSettingsImplementationBuilder>)
Specifies that the offsets have to stored in the specified client store, additionally to or instead of being committed to the message broker.
public KafkaConsumerConfigurationBuilder StoreOffsetsClientSide(Func<KafkaOffsetStoreSettingsBuilder, IKafkaOffsetStoreSettingsImplementationBuilder> settingsBuilderFunc)
Parameters
settingsBuilderFuncFunc<KafkaOffsetStoreSettingsBuilder, IKafkaOffsetStoreSettingsImplementationBuilder>A Func<TResult> that takes the KafkaOffsetStoreSettingsBuilder and configures it.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
WithAutoCommitIntervalMs(int?)
Sets the frequency in milliseconds at which the consumer offsets are committed.
public KafkaConsumerConfigurationBuilder WithAutoCommitIntervalMs(int? autoCommitIntervalMs)
Parameters
autoCommitIntervalMsint?The frequency at which the consumer offsets are committed.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
WithAutoOffsetReset(AutoOffsetReset?)
Sets the action to take when there is no initial offset in the offset store or the desired offset is out of range: Confluent.Kafka.AutoOffsetReset.Earliest to automatically reset to the smallest offset, Confluent.Kafka.AutoOffsetReset.Latest to automatically reset to the largest offset, and Confluent.Kafka.AutoOffsetReset.Error to trigger an error (ERR__AUTO_OFFSET_RESET).
public KafkaConsumerConfigurationBuilder WithAutoOffsetReset(AutoOffsetReset? autoOffsetReset)
Parameters
autoOffsetResetAutoOffsetReset?The action to take when there is no initial offset in the offset store or the desired offset is out of range.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
WithConsumeResultFields(string?)
Sets a comma-separated list of fields that may be optionally set in Confluent.Kafka.ConsumeResult<TKey, TValue> objects returned by
the Consume(TimeSpan) method. Disabling fields that you do not require will improve
throughput and reduce memory consumption. Allowed values: headers, timestamp, topic, all, none.
public KafkaConsumerConfigurationBuilder WithConsumeResultFields(string? consumeResultFields)
Parameters
consumeResultFieldsstringA comma-separated list of fields that may be optionally set in Confluent.Kafka.ConsumeResult<TKey, TValue>.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
WithCooperativeStickyPartitionAssignmentStrategy()
Sets the partition assignment strategy to Confluent.Kafka.PartitionAssignmentStrategy.CooperativeSticky to evenly distribute the partitions and minimize the partitions movements.
public KafkaConsumerConfigurationBuilder WithCooperativeStickyPartitionAssignmentStrategy()
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
WithCoordinatorQueryIntervalMs(int?)
Sets the interval (in milliseconds) at which the current group coordinator must be queried. If the currently assigned coordinator is down the configured query interval will be divided by ten to more quickly recover in case of coordinator reassignment.
public KafkaConsumerConfigurationBuilder WithCoordinatorQueryIntervalMs(int? coordinatorQueryIntervalMs)
Parameters
coordinatorQueryIntervalMsint?The interval at which the current group coordinator must be queried.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
WithFetchErrorBackoffMs(int?)
Sets how long to postpone the next fetch request for a topic and partition in case of a fetch error.
public KafkaConsumerConfigurationBuilder WithFetchErrorBackoffMs(int? fetchErrorBackoffMs)
Parameters
fetchErrorBackoffMsint?How long to postpone the next fetch request in case of a fetch error.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
WithFetchMaxBytes(int?)
Sets the maximum amount of data the broker shall return for a fetch request. The messages are fetched in batches by the consumer
and if the first message batch in the first non-empty partition of the fetch request is larger than this value, then the message
batch will still be returned to ensure that the consumer can make progress. The maximum message batch size accepted by the broker
is defined via message.max.bytes (broker config) or max.message.bytes (broker topic config). This value is automatically
adjusted upwards to be at least message.max.bytes (consumer config).
public KafkaConsumerConfigurationBuilder WithFetchMaxBytes(int? fetchMaxBytes)
Parameters
fetchMaxBytesint?The maximum amount of data the broker shall return for a fetch request.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
WithFetchMinBytes(int?)
Sets the minimum number of bytes that the broker must respond with. If FetchWaitMaxMs expires the accumulated data will be sent to the client regardless of this setting.
public KafkaConsumerConfigurationBuilder WithFetchMinBytes(int? fetchMinBytes)
Parameters
fetchMinBytesint?The minimum number of bytes that the broker must respond with.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
WithFetchQueueBackoffMs(int?)
Sets the maximum time in milliseconds to postpone the next fetch request for a topic+partition in case the current fetch queue thresholds (QueuedMinMessages or QueuedMaxMessagesKbytes) have been exceeded. This property may need to be decreased if the queue thresholds are set low and the application is experiencing long (~1s) delays between messages. Low values may increase CPU utilization.
public KafkaConsumerConfigurationBuilder WithFetchQueueBackoffMs(int? fetchQueueBackoffMs)
Parameters
fetchQueueBackoffMsint?The maximum time in milliseconds to postpone the next fetch request for a topic+partition in case the current fetch queue thresholds have been exceeded.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
WithFetchWaitMaxMs(int?)
Sets the maximum time (in milliseconds) that the broker may wait to fill the fetch response with enough messages to match the size specified by FetchMinBytes.
public KafkaConsumerConfigurationBuilder WithFetchWaitMaxMs(int? fetchWaitMaxMs)
Parameters
fetchWaitMaxMsint?The maximum time that the broker may wait to fill the fetch response.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
WithGetMetadataTimeout(TimeSpan?)
Sets the timeout used to wait for the metadata to be retrieved from the broker. The default is 30 seconds.
public KafkaConsumerConfigurationBuilder WithGetMetadataTimeout(TimeSpan? getMetadataTimeout)
Parameters
getMetadataTimeoutTimeSpan?The timeout used to wait for the metadata to be retrieved from the broker.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
WithGroupId(string?)
Sets the client group id string. All clients sharing the same group.id belong to the same group.
public KafkaConsumerConfigurationBuilder WithGroupId(string? groupId)
Parameters
groupIdstringThe client group id string. All clients sharing the same group.id belong to the same group.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
WithGroupInstanceId(string?)
Sets the static instance id used to enable static group membership. Static group members are able to leave and rejoin a group within the configured SessionTimeoutMs without prompting a group rebalance. This should be used in combination with a larger SessionTimeoutMs to avoid group rebalances caused by transient unavailability (e.g. process restarts). Requires broker version >= 2.3.0.
public KafkaConsumerConfigurationBuilder WithGroupInstanceId(string? groupInstanceId)
Parameters
groupInstanceIdstringThe static instance id used to enable static group membership.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
WithGroupProtocol(GroupProtocol?)
Sets the group protocol to use. Use Confluent.Kafka.GroupProtocol.Classic for the original protocol and Confluent.Kafka.GroupProtocol.Consumer for the new protocol introduced in KIP-848. Default is Confluent.Kafka.GroupProtocol.Classic, but will change to Confluent.Kafka.GroupProtocol.Consumer in next releases.
public KafkaConsumerConfigurationBuilder WithGroupProtocol(GroupProtocol? groupProtocol)
Parameters
groupProtocolGroupProtocol?The group protocol to use. Use Confluent.Kafka.GroupProtocol.Classic for the original protocol and Confluent.Kafka.GroupProtocol.Consumer for the new protocol introduced in KIP-848.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
WithGroupProtocolType(string?)
Sets the group protocol type.
public KafkaConsumerConfigurationBuilder WithGroupProtocolType(string? groupProtocolType)
Parameters
groupProtocolTypestringThe group protocol type.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
WithGroupRemoteAssignor(string?)
Sets the server-side assignor to use. Keep it null to make the server select a suitable assignor for the group.
Available assignors: uniform or range.
public KafkaConsumerConfigurationBuilder WithGroupRemoteAssignor(string? groupRemoteAssignor)
Parameters
groupRemoteAssignorstringThe server-side assignor to use.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
WithHeartbeatIntervalMs(int?)
Sets the interval (in milliseconds) at which the heartbeats have to be sent to the broker.
public KafkaConsumerConfigurationBuilder WithHeartbeatIntervalMs(int? heartbeatIntervalMs)
Parameters
heartbeatIntervalMsint?The interval at which the heartbeats have to be sent.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
WithIsolationLevel(IsolationLevel?)
Sets a value indicating how to read messages written inside a transaction: Confluent.Kafka.IsolationLevel.ReadCommitted to only return transactional messages which have been committed, or Confluent.Kafka.IsolationLevel.ReadUncommitted to return all messages, even transactional messages which have been aborted.
public KafkaConsumerConfigurationBuilder WithIsolationLevel(IsolationLevel? isolationLevel)
Parameters
isolationLevelIsolationLevel?A value indicating how to read messages written inside a transaction.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
WithMaxPartitionFetchBytes(int?)
Sets the initial maximum number of bytes per topic and partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched.
public KafkaConsumerConfigurationBuilder WithMaxPartitionFetchBytes(int? maxPartitionFetchBytes)
Parameters
maxPartitionFetchBytesint?The initial maximum number of bytes per topic and partition to request when fetching messages.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
WithMaxPollIntervalMs(int?)
Sets the maximum allowed time (in milliseconds) between calls to consume messages. If this interval is exceeded the consumer is
considered failed and the group will rebalance in order to reassign the partitions to another consumer group member.
Warning: Offset commits may be not possible at this point.
public KafkaConsumerConfigurationBuilder WithMaxPollIntervalMs(int? maxPollIntervalMs)
Parameters
maxPollIntervalMsint?The maximum allowed time between calls to consume messages.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
WithPartitionAssignmentStrategy(PartitionAssignmentStrategy?)
Sets the partition assignment strategy: Confluent.Kafka.PartitionAssignmentStrategy.Range to co-localize the partitions of several topics, Confluent.Kafka.PartitionAssignmentStrategy.RoundRobin to evenly distribute the partitions among the consumer group members, Confluent.Kafka.PartitionAssignmentStrategy.CooperativeSticky to evenly distribute the partitions and minimize the partitions movements. The default is Confluent.Kafka.PartitionAssignmentStrategy.Range.
public KafkaConsumerConfigurationBuilder WithPartitionAssignmentStrategy(PartitionAssignmentStrategy? partitionAssignmentStrategy)
Parameters
partitionAssignmentStrategyPartitionAssignmentStrategy?The partition assignment strategy.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
WithPollingTimeout(TimeSpan)
Sets the timeout to wait for the consumer to poll for new messages before initiating a new poll. The default is 100 milliseconds.
public KafkaConsumerConfigurationBuilder WithPollingTimeout(TimeSpan pollingTimeout)
Parameters
pollingTimeoutTimeSpanThe timeout to wait for the consumer to poll for new messages.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
WithQueuedMaxMessagesKbytes(int?)
Sets the maximum number of kilobytes of queued pre-fetched messages to store in the local consumer queue. This setting applies to the single consumer queue, regardless of the number of partitions. This value may be overshot by FetchMaxBytes. This property has higher priority than QueuedMinMessages.
public KafkaConsumerConfigurationBuilder WithQueuedMaxMessagesKbytes(int? queuedMaxMessagesKbytes)
Parameters
queuedMaxMessagesKbytesint?The maximum number of kilobytes of queued pre-fetched messages to store in the local consumer queue.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
WithQueuedMinMessages(int?)
Sets the minimum number of messages per topic and partition that the underlying library must try to maintain in the local consumer queue.
public KafkaConsumerConfigurationBuilder WithQueuedMinMessages(int? queuedMinMessages)
Parameters
queuedMinMessagesint?The minimum number of messages that must be maintained in the local consumer queue.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
WithRangePartitionAssignmentStrategy()
Sets the partition assignment strategy to Confluent.Kafka.PartitionAssignmentStrategy.Range to co-localize the partitions of several topics.
public KafkaConsumerConfigurationBuilder WithRangePartitionAssignmentStrategy()
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
WithRoundRobinPartitionAssignmentStrategy()
Sets the partition assignment strategy to Confluent.Kafka.PartitionAssignmentStrategy.RoundRobin to evenly distribute the partitions among the consumer group members.
public KafkaConsumerConfigurationBuilder WithRoundRobinPartitionAssignmentStrategy()
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
WithSessionTimeoutMs(int?)
Sets the client group session and failure detection timeout (in milliseconds). The consumer sends periodic heartbeats HeartbeatIntervalMs to indicate its liveness to the broker. If no heartbeat is received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance. Also see MaxPollIntervalMs.
public KafkaConsumerConfigurationBuilder WithSessionTimeoutMs(int? sessionTimeoutMs)
Parameters
sessionTimeoutMsint?The client group session and failure detection timeout.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.
WithStallDetectionThreshold(TimeSpan?)
Sets the maximum time to wait for a message to be consumed before the consumer is considered stale. A stale consumer is
automatically restarted.
The default is null, which means that the consumer is never considered stale and is never restarted.
public KafkaConsumerConfigurationBuilder WithStallDetectionThreshold(TimeSpan? stallDetectionThreshold)
Parameters
stallDetectionThresholdTimeSpan?The maximum time to wait for a message to be consumed before the consumer is considered stale.
Returns
- KafkaConsumerConfigurationBuilder
The KafkaConsumerConfigurationBuilder so that additional calls can be chained.