DevOps ve Ölçeklendirme

NATS Mesajlaşma + CaptchaAI: Basit CAPTCHA Görev Dağıtımı

NATS hafif, yüksek performanslı bir mesajlaşma sistemidir - JVM içermez, varsayılan olarak disk kalıcılığı yoktur, milisaniyeden kısa gecikme süresi. Kafka'nın dayanıklılığından ziyade hız ve basitliğe ihtiyaç duyduğunuz CAPTCHA görev dağıtımı için NATS ideal bir seçimdir.

Neden CAPTCHA Görevleri için NATS

Özellik NATS Kafka'nın TavşanMQ
Gecikme < 1 ms 5-10 ms 1-5 ms
Kurulum karmaşıklığı Tek ikili Küme + ZooKeeper Orta
Bellek ayak izi ~20MB ~1GB+ ~200MB
Kalıcılık İsteğe bağlı (JetStream) Yerleşik Yerleşik
Şunun için en iyisi: Geçici görevler, düşük gecikme Dayanıklı akış Karmaşık yönlendirme

CAPTCHA görevleri geçicidir; bir görev kaybolursa yeniden gönderirsiniz. NATS'ın basitliği ve hızı onu doğal bir seçim haline getiriyor.

Mimarlık

[Scrapers] → Publish → [NATS: captcha.tasks]
                              ↓
                    Queue Group: captcha-workers
                    ├── Worker 1 (solve via CaptchaAI)
                    ├── Worker 2
                    └── Worker 3
                              ↓
                    Publish → [NATS: captcha.results]
                              ↓
                    [Result Subscribers]

NATS kuyruk grupları, mesajları çalışanlar arasında otomatik olarak dağıtır; her görev tam olarak bir çalışana gider.

Önkoşullar

# Install NATS server
# macOS
brew install nats-server

# Linux
curl -L https://github.com/nats-io/nats-server/releases/download/v2.10.0/nats-server-v2.10.0-linux-amd64.tar.gz | tar xz

# Start
nats-server

# Python client
pip install nats-py

# Node.js client
npm install nats

Görev Yayımlayıcı (Kazıyıcı)

Python

import asyncio
import json
import nats


async def publish_captcha_tasks():
    nc = await nats.connect("nats://localhost:4222")

    tasks = [
        {
            "task_id": f"task_{i}",
            "method": "userrecaptcha",
            "sitekey": "6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-",
            "pageurl": f"https://example.com/page/{i}"
        }
        for i in range(100)
    ]

    for task in tasks:
        await nc.publish("captcha.tasks", json.dumps(task).encode())
        print(f"Published: {task['task_id']}")

    await nc.flush()
    await nc.close()


asyncio.run(publish_captcha_tasks())

JavaScript

const { connect, StringCodec } = require("nats");

const sc = StringCodec();

async function publishCaptchaTasks() {
  const nc = await connect({ servers: "nats://localhost:4222" });

  for (let i = 0; i < 100; i++) {
    const task = {
      task_id: `task_${i}`,
      method: "userrecaptcha",
      sitekey: "6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-",
      pageurl: `https://example.com/page/${i}`,
    };

    nc.publish("captcha.tasks", sc.encode(JSON.stringify(task)));
    console.log(`Published: ${task.task_id}`);
  }

  await nc.flush();
  await nc.close();
}

publishCaptchaTasks();

CAPTCHA Çalışanı (Kuyruk Grubu Abonesi)

Kuyruk grupları, birden fazla çalışan çalışıyor olsa bile her mesajın tam olarak bir çalışana gitmesini sağlar.

Python

import asyncio
import json
import os
import nats
import aiohttp

API_KEY = os.environ["CAPTCHAAI_API_KEY"]


async def solve_captcha(session, task):
    """Submit to CaptchaAI and poll for result."""
    # Submit
    async with session.post("https://ocr.captchaai.com/in.php", data={
        "key": API_KEY,
        "method": task["method"],
        "googlekey": task["sitekey"],
        "pageurl": task["pageurl"],
        "json": 1
    }) as resp:
        data = await resp.json(content_type=None)

    if data.get("status") != 1:
        return {"task_id": task["task_id"], "error": data.get("request")}

    captcha_id = data["request"]

    # Poll for result
    for _ in range(60):
        await asyncio.sleep(5)
        async with session.get("https://ocr.captchaai.com/res.php", params={
            "key": API_KEY, "action": "get", "id": captcha_id, "json": 1
        }) as resp:
            result = await resp.json(content_type=None)

        if result.get("status") == 1:
            return {"task_id": task["task_id"], "solution": result["request"]}
        if result.get("request") != "CAPCHA_NOT_READY":
            return {"task_id": task["task_id"], "error": result.get("request")}

    return {"task_id": task["task_id"], "error": "TIMEOUT"}


async def worker(worker_id):
    nc = await nats.connect("nats://localhost:4222")

    # Subscribe with queue group — each message goes to one worker only
    sub = await nc.subscribe("captcha.tasks", queue="captcha-workers")

    print(f"Worker {worker_id} listening...")

    async with aiohttp.ClientSession() as session:
        async for msg in sub.messages:
            task = json.loads(msg.data.decode())
            print(f"Worker {worker_id} processing {task['task_id']}")

            result = await solve_captcha(session, task)

            # Publish result
            await nc.publish(
                "captcha.results",
                json.dumps(result).encode()
            )

            status = "solved" if "solution" in result else result.get("error")
            print(f"  → {task['task_id']}: {status}")


asyncio.run(worker(1))

JavaScript

const { connect, StringCodec } = require("nats");
const axios = require("axios");

