RabbitMQ – Bezbolesna integracja z .NET Core

RabbitMQ – Bezbolesna integracja z .NET Core7 min read

Od mojego ostatniego artykułu, w którym wspominałem o rabbicie, minął prawie rok. Chyba najwyższa pora, aby napisać coś więcej. Szczególnie, że ostatnio w moje łapki wpadła bardzo ciekawa książka:

Tak off-topic – jeżeli chcielibyście, abym skrobnął drobną recenzję tej książki, dajcie mi znać 🙂

Moją nową miłością okazało się tworzenie aplikacji rozproszonych przy wykorzystaniu DDD, TDD i jeszcze kilku innych skrótów, których nie wymienię 🙂 Mają one ważną cechę – muszą się ze sobą komunikować. Nie ma znaczenia, czy robią to synchronicznie za pomocą request-ów HTTP, czy też asynchronicznie za pomocą kolejki wiadomości. Są jak zespół, bez komunikacji praca jest bez sensu. W dzisiejszym artykule chciałbym wam przedstawić jak w bezbolesny sposób zintegrować message broker-a RabbitMQ z aplikacją .NET Core MVC. Wykorzystamy do tego bibliotekę Raw Rabbit. Poznałem ją dzięki Piotrowi Gankiewiczowi podczas prowadzonych przez niego devWarsztatów.

Konfiguracja Raw Rabbit-a

No dobra, przejdźmy do mięska. Nie będę wam opisywał jak uruchomić kolejkę RabbitMQ, ponieważ zrobiłem to w poprzednim artykule. Zaczniemy od instalacji paczkek nuget-a.

<PackageReference Include="RawRabbit" Version="1.10.4" />
<PackageReference Include="RawRabbit.vNext" Version="1.10.4" />

Raw rabbit do swojego działania potrzebuje dwóch paczek. Na chwilę obecną polecam wam instalację numerów 1.10.4, ponieważ wersja 2+ jest jeszcze oznaczona jako release candidate. Stabilność projektu ważna sprawa!

Przejdźmy do plik appsettings.json, w którym wklejamy konfigurację biblioteki.

{
  "rabbitmq": {
    "Username": "guest",
    "Password": "guest",
    "VirtualHost": "/",
    "Port": 5672,
    "Hostnames": [ "localhost" ],
    "RequestTimeout": "00:00:10",
    "PublishConfirmTimeout": "00:00:01",
    "RecoveryInterval": "00:00:10",
    "PersistentDeliveryMode": true,
    "AutoCloseConnection": true,
    "AutomaticRecovery": true,
    "TopologyRecovery": true,
    "Exchange": {
      "Durable": true,
      "AutoDelete": true,
      "Type": "Topic"
    },
    "Queue": {
      "AutoDelete": true,
      "Durable": true,
      "Exclusive": true
    }
  }
}

No i mamy ścianę informacji. Co z tego możemy wyciągnąć? Na przykład konfiguracje kolejek czy centrali Exchange, w które na szczęście dla was, jak i dla mnie, nie ma teraz sensu się zagłębiać. Pewnie, za jakiś czas, na moim blogu, ukaże się artykuł, który przybliży wam znaczenie tej ściany. Obecnie najważniejszymi kluczami są:

  • Username i Password, wiadomo chyba co robią.
  • Port czyli, na którym porcie nasłuchuje nasza instancja RabbitMQ. Standardowy port to 5672.

Tworzenie klienta

Stwórzmy teraz klienta, czyli obiektu, za pomocą którego będziemy komunikować się z RabbitMQ. Osobiście uwielbiam zamykać je w mechanizmie extension methods. Umożliwia to łatwe i przyjemne wyniesienie kodu do zewnętrznej biblioteki i używanie w większej ilości projektów.

