data.mdx.frontmatter.hero_image

Apache Kafka - Buforowanie wiadomości w pamięci

2024-02-25 | .NET, Apache Kafka | bd90

Jakiś czas temu trafiłem na artykuł, który niezmiernie mnie zaintrygował: Capturing a Billion Emo(j)i-ons. Została w nim opisana technika pozwalająca na zwiększenie wydajności naszego API poprzez buforowanie wiadomości w pamięci tak, aby wysyłać batch-e na kolejkę. Idea zrobiła na mnie ogromne, pozytywne wrażenie. Postanowiłem sprawdzić jak mogłaby wyglądać implementacja takiego mechanizmu na platformie dotnet. Natomiast artykuł, który czytasz, jest zwieńczeniem mojego krótkiego "researchu" w tym temacie.

Na wstępie chciałbym zaznaczyć kilka rzeczy:

  • Nie jestem ekspertem w wykorzystaniu Kafki w projektach, dlatego postanowiłem zostać przy defaultowej konfiguracji.
  • Jako, że infrastruktura, na której testowałem czy wszystko działa, miała dość mocne ograniczenia (mój lokalny komputer M1 Max, 64GB ramu) postanowiłem nie bawić się w partycjonowanie wiadomości, aby wygenerować jak największy load na jednej partycji.
  • Cały kod, który zobaczycie poniżej, trzeba traktować w formie "Proof of concept". Nie jest to produkcyjna implementacja (nigdy nie miała być). Celem kodu było sprawdzenie z jakimi "zagrożeniami" trzeba się mierzyć w przypadku takiego podejścia.

Skoro wszystko mamy uzgodnione (hip hip hura) możemy przejść przez do implementacji.

Zasada działania mechanizmu

Gdy do naszego api przychodzą żądania HTTP, zamiast od razu wysyłać je na kolejkę, to w pierwszej kolejności zapisujemy je do bufora w pamięci. Następnie, po jasno określonym czasie (lub wystarczającej ilości wiadomości), bierzemy batch danych, wysyłamy na kolejkę i czyścimy nasz bufor.

Myślę że całkiem nieźle udało mi się to zobrazować na poniższym diagramie (mistrz skromności):

Trochę żartem, trochę serio - nazwa Fetch-Push-Flush bardzo pasuje mi do takiego rodzaju zachowania. Mam nadzieje, że nazwa się przyjmie i kiedyś będę mógł spotkać w kodzie FPFService 😅

Uruchomienie Apache Kafka lokalnie

Dobrze, jak już mamy opisaną zasadę działania to trzeba przejść do implementacji, co nie? Do tego jednak potrzebujemy lokalnej instancji kafki. Jeżeli pracujecie na Mac-ach to zadanie jest banalne, wystarczą dwie proste komendy.

➜ brew install confluent-cli
➜ confluent local kafka start

Powyższe komendy odpalą nam kontener confluentinc/confluent-local

Następnie, przy pomocy cli confluent-a, możemy utworzyć topic i zacząć produkować wiadomości:

➜ confluent local kafka topic create quickstart
➜ confluent local kafka topic produce quickstart
# Teraz wszystko co wpiszemy będzie wysłane jako wiadomość
➜ wiadomosc-1
➜ wiadomosc-2
➜ wiadomosc-3

Aby wyjść z trybu wysyłania wiadomości trzeba po prostu nacisnąć cmd + c lub crtl + c.

Consumer

Mając postawioną infrastrukturę trzeba przejść do implementacji aplikacji testowej. Zacznijmy od Consumer-a. Nie będzie to nic specjalnego, zwykła aplikacja konsolowa, której zadaniem będzie odebranie wiadomości z Kafki i wypisanie jej na konsolę naszego programu.

using Confluent.Kafka;

var config = new ConsumerConfig
{
    BootstrapServers = "localhost:58395",
    GroupId = $"my-group",
    AutoOffsetReset = AutoOffsetReset.Earliest
};

using var consumer = new ConsumerBuilder<string, string>(config).Build();
consumer.Subscribe("quickstart");

while (true)
{
    var consumeResult = consumer.Consume(TimeSpan.FromSeconds(1));

    if (consumeResult is not null)
    {
        Console.WriteLine($"Consumed message: {consumeResult.Message.Value}");
    }
}

Producer

Skoro w konsumencie nic interesującego się nie dzieje to zobaczymy jak będą wyglądały implementację producent-ów naszych wiadomości. Aby przetestować założenia z wyżej wymienionego artykułu będziemy potrzebowali trzech endpoint-ów.