const sc = StringCodec();
const API_KEY = process.env.CAPTCHAAI_API_KEY;

function sleep(ms) {
  return new Promise((r) => setTimeout(r, ms));
}

async function solveCaptcha(task) {
  const submitResp = await axios.post(
    "https://ocr.captchaai.com/in.php",
    null,
    {
      params: {
        key: API_KEY,
        method: task.method,
        googlekey: task.sitekey,
        pageurl: task.pageurl,
        json: 1,
      },
    }
  );

  if (submitResp.data.status !== 1) {
    return { task_id: task.task_id, error: submitResp.data.request };
  }

  const captchaId = submitResp.data.request;

  for (let i = 0; i < 60; i++) {
    await sleep(5000);
    const result = await axios.get("https://ocr.captchaai.com/res.php", {
      params: { key: API_KEY, action: "get", id: captchaId, json: 1 },
    });

    if (result.data.status === 1) {
      return { task_id: task.task_id, solution: result.data.request };
    }
    if (result.data.request !== "CAPCHA_NOT_READY") {
      return { task_id: task.task_id, error: result.data.request };
    }
  }

  return { task_id: task.task_id, error: "TIMEOUT" };
}

async function worker(workerId) {
  const nc = await connect({ servers: "nats://localhost:4222" });

  // Queue group subscription — load-balanced across workers
  const sub = nc.subscribe("captcha.tasks", { queue: "captcha-workers" });

  console.log(`Worker ${workerId} listening...`);

  for await (const msg of sub) {
    const task = JSON.parse(sc.decode(msg.data));
    console.log(`Worker ${workerId} processing ${task.task_id}`);

    const result = await solveCaptcha(task);
    nc.publish("captcha.results", sc.encode(JSON.stringify(result)));

    const status = result.solution ? "solved" : result.error;
    console.log(`  → ${task.task_id}: ${status}`);
  }
}

worker(1);

Sonuç Toplayıcı

async def collect_results():
    nc = await nats.connect("nats://localhost:4222")
    sub = await nc.subscribe("captcha.results")

    solved = 0
    failed = 0

    async for msg in sub.messages:
        result = json.loads(msg.data.decode())

        if "solution" in result:
            solved += 1
            print(f"[SOLVED] {result['task_id']} — {result['solution'][:30]}...")
        else:
            failed += 1
            print(f"[FAILED] {result['task_id']} — {result['error']}")

        print(f"  Stats: {solved} solved, {failed} failed")

asyncio.run(collect_results())

Dayanıklılık için NATS JetStream

Kaybolmaması gereken görevler için JetStream kalıcılığını etkinleştirin:

async def durable_publisher():
    nc = await nats.connect("nats://localhost:4222")
    js = nc.jetstream()

    # Create stream (one-time setup)
    await js.add_stream(name="CAPTCHA", subjects=["captcha.>"])

    # Publish with acknowledgment
    ack = await js.publish("captcha.tasks", json.dumps(task).encode())
    print(f"Published to stream, seq={ack.seq}")

JetStream, Kafka'ya benzer ancak NATS'nin basitliğiyle disk kalıcılığı, yeniden oynatma yeteneği ve tam olarak bir kez teslimat ekler.

Ölçeklendirme Çalışanları

# Run multiple workers — NATS distributes automatically via queue groups
python worker.py --id=1 &
python worker.py --id=2 &
python worker.py --id=3 &

# Each task goes to exactly one worker
# Add more workers to increase throughput

Sorun giderme

Sorun Sebep Düzeltme
İletiler bırakıldı NATS core pub/sub yavaş tüketiciler için ara belleğe alma işlemi yapmıyor Kalıcılık için JetStream'i kullanın veya tüketici kapasitesini artırın
Çalışan mesaj almıyor Yanlış kuyruk grubu adı veya konusu Konu ve kuyruk grubunun yayıncıyla eşleştiğini doğrulayın
Bağlantı sıfırlama NATS sunucusu yeniden başlatılıyor İstemci seçeneklerinde otomatik yeniden bağlanmayı etkinleştir
Düzensiz dağıtım Bir çalışan diğerlerinden daha hızlı işlem yapıyor Normal – NATS mevcut çalışanlara dağıtılır; daha hızlı çalışanlar daha fazlasını alır

SSS

Redis veya Kafka yerine NATS'yi ne zaman kullanmalıyım?

Minimum düzeyde altyapı (tek ikili dosya, bağımlılık yok), milisaniyenin altında gecikme süresi istediğinizde ve kalıcı mesaj depolamaya ihtiyaç duymadığınızda NATS'yi kullanın. Dayanıklı akış için Kafka'yı, önbelleğe alma + pub/sub birleşimi için Redis'i kullanın.

NATS saatte 10.000'den fazla CAPTCHA görevini gerçekleştirebilir mi?

Kolayca. NATS saniyede milyonlarca mesajı işler. Darboğaz NATS verimi değil, CaptchaAI çözüm süreleri olacaktır.

JetStream'e ihtiyacım var mı?

Sadece ısrara ihtiyacınız varsa. Çoğu CAPTCHA iş akışı için temel NATS yeterlidir; bir görev kaybolursa yeniden gönderebilirsiniz. Denetim izleri veya tam olarak bir kez işleme gereksinimleri için JetStream'i etkinleştirin.

Sonraki Adımlar

CAPTCHA görevlerini NATS ile dağıtın —CaptchaAI API anahtarınızı alınve hafif sıklet işçilerini harekete geçirin.

İlgili kılavuzlar:

Bu makale için yorumlar devre dışı bırakılmıştır.