Class KafkaConsumerConfigurationBuilder
Builds the KafkaConsumerConfiguration.
Inheritance
Inherited Members
Namespace: Silverback.Messaging.Configuration.Kafka
Assembly: Silverback.Integration.Kafka.dll
Syntax
public class KafkaConsumerConfigurationBuilder : KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>
Constructors
KafkaConsumerConfigurationBuilder(IServiceProvider)
Initializes a new instance of the KafkaConsumerConfigurationBuilder class.
Declaration
public KafkaConsumerConfigurationBuilder(IServiceProvider serviceProvider)
Parameters
| Type | Name | Description |
|---|---|---|
| IServiceProvider | serviceProvider | The IServiceProvider instance. |
Properties
This
Gets this instance.
Declaration
protected override KafkaConsumerConfigurationBuilder This { get; }
Property Value
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder |
Overrides
Remarks
This is necessary to work around casting in the base classes.
Methods
AutoResetOffsetToEarliest()
Specifies that the offset needs to be reset to the smallest offset, when there is no initial offset in the offset store or the desired offset is out of range.
Declaration
public KafkaConsumerConfigurationBuilder AutoResetOffsetToEarliest()
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
AutoResetOffsetToLatest()
Specifies that the offset needs to be reset to the largest offset, when there is no initial offset in the offset store or the desired offset is out of range.
Declaration
public KafkaConsumerConfigurationBuilder AutoResetOffsetToLatest()
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
Build()
Builds the KafkaConsumerConfiguration instance.
Declaration
public KafkaConsumerConfiguration Build()
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfiguration |
BuildCore()
Builds the configuration.
Declaration
protected override KafkaConsumerConfiguration BuildCore()
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfiguration | The configuration. |
Overrides
CommitOffsetEach(int)
Defines the number of message to be processed before committing the offset to the server. The most reliable level is 1 but it reduces throughput.
Declaration
public KafkaConsumerConfigurationBuilder CommitOffsetEach(int commitOffsetEach)
Parameters
| Type | Name | Description |
|---|---|---|
| int | commitOffsetEach | The number of message to be processed before committing the offset to the server. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
Remarks
Setting this value automatically disables automatic offset commit (see EnableAutoCommit()/DisableAutoCommit()).
Consume(Action<KafkaConsumerEndpointConfigurationBuilder<object>>)
Adds a consumer endpoint, which is a topic, a partition, or a group of topics/partitions that share the same configuration (deserializer, error policies, etc.).
Declaration
public KafkaConsumerConfigurationBuilder Consume(Action<KafkaConsumerEndpointConfigurationBuilder<object>> configurationBuilderAction)
Parameters
| Type | Name | Description |
|---|---|---|
| Action<KafkaConsumerEndpointConfigurationBuilder<object>> | configurationBuilderAction | An Action that takes the KafkaConsumerConfigurationBuilder and configures it. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
Consume(string?, Action<KafkaConsumerEndpointConfigurationBuilder<object>>)
Adds a consumer endpoint, which is a topic, a partition, or a group of topics/partitions that share the same configuration (deserializer, error policies, etc.).
Declaration
public KafkaConsumerConfigurationBuilder Consume(string? name, Action<KafkaConsumerEndpointConfigurationBuilder<object>> configurationBuilderAction)
Parameters
| Type | Name | Description |
|---|---|---|
| string | name | The name is used to guarantee that a duplicated configuration is discarded and is also displayed in the logs. By default, the name will be generated concatenating the topic name(s). |
| Action<KafkaConsumerEndpointConfigurationBuilder<object>> | configurationBuilderAction | An Action that takes the KafkaConsumerConfigurationBuilder and configures it. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
Consume<TMessage>(Action<KafkaConsumerEndpointConfigurationBuilder<TMessage>>)
Adds a consumer endpoint, which is a topic, a partition, or a group of topics/partitions that share the same configuration (deserializer, error policies, etc.).
Declaration
public KafkaConsumerConfigurationBuilder Consume<TMessage>(Action<KafkaConsumerEndpointConfigurationBuilder<TMessage>> configurationBuilderAction)
Parameters
| Type | Name | Description |
|---|---|---|
| Action<KafkaConsumerEndpointConfigurationBuilder<TMessage>> | configurationBuilderAction | An Action that takes the KafkaConsumerConfigurationBuilder and configures it. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
Type Parameters
| Name | Description |
|---|---|
| TMessage | The type (or base type) of the messages being consumed. This is used to setup the deserializer and will determine the type of the message parameter in the nested configuration functions. |
Consume<TMessage>(string?, Action<KafkaConsumerEndpointConfigurationBuilder<TMessage>>)
Adds a consumer endpoint, which is a topic, a partition, or a group of topics/partitions that share the same configuration (deserializer, error policies, etc.).
Declaration
public KafkaConsumerConfigurationBuilder Consume<TMessage>(string? name, Action<KafkaConsumerEndpointConfigurationBuilder<TMessage>> configurationBuilderAction)
Parameters
| Type | Name | Description |
|---|---|---|
| string | name | The name is used to guarantee that a duplicated configuration is discarded and is also displayed in the logs. By default, the name will be generated concatenating the topic name(s). |
| Action<KafkaConsumerEndpointConfigurationBuilder<TMessage>> | configurationBuilderAction | An Action that takes the KafkaConsumerConfigurationBuilder and configures it. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
Type Parameters
| Name | Description |
|---|---|
| TMessage | The type (or base type) of the messages being consumed. This is used to setup the deserializer and will determine the type of the message parameter in the nested configuration functions. |
DisableAutoCommit()
Disable automatic offsets commit. Note: setting this does not prevent the consumer from fetching previously committed start offsets. To circumvent this behaviour set specific start offsets per partition in the call to assign().
Declaration
public KafkaConsumerConfigurationBuilder DisableAutoCommit()
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
Remarks
See also CommitOffsetEach(int).
DisableAutoOffsetReset()
Specifies that an error (ERR__AUTO_OFFSET_RESET) must be triggered, when there is no initial offset in the offset store or the desired offset is out of range.
Declaration
public KafkaConsumerConfigurationBuilder DisableAutoOffsetReset()
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
DisableAutoRecovery()
Specifies that the consumer doesn't have to be automatically recycled when a Confluent.Kafka.KafkaException is thrown while polling/consuming or an issues is detected (e.g. a poll timeout is reported).
Declaration
public KafkaConsumerConfigurationBuilder DisableAutoRecovery()
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
DisableCheckCrcs()
Disables the verification of the CRC32 of the consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage.
Declaration
public KafkaConsumerConfigurationBuilder DisableCheckCrcs()
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
DisableOffsetsCommit()
Disables the offsets commit.
Declaration
public KafkaConsumerConfigurationBuilder DisableOffsetsCommit()
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
Remarks
Disabling offsets commit will also disable auto commit (see DisableAutoCommit()) and clear the CommitOffsetEach setting (see CommitOffsetEach(int)).
DisablePartitionEof()
Don't invoke the IKafkaPartitionEofCallback when a partition end of file is reached.
Declaration
public KafkaConsumerConfigurationBuilder DisablePartitionEof()
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
DisableSendOffsetsToTransaction()
Specifies that the consumer should not commit the consumed offsets in the same transaction of the produced messages. This is the default.
Declaration
public KafkaConsumerConfigurationBuilder DisableSendOffsetsToTransaction()
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
EnableAutoCommit()
Automatically and periodically commit offsets in the background.
Declaration
public KafkaConsumerConfigurationBuilder EnableAutoCommit()
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
Remarks
Enabling automatic offsets commit will clear the CommitOffsetEach setting (see CommitOffsetEach(int)).
EnableAutoRecovery()
Specifies that the consumer has to be automatically recycled when a Confluent.Kafka.KafkaException is thrown while polling/consuming or an issues is detected (e.g. a poll timeout is reported). This is the default.
Declaration
public KafkaConsumerConfigurationBuilder EnableAutoRecovery()
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
EnableCheckCrcs()
Enables the verification of the CRC32 of the consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage.
Declaration
public KafkaConsumerConfigurationBuilder EnableCheckCrcs()
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
EnableOffsetsCommit()
Enable the offsets commit. This is the default.
Declaration
public KafkaConsumerConfigurationBuilder EnableOffsetsCommit()
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
EnablePartitionEof()
Invoke the IKafkaPartitionEofCallback whenever a partition end of file is reached.
Declaration
public KafkaConsumerConfigurationBuilder EnablePartitionEof()
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
LimitBackpressure(int)
Sets the maximum number of messages to be consumed and enqueued waiting to be processed. The limit will be applied per partition when processing the partitions independently (default). The default limit is 2.
Declaration
public KafkaConsumerConfigurationBuilder LimitBackpressure(int backpressureLimit)
Parameters
| Type | Name | Description |
|---|---|---|
| int | backpressureLimit | The maximum number of messages to be enqueued. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
LimitParallelism(int)
Sets the maximum number of incoming message that can be processed concurrently. Up to a message per each subscribed partition can be processed in parallel. The default limit is 100.
Declaration
public KafkaConsumerConfigurationBuilder LimitParallelism(int maxDegreeOfParallelism)
Parameters
| Type | Name | Description |
|---|---|---|
| int | maxDegreeOfParallelism | The maximum number of incoming message that can be processed concurrently. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
ProcessAllPartitionsTogether()
Specifies that all partitions must be processed together. This means that a single stream will published for the messages from all the partitions and the sequences (ChunkSequence, BatchSequence, ...) can span across the partitions.
Declaration
public KafkaConsumerConfigurationBuilder ProcessAllPartitionsTogether()
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
ProcessPartitionsIndependently()
Specifies that the partitions must be processed independently. This means that a stream will published per each partition and the sequences (ChunkSequence, BatchSequence, ...) cannot span across the partitions. This option is enabled by default. Use ProcessAllPartitionsTogether() to disable it.
Declaration
public KafkaConsumerConfigurationBuilder ProcessPartitionsIndependently()
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
SendOffsetsToTransaction()
Specifies that the consumer should commit the consumed offsets in the same transaction of the produced messages.
Declaration
public KafkaConsumerConfigurationBuilder SendOffsetsToTransaction()
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
StoreOffsetsClientSide(KafkaOffsetStoreSettings)
Specifies that the offsets have to stored in the specified client store, additionally to or instead of being committed to the message broker.
Declaration
public KafkaConsumerConfigurationBuilder StoreOffsetsClientSide(KafkaOffsetStoreSettings settings)
Parameters
| Type | Name | Description |
|---|---|---|
| KafkaOffsetStoreSettings | settings | The offset store settings. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
StoreOffsetsClientSide(Func<KafkaOffsetStoreSettingsBuilder, IKafkaOffsetStoreSettingsImplementationBuilder>)
Specifies that the offsets have to stored in the specified client store, additionally to or instead of being committed to the message broker.
Declaration
public KafkaConsumerConfigurationBuilder StoreOffsetsClientSide(Func<KafkaOffsetStoreSettingsBuilder, IKafkaOffsetStoreSettingsImplementationBuilder> settingsBuilderFunc)
Parameters
| Type | Name | Description |
|---|---|---|
| Func<KafkaOffsetStoreSettingsBuilder, IKafkaOffsetStoreSettingsImplementationBuilder> | settingsBuilderFunc | A Func<TResult> that takes the KafkaOffsetStoreSettingsBuilder and configures it. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
WithAutoCommitIntervalMs(int?)
Sets the frequency in milliseconds at which the consumer offsets are committed.
Declaration
public KafkaConsumerConfigurationBuilder WithAutoCommitIntervalMs(int? autoCommitIntervalMs)
Parameters
| Type | Name | Description |
|---|---|---|
| int? | autoCommitIntervalMs | The frequency at which the consumer offsets are committed. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
WithAutoOffsetReset(AutoOffsetReset?)
Sets the action to take when there is no initial offset in the offset store or the desired offset is out of range: Confluent.Kafka.AutoOffsetReset.Earliest to automatically reset to the smallest offset, Confluent.Kafka.AutoOffsetReset.Latest to automatically reset to the largest offset, and Confluent.Kafka.AutoOffsetReset.Error to trigger an error (ERR__AUTO_OFFSET_RESET).
Declaration
public KafkaConsumerConfigurationBuilder WithAutoOffsetReset(AutoOffsetReset? autoOffsetReset)
Parameters
| Type | Name | Description |
|---|---|---|
| AutoOffsetReset? | autoOffsetReset | The action to take when there is no initial offset in the offset store or the desired offset is out of range. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
WithConsumeResultFields(string?)
Sets a comma-separated list of fields that may be optionally set in Confluent.Kafka.ConsumeResult<TKey, TValue> objects returned by
the Consume(TimeSpan) method. Disabling fields that you do not require will improve
throughput and reduce memory consumption. Allowed values: headers, timestamp, topic, all, none.
Declaration
public KafkaConsumerConfigurationBuilder WithConsumeResultFields(string? consumeResultFields)
Parameters
| Type | Name | Description |
|---|---|---|
| string | consumeResultFields | A comma-separated list of fields that may be optionally set in Confluent.Kafka.ConsumeResult<TKey, TValue>. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
WithCooperativeStickyPartitionAssignmentStrategy()
Sets the partition assignment strategy to Confluent.Kafka.PartitionAssignmentStrategy.CooperativeSticky to evenly distribute the partitions and minimize the partitions movements.
Declaration
public KafkaConsumerConfigurationBuilder WithCooperativeStickyPartitionAssignmentStrategy()
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
WithCoordinatorQueryIntervalMs(int?)
Sets the interval (in milliseconds) at which the current group coordinator must be queried. If the currently assigned coordinator is down the configured query interval will be divided by ten to more quickly recover in case of coordinator reassignment.
Declaration
public KafkaConsumerConfigurationBuilder WithCoordinatorQueryIntervalMs(int? coordinatorQueryIntervalMs)
Parameters
| Type | Name | Description |
|---|---|---|
| int? | coordinatorQueryIntervalMs | The interval at which the current group coordinator must be queried. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
WithFetchErrorBackoffMs(int?)
Sets how long to postpone the next fetch request for a topic and partition in case of a fetch error.
Declaration
public KafkaConsumerConfigurationBuilder WithFetchErrorBackoffMs(int? fetchErrorBackoffMs)
Parameters
| Type | Name | Description |
|---|---|---|
| int? | fetchErrorBackoffMs | How long to postpone the next fetch request in case of a fetch error. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
WithFetchMaxBytes(int?)
Sets the maximum amount of data the broker shall return for a fetch request. The messages are fetched in batches by the consumer
and if the first message batch in the first non-empty partition of the fetch request is larger than this value, then the message
batch will still be returned to ensure that the consumer can make progress. The maximum message batch size accepted by the broker
is defined via message.max.bytes (broker config) or max.message.bytes (broker topic config). This value is automatically
adjusted upwards to be at least message.max.bytes (consumer config).
Declaration
public KafkaConsumerConfigurationBuilder WithFetchMaxBytes(int? fetchMaxBytes)
Parameters
| Type | Name | Description |
|---|---|---|
| int? | fetchMaxBytes | The maximum amount of data the broker shall return for a fetch request. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
WithFetchMinBytes(int?)
Sets the minimum number of bytes that the broker must respond with. If FetchWaitMaxMs expires the accumulated data will be sent to the client regardless of this setting.
Declaration
public KafkaConsumerConfigurationBuilder WithFetchMinBytes(int? fetchMinBytes)
Parameters
| Type | Name | Description |
|---|---|---|
| int? | fetchMinBytes | The minimum number of bytes that the broker must respond with. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
WithFetchQueueBackoffMs(int?)
Sets the maximum time in milliseconds to postpone the next fetch request for a topic+partition in case the current fetch queue thresholds (QueuedMinMessages or QueuedMaxMessagesKbytes) have been exceeded. This property may need to be decreased if the queue thresholds are set low and the application is experiencing long (~1s) delays between messages. Low values may increase CPU utilization.
Declaration
public KafkaConsumerConfigurationBuilder WithFetchQueueBackoffMs(int? fetchQueueBackoffMs)
Parameters
| Type | Name | Description |
|---|---|---|
| int? | fetchQueueBackoffMs | The maximum time in milliseconds to postpone the next fetch request for a topic+partition in case the current fetch queue thresholds have been exceeded. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
WithFetchWaitMaxMs(int?)
Sets the maximum time (in milliseconds) that the broker may wait to fill the fetch response with enough messages to match the size specified by FetchMinBytes.
Declaration
public KafkaConsumerConfigurationBuilder WithFetchWaitMaxMs(int? fetchWaitMaxMs)
Parameters
| Type | Name | Description |
|---|---|---|
| int? | fetchWaitMaxMs | The maximum time that the broker may wait to fill the fetch response. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
WithGetMetadataTimeout(TimeSpan?)
Sets the timeout used to wait for the metadata to be retrieved from the broker. The default is 30 seconds.
Declaration
public KafkaConsumerConfigurationBuilder WithGetMetadataTimeout(TimeSpan? getMetadataTimeout)
Parameters
| Type | Name | Description |
|---|---|---|
| TimeSpan? | getMetadataTimeout | The timeout used to wait for the metadata to be retrieved from the broker. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
WithGroupId(string?)
Sets the client group id string. All clients sharing the same group.id belong to the same group.
Declaration
public KafkaConsumerConfigurationBuilder WithGroupId(string? groupId)
Parameters
| Type | Name | Description |
|---|---|---|
| string | groupId | The client group id string. All clients sharing the same group.id belong to the same group. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
WithGroupInstanceId(string?)
Sets the static instance id used to enable static group membership. Static group members are able to leave and rejoin a group within the configured SessionTimeoutMs without prompting a group rebalance. This should be used in combination with a larger SessionTimeoutMs to avoid group rebalances caused by transient unavailability (e.g. process restarts). Requires broker version >= 2.3.0.
Declaration
public KafkaConsumerConfigurationBuilder WithGroupInstanceId(string? groupInstanceId)
Parameters
| Type | Name | Description |
|---|---|---|
| string | groupInstanceId | The static instance id used to enable static group membership. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
WithGroupProtocol(GroupProtocol?)
Sets the group protocol to use. Use Confluent.Kafka.GroupProtocol.Classic for the original protocol and Confluent.Kafka.GroupProtocol.Consumer for the new protocol introduced in KIP-848. Default is Confluent.Kafka.GroupProtocol.Classic, but will change to Confluent.Kafka.GroupProtocol.Consumer in next releases.
Declaration
public KafkaConsumerConfigurationBuilder WithGroupProtocol(GroupProtocol? groupProtocol)
Parameters
| Type | Name | Description |
|---|---|---|
| GroupProtocol? | groupProtocol | The group protocol to use. Use Confluent.Kafka.GroupProtocol.Classic for the original protocol and Confluent.Kafka.GroupProtocol.Consumer for the new protocol introduced in KIP-848. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
WithGroupProtocolType(string?)
Sets the group protocol type.
Declaration
public KafkaConsumerConfigurationBuilder WithGroupProtocolType(string? groupProtocolType)
Parameters
| Type | Name | Description |
|---|---|---|
| string | groupProtocolType | The group protocol type. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
WithGroupRemoteAssignor(string?)
Sets the server-side assignor to use. Keep it null to make the server select a suitable assignor for the group.
Available assignors: uniform or range.
Declaration
public KafkaConsumerConfigurationBuilder WithGroupRemoteAssignor(string? groupRemoteAssignor)
Parameters
| Type | Name | Description |
|---|---|---|
| string | groupRemoteAssignor | The server-side assignor to use. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
WithHeartbeatIntervalMs(int?)
Sets the interval (in milliseconds) at which the heartbeats have to be sent to the broker.
Declaration
public KafkaConsumerConfigurationBuilder WithHeartbeatIntervalMs(int? heartbeatIntervalMs)
Parameters
| Type | Name | Description |
|---|---|---|
| int? | heartbeatIntervalMs | The interval at which the heartbeats have to be sent. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
WithIsolationLevel(IsolationLevel?)
Sets a value indicating how to read messages written inside a transaction: Confluent.Kafka.IsolationLevel.ReadCommitted to only return transactional messages which have been committed, or Confluent.Kafka.IsolationLevel.ReadUncommitted to return all messages, even transactional messages which have been aborted.
Declaration
public KafkaConsumerConfigurationBuilder WithIsolationLevel(IsolationLevel? isolationLevel)
Parameters
| Type | Name | Description |
|---|---|---|
| IsolationLevel? | isolationLevel | A value indicating how to read messages written inside a transaction. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
WithMaxPartitionFetchBytes(int?)
Sets the initial maximum number of bytes per topic and partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched.
Declaration
public KafkaConsumerConfigurationBuilder WithMaxPartitionFetchBytes(int? maxPartitionFetchBytes)
Parameters
| Type | Name | Description |
|---|---|---|
| int? | maxPartitionFetchBytes | The initial maximum number of bytes per topic and partition to request when fetching messages. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
WithMaxPollIntervalMs(int?)
Sets the maximum allowed time (in milliseconds) between calls to consume messages. If this interval is exceeded the consumer is
considered failed and the group will rebalance in order to reassign the partitions to another consumer group member.
Warning: Offset commits may be not possible at this point.
Declaration
public KafkaConsumerConfigurationBuilder WithMaxPollIntervalMs(int? maxPollIntervalMs)
Parameters
| Type | Name | Description |
|---|---|---|
| int? | maxPollIntervalMs | The maximum allowed time between calls to consume messages. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
WithPartitionAssignmentStrategy(PartitionAssignmentStrategy?)
Sets the partition assignment strategy: Confluent.Kafka.PartitionAssignmentStrategy.Range to co-localize the partitions of several topics, Confluent.Kafka.PartitionAssignmentStrategy.RoundRobin to evenly distribute the partitions among the consumer group members, Confluent.Kafka.PartitionAssignmentStrategy.CooperativeSticky to evenly distribute the partitions and minimize the partitions movements. The default is Confluent.Kafka.PartitionAssignmentStrategy.Range.
Declaration
public KafkaConsumerConfigurationBuilder WithPartitionAssignmentStrategy(PartitionAssignmentStrategy? partitionAssignmentStrategy)
Parameters
| Type | Name | Description |
|---|---|---|
| PartitionAssignmentStrategy? | partitionAssignmentStrategy | The partition assignment strategy. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
WithPollingTimeout(TimeSpan)
Sets the timeout to wait for the consumer to poll for new messages before initiating a new poll. The default is 100 milliseconds.
Declaration
public KafkaConsumerConfigurationBuilder WithPollingTimeout(TimeSpan pollingTimeout)
Parameters
| Type | Name | Description |
|---|---|---|
| TimeSpan | pollingTimeout | The timeout to wait for the consumer to poll for new messages. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
WithQueuedMaxMessagesKbytes(int?)
Sets the maximum number of kilobytes of queued pre-fetched messages to store in the local consumer queue. This setting applies to the single consumer queue, regardless of the number of partitions. This value may be overshot by FetchMaxBytes. This property has higher priority than QueuedMinMessages.
Declaration
public KafkaConsumerConfigurationBuilder WithQueuedMaxMessagesKbytes(int? queuedMaxMessagesKbytes)
Parameters
| Type | Name | Description |
|---|---|---|
| int? | queuedMaxMessagesKbytes | The maximum number of kilobytes of queued pre-fetched messages to store in the local consumer queue. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
WithQueuedMinMessages(int?)
Sets the minimum number of messages per topic and partition that the underlying library must try to maintain in the local consumer queue.
Declaration
public KafkaConsumerConfigurationBuilder WithQueuedMinMessages(int? queuedMinMessages)
Parameters
| Type | Name | Description |
|---|---|---|
| int? | queuedMinMessages | The minimum number of messages that must be maintained in the local consumer queue. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
WithRangePartitionAssignmentStrategy()
Sets the partition assignment strategy to Confluent.Kafka.PartitionAssignmentStrategy.Range to co-localize the partitions of several topics.
Declaration
public KafkaConsumerConfigurationBuilder WithRangePartitionAssignmentStrategy()
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
WithRoundRobinPartitionAssignmentStrategy()
Sets the partition assignment strategy to Confluent.Kafka.PartitionAssignmentStrategy.RoundRobin to evenly distribute the partitions among the consumer group members.
Declaration
public KafkaConsumerConfigurationBuilder WithRoundRobinPartitionAssignmentStrategy()
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
WithSessionTimeoutMs(int?)
Sets the client group session and failure detection timeout (in milliseconds). The consumer sends periodic heartbeats HeartbeatIntervalMs to indicate its liveness to the broker. If no heartbeat is received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance. Also see MaxPollIntervalMs.
Declaration
public KafkaConsumerConfigurationBuilder WithSessionTimeoutMs(int? sessionTimeoutMs)
Parameters
| Type | Name | Description |
|---|---|---|
| int? | sessionTimeoutMs | The client group session and failure detection timeout. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |
WithStallDetectionThreshold(TimeSpan?)
Sets the maximum time to wait for a message to be consumed before the consumer is considered stale. A stale consumer is
automatically restarted.
The default is null, which means that the consumer is never considered stale and is never restarted.
Declaration
public KafkaConsumerConfigurationBuilder WithStallDetectionThreshold(TimeSpan? stallDetectionThreshold)
Parameters
| Type | Name | Description |
|---|---|---|
| TimeSpan? | stallDetectionThreshold | The maximum time to wait for a message to be consumed before the consumer is considered stale. |
Returns
| Type | Description |
|---|---|
| KafkaConsumerConfigurationBuilder | The KafkaConsumerConfigurationBuilder so that additional calls can be chained. |