SAGA-PATTERN DISTRIBUTED-TRANSACTIONS MICROSERVICES EVENT-DRIVEN DATA-CONSISTENCY WORKFLOW-MANAGEMENT SYSTEM-DESIGN BACKEND ARCHITECTURE NODEJS RABBITMQ TRANSACTION

Mengimplementasikan Saga Pattern: Panduan Praktis Transaksi Terdistribusi untuk Microservices Anda

⏱️ 14 menit baca
👨‍💻

Mengimplementasikan Saga Pattern: Panduan Praktis Transaksi Terdistribusi untuk Microservices Anda

1. Pendahuluan

Di dunia monolit, menjaga konsistensi data sangat mudah. Anda cukup menggunakan transaksi database (ACID) dan semuanya beres. Namun, begitu Anda memecah aplikasi menjadi microservices, transaksi tunggal yang mencakup beberapa layanan menjadi mustahil. Setiap layanan memiliki databasenya sendiri, dan tidak ada lagi “dua fase commit” yang ajaib di seluruh sistem.

Inilah masalah klasik dalam arsitektur terdistribusi: bagaimana menjaga konsistensi data ketika sebuah operasi bisnis melibatkan beberapa layanan yang independen? Jika salah satu bagian operasi gagal, semua bagian lainnya harus dibatalkan (rollback) agar data tetap konsisten.

Memperkenalkan Saga Pattern: sebuah pola desain yang membantu kita mengelola transaksi bisnis yang mencakup beberapa layanan, dengan setiap layanan melakukan transaksi lokalnya sendiri dan mempublikasikan event. Jika terjadi kegagalan, Saga akan mengorkestrasi serangkaian “kompensasi” untuk membatalkan transaksi yang sudah berhasil. Ini adalah salah satu kunci untuk membangun sistem microservices yang tangguh dan andal.

Artikel ini akan membawa Anda melampaui teori dan langsung ke praktik. Kita akan mengimplementasikan Saga Pattern dengan contoh konkret menggunakan Node.js dan RabbitMQ.

2. Memahami Kembali Saga Pattern (Singkat)

Sebelum menyelam ke kode, mari kita segarkan ingatan tentang Saga Pattern. Intinya, Saga adalah urutan transaksi lokal. Setiap transaksi lokal memperbarui database layanan dan mempublikasikan event yang memicu transaksi lokal berikutnya dalam saga.

Ada dua pendekatan utama untuk mengimplementasikan Saga:

  1. Choreography: Setiap layanan mempublikasikan event dan layanan lain mendengarkan event tersebut untuk memicu transaksi lokalnya sendiri. Ini bersifat desentralisasi dan lebih sederhana untuk saga yang ringan, tetapi bisa sulit untuk dilacak dan didebug jika saga menjadi kompleks.
  2. Orchestration: Sebuah layanan terpusat (Saga Orchestrator) mengelola dan mengoordinasikan seluruh alur saga. Orchestrator memberi tahu layanan mana yang harus melakukan apa, dan bertanggung jawab untuk memicu langkah kompensasi jika terjadi kegagalan. Ini lebih kompleks untuk diimplementasikan di awal, tetapi memberikan kontrol dan visibilitas yang lebih baik.

Untuk panduan praktis ini, kita akan fokus pada Orchestration karena memberikan kontrol yang lebih jelas dan lebih mudah ditunjukkan dengan contoh kode.

3. Studi Kasus: Proses Pemesanan E-commerce

Mari kita ambil studi kasus yang umum: proses pemesanan di sebuah e-commerce. Ketika pelanggan membuat pesanan, beberapa hal perlu terjadi secara berurutan:

  1. Order Service: Membuat entri pesanan baru.
  2. Payment Service: Memproses pembayaran.
  3. Inventory Service: Mengurangi stok produk yang dipesan.

Jika salah satu dari langkah ini gagal (misalnya, pembayaran ditolak atau stok tidak cukup), seluruh pesanan harus dibatalkan.

Alur Saga (Orchestration):

  1. Order Service menerima permintaan pesanan.
  2. Order Service membuat pesanan dengan status PENDING dan mengirim event OrderCreated ke Order Saga Orchestrator.
  3. Order Saga Orchestrator menerima OrderCreated dan memerintahkan Payment Service untuk memproses pembayaran.
  4. Payment Service memproses pembayaran dan merespons dengan PaymentProcessed (berhasil) atau PaymentFailed (gagal).
  5. Jika PaymentProcessed: Order Saga Orchestrator memerintahkan Inventory Service untuk mengurangi stok.
  6. Jika PaymentFailed: Order Saga Orchestrator memerintahkan Order Service untuk membatalkan pesanan (kompensasi).
  7. Inventory Service mengurangi stok dan merespons dengan InventoryUpdated (berhasil) atau InventoryFailed (gagal).
  8. Jika InventoryUpdated: Order Saga Orchestrator memperbarui status pesanan menjadi COMPLETED di Order Service.
  9. Jika InventoryFailed: Order Saga Orchestrator memerintahkan Payment Service untuk mengembalikan dana (kompensasi) dan Order Service untuk membatalkan pesanan (kompensasi).

