DevOps ve Ölçeklendirme

Apache Kafka + CaptchaAI: Akış CAPTCHA Görev İşleme

CAPTCHA çözme hacmi saatte binlerce göreve ulaştığında basit bir sıradan daha fazlasına ihtiyacınız olur. Apache Kafka, dayanıklı, düzenli, yüksek verimli mesaj akışı sağlar; CAPTCHA görev gönderimini geniş ölçekte sonuç işlemeden ayırmak için idealdir.

Mimarlık

[Scrapers] → Produce → [Kafka: captcha-tasks topic]
                              ↓
                    [CAPTCHA Worker Group]
                    (consume tasks, solve via CaptchaAI)
                              ↓
                    Produce → [Kafka: captcha-results topic]
                              ↓
                    [Result Consumer Group]
                    (process solutions, update database)

İki Kafka konusu endişeleri birbirinden ayırıyor:

  • captcha-tasks — Çözülmeyi bekleyen CAPTCHA parametreleri
  • captcha-results – Aşağı yönde kullanıma hazır çözülmüş belirteçler

Önkoşullar

# Python
pip install kafka-python requests

# Node.js
npm install kafkajs axios

localhost:9092'de (veya küme adresinizde) çalışan Kafka aracısı.

1. Adım: Konu Oluşturun

kafka-topics.sh --create --topic captcha-tasks \
  --partitions 6 --replication-factor 1 \
  --bootstrap-server localhost:9092

kafka-topics.sh --create --topic captcha-results \
  --partitions 6 --replication-factor 1 \
  --bootstrap-server localhost:9092

Altı bölüm, grup başına altı adede kadar paralel tüketiciye izin verir.

Adım 2: Görev Üreticisi (Kazıyıcı Tarafı)

Python

import json
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers=["localhost:9092"],
    value_serializer=lambda v: json.dumps(v).encode("utf-8"),
    key_serializer=lambda k: k.encode("utf-8") if k else None,
    acks="all",  # Wait for all replicas to confirm
    retries=3
)


def enqueue_captcha(task_id, sitekey, pageurl, captcha_type="userrecaptcha"):
    """Send a CAPTCHA task to Kafka."""
    task = {
        "task_id": task_id,
        "method": captcha_type,
        "sitekey": sitekey,
        "pageurl": pageurl,
        "submitted_at": __import__("time").time()
    }

    future = producer.send(
        "captcha-tasks",
        key=task_id,  # Key ensures same task goes to same partition
        value=task
    )
    future.get(timeout=10)  # Block until confirmed
    return task_id


# Submit tasks
enqueue_captcha("task_001", "6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-", "https://example.com")
enqueue_captcha("task_002", "6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-", "https://example.com")
producer.flush()

JavaScript

const { Kafka } = require("kafkajs");

const kafka = new Kafka({
  clientId: "captcha-producer",
  brokers: ["localhost:9092"],
});

const producer = kafka.producer();

async function enqueueCaptcha(taskId, sitekey, pageurl) {
  await producer.connect();

  const task = {
    task_id: taskId,
    method: "userrecaptcha",
    sitekey: sitekey,
    pageurl: pageurl,
    submitted_at: Date.now(),
  };

  await producer.send({
    topic: "captcha-tasks",
    messages: [{ key: taskId, value: JSON.stringify(task) }],
  });
}

(async () => {
  await enqueueCaptcha(
    "task_001",
    "6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-",
    "https://example.com"
  );
  await producer.disconnect();
})();

Adım 3: CAPTCHA Çalışanı (Tüketici + Çözücü)

Python

import json
import os
import time
import requests
from kafka import KafkaConsumer, KafkaProducer

API_KEY = os.environ["CAPTCHAAI_API_KEY"]

consumer = KafkaConsumer(
    "captcha-tasks",
    bootstrap_servers=["localhost:9092"],
    group_id="captcha-workers",
    value_deserializer=lambda m: json.loads(m.decode("utf-8")),
    auto_offset_reset="earliest",
    enable_auto_commit=False,  # Manual commit after processing
    max_poll_records=10
)

result_producer = KafkaProducer(
    bootstrap_servers=["localhost:9092"],
    value_serializer=lambda v: json.dumps(v).encode("utf-8")
)


def solve_captcha(task):
    """Submit to CaptchaAI and poll for result."""
    # Submit
    resp = requests.post("https://ocr.captchaai.com/in.php", data={
        "key": API_KEY,
        "method": task["method"],
        "googlekey": task["sitekey"],
        "pageurl": task["pageurl"],
        "json": 1
    })
    data = resp.json()

    if data.get("status") != 1:
        return {"error": data.get("request")}

    captcha_id = data["request"]

    # Poll for result
    for _ in range(60):
        time.sleep(5)
        result = requests.get("https://ocr.captchaai.com/res.php", params={
            "key": API_KEY,
            "action": "get",
            "id": captcha_id,
            "json": 1
        }).json()

        if result.get("status") == 1:
            return {"solution": result["request"]}
        if result.get("request") != "CAPCHA_NOT_READY":
            return {"error": result.get("request")}

    return {"error": "TIMEOUT"}


# Main consumer loop
print("CAPTCHA worker started. Waiting for tasks...")
for message in consumer:
    task = message.value
    print(f"Processing {task['task_id']}...")

    result = solve_captcha(task)
    result["task_id"] = task["task_id"]
    result["solved_at"] = time.time()

    # Publish result
    result_producer.send("captcha-results", value=result)
    result_producer.flush()

    # Commit offset after successful processing
    consumer.commit()
    print(f"  → {task['task_id']}: {'solved' if 'solution' in result else result.get('error')}")

JavaScript

