Table of Contents

Class KafkaProducer

Namespace
Silverback.Messaging.Broker
Assembly
Silverback.Integration.Kafka.dll

Produces the messages to an endpoint.

public sealed class KafkaProducer : Producer, IProducer, IDisposable
Inheritance
KafkaProducer
Implements
Inherited Members

Constructors

KafkaProducer(string, IConfluentProducerWrapper, KafkaProducerConfiguration, IBrokerBehaviorsProvider<IProducerBehavior>, IServiceProvider, ISilverbackLogger<KafkaProducer>)

Initializes a new instance of the KafkaProducer class.

public KafkaProducer(string name, IConfluentProducerWrapper client, KafkaProducerConfiguration configuration, IBrokerBehaviorsProvider<IProducerBehavior> behaviorsProvider, IServiceProvider serviceProvider, ISilverbackLogger<KafkaProducer> logger)

Parameters

name string

The producer identifier.

client IConfluentProducerWrapper

The IConfluentProducerWrapper.

configuration KafkaProducerConfiguration

The configuration containing only the actual endpoint.

behaviorsProvider IBrokerBehaviorsProvider<IProducerBehavior>

The IBrokerBehaviorsProvider<TBehavior>.

serviceProvider IServiceProvider

The IServiceProvider to be used to resolve the required services.

logger ISilverbackLogger<KafkaProducer>

The ISilverbackLogger<TCategoryName>.

Properties

Client

Gets the related IBrokerClient.

public IConfluentProducerWrapper Client { get; }

Property Value

IConfluentProducerWrapper

Configuration

Gets the producer configuration.

public KafkaProducerConfiguration Configuration { get; }

Property Value

KafkaProducerConfiguration

EndpointConfiguration

Gets the endpoint configuration.

public KafkaProducerEndpointConfiguration EndpointConfiguration { get; }

Property Value

KafkaProducerEndpointConfiguration

Methods

ProduceCore(IOutboundEnvelope)

Publishes the specified message and returns its identifier.

protected override IBrokerMessageIdentifier? ProduceCore(IOutboundEnvelope envelope)

Parameters

envelope IOutboundEnvelope

The envelope containing the message to be produced.

Returns

IBrokerMessageIdentifier

The message identifier assigned by the broker (the Kafka offset or similar).

ProduceCoreAsync(IOutboundEnvelope, CancellationToken)

Publishes the specified message and returns its identifier.

protected override ValueTask<IBrokerMessageIdentifier?> ProduceCoreAsync(IOutboundEnvelope envelope, CancellationToken cancellationToken)

Parameters

envelope IOutboundEnvelope

The envelope containing the message to be produced.

cancellationToken CancellationToken

The cancellation token that can be used to cancel the operation.

Returns

ValueTask<IBrokerMessageIdentifier>

A ValueTask<TResult> representing the asynchronous operation. The task result contains the message identifier assigned by the broker (the Kafka offset or similar).

ProduceCore<TState>(IOutboundEnvelope, Action<IBrokerMessageIdentifier, TState>, Action<Exception, TState>, TState)

Publishes the specified message and returns its identifier.

protected override void ProduceCore<TState>(IOutboundEnvelope envelope, Action<IBrokerMessageIdentifier, TState> onSuccess, Action<Exception, TState> onError, TState state)

Parameters

envelope IOutboundEnvelope

The envelope containing the message to be produced.

onSuccess Action<IBrokerMessageIdentifier, TState>

The callback to be invoked when the message is successfully produced.

onError Action<Exception, TState>

The callback to be invoked when the produce fails.

state TState

The state object that will be passed to the callbacks.

Type Parameters

TState

The type of the state object that will be passed to the callbacks.

Remarks

In this implementation, the message is synchronously enqueued but produced asynchronously. The callbacks are called when the message is actually produced (or the produce failed).