Natychmiastowe wysyłanie wiadomości

Pierwszy z nich będzie miał za zadanie natychmiastowe wysłanie wiadomości na kafkę. Tutaj też nic nie powinno nas zaskoczyć. Dla uproszczenia użyłem minima-api i od razu z handler-a bijemy do kafki.

app.MapPost("/record-event", async ([FromBody] Event @event) =>
{
    var config = new ProducerConfig
    {
        BootstrapServers = "localhost:58395"
    };

    using (var producer = new ProducerBuilder<string, string>(config).Build())
    {
        var result = await producer.ProduceAsync("quickstart", new Message<string, string>
        {
            Value = @event.Message
        });

        return Results.Ok(result.Status.ToString());
    }
});

Poniżej można zobaczyć jak wyglądała przepustowość api na mojej maszynie. Prawie 700 żądań na sekundę! Sądze, że nie jest najgorszy wynik. Czasem to wręcz wynik wystarczający dla aplikacji produkcyjnych.

Re-użycie obiektu producer-a

Prawdopodobnie tworzenie nowej instancji producer-a, w celu wysłania pojedynczej wiadomość, nie jest zbyt wydajnym rozwiązaniem. Sprawdźmy jak wpłynie na wyniki zmiana endpoint-a w taki sposób. aby obiekt producer-a był współdzielony pomiędzy request-ami.

var config = new ProducerConfig
{
    BootstrapServers = "localhost:58395"
};

builder.Services.AddSingleton<IProducer<string, string>>(_ => new ProducerBuilder<string, string>(config)
    .Build());

app.MapPost("/record-event/", async ([FromBody] Event @event, [FromServices] IProducer<string, string> producer) =>
{
    var result = await producer.ProduceAsync("quickstart", new Message<string, string>
    {
        Value = @event.Message
    });
    
    return Results.Ok(result.Status.ToString());
});

Nie mam pewności co do poprawności implementacji, aczkolwiek tworzenie instancji producer-a per request jest dość kosztowną operacją. Nowa implementacja okazała się 7 razy wydajniejsza. Pozwoliło nam to obsłużyć w okolicach 5k żądań na sekundę. (Jak widać na screenie poniżej, powyżej 5k na sekundę część żądań kończyła się błędem). Nadal wzrost wydajności jest naprawdę spory.

Opóźnione Wysyłanie

Przejdźmy do ostatniego testu opisanego w tym artykule: jak na wydajność naszego api wpłynie wysyłanie wiadomości w oddzielnym procesie i jak mogłoby to wyglądać z poziomu implementacji w C#.

Zacznijmy od naszego endpoint-a. Ulegnie on znacznemu uproszczeniu ponieważ nie będziemy używać producent-a. Musimy stworzyć miejsce do przechowywania wiadomości. Dodatkowo będziemy potrzebowali background service, aby cyklicznie zaczytywał wiadomości i publikował je na kolejkę. Ja taki service nazwałem FlushService.

builder.Services.AddSingleton<InMemoryStorage>();
builder.Services.AddHostedService<FlushService>();

app.MapPost("/record-event/", async ([FromBody] Event @event, [FromServices] InMemoryStorage storage) =>
{
    storage.Add(@event.Message);
    
    return Results.Ok();
});

Skoro jesteśmy przy przechowywaniu danych w pamięci naszej aplikacji to pojawiają nam się pierwsze decyzje / trade-offy do podjęcia. Musimy ustalić strategię zachowania naszego "schowka".

Pierwszym napotkanym przeze mnie problemem było upewnienie się, że pomiędzy cyklami FlushService wiadomość nie trafiła przez przypadek do dwóch różnych batch-y. Jako, że jest to implementacja PoC to zdecydowałem się na grupowanie wiadomości przy pomocy znacznika czasu. W uproszczeniu, wiadomości, które będzie musiał wysłać mój serwis, będą trafiały do tego samego kubełka. jeżeli były nadane w tej samej sekundzie.

Najprostsza implementacja takiego mechanizmu wygląda następująco:

public class InMemoryStorage
{
    private readonly Dictionary<string, ConcurrentQueue<string>> _storage = new();
    
