Class ConfluentConsumerConfigProxy
Wraps the Confluent.Kafka.ConsumerConfig.
Implements
Inherited Members
Namespace: Silverback.Messaging.Configuration.Kafka
Assembly: Silverback.Integration.Kafka.dll
Syntax
public abstract class ConfluentConsumerConfigProxy : ConfluentClientConfigProxy, IValidatableEndpointSettings
Constructors
| Improve this doc View sourceConfluentConsumerConfigProxy(ClientConfig?)
Initializes a new instance of the ConfluentConsumerConfigProxy class.
Declaration
protected ConfluentConsumerConfigProxy(ClientConfig? clientConfig = null)
Parameters
Type | Name | Description |
---|---|---|
ClientConfig | clientConfig | The Confluent.Kafka.ClientConfig to be used to initialize the Confluent.Kafka.ConsumerConfig. |
Properties
| Improve this doc View sourceAutoCommitIntervalMs
The frequency in milliseconds that the consumer offsets are committed (written) to offset storage. (0 = disable). This setting is used by the high-level consumer.
default: 5000
importance: medium
Declaration
public int? AutoCommitIntervalMs { get; set; }
Property Value
Type | Description |
---|---|
int? |
AutoOffsetReset
Action to take when there is no initial offset in offset store or the desired offset is out of range: 'smallest','earliest' - automatically reset the offset to the smallest offset, 'largest','latest' - automatically reset the offset to the largest offset, 'error' - trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by consuming messages and checking 'message->err'.
default: largest
importance: high
Declaration
public AutoOffsetReset? AutoOffsetReset { get; set; }
Property Value
Type | Description |
---|---|
AutoOffsetReset? |
CheckCrcs
Verify CRC32 of consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage.
default: false
importance: medium
Declaration
public bool? CheckCrcs { get; set; }
Property Value
Type | Description |
---|---|
bool? |
ConfluentConfig
Gets the Confluent.Kafka.ClientConfig instance being wrapped.
Declaration
protected ConsumerConfig ConfluentConfig { get; }
Property Value
Type | Description |
---|---|
ConsumerConfig |
ConsumeResultFields
A comma separated list of fields that may be optionally set
in Confluent.Kafka.ConsumeResult<TKey, TValue>
objects returned by the
Consume(TimeSpan)
method. Disabling fields that you do not require will improve
throughput and reduce memory consumption. Allowed values:
headers, timestamp, topic, all, none
default: all
importance: low
Declaration
public string ConsumeResultFields { set; }
Property Value
Type | Description |
---|---|
string |
CoordinatorQueryIntervalMs
How often to query for the current client group coordinator. If the currently assigned coordinator is down the configured query interval will be divided by ten to more quickly recover in case of coordinator reassignment.
default: 600000
importance: low
Declaration
public int? CoordinatorQueryIntervalMs { get; set; }
Property Value
Type | Description |
---|---|
int? |
EnableAutoCommit
Automatically and periodically commit offsets in the background. Note: setting this to false does not prevent the consumer from fetching previously committed start offsets. To circumvent this behaviour set specific start offsets per partition in the call to assign().
default: true
importance: high
Declaration
public bool? EnableAutoCommit { get; set; }
Property Value
Type | Description |
---|---|
bool? |
EnableAutoOffsetStore
Automatically store offset of last message provided to application. The offset store is an in-memory store of the next offset to (auto-)commit for each partition.
default: true
importance: high
Declaration
public bool? EnableAutoOffsetStore { get; set; }
Property Value
Type | Description |
---|---|
bool? |
EnablePartitionEof
Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the consumer reaches the end of a partition.
default: false
importance: low
Declaration
public bool? EnablePartitionEof { get; set; }
Property Value
Type | Description |
---|---|
bool? |
FetchErrorBackoffMs
How long to postpone the next fetch request for a topic+partition in case of a fetch error.
default: 500
importance: medium
Declaration
public int? FetchErrorBackoffMs { get; set; }
Property Value
Type | Description |
---|---|
int? |
FetchMaxBytes
Maximum amount of data the broker shall return for a Fetch request. Messages are fetched in batches by the consumer and if the first message batch in the first non-empty partition of the Fetch request is larger than this value, then the message batch will still be returned to ensure the consumer can make progress. The maximum message batch size accepted by the broker is defined via message.max.bytes
(broker config) or max.message.bytes
(broker topic config). fetch.max.bytes
is automatically adjusted upwards to be at least message.max.bytes
(consumer config).
default: 52428800
importance: medium
Declaration
public int? FetchMaxBytes { get; set; }
Property Value
Type | Description |
---|---|
int? |
FetchMinBytes
Minimum number of bytes the broker responds with. If fetch.wait.max.ms expires the accumulated data will be sent to the client regardless of this setting.
default: 1
importance: low
Declaration
public int? FetchMinBytes { get; set; }
Property Value
Type | Description |
---|---|
int? |
FetchQueueBackoffMs
How long to postpone the next fetch request for a topic+partition in case the current fetch queue thresholds (queued.min.messages or queued.max.messages.kbytes) have been exceded. This property may need to be decreased if the queue thresholds are set low and the application is experiencing long (~1s) delays between messages. Low values may increase CPU utilization.
default: 1000
importance: medium
Declaration
public int? FetchQueueBackoffMs { get; set; }
Property Value
Type | Description |
---|---|
int? |
FetchWaitMaxMs
Maximum time the broker may wait to fill the Fetch response with fetch.min.bytes of messages.
default: 500
importance: low
Declaration
public int? FetchWaitMaxMs { get; set; }
Property Value
Type | Description |
---|---|
int? |
GroupId
Client group id string. All clients sharing the same group.id belong to the same group.
default: ''
importance: high
Declaration
public abstract string GroupId { get; set; }
Property Value
Type | Description |
---|---|
string |
GroupInstanceId
Enable static group membership. Static group members are able to leave and rejoin a group within the configured session.timeout.ms
without prompting a group rebalance. This should be used in combination with a larger session.timeout.ms
to avoid group rebalances caused by transient unavailability (e.g. process restarts). Requires broker version >= 2.3.0.
default: ''
importance: medium
Declaration
public string GroupInstanceId { get; set; }
Property Value
Type | Description |
---|---|
string |
GroupProtocol
Group protocol type. NOTE: Currently, the only supported group protocol type is consumer
.
default: consumer
importance: low
Declaration
public GroupProtocol? GroupProtocol { get; set; }
Property Value
Type | Description |
---|---|
GroupProtocol? |
GroupProtocolType
Group protocol type. NOTE: Currently, the only supported group protocol type is consumer
.
default: consumer
importance: low
Declaration
public string GroupProtocolType { get; set; }
Property Value
Type | Description |
---|---|
string |
GroupRemoteAssignor
Server side assignor to use. Keep it null to make server select a suitable assignor for the group. Available assignors: uniform or range. Default is null
default: ''
importance: medium
Declaration
public string GroupRemoteAssignor { get; set; }
Property Value
Type | Description |
---|---|
string |
HeartbeatIntervalMs
Group session keepalive heartbeat interval.
default: 3000
importance: low
Declaration
public int? HeartbeatIntervalMs { get; set; }
Property Value
Type | Description |
---|---|
int? |
IsolationLevel
Controls how to read messages written transactionally: read_committed
- only return transactional messages which have been committed. read_uncommitted
- return all messages, even transactional messages which have been aborted.
default: read_committed
importance: high
Declaration
public IsolationLevel? IsolationLevel { get; set; }
Property Value
Type | Description |
---|---|
IsolationLevel? |
MaxPartitionFetchBytes
Initial maximum number of bytes per topic+partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched.
default: 1048576
importance: medium
Declaration
public int? MaxPartitionFetchBytes { get; set; }
Property Value
Type | Description |
---|---|
int? |
MaxPollIntervalMs
Maximum allowed time between calls to consume messages (e.g., rd_kafka_consumer_poll()) for high-level consumers. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. Warning: Offset commits may be not possible at this point. Note: It is recommended to set enable.auto.offset.store=false
for long-time processing applications and then explicitly store offsets (using offsets_store()) after message processing, to make sure offsets are not auto-committed prior to processing has finished. The interval is checked two times per second. See KIP-62 for more information.
default: 300000
importance: high
Declaration
public int? MaxPollIntervalMs { get; set; }
Property Value
Type | Description |
---|---|
int? |
PartitionAssignmentStrategy
The name of one or more partition assignment strategies. The elected group leader will use a strategy supported by all members of the group to assign partitions to group members. If there is more than one eligible strategy, preference is determined by the order of this list (strategies earlier in the list have higher priority). Cooperative and non-cooperative (eager) strategies must not be mixed. Available strategies: range, roundrobin, cooperative-sticky.
default: range,roundrobin
importance: medium
Declaration
public PartitionAssignmentStrategy? PartitionAssignmentStrategy { get; set; }
Property Value
Type | Description |
---|---|
PartitionAssignmentStrategy? |
QueuedMaxMessagesKbytes
Maximum number of kilobytes of queued pre-fetched messages in the local consumer queue. If using the high-level consumer this setting applies to the single consumer queue, regardless of the number of partitions. When using the legacy simple consumer or when separate partition queues are used this setting applies per partition. This value may be overshot by fetch.message.max.bytes. This property has higher priority than queued.min.messages.
default: 65536
importance: medium
Declaration
public int? QueuedMaxMessagesKbytes { get; set; }
Property Value
Type | Description |
---|---|
int? |
QueuedMinMessages
Minimum number of messages per topic+partition librdkafka tries to maintain in the local consumer queue.
default: 100000
importance: medium
Declaration
public int? QueuedMinMessages { get; set; }
Property Value
Type | Description |
---|---|
int? |
SessionTimeoutMs
Client group session and failure detection timeout. The consumer sends periodic heartbeats (heartbeat.interval.ms) to indicate its liveness to the broker. If no hearts are received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance. The allowed range is configured with the broker configuration properties group.min.session.timeout.ms
and group.max.session.timeout.ms
. Also see max.poll.interval.ms
.
default: 45000
importance: high
Declaration
public int? SessionTimeoutMs { get; set; }
Property Value
Type | Description |
---|---|
int? |