const { Kafka } = require("kafkajs");
const axios = require("axios");

const API_KEY = process.env.CAPTCHAAI_API_KEY;

const kafka = new Kafka({
  clientId: "captcha-worker",
  brokers: ["localhost:9092"],
});

const consumer = kafka.consumer({ groupId: "captcha-workers" });
const producer = kafka.producer();

function sleep(ms) {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

async function solveCaptcha(task) {
  const submitResp = await axios.post(
    "https://ocr.captchaai.com/in.php",
    null,
    {
      params: {
        key: API_KEY,
        method: task.method,
        googlekey: task.sitekey,
        pageurl: task.pageurl,
        json: 1,
      },
    }
  );

  if (submitResp.data.status !== 1) {
    return { error: submitResp.data.request };
  }

  const captchaId = submitResp.data.request;

  for (let i = 0; i < 60; i++) {
    await sleep(5000);
    const result = await axios.get("https://ocr.captchaai.com/res.php", {
      params: { key: API_KEY, action: "get", id: captchaId, json: 1 },
    });

    if (result.data.status === 1) return { solution: result.data.request };
    if (result.data.request !== "CAPCHA_NOT_READY")
      return { error: result.data.request };
  }

  return { error: "TIMEOUT" };
}

async function run() {
  await consumer.connect();
  await producer.connect();
  await consumer.subscribe({ topic: "captcha-tasks", fromBeginning: false });

  await consumer.run({
    eachMessage: async ({ message }) => {
      const task = JSON.parse(message.value.toString());
      console.log(`Processing ${task.task_id}...`);

      const result = await solveCaptcha(task);
      result.task_id = task.task_id;
      result.solved_at = Date.now();

      await producer.send({
        topic: "captcha-results",
        messages: [{ value: JSON.stringify(result) }],
      });

      console.log(
        `  → ${task.task_id}: ${result.solution ? "solved" : result.error}`
      );
    },
  });
}

run();

Ölçeklendirme Çalışanları

Kafka tüketici grupları, bölümleri çalışanlar arasında otomatik olarak dağıtır:

# 6 partitions, 3 workers → each worker gets 2 partitions
Worker-1: partitions 0, 1
Worker-2: partitions 2, 3
Worker-3: partitions 4, 5

# Add Worker-4 → rebalance
Worker-1: partitions 0, 1
Worker-2: partitions 2
Worker-3: partitions 3, 4
Worker-4: partition 5

Bölüm sayısına kadar ölçeklendirin. Bunun ötesinde daha fazla bölüm ekleyin.

İzleme

Temel metrikleri Kafka tüketici gecikmesi yoluyla izleyin:

kafka-consumer-groups.sh --describe --group captcha-workers \
  --bootstrap-server localhost:9092
Metrik Sağlıklı Uyarı
Tüketici gecikmesi < 100 > 1000 (işçi ekle)
Mesajlar/sec in Kazıyıcı hızıyla eşleşir Sivri uçlar patlamayı gösteriyor
Mesajlar/sec çıktı Orandaki eşleşmeler Geride kalmak = darboğaz

Üretici doğrulama kuralları

  • Çözücü türünü, hedef meta verilerini veya yanıt yönlendirme ayrıntılarını gözden kaçıran iletileri, konuya gelmeden önce reddedin.
  • Yeniden denemelerin aynı sorun için yinelenen çözüm denemeleri yaratmaması için eş güç anahtarları ekleyin.
  • Hatalı biçimlendirilmiş kayıtları, daha sonra tekrar oynatmak için yeterli bağlama sahip, geçersiz bir yola gönderin.

Sorun giderme

Sorun Sebep Düzeltme
Tüketici gecikmesi artıyor İşçiler görev hızına ayak uyduramıyor Daha fazla çalışan örneği ekleyin (bölüm sayısına kadar)
Yinelenen sonuçlar İşçi mahsup işlemi yapmadan kaza yapıyor Sonuç tüketicisinde görev_id'ye yetkisizlik kontrolü ekleyin
Çok sık yeniden dengeleme İşçiler kaza yapıyor/restarting session.timeout.ms'yi artırın; OOM'u kontrol et
Görevler eşit dağıtılmıyor Kötü anahtar dağıtımı Rastgeleleştirilmiş anahtarlar veya daha fazla bölüm kullanın

SSS

Neden Redis veya RabbitMQ yerine Kafka?

Kafka, mesaj dayanıklılığına (tekrar oynatma yeteneği), yüksek verime (100.000'den fazla mesaj/sec) ve tüketici grubu ölçeklendirmesine ihtiyaç duyduğunuzda idealdir. 1.000 görevin altındaki daha basit kurulumlar için/hour, Redis veya RabbitMQ yeterlidir.

Bir konu mu yoksa iki konu mu kullanmalıyım?

İki konu (görevler + sonuçlar) üreticileri ve tüketicileri net bir şekilde ayırıyor. Görev üreticisinin sonuç tüketicileri hakkında bilgi sahibi olmasına gerek yoktur ve bunun tersi de geçerlidir.

Zehirli mesajları (çözülemeyen CAPTCHA'lar) nasıl ele alırım?

Çalışanda yeniden deneme sınırı ayarlayın. Maksimum yeniden denemeden sonra, manuel inceleme için bir captcha-dead-letter konusuna yayınlayın. Bölümü sonsuz yeniden denemelerle engellemeyin.

İlgili Makaleler

Sonraki Adımlar

Akış CAPTCHA ardışık düzenleri oluşturun —CaptchaAI API anahtarınızı alınve yüksek verimli işleme için Kafka'yı bağlayın.

İlgili kılavuzlar:

Bu makale için yorumlar devre dışı bırakılmıştır.