Producer¶
Quick Start¶
- Setup producer configuration
- Create a message class
- Produce messages
Setup producer configuration¶
Add Kafka producer configuration and outgoing messages:
public class Startup { ... // This method gets called by the runtime. Use this method to add services to the container. public void ConfigureServices(IServiceCollection services) { // configure messaging: producer services.AddProducerFor<Service>(options => { // configuration settings options.WithBootstrapServers("localhost:9092"); // register outgoing messages (includes outbox messages) options.Register<Test>("test-topic", "test-event", @event => @event.AggregateId); }); } }
Producers are registered as a per service registration
The Service
is registered with the .NET dependency injection, and able to produce Test
messages on the Kafka topic test-topic
.
Services with interfaces¶
If the Service
is injected to consumers with an interface (like IService
), register it like so:
services.AddProducerFor<IService,Service>(options => ...);
Make sure the service/interface is not registered elsewhere
If the service is registered with the interface elsewhere, like services.AddTransient<IService,Service>();
, the Producer
cannot be resolved. Dafda takes care of registering the service, not just the Producer
dependency for it.
Create a message class¶
Create a POCO representation of the Kafka message:
public class Test { public string AggregateId { get; set; } }
Produce messages¶
Inject a dependency on Producer
in the Service
class and call the Produce
method:
public class Service { private readonly Producer _producer; public Service(Producer producer) { _producer = producer; } public async Task DoStuff() { ... await _producer.Produce(new Test { AggregateId = "aggregate-id" }); ... } }
Configuration¶
Kafka Producer Settings (Confluent Docs)
Producer Options¶
Information about some of basic options are available in the general Configuration section.
Message Registration¶
It may seem a bit perculiar that outgoing (produced) messages needs to be registered, however, the current version of Dafda opted for a centralized configuration scheme.
In order to produce message, messages needs to be registered for a given producer, like:
public class Startup { ... // This method gets called by the runtime. Use this method to add services to the container. public void ConfigureServices(IServiceCollection services) { // configure messaging: producer services.AddProducerFor<Service>(options => { // configuration settings options.WithBootstrapServers("localhost:9092"); // register outgoing messages (includes outbox messages) options.Register<Test>("test-topic", "test-event", @event => @event.AggregateId); }); } }
The class Test
is now register for the Service
producer, and able to send test-event
message on the Kafka topic test-topic
. The lambda function at the end of the Register
call
is used to select the partition key, so that ordering is applicable.
Message Id Generator¶
Each message should contain a unique way of identifying a specific instances of a message. The message id is pass along in the Message Envelope, and consumers should be able to use it for, e.g., deduplication.
The default MessageIdGenerator
generates a GUID
for each call, deriving from the MessageIdGenerator
class and implementing the NextMessageId
method allows for overriding the default behavior, like:
public class Startup { ... // This method gets called by the runtime. Use this method to add services to the container. public void ConfigureServices(IServiceCollection services) { // configure messaging: producer services.AddProducerFor<Service>(options => { options.WithMessageIdGenerator(new CustomMessageIdGenerator()); }); } }
Message Serialization¶
It some possible to override the default JSON message serialization (here is more on Message Deserialization).
To implement a custom message serializer implement the IPayloadSerializer
interface:
/// <summary> /// Implementations must use the message payload and metadata in the /// <see cref="PayloadDescriptor"/> in order to create a string representation. /// The MIME type format should be described by the <see cref="PayloadFormat"/> /// property. /// </summary> public interface IPayloadSerializer { /// <summary> /// The MIME type of the payload format /// </summary> string PayloadFormat { get; } /// <summary> /// Serialize the payload using the message and metadata in the /// <see cref="PayloadDescriptor"/> /// </summary> /// <param name="payloadDescriptor">The payload description</param> /// <returns>A string representation of the payload</returns> Task<string> Serialize(PayloadDescriptor payloadDescriptor); }
And use the following to replace the default message serialization for all messages:
public class Startup { ... // This method gets called by the runtime. Use this method to add services to the container. public void ConfigureServices(IServiceCollection services) { // configure messaging: producer services.AddProducerFor<Service>(options => { options.WithDefaultPayloadSerializer(new XmlPayloadSerializer()); }); } }
Or the following to replace message serialization for only a specific topic:
public class Startup { ... // This method gets called by the runtime. Use this method to add services to the container. public void ConfigureServices(IServiceCollection services) { // configure messaging: producer services.AddProducerFor<Service>(options => { options.WithPayloadSerializer("test-topic", new XmlPayloadSerializer()); }); } }