4. Merancang Orchestrator Saga

Orchestrator adalah jantung dari Saga Orchestration. Ia adalah sebuah state machine yang melacak status saga dan menentukan langkah selanjutnya berdasarkan event yang diterima.

Kita bisa merepresentasikan status saga seperti ini:

graph TD
    A[OrderCreated] --> B{Process Payment};
    B -- PaymentProcessed --> C{Update Inventory};
    B -- PaymentFailed --> D[CancelOrder];
    C -- InventoryUpdated --> E[CompleteOrder];
    C -- InventoryFailed --> F[RefundPayment];
    F --> D;
    D --> G[OrderCancelled];
    E --> H[OrderCompleted];

Setiap transisi akan dipicu oleh sebuah event dan akan menghasilkan perintah ke layanan mikro atau event internal saga.

5. Mengimplementasikan Layanan Mikro (Node.js & RabbitMQ)

Kita akan menggunakan Node.js untuk layanan mikro dan RabbitMQ sebagai broker pesan untuk komunikasi. Setiap layanan akan memiliki:

Mari kita lihat struktur kode intinya.

Order Service

// order-service/index.js
const express = require('express');
const amqp = require('amqplib');
const app = express();
app.use(express.json());

let channel;
const orders = {}; // Simpan pesanan di memori untuk demo

async function connectRabbitMQ() {
    const connection = await amqp.connect('amqp://localhost');
    channel = await connection.createChannel();
    await channel.assertQueue('order_commands'); // Untuk menerima perintah dari orchestrator
    await channel.assertQueue('order_events');   // Untuk mempublikasikan event
    console.log('Order Service connected to RabbitMQ');

    channel.consume('order_commands', async (msg) => {
        const { type, payload } = JSON.parse(msg.content.toString());
        console.log(`Order Service received command: ${type} for order ${payload.orderId}`);

        if (type === 'CANCEL_ORDER') {
            // Logika kompensasi: membatalkan pesanan
            const order = orders[payload.orderId];
            if (order && order.status !== 'CANCELLED') {
                order.status = 'CANCELLED';
                console.log(`Order ${payload.orderId} cancelled.`);
                // Publikasikan event OrderCancelled
                channel.publish('', 'order_events', Buffer.from(JSON.stringify({
                    type: 'OrderCancelled',
                    payload: { orderId: payload.orderId }
                })));
            }
        } else if (type === 'COMPLETE_ORDER') {
            const order = orders[payload.orderId];
            if (order) {
                order.status = 'COMPLETED';
                console.log(`Order ${payload.orderId} completed.`);
            }
        }
        channel.ack(msg);
    });
}

app.post('/orders', async (req, res) => {
    const { items, total } = req.body;
    const orderId = `ORD-${Date.now()}`;
    orders[orderId] = { orderId, items, total, status: 'PENDING' };
    console.log(`Order ${orderId} created with status PENDING.`);

    // Publikasikan event OrderCreated ke orchestrator
    channel.publish('', 'order_events', Buffer.from(JSON.stringify({
        type: 'OrderCreated',
        payload: { orderId, total }
    })));

    res.status(202).json({ message: 'Order received, processing...', orderId });
});

connectRabbitMQ().then(() => {
    app.listen(3000, () => console.log('Order Service listening on port 3000'));
});

Payment Service

// payment-service/index.js
const amqp = require('amqplib');

let channel;
const payments = {}; // Simpan pembayaran di memori

