Consumer¶
Quick Start¶
- Setup consumer configuration
- Create a message class
- Create a message handler
Setup consumer configuration¶
Add Kafka consumer configuration and message handlers in Startup
's ConfigureServices()
:
public class Startup { ... // This method gets called by the runtime. Use this method to add services to the container. public void ConfigureServices(IServiceCollection services) { // configure messaging: consumer services.AddConsumer(options => { // configuration settings options.WithBootstrapServers("http://localhost:9092"); options.WithGroupId("consumer-group-id"); // register message handlers options.RegisterMessageHandler<Test, TestHandler>("test-topic", "test-event"); }); } ... }
Multiple consumers
It is possible to add multiple consumers (with different configuration) using the AddConsumer()
extension method, which
might be helpful when consuming from different topic with a different auto.offset.reset
.
Create a message class¶
Create a POCO representation of the Kafka message:
public class Test { public string AggregateId { get; set; } }
Private/public properties
Due to the nature of System.Text.Json
having private
setters will result in properties being skipped by the default deserializer.
Constructors
Please use an public
parameterless constructor to be on the safe side of System.Text.Json
.
Create a message handler¶
Create a message handler that implements the IMessageHandler<Test>
interface, and use the Handle()
method to do all the needed business logic to handle the message:
public class TestHandler : IMessageHandler<Test> { private readonly ILogger<TestHandler> _logger; public TestHandler(ILogger<TestHandler> logger) { _logger = logger; } public Task Handle(Test message, MessageHandlerContext context) { _logger.LogInformation(@"Handled: {@Message}", message); return Task.CompletedTask; } }
MessageHandlerContext
The MessageHandlerContext
contains information about the message, such as:
- Message Identifier
- Correlation Identifier
- Message Type
- Message Headers
Dependency Injection¶
Types registered in the IServiceCollection
are available as constructor arguments. The IMessageHandler<T>
implementation and any dependencies are resolved as part of the same scope, and disposed at the end of the Handle()
method invocation.
DbContext
Therefore, it is entirely possible to resolve DbContext
instances and have them behave as expected, however, transaction management and calls to SaveChangesAsync()
are left up to the user.
Service Lifetimes
As always, with dependency injection, be mindful of service lifetimes.
Configuration¶
Kafka Consumer Settings (Confluent Docs)
Consumer Options¶
Information about some of basic options are available in the general Configuration section.
Message Handler Registration¶
Adding message handlers during service configuration:
public void ConfigureServices(IServiceCollection services) { // configure messaging: producer services.AddConsumer(options => { // register outgoing messages (includes outbox messages) options.RegisterMessageHandler<Test, TestHandler>("test-topic", "test-event"); }); }
Will ensure that all messages with the type1 test-event
on the Kafka topic named test-topic
will be deserialized as an instance of the POCO Test
and handed to a transiently resolved instance of TestHandler
. This is all handled by the .NET Core dependency injection, and Dafda clients need only concern themselves with creating simple messages and matching message handlers.
Unconfigured Messages¶
By default, a consumer will throw a MissingMessageHandlerRegistrationException
if it receives a message where the type has not been configured with a handler. This can be overridden by providing a different IUnconfiguredMessageHandlingStrategy
:
public void ConfigureServices(IServiceCollection services) { // configure messaging: producer services.AddConsumer(options => { // register outgoing messages (includes outbox messages) options.RegisterMessageHandler<Test, TestHandler>("test-topic", "test-event"); // log, but perform no other action for other messages options.WithUnconfiguredMessageHandlingStrategy<UseNoOpHandler>(); }); }
UseNoOpHandler
is built in, and uses an ILogger
to log an information message about having received the message and then considers it processed.
Message Deserialization¶
In order to gain controler over the deserialization of the message handled by Dafda use WithIncomingMessageFactory
, like:
public void ConfigureServices(IServiceCollection services) { // configure messaging: producer services.AddConsumer(options => { // register outgoing messages (includes outbox messages) options.WithIncomingMessageFactory(new XmlMessageSerializer()); }); }
To override the default JSON deserializer, and supply a custom implementation of the IIncomingMessageFactory
interface.
Unit of Work Factory¶
It is possible to override the default Unit of Work behavior for each consumed message, using configuration options and custom implementations of IHandlerUnitOfWorkFactory
and IHandlerUnitOfWork
. However, the default implementation adheres to the scoped service lifetime of .NET Core's dependency injection, allowing it to work in tandem with the scopes of, e.g., EF Core.
For more information see ServiceProviderUnitOfWorkFactory.cs
Unit of Work Factory¶
When a consumer starts Dafda continues consuming from the last committed offset on each topic, if a committed offset exists.
To read all topics from the beginning of all partitions, use ReadFromBeginning
:
public void ConfigureServices(IServiceCollection services) { // configure messaging: producer services.AddConsumer(options => { options.ReadFromBeginning(); }); }
-
The message type is part of the Message Envelope ↩