    public InMemoryStorage()
    {
        // create buckets
        for (var i = 0; i <= 10; i++)
        {
            var key = DateTime.UtcNow.Subtract(TimeSpan.FromSeconds(i)).ToString("mm/dd/yy HH:MM:ss");
            _storage.TryAdd(key, new ConcurrentQueue<string>());
        }
        for (var i = 0; i <= 300; i++)
        {
            var key = DateTime.UtcNow.Add(TimeSpan.FromSeconds(i)).ToString("mm/dd/yy HH:MM:ss");
            _storage.TryAdd(key, new ConcurrentQueue<string>());
        }
    }

    public void Add(string message)
    {
        var key = DateTime.UtcNow.ToString("mm/dd/yy HH:MM:ss");
        
        _storage[key].Enqueue(message);
        
        Console.WriteLine($"{message} - added to bucket {key}");
    }

    public ConcurrentQueue<string> Get(string key)
    {
        return _storage[key];
    }
}

Mamy tutaj klasę InMemoryStorage, która zawiera w sobie słownik: klucz, kolejka. Wszystkie klucze to znaczniki czasowe z dokładnością do sekundy. Jako, że to tylko PoC, nie planowałem dodawać złożoności, aby w dynamiczny sposób tworzyć ten słownik (wtedy też co sekundę byśmy mieli drop wydajności w czasie dodawania wiadomości do schowka). Zrobiłem tak głównie dlatego, że nie uważam, aby to było najlepsze podejście do batchowania wiadomości.

Następnie trzeba przejść do implementacji samego FlushService, który jest najprostszym BackgroundService jaki możecie sobie wyobrazić.

public class FlushService : BackgroundService 
{
    private readonly IServiceProvider _serviceProvider;

    public FlushService(IServiceProvider serviceProvider)
    {
        _serviceProvider = serviceProvider;
    }
    
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            using var scope = _serviceProvider.CreateScope();
            var storage = scope.ServiceProvider.GetRequiredService<InMemoryStorage>();
            var key = DateTime.UtcNow.Subtract(TimeSpan.FromSeconds(1)).ToString("mm/dd/yy HH:MM:ss");
            
            var config = new ProducerConfig
            {
                BootstrapServers = "localhost:58395"
            };

            using (var producer = new ProducerBuilder<string, string>(config).Build())
            {
                Console.WriteLine($"----Flushing {key}");

                foreach (var msg in storage.Get(key))
                {
                    Console.WriteLine(msg);

                    var result = await producer.ProduceAsync("quickstart", new Message<string, string>
                    {
                        Value = msg
                    });
                }
            }

            Console.WriteLine($"----Flushed {key}");
            
            await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken);
        }
    }
}

Bierzemy wiadomości z ostatniej sekundy i w pętli wysyłamy je na kolejkę.

Oczywiście, jak w przypadku implementacji schowka, muszę tutaj zaznaczyć, że to jest bardzo uproszczona implementacja. W realnym scenariuszu zamiast opierać mechanizm na czasie wysłania mielibyśmy API w schowku, które pozwalałoby na pobranie najstarszego, nie wysłanego jeszcze batch-a i oznaczenie go jako wysłanego. Nie mniej dla testu byłby to przerost formy nad treścią.

Jak to się ma do wydajności? Zobaczcie sami:

Jako, że zadaniem endpoint-a jest tylko zapisanie wiadomości do pamięci, to jest on w stanie obsłużyć naprawdę olbrzymi ruch.

Podsumowanie

Niestety nie wszystko złoto co się świeci. W przypadku implementacji takiego mechanizmu trzeba bardzo uważać na sposób grupowania wiadomości. Jeżeli w naszym schowku będzie narastała ilość danych to bardzo łatwo o sytuację, gdy zabraknie nam RAM-u na maszynie. Tak więc ta decyzja będzie miała diametralne znaczenie dla stabilności tego rozwiązania.

Innym problemem jest możliwa utrata danych, więc to nie będzie pattern do użycia w miejscach, gdzie nie możemy sobie na to pozwolić.

Nie mniej uważam ten pattern za całkiem ciekawy sposób na przyspieszenie API. Warto jednak nadmienić, iż gorzej wygląda sprawa z użytecznością w aplikacjach enterprise. W większości miejsc, gdzie pracowałem, nie mogłem sobie pozwolić na utratę danych... Nie mówię jednak, że poznałem wszystkie przypadki. Jeżeli nacisk na prędkość działania miałby pierwszeństwo nad spójnością danych to myślę że ten pattern może wam się przydać!

Jak zwykle mam nadzieje że artykuł się podobał!

Do Następnego!

Cześć 🙂

Referencje

By Bd90 | 25-02-2024 | .NET, Apache Kafka