async function connectRabbitMQ() {
    const connection = await amqp.connect('amqp://localhost');
    channel = await connection.createChannel();
    await channel.assertQueue('payment_commands'); // Untuk menerima perintah dari orchestrator
    await channel.assertQueue('payment_events');   // Untuk mempublikasikan event
    console.log('Payment Service connected to RabbitMQ');

    channel.consume('payment_commands', async (msg) => {
        const { type, payload } = JSON.parse(msg.content.toString());
        console.log(`Payment Service received command: ${type} for order ${payload.orderId}`);

        if (type === 'PROCESS_PAYMENT') {
            // Logika proses pembayaran
            const success = Math.random() > 0.1; // 90% sukses
            payments[payload.orderId] = { orderId: payload.orderId, amount: payload.total, status: success ? 'PAID' : 'FAILED' };

            if (success) {
                console.log(`Payment for order ${payload.orderId} processed successfully.`);
                channel.publish('', 'payment_events', Buffer.from(JSON.stringify({
                    type: 'PaymentProcessed',
                    payload: { orderId: payload.orderId }
                })));
            } else {
                console.log(`Payment for order ${payload.orderId} failed.`);
                channel.publish('', 'payment_events', Buffer.from(JSON.stringify({
                    type: 'PaymentFailed',
                    payload: { orderId: payload.orderId }
                })));
            }
        } else if (type === 'REFUND_PAYMENT') {
            // Logika kompensasi: mengembalikan dana
            const payment = payments[payload.orderId];
            if (payment && payment.status === 'PAID') {
                payment.status = 'REFUNDED';
                console.log(`Payment for order ${payload.orderId} refunded.`);
                // Publikasikan event PaymentRefunded
                channel.publish('', 'payment_events', Buffer.from(JSON.stringify({
                    type: 'PaymentRefunded',
                    payload: { orderId: payload.orderId }
                })));
            }
        }
        channel.ack(msg);
    });
}

connectRabbitMQ();

Inventory Service

// inventory-service/index.js
const amqp = require('amqplib');

let channel;
const inventory = {
    'product-1': 10,
    'product-2': 5
}; // Stok produk

async function connectRabbitMQ() {
    const connection = await amqp.connect('amqp://localhost');
    channel = await connection.createChannel();
    await channel.assertQueue('inventory_commands'); // Untuk menerima perintah dari orchestrator
    await channel.assertQueue('inventory_events');   // Untuk mempublikasikan event
    console.log('Inventory Service connected to RabbitMQ');

    channel.consume('inventory_commands', async (msg) => {
        const { type, payload } = JSON.parse(msg.content.toString());
        console.log(`Inventory Service received command: ${type} for order ${payload.orderId}`);

        if (type === 'UPDATE_INVENTORY') {
            // Logika mengurangi stok
            // Untuk demo, asumsikan pesanan selalu 'product-1'
            if (inventory['product-1'] > 0) {
                inventory['product-1']--;
                console.log(`Inventory for product-1 updated. Remaining: ${inventory['product-1']}`);
                channel.publish('', 'inventory_events', Buffer.from(JSON.stringify({
                    type: 'InventoryUpdated',
                    payload: { orderId: payload.orderId }
                })));
            } else {
                console.log(`Inventory for product-1 insufficient for order ${payload.orderId}.`);
                channel.publish('', 'inventory_events', Buffer.from(JSON.stringify({
                    type: 'InventoryFailed',
                    payload: { orderId: payload.orderId }
                })));
            }
        }
        channel.ack(msg);
    });
}

connectRabbitMQ();

6. Mengimplementasikan Saga Orchestrator

Orchestrator akan mendengarkan event dari semua layanan dan mengirimkan perintah.

// saga-orchestrator/index.js
const amqp = require('amqplib');

let channel;
const sagaStates = {}; // Menyimpan status saga untuk setiap orderId

async function connectRabbitMQ() {
    const connection = await amqp.connect('amqp://localhost');
    channel = await connection.createChannel();

    // Pastikan semua antrian perintah ada
    await channel.assertQueue('order_commands');
    await channel.assertQueue('payment_commands');
    await channel.assertQueue('inventory_commands');

    // Buat antrian untuk mendengarkan semua event
    await channel.assertQueue('saga_events');

    // Bind ke semua antrian event layanan
    await channel.assertQueue('order_events');
    await channel.assertQueue('payment_events');
    await channel.assertQueue('inventory_events');

    channel.consume('order_events', handleEvent);
    channel.consume('payment_events', handleEvent);
    channel.consume('inventory_events', handleEvent);
    console.log('Saga Orchestrator connected to RabbitMQ and listening for events');
}

async function handleEvent(msg) {
    const { type, payload } = JSON.parse(msg.content.toString());
    const { orderId, total } = payload;

    console.log(`Orchestrator received event: ${type} for order ${orderId}`);

    if (!sagaStates[orderId]) {
        sagaStates[orderId] = { status: 'INITIAL' };
    }

    const currentState = sagaStates[orderId].status;

    switch (type) {
        case 'OrderCreated':
            if (currentState === 'INITIAL') {
                sagaStates[orderId].status = 'PAYMENT_PENDING';
                channel.publish('', 'payment_commands', Buffer.from(JSON.stringify({
                    type: 'PROCESS_PAYMENT',