public static class ServiceCollectionExtensions
{
    public static void AddRabbitMq(this IServiceCollection services, IConfigurationSection section)
    {
        // RabbitMQ Configuration
        var options = new RawRabbitConfiguration();
        section.Bind(options);
        
        var client = BusClientFactory.CreateDefault(options);
        services.AddSingleton<IBusClient>(_ => client);
    }
}

Sama implementacja nie wymaga wiele omawiania. Metoda rozszerzająca (tak to się tłumaczy?) otrzymuje sekcję konfiguracyjną z pliku appsettings.json. Następnie wiąże ją z konfiguracją Raw Rabbit-a, by na końcu stworzyć default-owy obiekt BusClient.

Wisienką na torcie jest wpis w pliku Startup.cs.

public void ConfigureServices(IServiceCollection services)
{          
    services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1);
    
    // RabbitMQ Configuration
    services.AddRabbitMq(Configuration.GetSection("rabbitmq"));
}

Pierwsza zdefiniowana wiadomość

Kiedy klient został już odpowiednio skonfigurowany możemy przejść do zdefiniowania pierwszej wiadomości w systemie. Stwórzmy interfejs znacznikowy, którym będziemy naznaczać nasze wiadomości.

public interface IMessage { }

Oczywiście interfejs będzie pusty w środku. Teraz definiujemy POCO (Plain Old C# Object). Nie chcąc komplikować tego tutorialu, będzie to bardzo prosta klasa, która zawiera tylko jedno pole Message.

public class SendMessage : IMessage
{
    public string Message { get; }

    public SendForgetPasswordEmailCommand(string message)
    {
        Message = message;
    }
}

Ciekawym szczegółem implementacji jest fakt, że obiekt klasy SendMessage jest immutable. Oznacza to, że nie można zmienić jego stanu 🙂 Takie tam fanaberię można robić 🙂

Pierwszy handler wiadomości

To jest moment, w którym zazwyczaj pisze jakieś durnoty o fascynacji napojami mlecznymi (w szczególności czekoladowymi). I tym razem nie zawiodę, bo od kiedy temperatura spadła to i kakałko stało się znów moim napojem numer jeden. Więc nastawiajcie kakałko, trochę pracy jeszcze przed nami. Po wszystkim wypijecie na skołatane nerwy ;). Wracając jednak do tematu: skoro sama wiadomość posiada swój interfejs, to dlaczego nie miałby go posiadać handler? Możemy stworzyć taki dość uproszczony rodzaj handler-a na podstawie poniższego interfejsu.

public interface IHandler<in T> where T : IMessage
{
    Task HandleAsync(T message, CancellationToken token);
}

Będzie on definiował przymus implementowania metody HandleAsync, która w swoich argumentach przyjmuje wiadomość jak i CancellationToken. W tym artykule pominiemy implementację wykorzystania tego mechanizmu. Warto jednak wiedzieć o takiej możliwość.
Przejdźmy do przykładowej implementacji handler-a, niewiele dłuższej od samego, zdefiniowanego interfejsu.

public class SendMessageHandler : IHandler<SendMessage>
{   
    public async Task HandleAsync(SendMessage @event, CancellationToken token)
    {          
        Console.WriteLine($"Receive: {@event.Message}");
        return Task.CompletedTask;
    }
}

W nim po prostu wypisujemy na konsolę to co otrzymaliśmy w wiadomości.

Następnie wystarczy że dodamy nasz handler do kontenera DI. W tym celu użyjemy standardowego kontenera .NET Core MVC.

services.AddTransient<IHandler<SomeMessage>>, SendMessageHandler>();

Zapisujemy się na przychodzące wiadomości

Jako że RabbitMQ powstał z myślą o aplikacjach rozproszonych. Poprawną praktyko jest tworzenie kodu nadającego się do użycia w kilku aplikacjach naraz. Za pomocą mechanizmu Extension Methods możemy zrobić rozszerzenie do interfejsu IApplicationBuilder, które najpierw pobierze nam instancje obiektu BusClient, a następnie wywoła na nim metodę SubscribeAsync. Dodatkowo, jeżeli nasze rozszerzenie zwraca typ IApplicationBuilder, otrzymujemy przydatne FluetAPI.  Wykorzystując app.ApplicationServices.GetService dynamicznie pobieramy instancję handler-a. Cała implementacja wygląda następująco:

