Table of Contents

Class KafkaProducerConfiguration

Namespace
Silverback.Messaging.Configuration.Kafka
Assembly
Silverback.Integration.Kafka.dll

The KafkaProducer configuration.

public sealed record KafkaProducerConfiguration : KafkaClientConfiguration<ProducerConfig>, IValidatableSettings, IEquatable<KafkaClientConfiguration<ProducerConfig>>, IEquatable<KafkaProducerConfiguration>
Inheritance
KafkaClientConfiguration<ProducerConfig>
KafkaProducerConfiguration
Implements
Inherited Members

Constructors

KafkaProducerConfiguration()

public KafkaProducerConfiguration()

Properties

AreDeliveryReportsEnabled

Gets a value indicating whether delivery reports are enabled according to the explicit configuration and Kafka defaults.

public bool AreDeliveryReportsEnabled { get; }

Property Value

bool

BatchNumMessages

Gets the maximum number of messages batched in one message set. The total message set size is also limited by BatchSize and MessageMaxBytes.

public int? BatchNumMessages { get; init; }

Property Value

int?

BatchSize

Gets the maximum size (in bytes) of all messages batched in one message set, including the protocol framing overhead. This limit is applied after the first message has been added to the batch, regardless of the first message size, this is to ensure that messages that exceed the BatchSize are still produced. The total message set size is also limited by BatchNumMessages and MessageMaxBytes.

public int? BatchSize { get; init; }

Property Value

int?

CompressionLevel

Gets the compression level parameter for the algorithm selected by configuration property CompressionType. Higher values will result in better compression at the cost of higher CPU usage. Usable range is algorithm-dependent: [0-9] for gzip, [0-12] for lz4, only 0 for snappy. -1 = codec-dependent default compression level.

public int? CompressionLevel { get; init; }

Property Value

int?

CompressionType

Gets the compression codec to be used to compress message sets. This is the default value for all topics, may be overridden by the topic configuration property compression.codec.

public CompressionType? CompressionType { get; init; }

Property Value

CompressionType?

DisposeOnException

Gets a value indicating whether the producer has to be disposed and recreated if a Confluent.Kafka.KafkaException is thrown. The default is true.

public bool DisposeOnException { get; init; }

Property Value

bool

EnableDeliveryReports

Gets a value indicating whether delivery reports must be sent. Typically you should set this parameter to true. Set it to false for "fire and forget" semantics and a small boost in performance.

public bool? EnableDeliveryReports { get; init; }

Property Value

bool?

EnableGaplessGuarantee

Gets a value indicating whether an error that could result in a gap in the produced message series when a batch of messages fails, must raise a fatal error (ERR_GAPLESS_GUARANTEE) and stop the producer. Messages failing due to MessageTimeoutMs are not covered by this guarantee. Requires EnableIdempotence=true.

public bool? EnableGaplessGuarantee { get; init; }

Property Value

bool?

EnableIdempotence

Gets a value indicating whether the producer must ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: MaxInFlight to 5 (must be less than or equal to 5), MessageSendMaxRetries to Int32.MaxValue (must be greater than 0), Acks to Confluent.Kafka.Acks.All. The producer instantiation will fail if user-supplied configuration is incompatible.

public bool? EnableIdempotence { get; init; }

Property Value

bool?

Endpoints

Gets the configured endpoints.

public IValueReadOnlyCollection<KafkaProducerEndpointConfiguration> Endpoints { get; init; }

Property Value

IValueReadOnlyCollection<KafkaProducerEndpointConfiguration>

EqualityContract

protected override Type EqualityContract { get; }

Property Value

Type

FlushTimeout

Gets the flush operation timeout. The default is 30 seconds.

public TimeSpan FlushTimeout { get; init; }

Property Value

TimeSpan

LingerMs

Gets the delay in milliseconds to wait for messages in the producer queue to accumulate before constructing message batches to transmit to brokers. A higher value allows larger and more effective (less overhead, improved compression) batches of messages to accumulate at the expense of increased message delivery latency.

public double? LingerMs { get; init; }

Property Value

double?

MessageSendMaxRetries

Gets how many times to retry sending a failing message.
Note: retrying may cause reordering unless EnableIdempotence is set to true.

public int? MessageSendMaxRetries { get; init; }

Property Value

int?

MessageTimeoutMs

Gets the local message timeout (in milliseconds). This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite. This is the maximum time to deliver a message (including retries) and a delivery error will occur when either the retry count or the message timeout are exceeded. The message timeout is automatically adjusted to TransactionTimeoutMs if TransactionalId is set.

public int? MessageTimeoutMs { get; init; }

Property Value

int?

Partitioner

Gets the partitioner to be used to decide the target partition for a message: Confluent.Kafka.Partitioner.Random to randomly distribute the messages, Confluent.Kafka.Partitioner.Consistent to use the CRC32 hash of the message key (empty and null keys are mapped to a single partition), Confluent.Kafka.Partitioner.ConsistentRandom to use the CRC32 hash of the message key (but empty and null keys are randomly partitioned), Confluent.Kafka.Partitioner.Murmur2 to use a Java Producer compatible Murmur2 hash of the message key (null keys are mapped to a single partition), or Confluent.Kafka.Partitioner.Murmur2Random to use a Java Producer compatible Murmur2 hash of the message key (but null keys are randomly partitioned).
The default is Confluent.Kafka.Partitioner.ConsistentRandom, while Confluent.Kafka.Partitioner.Murmur2Random is functionally equivalent to the default partitioner in the Java Producer.

