graceful
kafka-consumer capability
kafka-consumer adds a Kafka consumer to your service with a handler interface, consumer group management, and graceful shutdown drain.
Why Kafka for Async Processing
Section titled “Why Kafka for Async Processing”The problem with synchronous processing is coupling: the caller blocks until the work is done, and if the work is slow or the downstream is unavailable, the caller is affected. Kafka breaks that coupling. The producer writes a message and moves on; the consumer reads it when it’s ready. The two sides can scale, deploy, and fail independently.
Consumer groups are the key abstraction. Kafka distributes partitions across consumers in the same group, so you can scale processing horizontally by adding more consumers without changing the producer. Each partition is read by exactly one consumer in a group at a time, which gives you ordered processing within a partition and parallel processing across partitions.
The durability guarantee matters too. Unlike Redis pub/sub (which drops messages if no consumer is connected), Kafka retains messages for a configurable period — days by default. A consumer that falls behind can catch up. A consumer that crashes and restarts re-reads from its last committed offset.
Where Kafka is the wrong tool: if your use case is simple task queuing with no ordering requirements and you don’t need replay, something lighter (a worker backed by a database queue, or a Redis list) is easier to operate. Kafka’s operational overhead is real. Run it when the durability, replay, and fan-out guarantees are worth it.
What You Get
Section titled “What You Get”- Consumer group setup with broker and topic configuration
- Message handler interface — implement one function to process messages
- Graceful shutdown with in-flight message drain
- Error re-throw pattern for dead-letter queue routing
- Config for brokers, topics, consumer group ID, and commit strategy
Go Implementation
Section titled “Go Implementation”The Go capability uses segmentio/kafka-go — a pure Go Kafka client with no CGo dependency and a clean API for both low-level reader control and higher-level consumer group abstractions.
The scaffold creates a kafka.Reader bound to a consumer group and wraps it in a Consumer struct that calls your handler for each message. Commits happen after the handler returns without error — at-least-once delivery by default.
// Implement the handler interface for your message type.type OrderHandler struct{ orderService *OrderService }
func (h *OrderHandler) Handle(ctx context.Context, msg kafka.Message) error { var event OrderPlacedEvent if err := json.Unmarshal(msg.Value, &event); err != nil { return fmt.Errorf("unmarshal order placed event: %w", err) } return h.orderService.ProcessOrder(ctx, event)}
// The consumer loop runs until context is cancelled.consumer := kafka.NewConsumer(cfg.Kafka, &OrderHandler{orderService})consumer.Run(ctx)Returning an error from the handler does not commit the offset. The message will be re-delivered on the next fetch. For messages that consistently fail (poison pills), you need a dead-letter strategy — the scaffold includes a MaxRetries option that routes to a DLQ topic after N failures rather than blocking the partition forever.
TypeScript Implementation
Section titled “TypeScript Implementation”The TypeScript capability uses KafkaJS — the most widely used Kafka client for Node.js, with full TypeScript types, consumer group support, and a clear async API.
The scaffold exports a createConsumer factory function that returns a configured consumer bound to your handler function. The consumer subscribes from the latest offset by default — change to fromBeginning: true if you need replay.
import { createConsumer } from './messaging/kafka';
const consumer = await createConsumer({ topics: ['orders.placed'], groupId: 'order-processor', handler: async ({ topic, partition, message }) => { const event = JSON.parse(message.value?.toString() ?? '');
// Throwing here re-throws to KafkaJS, which handles retry/DLQ routing. await orderService.processOrder(event); },});
await consumer.connect();await consumer.run();
// On SIGTERM — drains in-flight messages before disconnecting.process.on('SIGTERM', async () => { await consumer.disconnect();});The error re-throw pattern is intentional: KafkaJS will handle the retry logic and DLQ routing if configured. Swallowing errors in the handler means offsets get committed for messages that weren’t actually processed.
Exactly-Once vs At-Least-Once
Section titled “Exactly-Once vs At-Least-Once”The scaffold implements at-least-once delivery: messages are committed after successful handler execution. If your handler processes a message and then crashes before the commit, the message will be re-delivered and processed again.
If your handler has side effects that must not be duplicated (writing to a database, charging a payment), pair this with idempotency. Store a fingerprint of each message ID and skip processing if it’s already been handled. This is the correct way to get effectively-once semantics without the complexity of Kafka transactions.
Add to Your Service
Section titled “Add to Your Service”verikt new my-service --cap kafka-consumer# or add to an existing service:verikt add kafka-consumerRelated Capabilities
Section titled “Related Capabilities”outbox
idempotency
observability