public static class ApplicationBuilderExtensions
{
    public static IApplicationBuilder AddHandler<T>(this IApplicationBuilder app, IBusClient client) 
        where T : IMessage
    {
        if (!(app.ApplicationServices.GetService(typeof(IHandler<T>)) is IHandler<T> handler))
            throw new NullReferenceException();
        
        client
            .SubscribeAsync<T>(async (msg, context) =>
            {
                await handler.HandleAsync(msg, CancellationToken.None);
            });

        return app;
    }

    public static IApplicationBuilder AddHandler<T>(this IApplicationBuilder app)
        where T : IMessage
    {
        if (!(app.ApplicationServices.GetService(typeof(IBusClient)) is IBusClient busClient))
            throw new NullReferenceException();
        
        return AddHandler<T>(app, busClient);
    }
}

Łatwo zauważyć, że w parametrze SubscribeAsync jest wbite na sztywno wyrażenie lambda. Dla zwiększenia elastyczności implementacji logiki wystarczy przekazać wyrażenie lambda w argumentach metody. Na potrzeby tego artykułu oszczędzałem ilość znaków, stąd sztywna Lambda.

Nie pozostało nam nic innego jak tylko wykorzystać ładnie stworzone API do zapisywania się na nadchodzące wiadomości.

public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
    // ...
    app.UseMvc();

    app.AddHandler<SomeMessage>()
        .AddHandler<SomeAnotherMessage>()
        .AddHandler<MayberOneMoreMessage>();
}

Wysłanie wiadomości z poziomu kontrolera

Na zakończenie przejdźmy jeszcze do implementacji wysyłania wiadomości z poziomu kontrolera. Nie ma tutaj większej magii. Pobieramy obiekt BusClient z naszego kontenera DI i w jego akcji możemy wywołać metodę PublishAsync.

public class MessageController : Controller
{
    private readonly IBusClient _client;

    public ProfilesController(IBusClient client)
    {
        _client = client;
    }

    [HttpGet]
    [Route("Create")]
    public async Task<IActionResult> Create()
    {
        await _client.PublishAsync(new SomeMessage("Test Message"));
        
        return Accepted();
    }
}

Robimy to w tej samej aplikacji, jednak polecam wygenerowanie nowej. Pozwoli nam to poczuć, jak wygląda komunikacja pomiędzy dwoma aplikacjami. Oczywiście, musicie mieć na uwadze, że tworząc nową aplikację musi ona zawierać konfigurację połączenia z RabbitMQ.

Podsumowanie

Jeżeli jakimś cudem dobrnęliście na koniec tego artykułu należą wam się szczere gratulacje. W sumie ostatnio przyzwyczaiłem Was drodzy czytelnicy do krótszych form, acz nie całą wiedzę można przekazać w krótki, prosty sposób. Stworzyliśmy w naszym projekcie uproszczoną w obsłudze implementacje wykorzystania biblioteki Raw Rabbit Podobną można znaleźć we wspólnym projekcie edukacyjnym Piotra Gankiewicza i Dariusza Pawlukiewicza pod szyldem DevMentors. Główną różnicą mojej wersji jest wykorzystanie standardowego konteneru DI oraz przyjęte nazewnictwo.

Mam nadzieje, że udało mi się przybliżyć sposób wykorzystania brokera wiadomości RabbitMQ w sposób wystarczająco klarowny, abyście mogli zacząć stosować go w swoim przyszłych projektach.

Dzięki za poświęcony czas na czytanie tego artykułu.

Jak zwykle do następnego!

Cześć 🙂

By |2018-08-23T15:54:33+00:00Sierpień 23rd, 2018|.NET|1 Comment