public Partitioner? Partitioner { get; init; }

Property Value

Partitioner?

QueueBufferingBackpressureThreshold

Gets the threshold of outstanding not yet transmitted broker requests needed to backpressure the producer's message accumulator. If the number of not yet transmitted requests equals or exceeds this number, produce request creation that would have otherwise been triggered (for example, in accordance with LingerMs) will be delayed. A lower number yields larger and more effective batches. A higher value can improve latency when using compression on slow machines.

public int? QueueBufferingBackpressureThreshold { get; init; }

Property Value

int?

QueueBufferingMaxKbytes

Gets the maximum total message size sum allowed on the producer queue. This queue is shared by all topics and partitions. This property has higher priority than QueueBufferingMaxMessages.

public int? QueueBufferingMaxKbytes { get; init; }

Property Value

int?

QueueBufferingMaxMessages

Gets the maximum number of messages allowed on the producer queue. This queue is shared by all topics and partitions.

public int? QueueBufferingMaxMessages { get; init; }

Property Value

int?

RequestTimeoutMs

Gets the ack timeout of the producer request in milliseconds. This value is only enforced by the broker and relies on request.required.acks being != 0.

public int? RequestTimeoutMs { get; init; }

Property Value

int?

SendOffsetToTransactionTimeout

Gets the timeout for sending the offset to the transaction. The default is 10 seconds.

public TimeSpan SendOffsetToTransactionTimeout { get; init; }

Property Value

TimeSpan

StickyPartitioningLingerMs

Gets the delay in milliseconds to wait to assign new sticky partitions for each topic. By default this is set to double the time of LingerMs. To disable sticky behavior, set it to 0. This behavior affects messages with the key null in all cases, and messages with key lengths of zero when the Confluent.Kafka.Partitioner.ConsistentRandom partitioner is in use. These messages would otherwise be assigned randomly. A higher value allows for more effective batching of these messages.

public int? StickyPartitioningLingerMs { get; init; }

Property Value

int?

ThrowIfNotAcknowledged

Gets a value indicating whether an exception must be thrown by the producer if the persistence is not acknowledge by the broker. The default is true.

public bool ThrowIfNotAcknowledged { get; init; }

Property Value

bool

TransactionAbortTimeout

Gets the transaction abort operation timeout. The default is 30 seconds.

public TimeSpan TransactionAbortTimeout { get; init; }

Property Value

TimeSpan

TransactionCommitTimeout

Gets the transaction commit operation timeout. The default is 30 seconds.

public TimeSpan TransactionCommitTimeout { get; init; }

Property Value

TimeSpan

TransactionTimeoutMs

Gets the maximum amount of time in milliseconds that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. If this value is larger than the transaction.max.timeout.ms setting in the broker, the init transaction call will fail with ERR_INVALID_TRANSACTION_TIMEOUT. The transaction timeout automatically adjusts MessageTimeoutMs and SocketTimeoutMs unless explicitly configured in which case they must not exceed the transaction timeout (SocketTimeoutMs must be at least 100ms lower than TransactionTimeoutMs).

public int? TransactionTimeoutMs { get; init; }

Property Value

int?

TransactionalId

Gets the identifier to be used to identify the same transactional producer instance across process restarts. This is required to enable the transactional producer and it allows the producer to guarantee that transactions corresponding to earlier instances of the same producer have been finalized prior to starting any new transaction, and that any zombie instances are fenced off. If no TransactionalId is provided, then the producer is limited to idempotent delivery (see EnableIdempotence). Requires broker version >= 0.11.0.

public string? TransactionalId { get; init; }

Property Value

string

TransactionsInitTimeout

Gets the transactions init operation timeout. The default is 30 seconds.

public TimeSpan TransactionsInitTimeout { get; init; }

Property Value

TimeSpan

Methods

Equals(KafkaClientConfiguration<ProducerConfig>?)

public override sealed bool Equals(KafkaClientConfiguration<ProducerConfig>? other)

Parameters

other KafkaClientConfiguration<ProducerConfig>

Returns

bool

Equals(KafkaProducerConfiguration?)

public bool Equals(KafkaProducerConfiguration? other)

Parameters

other KafkaProducerConfiguration

Returns

bool

Equals(object?)

public override bool Equals(object? obj)

Parameters

obj object

Returns

bool

GetHashCode()

public override int GetHashCode()

Returns

int

MapCore()

Maps to the Confluent client configuration.

protected override ProducerConfig MapCore()

Returns

ProducerConfig

The Confluent client configuration.

PrintMembers(StringBuilder)

protected override bool PrintMembers(StringBuilder builder)

Parameters

builder StringBuilder

Returns

bool

ToString()

public override string ToString()

Returns

string

Validate()

Throws a SilverbackConfigurationException if the configuration is not valid.

public override void Validate()

Operators

operator ==(KafkaProducerConfiguration?, KafkaProducerConfiguration?)

public static bool operator ==(KafkaProducerConfiguration? left, KafkaProducerConfiguration? right)

Parameters

left KafkaProducerConfiguration
right KafkaProducerConfiguration

Returns

bool

operator !=(KafkaProducerConfiguration?, KafkaProducerConfiguration?)

public static bool operator !=(KafkaProducerConfiguration? left, KafkaProducerConfiguration? right)

Parameters

left KafkaProducerConfiguration
right KafkaProducerConfiguration

Returns

bool