Show / Hide Table of Contents

    Class KafkaConsumerConfigurationBuilder

    Builds the KafkaConsumerConfiguration.

    Inheritance
    object
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>
    KafkaConsumerConfigurationBuilder
    Inherited Members
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslMechanism(SaslMechanism?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithAcks(Acks?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithBootstrapServers(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithMessageMaxBytes(int?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithMessageCopyMaxBytes(int?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithReceiveMessageMaxBytes(int?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithMaxInFlight(int?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithMetadataRecoveryStrategy(MetadataRecoveryStrategy?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithMetadataRecoveryRebootstrapTriggerMs(int?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithTopicMetadataRefreshIntervalMs(int?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithMetadataMaxAgeMs(int?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithTopicMetadataRefreshFastIntervalMs(int?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.EnableSparseTopicMetadataRefresh()
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.DisableSparseTopicMetadataRefresh()
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithTopicMetadataPropagationMaxMs(int?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithTopicBlacklist(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithDebug(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSocketTimeoutMs(int?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSocketSendBufferBytes(int?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSocketReceiveBufferBytes(int?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.EnableSocketKeepalive()
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.DisableSocketKeepalive()
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.DisableSocketNagle()
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.EnableSocketNagle()
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSocketMaxFails(int?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithBrokerAddressTtl(int?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithBrokerAddressFamily(BrokerAddressFamily?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSocketConnectionSetupTimeoutMs(int?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithConnectionsMaxIdleMs(int?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithReconnectBackoffMs(int?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithReconnectBackoffMaxMs(int?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithStatisticsIntervalMs(int?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.EnableApiVersionRequest()
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.DisableApiVersionRequest()
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithApiVersionRequestTimeoutMs(int?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithApiVersionFallbackMs(int?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithBrokerVersionFallback(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.AllowAutoCreateTopics()
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.DisallowAutoCreateTopics()
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSecurityProtocol(SecurityProtocol?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSslCipherSuites(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSslCurvesList(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSslSigalgsList(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSslKeyLocation(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSslKeyPassword(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSslKeyPem(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSslCertificateLocation(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSslCertificatePem(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSslCaLocation(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithHttpsCaLocation(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithHttpsCaPem(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSslCaPem(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSslCaCertificateStores(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSslCrlLocation(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSslKeystoreLocation(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSslKeystorePassword(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSslProviders(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSslEngineLocation(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSslEngineId(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.EnableSslCertificateVerification()
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.DisableSslCertificateVerification()
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSslEndpointIdentificationAlgorithm(SslEndpointIdentificationAlgorithm?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslKerberosServiceName(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslKerberosPrincipal(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslKerberosKinitCmd(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslKerberosKeytab(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslKerberosMinTimeBeforeRelogin(int?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslUsername(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslPassword(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslOauthbearerConfig(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.EnableSaslOauthbearerUnsecureJwt()
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.DisableSaslOauthbearerUnsecureJwt()
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslOauthbearerMethod(SaslOauthbearerMethod?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslOauthbearerClientId(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslOauthbearerClientSecret(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslOauthbearerScope(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslOauthbearerExtensions(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslOauthbearerTokenEndpointUrl(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslOauthbearerGrantType(SaslOauthbearerGrantType?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslOauthbearerAssertionAlgorithm(SaslOauthbearerAssertionAlgorithm?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslOauthbearerAssertionPrivateKeyFile(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslOauthbearerAssertionPrivateKeyPassphrase(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslOauthbearerAssertionPrivateKeyPem(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslOauthbearerAssertionFile(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslOauthbearerAssertionClaimAud(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslOauthbearerAssertionClaimExpSeconds(int?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslOauthbearerAssertionClaimIss(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslOauthbearerAssertionClaimJtiInclude(bool?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslOauthbearerAssertionClaimNbfSeconds(int?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslOauthbearerAssertionClaimSub(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslOauthbearerAssertionJwtTemplateFile(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithSaslOauthbearerMetadataAuthenticationType(SaslOauthbearerMetadataAuthenticationType?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithPluginLibraryPaths(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithClientRack(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithCancellationDelayMaxMs(int)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithClientId(string)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithRetryBackoffMs(int?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithRetryBackoffMaxMs(int?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.WithClientDnsLookup(ClientDnsLookup?)
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.EnableMetricsPush()
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.DisableMetricsPush()
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.BuildCore()
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.Config
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.This
    object.GetType()
    object.MemberwiseClone()
    object.ToString()
    object.Equals(object)
    object.Equals(object, object)
    object.ReferenceEquals(object, object)
    object.GetHashCode()
    Namespace: Silverback.Messaging.Configuration.Kafka
    Assembly: Silverback.Integration.Kafka.dll
    Syntax
    public class KafkaConsumerConfigurationBuilder : KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>

    Constructors

    KafkaConsumerConfigurationBuilder(IServiceProvider)

    Initializes a new instance of the KafkaConsumerConfigurationBuilder class.

    Declaration
    public KafkaConsumerConfigurationBuilder(IServiceProvider serviceProvider)
    Parameters
    Type Name Description
    IServiceProvider serviceProvider

    The IServiceProvider instance.

    Properties

    This

    Gets this instance.

    Declaration
    protected override KafkaConsumerConfigurationBuilder This { get; }
    Property Value
    Type Description
    KafkaConsumerConfigurationBuilder
    Overrides
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.This
    Remarks

    This is necessary to work around casting in the base classes.

    Methods

    AutoResetOffsetToEarliest()

    Specifies that the offset needs to be reset to the smallest offset, when there is no initial offset in the offset store or the desired offset is out of range.

    Declaration
    public KafkaConsumerConfigurationBuilder AutoResetOffsetToEarliest()
    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    AutoResetOffsetToLatest()

    Specifies that the offset needs to be reset to the largest offset, when there is no initial offset in the offset store or the desired offset is out of range.

    Declaration
    public KafkaConsumerConfigurationBuilder AutoResetOffsetToLatest()
    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    Build()

    Builds the KafkaConsumerConfiguration instance.

    Declaration
    public KafkaConsumerConfiguration Build()
    Returns
    Type Description
    KafkaConsumerConfiguration

    The KafkaConsumerConfiguration.

    BuildCore()

    Builds the configuration.

    Declaration
    protected override KafkaConsumerConfiguration BuildCore()
    Returns
    Type Description
    KafkaConsumerConfiguration

    The configuration.

    Overrides
    KafkaClientConfigurationBuilder<KafkaConsumerConfiguration, ConsumerConfig, KafkaConsumerConfigurationBuilder>.BuildCore()

    CommitOffsetEach(int)

    Defines the number of message to be processed before committing the offset to the server. The most reliable level is 1 but it reduces throughput.

    Declaration
    public KafkaConsumerConfigurationBuilder CommitOffsetEach(int commitOffsetEach)
    Parameters
    Type Name Description
    int commitOffsetEach

    The number of message to be processed before committing the offset to the server.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    Remarks

    Setting this value automatically disables automatic offset commit (see EnableAutoCommit()/DisableAutoCommit()).

    Consume(Action<KafkaConsumerEndpointConfigurationBuilder<object>>)

    Adds a consumer endpoint, which is a topic, a partition, or a group of topics/partitions that share the same configuration (deserializer, error policies, etc.).

    Declaration
    public KafkaConsumerConfigurationBuilder Consume(Action<KafkaConsumerEndpointConfigurationBuilder<object>> configurationBuilderAction)
    Parameters
    Type Name Description
    Action<KafkaConsumerEndpointConfigurationBuilder<object>> configurationBuilderAction

    An Action that takes the KafkaConsumerConfigurationBuilder and configures it.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    Consume(string?, Action<KafkaConsumerEndpointConfigurationBuilder<object>>)

    Adds a consumer endpoint, which is a topic, a partition, or a group of topics/partitions that share the same configuration (deserializer, error policies, etc.).

    Declaration
    public KafkaConsumerConfigurationBuilder Consume(string? name, Action<KafkaConsumerEndpointConfigurationBuilder<object>> configurationBuilderAction)
    Parameters
    Type Name Description
    string name

    The name is used to guarantee that a duplicated configuration is discarded and is also displayed in the logs. By default, the name will be generated concatenating the topic name(s).

    Action<KafkaConsumerEndpointConfigurationBuilder<object>> configurationBuilderAction

    An Action that takes the KafkaConsumerConfigurationBuilder and configures it.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    Consume<TMessage>(Action<KafkaConsumerEndpointConfigurationBuilder<TMessage>>)

    Adds a consumer endpoint, which is a topic, a partition, or a group of topics/partitions that share the same configuration (deserializer, error policies, etc.).

    Declaration
    public KafkaConsumerConfigurationBuilder Consume<TMessage>(Action<KafkaConsumerEndpointConfigurationBuilder<TMessage>> configurationBuilderAction)
    Parameters
    Type Name Description
    Action<KafkaConsumerEndpointConfigurationBuilder<TMessage>> configurationBuilderAction

    An Action that takes the KafkaConsumerConfigurationBuilder and configures it.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    Type Parameters
    Name Description
    TMessage

    The type (or base type) of the messages being consumed. This is used to setup the deserializer and will determine the type of the message parameter in the nested configuration functions.

    Consume<TMessage>(string?, Action<KafkaConsumerEndpointConfigurationBuilder<TMessage>>)

    Adds a consumer endpoint, which is a topic, a partition, or a group of topics/partitions that share the same configuration (deserializer, error policies, etc.).

    Declaration
    public KafkaConsumerConfigurationBuilder Consume<TMessage>(string? name, Action<KafkaConsumerEndpointConfigurationBuilder<TMessage>> configurationBuilderAction)
    Parameters
    Type Name Description
    string name

    The name is used to guarantee that a duplicated configuration is discarded and is also displayed in the logs. By default, the name will be generated concatenating the topic name(s).

    Action<KafkaConsumerEndpointConfigurationBuilder<TMessage>> configurationBuilderAction

    An Action that takes the KafkaConsumerConfigurationBuilder and configures it.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    Type Parameters
    Name Description
    TMessage

    The type (or base type) of the messages being consumed. This is used to setup the deserializer and will determine the type of the message parameter in the nested configuration functions.

    DisableAutoCommit()

    Disable automatic offsets commit. Note: setting this does not prevent the consumer from fetching previously committed start offsets. To circumvent this behaviour set specific start offsets per partition in the call to assign().

    Declaration
    public KafkaConsumerConfigurationBuilder DisableAutoCommit()
    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    Remarks

    See also CommitOffsetEach(int).

    DisableAutoOffsetReset()

    Specifies that an error (ERR__AUTO_OFFSET_RESET) must be triggered, when there is no initial offset in the offset store or the desired offset is out of range.

    Declaration
    public KafkaConsumerConfigurationBuilder DisableAutoOffsetReset()
    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    DisableAutoRecovery()

    Specifies that the consumer doesn't have to be automatically recycled when a Confluent.Kafka.KafkaException is thrown while polling/consuming or an issues is detected (e.g. a poll timeout is reported).

    Declaration
    public KafkaConsumerConfigurationBuilder DisableAutoRecovery()
    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    DisableCheckCrcs()

    Disables the verification of the CRC32 of the consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage.

    Declaration
    public KafkaConsumerConfigurationBuilder DisableCheckCrcs()
    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    DisableOffsetsCommit()

    Disables the offsets commit.

    Declaration
    public KafkaConsumerConfigurationBuilder DisableOffsetsCommit()
    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    Remarks

    Disabling offsets commit will also disable auto commit (see DisableAutoCommit()) and clear the CommitOffsetEach setting (see CommitOffsetEach(int)).

    DisablePartitionEof()

    Don't invoke the IKafkaPartitionEofCallback when a partition end of file is reached.

    Declaration
    public KafkaConsumerConfigurationBuilder DisablePartitionEof()
    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    DisableSendOffsetsToTransaction()

    Specifies that the consumer should not commit the consumed offsets in the same transaction of the produced messages. This is the default.

    Declaration
    public KafkaConsumerConfigurationBuilder DisableSendOffsetsToTransaction()
    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    EnableAutoCommit()

    Automatically and periodically commit offsets in the background.

    Declaration
    public KafkaConsumerConfigurationBuilder EnableAutoCommit()
    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    Remarks

    Enabling automatic offsets commit will clear the CommitOffsetEach setting (see CommitOffsetEach(int)).

    EnableAutoRecovery()

    Specifies that the consumer has to be automatically recycled when a Confluent.Kafka.KafkaException is thrown while polling/consuming or an issues is detected (e.g. a poll timeout is reported). This is the default.

    Declaration
    public KafkaConsumerConfigurationBuilder EnableAutoRecovery()
    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    EnableCheckCrcs()

    Enables the verification of the CRC32 of the consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage.

    Declaration
    public KafkaConsumerConfigurationBuilder EnableCheckCrcs()
    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    EnableOffsetsCommit()

    Enable the offsets commit. This is the default.

    Declaration
    public KafkaConsumerConfigurationBuilder EnableOffsetsCommit()
    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    EnablePartitionEof()

    Invoke the IKafkaPartitionEofCallback whenever a partition end of file is reached.

    Declaration
    public KafkaConsumerConfigurationBuilder EnablePartitionEof()
    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    LimitBackpressure(int)

    Sets the maximum number of messages to be consumed and enqueued waiting to be processed. The limit will be applied per partition when processing the partitions independently (default). The default limit is 2.

    Declaration
    public KafkaConsumerConfigurationBuilder LimitBackpressure(int backpressureLimit)
    Parameters
    Type Name Description
    int backpressureLimit

    The maximum number of messages to be enqueued.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    LimitParallelism(int)

    Sets the maximum number of incoming message that can be processed concurrently. Up to a message per each subscribed partition can be processed in parallel. The default limit is 100.

    Declaration
    public KafkaConsumerConfigurationBuilder LimitParallelism(int maxDegreeOfParallelism)
    Parameters
    Type Name Description
    int maxDegreeOfParallelism

    The maximum number of incoming message that can be processed concurrently.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    ProcessAllPartitionsTogether()

    Specifies that all partitions must be processed together. This means that a single stream will published for the messages from all the partitions and the sequences (ChunkSequence, BatchSequence, ...) can span across the partitions.

    Declaration
    public KafkaConsumerConfigurationBuilder ProcessAllPartitionsTogether()
    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    ProcessPartitionsIndependently()

    Specifies that the partitions must be processed independently. This means that a stream will published per each partition and the sequences (ChunkSequence, BatchSequence, ...) cannot span across the partitions. This option is enabled by default. Use ProcessAllPartitionsTogether() to disable it.

    Declaration
    public KafkaConsumerConfigurationBuilder ProcessPartitionsIndependently()
    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    SendOffsetsToTransaction()

    Specifies that the consumer should commit the consumed offsets in the same transaction of the produced messages.

    Declaration
    public KafkaConsumerConfigurationBuilder SendOffsetsToTransaction()
    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    StoreOffsetsClientSide(KafkaOffsetStoreSettings)

    Specifies that the offsets have to stored in the specified client store, additionally to or instead of being committed to the message broker.

    Declaration
    public KafkaConsumerConfigurationBuilder StoreOffsetsClientSide(KafkaOffsetStoreSettings settings)
    Parameters
    Type Name Description
    KafkaOffsetStoreSettings settings

    The offset store settings.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    StoreOffsetsClientSide(Func<KafkaOffsetStoreSettingsBuilder, IKafkaOffsetStoreSettingsImplementationBuilder>)

    Specifies that the offsets have to stored in the specified client store, additionally to or instead of being committed to the message broker.

    Declaration
    public KafkaConsumerConfigurationBuilder StoreOffsetsClientSide(Func<KafkaOffsetStoreSettingsBuilder, IKafkaOffsetStoreSettingsImplementationBuilder> settingsBuilderFunc)
    Parameters
    Type Name Description
    Func<KafkaOffsetStoreSettingsBuilder, IKafkaOffsetStoreSettingsImplementationBuilder> settingsBuilderFunc

    A Func<TResult> that takes the KafkaOffsetStoreSettingsBuilder and configures it.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    WithAutoCommitIntervalMs(int?)

    Sets the frequency in milliseconds at which the consumer offsets are committed.

    Declaration
    public KafkaConsumerConfigurationBuilder WithAutoCommitIntervalMs(int? autoCommitIntervalMs)
    Parameters
    Type Name Description
    int? autoCommitIntervalMs

    The frequency at which the consumer offsets are committed.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    WithAutoOffsetReset(AutoOffsetReset?)

    Sets the action to take when there is no initial offset in the offset store or the desired offset is out of range: Confluent.Kafka.AutoOffsetReset.Earliest to automatically reset to the smallest offset, Confluent.Kafka.AutoOffsetReset.Latest to automatically reset to the largest offset, and Confluent.Kafka.AutoOffsetReset.Error to trigger an error (ERR__AUTO_OFFSET_RESET).

    Declaration
    public KafkaConsumerConfigurationBuilder WithAutoOffsetReset(AutoOffsetReset? autoOffsetReset)
    Parameters
    Type Name Description
    AutoOffsetReset? autoOffsetReset

    The action to take when there is no initial offset in the offset store or the desired offset is out of range.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    WithConsumeResultFields(string?)

    Sets a comma-separated list of fields that may be optionally set in Confluent.Kafka.ConsumeResult<TKey, TValue> objects returned by the Consume(TimeSpan) method. Disabling fields that you do not require will improve throughput and reduce memory consumption. Allowed values: headers, timestamp, topic, all, none.

    Declaration
    public KafkaConsumerConfigurationBuilder WithConsumeResultFields(string? consumeResultFields)
    Parameters
    Type Name Description
    string consumeResultFields

    A comma-separated list of fields that may be optionally set in Confluent.Kafka.ConsumeResult<TKey, TValue>.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    WithCooperativeStickyPartitionAssignmentStrategy()

    Sets the partition assignment strategy to Confluent.Kafka.PartitionAssignmentStrategy.CooperativeSticky to evenly distribute the partitions and minimize the partitions movements.

    Declaration
    public KafkaConsumerConfigurationBuilder WithCooperativeStickyPartitionAssignmentStrategy()
    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    WithCoordinatorQueryIntervalMs(int?)

    Sets the interval (in milliseconds) at which the current group coordinator must be queried. If the currently assigned coordinator is down the configured query interval will be divided by ten to more quickly recover in case of coordinator reassignment.

    Declaration
    public KafkaConsumerConfigurationBuilder WithCoordinatorQueryIntervalMs(int? coordinatorQueryIntervalMs)
    Parameters
    Type Name Description
    int? coordinatorQueryIntervalMs

    The interval at which the current group coordinator must be queried.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    WithFetchErrorBackoffMs(int?)

    Sets how long to postpone the next fetch request for a topic and partition in case of a fetch error.

    Declaration
    public KafkaConsumerConfigurationBuilder WithFetchErrorBackoffMs(int? fetchErrorBackoffMs)
    Parameters
    Type Name Description
    int? fetchErrorBackoffMs

    How long to postpone the next fetch request in case of a fetch error.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    WithFetchMaxBytes(int?)

    Sets the maximum amount of data the broker shall return for a fetch request. The messages are fetched in batches by the consumer and if the first message batch in the first non-empty partition of the fetch request is larger than this value, then the message batch will still be returned to ensure that the consumer can make progress. The maximum message batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (broker topic config). This value is automatically adjusted upwards to be at least message.max.bytes (consumer config).

    Declaration
    public KafkaConsumerConfigurationBuilder WithFetchMaxBytes(int? fetchMaxBytes)
    Parameters
    Type Name Description
    int? fetchMaxBytes

    The maximum amount of data the broker shall return for a fetch request.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    WithFetchMinBytes(int?)

    Sets the minimum number of bytes that the broker must respond with. If FetchWaitMaxMs expires the accumulated data will be sent to the client regardless of this setting.

    Declaration
    public KafkaConsumerConfigurationBuilder WithFetchMinBytes(int? fetchMinBytes)
    Parameters
    Type Name Description
    int? fetchMinBytes

    The minimum number of bytes that the broker must respond with.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    WithFetchQueueBackoffMs(int?)

    Sets the maximum time in milliseconds to postpone the next fetch request for a topic+partition in case the current fetch queue thresholds (QueuedMinMessages or QueuedMaxMessagesKbytes) have been exceeded. This property may need to be decreased if the queue thresholds are set low and the application is experiencing long (~1s) delays between messages. Low values may increase CPU utilization.

    Declaration
    public KafkaConsumerConfigurationBuilder WithFetchQueueBackoffMs(int? fetchQueueBackoffMs)
    Parameters
    Type Name Description
    int? fetchQueueBackoffMs

    The maximum time in milliseconds to postpone the next fetch request for a topic+partition in case the current fetch queue thresholds have been exceeded.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    WithFetchWaitMaxMs(int?)

    Sets the maximum time (in milliseconds) that the broker may wait to fill the fetch response with enough messages to match the size specified by FetchMinBytes.

    Declaration
    public KafkaConsumerConfigurationBuilder WithFetchWaitMaxMs(int? fetchWaitMaxMs)
    Parameters
    Type Name Description
    int? fetchWaitMaxMs

    The maximum time that the broker may wait to fill the fetch response.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    WithGetMetadataTimeout(TimeSpan?)

    Sets the timeout used to wait for the metadata to be retrieved from the broker. The default is 30 seconds.

    Declaration
    public KafkaConsumerConfigurationBuilder WithGetMetadataTimeout(TimeSpan? getMetadataTimeout)
    Parameters
    Type Name Description
    TimeSpan? getMetadataTimeout

    The timeout used to wait for the metadata to be retrieved from the broker.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    WithGroupId(string?)

    Sets the client group id string. All clients sharing the same group.id belong to the same group.

    Declaration
    public KafkaConsumerConfigurationBuilder WithGroupId(string? groupId)
    Parameters
    Type Name Description
    string groupId

    The client group id string. All clients sharing the same group.id belong to the same group.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    WithGroupInstanceId(string?)

    Sets the static instance id used to enable static group membership. Static group members are able to leave and rejoin a group within the configured SessionTimeoutMs without prompting a group rebalance. This should be used in combination with a larger SessionTimeoutMs to avoid group rebalances caused by transient unavailability (e.g. process restarts). Requires broker version >= 2.3.0.

    Declaration
    public KafkaConsumerConfigurationBuilder WithGroupInstanceId(string? groupInstanceId)
    Parameters
    Type Name Description
    string groupInstanceId

    The static instance id used to enable static group membership.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    WithGroupProtocol(GroupProtocol?)

    Sets the group protocol to use. Use Confluent.Kafka.GroupProtocol.Classic for the original protocol and Confluent.Kafka.GroupProtocol.Consumer for the new protocol introduced in KIP-848. Default is Confluent.Kafka.GroupProtocol.Classic, but will change to Confluent.Kafka.GroupProtocol.Consumer in next releases.

    Declaration
    public KafkaConsumerConfigurationBuilder WithGroupProtocol(GroupProtocol? groupProtocol)
    Parameters
    Type Name Description
    GroupProtocol? groupProtocol

    The group protocol to use. Use Confluent.Kafka.GroupProtocol.Classic for the original protocol and Confluent.Kafka.GroupProtocol.Consumer for the new protocol introduced in KIP-848.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    WithGroupProtocolType(string?)

    Sets the group protocol type.

    Declaration
    public KafkaConsumerConfigurationBuilder WithGroupProtocolType(string? groupProtocolType)
    Parameters
    Type Name Description
    string groupProtocolType

    The group protocol type.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    WithGroupRemoteAssignor(string?)

    Sets the server-side assignor to use. Keep it null to make the server select a suitable assignor for the group. Available assignors: uniform or range.

    Declaration
    public KafkaConsumerConfigurationBuilder WithGroupRemoteAssignor(string? groupRemoteAssignor)
    Parameters
    Type Name Description
    string groupRemoteAssignor

    The server-side assignor to use.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    WithHeartbeatIntervalMs(int?)

    Sets the interval (in milliseconds) at which the heartbeats have to be sent to the broker.

    Declaration
    public KafkaConsumerConfigurationBuilder WithHeartbeatIntervalMs(int? heartbeatIntervalMs)
    Parameters
    Type Name Description
    int? heartbeatIntervalMs

    The interval at which the heartbeats have to be sent.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    WithIsolationLevel(IsolationLevel?)

    Sets a value indicating how to read messages written inside a transaction: Confluent.Kafka.IsolationLevel.ReadCommitted to only return transactional messages which have been committed, or Confluent.Kafka.IsolationLevel.ReadUncommitted to return all messages, even transactional messages which have been aborted.

    Declaration
    public KafkaConsumerConfigurationBuilder WithIsolationLevel(IsolationLevel? isolationLevel)
    Parameters
    Type Name Description
    IsolationLevel? isolationLevel

    A value indicating how to read messages written inside a transaction.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    WithMaxPartitionFetchBytes(int?)

    Sets the initial maximum number of bytes per topic and partition to request when fetching messages from the broker. If the client encounters a message larger than this value it will gradually try to increase it until the entire message can be fetched.

    Declaration
    public KafkaConsumerConfigurationBuilder WithMaxPartitionFetchBytes(int? maxPartitionFetchBytes)
    Parameters
    Type Name Description
    int? maxPartitionFetchBytes

    The initial maximum number of bytes per topic and partition to request when fetching messages.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    WithMaxPollIntervalMs(int?)

    Sets the maximum allowed time (in milliseconds) between calls to consume messages. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member.
    Warning: Offset commits may be not possible at this point.

    Declaration
    public KafkaConsumerConfigurationBuilder WithMaxPollIntervalMs(int? maxPollIntervalMs)
    Parameters
    Type Name Description
    int? maxPollIntervalMs

    The maximum allowed time between calls to consume messages.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    WithPartitionAssignmentStrategy(PartitionAssignmentStrategy?)

    Sets the partition assignment strategy: Confluent.Kafka.PartitionAssignmentStrategy.Range to co-localize the partitions of several topics, Confluent.Kafka.PartitionAssignmentStrategy.RoundRobin to evenly distribute the partitions among the consumer group members, Confluent.Kafka.PartitionAssignmentStrategy.CooperativeSticky to evenly distribute the partitions and minimize the partitions movements. The default is Confluent.Kafka.PartitionAssignmentStrategy.Range.

    Declaration
    public KafkaConsumerConfigurationBuilder WithPartitionAssignmentStrategy(PartitionAssignmentStrategy? partitionAssignmentStrategy)
    Parameters
    Type Name Description
    PartitionAssignmentStrategy? partitionAssignmentStrategy

    The partition assignment strategy.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    WithPollingTimeout(TimeSpan)

    Sets the timeout to wait for the consumer to poll for new messages before initiating a new poll. The default is 100 milliseconds.

    Declaration
    public KafkaConsumerConfigurationBuilder WithPollingTimeout(TimeSpan pollingTimeout)
    Parameters
    Type Name Description
    TimeSpan pollingTimeout

    The timeout to wait for the consumer to poll for new messages.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    WithQueuedMaxMessagesKbytes(int?)

    Sets the maximum number of kilobytes of queued pre-fetched messages to store in the local consumer queue. This setting applies to the single consumer queue, regardless of the number of partitions. This value may be overshot by FetchMaxBytes. This property has higher priority than QueuedMinMessages.

    Declaration
    public KafkaConsumerConfigurationBuilder WithQueuedMaxMessagesKbytes(int? queuedMaxMessagesKbytes)
    Parameters
    Type Name Description
    int? queuedMaxMessagesKbytes

    The maximum number of kilobytes of queued pre-fetched messages to store in the local consumer queue.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    WithQueuedMinMessages(int?)

    Sets the minimum number of messages per topic and partition that the underlying library must try to maintain in the local consumer queue.

    Declaration
    public KafkaConsumerConfigurationBuilder WithQueuedMinMessages(int? queuedMinMessages)
    Parameters
    Type Name Description
    int? queuedMinMessages

    The minimum number of messages that must be maintained in the local consumer queue.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    WithRangePartitionAssignmentStrategy()

    Sets the partition assignment strategy to Confluent.Kafka.PartitionAssignmentStrategy.Range to co-localize the partitions of several topics.

    Declaration
    public KafkaConsumerConfigurationBuilder WithRangePartitionAssignmentStrategy()
    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    WithRoundRobinPartitionAssignmentStrategy()

    Sets the partition assignment strategy to Confluent.Kafka.PartitionAssignmentStrategy.RoundRobin to evenly distribute the partitions among the consumer group members.

    Declaration
    public KafkaConsumerConfigurationBuilder WithRoundRobinPartitionAssignmentStrategy()
    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    WithSessionTimeoutMs(int?)

    Sets the client group session and failure detection timeout (in milliseconds). The consumer sends periodic heartbeats HeartbeatIntervalMs to indicate its liveness to the broker. If no heartbeat is received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance. Also see MaxPollIntervalMs.

    Declaration
    public KafkaConsumerConfigurationBuilder WithSessionTimeoutMs(int? sessionTimeoutMs)
    Parameters
    Type Name Description
    int? sessionTimeoutMs

    The client group session and failure detection timeout.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    WithStallDetectionThreshold(TimeSpan?)

    Sets the maximum time to wait for a message to be consumed before the consumer is considered stale. A stale consumer is automatically restarted. The default is null, which means that the consumer is never considered stale and is never restarted.

    Declaration
    public KafkaConsumerConfigurationBuilder WithStallDetectionThreshold(TimeSpan? stallDetectionThreshold)
    Parameters
    Type Name Description
    TimeSpan? stallDetectionThreshold

    The maximum time to wait for a message to be consumed before the consumer is considered stale.

    Returns
    Type Description
    KafkaConsumerConfigurationBuilder

    The KafkaConsumerConfigurationBuilder so that additional calls can be chained.

    GitHub E-Mail
    ↑ Back to top © 2026 Sergio Aquilini