Class KafkaConsumerConfiguration
- Namespace
- Silverback.Messaging.Configuration.Kafka
- Assembly
- Silverback.Integration.Kafka.dll
Wraps the Confluent.Kafka.ConsumerConfig adding the Silverback specific settings.
public sealed record KafkaConsumerConfiguration : KafkaClientConfiguration<ConsumerConfig>, IValidatableSettings, IEquatable<KafkaClientConfiguration<ConsumerConfig>>, IEquatable<KafkaConsumerConfiguration>
- Inheritance
-
KafkaClientConfiguration<ConsumerConfig>KafkaConsumerConfiguration
- Implements
- Inherited Members
Constructors
KafkaConsumerConfiguration()
public KafkaConsumerConfiguration()
Properties
AutoCommitIntervalMs
Gets the frequency in milliseconds at which the consumer offsets are committed.
public int? AutoCommitIntervalMs { get; init; }
Property Value
- int?
AutoOffsetReset
Gets the action to take when there is no initial offset in the offset store or the desired offset is out of range: Confluent.Kafka.AutoOffsetReset.Earliest to automatically reset to the smallest offset, Confluent.Kafka.AutoOffsetReset.Latest to automatically reset to the largest offset, and Confluent.Kafka.AutoOffsetReset.Error to trigger an error (ERR__AUTO_OFFSET_RESET).
public AutoOffsetReset? AutoOffsetReset { get; init; }
Property Value
- AutoOffsetReset?
BackpressureLimit
Gets the maximum number of messages to be consumed and enqueued waiting to be processed.
When ProcessPartitionsIndependently is set to true (default) the limit will be applied per partition.
The default is 50.
public int BackpressureLimit { get; init; }
Property Value
CheckCrcs
Gets a value indicating whether the CRC32 of the consumed messages must be verified, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage.
public bool? CheckCrcs { get; init; }
Property Value
- bool?
ClientSideOffsetStore
Gets the settings for the IKafkaOffsetStore to be used to store the offsets. The stored offsets will be used during the partitions assignment to determine the starting offset and ensure that each message is consumed only once.
public KafkaOffsetStoreSettings? ClientSideOffsetStore { get; init; }
Property Value
CommitOffsetEach
Gets the number of messages to be processed before committing the offset to the server. The most reliable level is 1, but it reduces throughput.
public int? CommitOffsetEach { get; init; }
Property Value
- int?
CommitOffsets
Gets a value indicating whether the offsets must be committed. The default is true.
public bool CommitOffsets { get; init; }
Property Value
ConsumeResultFields
Gets a comma-separated list of fields that may be optionally set in Confluent.Kafka.ConsumeResult<TKey, TValue> objects returned by
the Consume(TimeSpan) method. Disabling fields that you do not require will improve
throughput and reduce memory consumption. Allowed values: headers, timestamp, topic, all, none.
public string? ConsumeResultFields { get; init; }
Property Value
CoordinatorQueryIntervalMs
Gets the interval (in milliseconds) at which the current group coordinator must be queried. If the currently assigned coordinator is down the configured query interval will be divided by ten to more quickly recover in case of coordinator reassignment.
public int? CoordinatorQueryIntervalMs { get; init; }
Property Value
- int?
EnableAutoCommit
Gets a value indicating whether the offsets must be automatically and periodically committed in the background.
Note: setting this to false does not prevent the consumer from fetching previously committed start offsets. To circumvent this
behaviour set specific start offsets per partition in the call to assign(). The default is true.
public bool EnableAutoCommit { get; init; }
Property Value
EnableAutoRecovery
Gets a value indicating whether the consumer has to be automatically recycled when a Confluent.Kafka.KafkaException
is thrown while polling/consuming or an issues is detected (e.g. a poll timeout is reported). The default is true.
public bool EnableAutoRecovery { get; init; }
Property Value
EnablePartitionEof
Gets a value indicating whether the partition EOF event must be emitted whenever the consumer reaches the end of a partition.
public bool? EnablePartitionEof { get; init; }
Property Value
- bool?
Endpoints
Gets the configured endpoints.
public IValueReadOnlyCollection<KafkaConsumerEndpointConfiguration> Endpoints { get; init; }
Property Value
EqualityContract
protected override Type EqualityContract { get; }
Property Value
FetchErrorBackoffMs
Gets how long to postpone the next fetch request for a topic and partition in case of a fetch error.
public int? FetchErrorBackoffMs { get; init; }
Property Value
- int?
FetchMaxBytes
Gets the maximum amount of data the broker shall return for a fetch request. The messages are fetched in batches by the consumer
and if the first message batch in the first non-empty partition of the fetch request is larger than this value, then the message
batch will still be returned to ensure that the consumer can make progress. The maximum message batch size accepted by the broker
is defined via message.max.bytes (broker config) or max.message.bytes (broker topic config). This value is automatically
adjusted upwards to be at least message.max.bytes (consumer config).
public int? FetchMaxBytes { get; init; }
Property Value
- int?
FetchMinBytes
Gets the minimum number of bytes that the broker must respond with. If FetchWaitMaxMs expires the accumulated data will be sent to the client regardless of this setting.
public int? FetchMinBytes { get; init; }
Property Value
- int?
FetchQueueBackoffMs
Gets the maximum time in milliseconds to postpone the next fetch request for a topic+partition in case the current fetch queue thresholds (QueuedMinMessages or QueuedMaxMessagesKbytes) have been exceeded. This property may need to be decreased if the queue thresholds are set low and the application is experiencing long (~1s) delays between messages. Low values may increase CPU utilization.
public int? FetchQueueBackoffMs { get; init; }
Property Value
- int?
FetchWaitMaxMs
Gets the maximum time (in milliseconds) that the broker may wait to fill the fetch response with enough messages to match the size specified by FetchMinBytes.
public int? FetchWaitMaxMs { get; init; }
Property Value
- int?
GetMetadataTimeout
Gets the timeout to wait for the metadata to be retrieved from the broker. The default is 30 seconds.
public TimeSpan GetMetadataTimeout { get; init; }
Property Value
GroupId
Gets the client group id. All clients sharing the same group id belong to the same group. The default is null
(which will internally be replaced with "not-set" since the underlying library requires a value).
public string? GroupId { get; init; }
Property Value
GroupInstanceId
Gets the static instance id used to enable static group membership. Static group members are able to leave and rejoin a group within the configured SessionTimeoutMs without prompting a group rebalance. This should be used in combination with a larger SessionTimeoutMs to avoid group rebalances caused by transient unavailability (e.g. process restarts). Requires broker version >= 2.3.0.
public string? GroupInstanceId { get; init; }
Property Value
GroupProtocol
Gets the group protocol to use. Use Confluent.Kafka.GroupProtocol.Classic for the original protocol and Confluent.Kafka.GroupProtocol.Consumer for the new protocol introduced in KIP-848. Default is Confluent.Kafka.GroupProtocol.Classic, but will change to Confluent.Kafka.GroupProtocol.Consumer in next releases.
public GroupProtocol? GroupProtocol { get; init; }
Property Value
- GroupProtocol?
GroupProtocolType
Gets the group protocol type.
public string? GroupProtocolType { get; init; }
Property Value
GroupRemoteAssignor
Gets the server-side assignor to use. Keep it null to make the server select a suitable assignor for the group. Available assignors: uniform or range.
public string? GroupRemoteAssignor { get; init; }
Property Value
HeartbeatIntervalMs
Gets the interval (in milliseconds) at which the heartbeats have to be sent to the broker.
public int? HeartbeatIntervalMs { get; init; }
Property Value
- int?
IsolationLevel
Gets a value indicating how to read the messages written inside a transaction: Confluent.Kafka.IsolationLevel.ReadCommitted to only return transactional messages which have been committed, or Confluent.Kafka.IsolationLevel.ReadUncommitted to return all messages, even transactional messages which have been aborted.
public IsolationLevel? IsolationLevel { get; init; }
Property Value
- IsolationLevel?
MaxDegreeOfParallelism
Gets the maximum number of incoming message that can be processed concurrently. Up to a message per each subscribed partition can be processed in parallel when processing them independently. The default is 100.
public int MaxDegreeOfParallelism { get; init; }
Property Value
MaxPartitionFetchBytes
Gets the initial maximum number of bytes per topic and partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched.
public int? MaxPartitionFetchBytes { get; init; }
Property Value
- int?
MaxPollIntervalMs
Gets the maximum allowed time (in milliseconds) between calls to consume messages. If this interval is exceeded the consumer is
considered failed and the group will rebalance in order to reassign the partitions to another consumer group member.
Warning: Offset commits may be not possible at this point.
public int? MaxPollIntervalMs { get; init; }
Property Value
- int?
PartitionAssignmentStrategy
Gets the partition assignment strategy: Confluent.Kafka.PartitionAssignmentStrategy.Range to co-localize the partitions of several topics, Confluent.Kafka.PartitionAssignmentStrategy.RoundRobin to evenly distribute the partitions among the consumer group members, Confluent.Kafka.PartitionAssignmentStrategy.CooperativeSticky to evenly distribute the partitions and limit minimize the partitions movements. The default is Confluent.Kafka.PartitionAssignmentStrategy.Range.
public PartitionAssignmentStrategy? PartitionAssignmentStrategy { get; init; }
Property Value
- PartitionAssignmentStrategy?
PollingTimeout
Gets the timeout to wait for the consumer to poll for new messages before initiating a new poll. The default is 500 milliseconds.
public TimeSpan PollingTimeout { get; init; }
Property Value
ProcessPartitionsIndependently
Gets a value indicating whether the partitions must be processed independently.
When true a stream will be published per each partition and the sequences (ChunkSequence,
BatchSequence, ...) cannot span across the partitions.
The default is true.
public bool ProcessPartitionsIndependently { get; init; }
Property Value
Remarks
Settings this value to false implicitly sets the MaxDegreeOfParallelism to 1.
QueuedMaxMessagesKbytes
Gets the maximum number of kilobytes of queued pre-fetched messages to store in the local consumer queue. This setting applies to the single consumer queue, regardless of the number of partitions. This value may be overshot by FetchMaxBytes. This property has higher priority than QueuedMinMessages.
public int? QueuedMaxMessagesKbytes { get; init; }
Property Value
- int?
QueuedMinMessages
Gets the minimum number of messages per topic and partition that the underlying library must try to maintain in the local consumer queue.
public int? QueuedMinMessages { get; init; }
Property Value
- int?
SendOffsetsToTransaction
Gets a value indicating whether the consumer should commit the consumed offsets in the same transaction of the produced
messages. The default is false.
public bool SendOffsetsToTransaction { get; init; }
Property Value
SessionTimeoutMs
Gets the client group session and failure detection timeout (in milliseconds). The consumer sends periodic heartbeats HeartbeatIntervalMs to indicate its liveness to the broker. If no heartbeat is received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance. Also see MaxPollIntervalMs.
public int? SessionTimeoutMs { get; init; }
Property Value
- int?
StallDetectionThreshold
Gets the maximum time to wait for a message to be consumed before the consumer is considered stale. A stale consumer is
automatically restarted.
The default is null, which means that the consumer is never considered stale and is never restarted.
public TimeSpan? StallDetectionThreshold { get; init; }
Property Value
Methods
Equals(KafkaClientConfiguration<ConsumerConfig>?)
public override sealed bool Equals(KafkaClientConfiguration<ConsumerConfig>? other)
Parameters
otherKafkaClientConfiguration<ConsumerConfig>
Returns
Equals(KafkaConsumerConfiguration?)
public bool Equals(KafkaConsumerConfiguration? other)
Parameters
Returns
Equals(object?)
public override bool Equals(object? obj)
Parameters
objobject
Returns
GetHashCode()
public override int GetHashCode()
Returns
MapCore()
Maps to the Confluent client configuration.
protected override ConsumerConfig MapCore()
Returns
- ConsumerConfig
The Confluent client configuration.
PrintMembers(StringBuilder)
protected override bool PrintMembers(StringBuilder builder)
Parameters
builderStringBuilder
Returns
ToString()
public override string ToString()
Returns
Validate()
Throws a SilverbackConfigurationException if the configuration is not valid.
public override void Validate()
Operators
operator ==(KafkaConsumerConfiguration?, KafkaConsumerConfiguration?)
public static bool operator ==(KafkaConsumerConfiguration? left, KafkaConsumerConfiguration? right)
Parameters
Returns
operator !=(KafkaConsumerConfiguration?, KafkaConsumerConfiguration?)
public static bool operator !=(KafkaConsumerConfiguration? left, KafkaConsumerConfiguration? right)