Class KafkaConsumerConfiguration
Wraps the Confluent.Kafka.ConsumerConfig adding the Silverback specific settings.
Inheritance
KafkaConsumerConfiguration
Assembly: Silverback.Integration.Kafka.dll
Syntax
public sealed record KafkaConsumerConfiguration : KafkaClientConfiguration<ConsumerConfig>, IValidatableSettings, IEquatable<KafkaClientConfiguration<ConsumerConfig>>, IEquatable<KafkaConsumerConfiguration>
Constructors
KafkaConsumerConfiguration()
Wraps the Confluent.Kafka.ConsumerConfig adding the Silverback specific settings.
Declaration
public KafkaConsumerConfiguration()
Properties
AutoCommitIntervalMs
Gets the frequency in milliseconds at which the consumer offsets are committed.
Declaration
public int? AutoCommitIntervalMs { get; init; }
Property Value
AutoOffsetReset
Gets the action to take when there is no initial offset in the offset store or the desired offset is out of range:
Confluent.Kafka.AutoOffsetReset.Earliest to automatically reset to the smallest offset,
Confluent.Kafka.AutoOffsetReset.Latest to automatically reset to the largest offset, and
Confluent.Kafka.AutoOffsetReset.Error to trigger an error (ERR__AUTO_OFFSET_RESET).
Declaration
public AutoOffsetReset? AutoOffsetReset { get; init; }
Property Value
| Type |
Description |
| AutoOffsetReset? |
|
BackpressureLimit
Gets the maximum number of messages to be consumed and enqueued waiting to be processed.
When ProcessPartitionsIndependently is set to true (default) the limit will be applied per partition.
The default is 50.
Declaration
public int BackpressureLimit { get; init; }
Property Value
CheckCrcs
Gets a value indicating whether the CRC32 of the consumed messages must be verified, ensuring no on-the-wire or on-disk corruption
to the messages occurred. This check comes at slightly increased CPU usage.
Declaration
public bool? CheckCrcs { get; init; }
Property Value
ClientSideOffsetStore
Gets the settings for the IKafkaOffsetStore to be used to store the offsets. The stored offsets will be used during
the partitions assignment to determine the starting offset and ensure that each message is consumed only once.
Declaration
public KafkaOffsetStoreSettings? ClientSideOffsetStore { get; init; }
Property Value
CommitOffsetEach
Gets the number of messages to be processed before committing the offset to the server. The most
reliable level is 1, but it reduces throughput.
Declaration
public int? CommitOffsetEach { get; init; }
Property Value
CommitOffsets
Gets a value indicating whether the offsets must be committed. The default is true.
Declaration
public bool CommitOffsets { get; init; }
Property Value
ConsumeResultFields
Gets a comma-separated list of fields that may be optionally set in Confluent.Kafka.ConsumeResult<TKey, TValue> objects returned by
the Consume(TimeSpan) method. Disabling fields that you do not require will improve
throughput and reduce memory consumption. Allowed values: headers, timestamp, topic, all, none.
Declaration
public string? ConsumeResultFields { get; init; }
Property Value
CoordinatorQueryIntervalMs
Gets the interval (in milliseconds) at which the current group coordinator must be queried. If the currently assigned coordinator
is down the configured query interval will be divided by ten to more quickly recover in case of coordinator reassignment.
Declaration
public int? CoordinatorQueryIntervalMs { get; init; }
Property Value
EnableAutoCommit
Gets a value indicating whether the offsets must be automatically and periodically committed in the background.
Note: setting this to false does not prevent the consumer from fetching previously committed start offsets. To circumvent this
behaviour set specific start offsets per partition in the call to assign(). The default is true.
Declaration
public bool EnableAutoCommit { get; init; }
Property Value
EnableAutoRecovery
Gets a value indicating whether the consumer has to be automatically recycled when a Confluent.Kafka.KafkaException
is thrown while polling/consuming or an issues is detected (e.g. a poll timeout is reported). The default is true.
Declaration
public bool EnableAutoRecovery { get; init; }
Property Value
EnablePartitionEof
Gets a value indicating whether the partition EOF event must be emitted whenever the consumer reaches the end of a partition.
Declaration
public bool? EnablePartitionEof { get; init; }
Property Value
Endpoints
Gets the configured endpoints.
Declaration
public IValueReadOnlyCollection<KafkaConsumerEndpointConfiguration> Endpoints { get; init; }
Property Value
EqualityContract
Wraps the Confluent.Kafka.ConsumerConfig adding the Silverback specific settings.
Declaration
protected override Type EqualityContract { get; }
Property Value
Overrides
FetchErrorBackoffMs
Gets how long to postpone the next fetch request for a topic and partition in case of a fetch error.
Declaration
public int? FetchErrorBackoffMs { get; init; }
Property Value
FetchMaxBytes
Gets the maximum amount of data the broker shall return for a fetch request. The messages are fetched in batches by the consumer
and if the first message batch in the first non-empty partition of the fetch request is larger than this value, then the message
batch will still be returned to ensure that the consumer can make progress. The maximum message batch size accepted by the broker
is defined via message.max.bytes (broker config) or max.message.bytes (broker topic config). This value is automatically
adjusted upwards to be at least message.max.bytes (consumer config).
Declaration
public int? FetchMaxBytes { get; init; }
Property Value
FetchMinBytes
Gets the minimum number of bytes that the broker must respond with. If FetchWaitMaxMs expires the accumulated data
will be sent to the client regardless of this setting.
Declaration
public int? FetchMinBytes { get; init; }
Property Value
FetchQueueBackoffMs
Gets the maximum time in milliseconds to postpone the next fetch request for a topic+partition in case the current fetch queue thresholds (QueuedMinMessages
or QueuedMaxMessagesKbytes) have been exceeded. This property may need to be decreased if the queue thresholds are set low and the application is experiencing long (~1s)
delays between messages. Low values may increase CPU utilization.
Declaration
public int? FetchQueueBackoffMs { get; init; }
Property Value
FetchWaitMaxMs
Gets the maximum time (in milliseconds) that the broker may wait to fill the fetch response with enough messages to match the
size specified by FetchMinBytes.
Declaration
public int? FetchWaitMaxMs { get; init; }
Property Value
Gets the timeout to wait for the metadata to be retrieved from the broker. The default is 30 seconds.
Declaration
public TimeSpan GetMetadataTimeout { get; init; }
Property Value
GroupId
Gets the client group id. All clients sharing the same group id belong to the same group. The default is null
(which will internally be replaced with "not-set" since the underlying library requires a value).
Declaration
public string? GroupId { get; init; }
Property Value
GroupInstanceId
Gets the static instance id used to enable static group membership. Static group members are able to leave and rejoin a group within
the configured SessionTimeoutMs without prompting a group rebalance. This should be used in combination with a larger
SessionTimeoutMs to avoid group rebalances caused by transient unavailability (e.g. process restarts).
Requires broker version >= 2.3.0.
Declaration
public string? GroupInstanceId { get; init; }
Property Value
GroupProtocol
Gets the group protocol to use. Use Confluent.Kafka.GroupProtocol.Classic for the original protocol and Confluent.Kafka.GroupProtocol.Consumer for the new protocol introduced in KIP-848.
Default is Confluent.Kafka.GroupProtocol.Classic, but will change to Confluent.Kafka.GroupProtocol.Consumer in next releases.
Declaration
public GroupProtocol? GroupProtocol { get; init; }
Property Value
| Type |
Description |
| GroupProtocol? |
|
GroupProtocolType
Gets the group protocol type.
Declaration
public string? GroupProtocolType { get; init; }
Property Value
GroupRemoteAssignor
Gets the server-side assignor to use. Keep it null to make the server select a suitable assignor for the group. Available assignors: uniform or range.
Declaration
public string? GroupRemoteAssignor { get; init; }
Property Value
HeartbeatIntervalMs
Gets the interval (in milliseconds) at which the heartbeats have to be sent to the broker.
Declaration
public int? HeartbeatIntervalMs { get; init; }
Property Value
IsolationLevel
Gets a value indicating how to read the messages written inside a transaction: Confluent.Kafka.IsolationLevel.ReadCommitted
to only return transactional messages which have been committed, or Confluent.Kafka.IsolationLevel.ReadUncommitted to
return all messages, even transactional messages which have been aborted.
Declaration
public IsolationLevel? IsolationLevel { get; init; }
Property Value
| Type |
Description |
| IsolationLevel? |
|
MaxDegreeOfParallelism
Gets the maximum number of incoming message that can be processed concurrently. Up to a
message per each subscribed partition can be processed in parallel when processing them independently.
The default is 100.
Declaration
public int MaxDegreeOfParallelism { get; init; }
Property Value
MaxPartitionFetchBytes
Gets the initial maximum number of bytes per topic and partition to request when fetching messages from the broker. If the client
encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched.
Declaration
public int? MaxPartitionFetchBytes { get; init; }
Property Value
MaxPollIntervalMs
Gets the maximum allowed time (in milliseconds) between calls to consume messages. If this interval is exceeded the consumer is
considered failed and the group will rebalance in order to reassign the partitions to another consumer group member.
Warning: Offset commits may be not possible at this point.
Declaration
public int? MaxPollIntervalMs { get; init; }
Property Value
PartitionAssignmentStrategy
Gets the partition assignment strategy: Confluent.Kafka.PartitionAssignmentStrategy.Range to co-localize the partitions
of several topics, Confluent.Kafka.PartitionAssignmentStrategy.RoundRobin to evenly distribute the partitions among
the consumer group members, Confluent.Kafka.PartitionAssignmentStrategy.CooperativeSticky to evenly distribute the
partitions and limit minimize the partitions movements. The default is Confluent.Kafka.PartitionAssignmentStrategy.Range.
Declaration
public PartitionAssignmentStrategy? PartitionAssignmentStrategy { get; init; }
Property Value
| Type |
Description |
| PartitionAssignmentStrategy? |
|
PollingTimeout
Gets the timeout to wait for the consumer to poll for new messages before initiating a new poll. The default is 500 milliseconds.
Declaration
public TimeSpan PollingTimeout { get; init; }
Property Value
ProcessPartitionsIndependently
Gets a value indicating whether the partitions must be processed independently.
When true a stream will be published per each partition and the sequences (ChunkSequence,
BatchSequence, ...) cannot span across the partitions.
The default is true.
Declaration
public bool ProcessPartitionsIndependently { get; init; }
Property Value
QueuedMaxMessagesKbytes
Gets the maximum number of kilobytes of queued pre-fetched messages to store in the local consumer queue. This setting applies to
the single consumer queue, regardless of the number of partitions. This value may be overshot by FetchMaxBytes. This
property has higher priority than QueuedMinMessages.
Declaration
public int? QueuedMaxMessagesKbytes { get; init; }
Property Value
QueuedMinMessages
Gets the minimum number of messages per topic and partition that the underlying library must try to maintain in the local consumer queue.
Declaration
public int? QueuedMinMessages { get; init; }
Property Value
SendOffsetsToTransaction
Gets a value indicating whether the consumer should commit the consumed offsets in the same transaction of the produced
messages. The default is false.
Declaration
public bool SendOffsetsToTransaction { get; init; }
Property Value
SessionTimeoutMs
Gets the client group session and failure detection timeout (in milliseconds). The consumer sends periodic heartbeats
HeartbeatIntervalMs to indicate its liveness to the broker. If no heartbeat is received by the broker for a group
member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance. Also see
MaxPollIntervalMs.
Declaration
public int? SessionTimeoutMs { get; init; }
Property Value
StallDetectionThreshold
Gets the maximum time to wait for a message to be consumed before the consumer is considered stale. A stale consumer is
automatically restarted.
The default is null, which means that the consumer is never considered stale and is never restarted.
Declaration
public TimeSpan? StallDetectionThreshold { get; init; }
Property Value
Methods
Equals(KafkaClientConfiguration<ConsumerConfig>?)
Wraps the Confluent.Kafka.ConsumerConfig adding the Silverback specific settings.
Declaration
public override sealed bool Equals(KafkaClientConfiguration<ConsumerConfig>? other)
Parameters
Returns
Overrides
Equals(KafkaConsumerConfiguration?)
Wraps the Confluent.Kafka.ConsumerConfig adding the Silverback specific settings.
Declaration
public bool Equals(KafkaConsumerConfiguration? other)
Parameters
Returns
Equals(object?)
Wraps the Confluent.Kafka.ConsumerConfig adding the Silverback specific settings.
Declaration
public override bool Equals(object? obj)
Parameters
| Type |
Name |
Description |
| object |
obj |
|
Returns
Overrides
GetHashCode()
Wraps the Confluent.Kafka.ConsumerConfig adding the Silverback specific settings.
Declaration
public override int GetHashCode()
Returns
Overrides
MapCore()
Maps to the Confluent client configuration.
Declaration
protected override ConsumerConfig MapCore()
Returns
| Type |
Description |
| ConsumerConfig |
The Confluent client configuration.
|
Overrides
PrintMembers(StringBuilder)
Wraps the Confluent.Kafka.ConsumerConfig adding the Silverback specific settings.
Declaration
protected override bool PrintMembers(StringBuilder builder)
Parameters
Returns
Overrides
ToString()
Wraps the Confluent.Kafka.ConsumerConfig adding the Silverback specific settings.
Declaration
public override string ToString()
Returns
Overrides
Validate()
Declaration
public override void Validate()
Overrides
Operators
operator ==(KafkaConsumerConfiguration?, KafkaConsumerConfiguration?)
Wraps the Confluent.Kafka.ConsumerConfig adding the Silverback specific settings.
Declaration
public static bool operator ==(KafkaConsumerConfiguration? left, KafkaConsumerConfiguration? right)
Parameters
Returns
operator !=(KafkaConsumerConfiguration?, KafkaConsumerConfiguration?)
Wraps the Confluent.Kafka.ConsumerConfig adding the Silverback specific settings.
Declaration
public static bool operator !=(KafkaConsumerConfiguration? left, KafkaConsumerConfiguration? right)
Parameters
Returns
Implements