CAPTCHA çözme hacmi saatte binlerce göreve ulaştığında basit bir sıradan daha fazlasına ihtiyacınız olur. Apache Kafka, dayanıklı, düzenli, yüksek verimli mesaj akışı sağlar; CAPTCHA görev gönderimini geniş ölçekte sonuç işlemeden ayırmak için idealdir.
Mimarlık
[Scrapers] → Produce → [Kafka: captcha-tasks topic]
↓
[CAPTCHA Worker Group]
(consume tasks, solve via CaptchaAI)
↓
Produce → [Kafka: captcha-results topic]
↓
[Result Consumer Group]
(process solutions, update database)
İki Kafka konusu endişeleri birbirinden ayırıyor:
captcha-tasks— Çözülmeyi bekleyen CAPTCHA parametrelericaptcha-results– Aşağı yönde kullanıma hazır çözülmüş belirteçler
Önkoşullar
# Python
pip install kafka-python requests
# Node.js
npm install kafkajs axios
localhost:9092'de (veya küme adresinizde) çalışan Kafka aracısı.
1. Adım: Konu Oluşturun
kafka-topics.sh --create --topic captcha-tasks \
--partitions 6 --replication-factor 1 \
--bootstrap-server localhost:9092
kafka-topics.sh --create --topic captcha-results \
--partitions 6 --replication-factor 1 \
--bootstrap-server localhost:9092
Altı bölüm, grup başına altı adede kadar paralel tüketiciye izin verir.
Adım 2: Görev Üreticisi (Kazıyıcı Tarafı)
Python
import json
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=["localhost:9092"],
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
key_serializer=lambda k: k.encode("utf-8") if k else None,
acks="all", # Wait for all replicas to confirm
retries=3
)
def enqueue_captcha(task_id, sitekey, pageurl, captcha_type="userrecaptcha"):
"""Send a CAPTCHA task to Kafka."""
task = {
"task_id": task_id,
"method": captcha_type,
"sitekey": sitekey,
"pageurl": pageurl,
"submitted_at": __import__("time").time()
}
future = producer.send(
"captcha-tasks",
key=task_id, # Key ensures same task goes to same partition
value=task
)
future.get(timeout=10) # Block until confirmed
return task_id
# Submit tasks
enqueue_captcha("task_001", "6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-", "https://example.com")
enqueue_captcha("task_002", "6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-", "https://example.com")
producer.flush()
JavaScript
const { Kafka } = require("kafkajs");
const kafka = new Kafka({
clientId: "captcha-producer",
brokers: ["localhost:9092"],
});
const producer = kafka.producer();
async function enqueueCaptcha(taskId, sitekey, pageurl) {
await producer.connect();
const task = {
task_id: taskId,
method: "userrecaptcha",
sitekey: sitekey,
pageurl: pageurl,
submitted_at: Date.now(),
};
await producer.send({
topic: "captcha-tasks",
messages: [{ key: taskId, value: JSON.stringify(task) }],
});
}
(async () => {
await enqueueCaptcha(
"task_001",
"6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-",
"https://example.com"
);
await producer.disconnect();
})();
Adım 3: CAPTCHA Çalışanı (Tüketici + Çözücü)
Python
import json
import os
import time
import requests
from kafka import KafkaConsumer, KafkaProducer
API_KEY = os.environ["CAPTCHAAI_API_KEY"]
consumer = KafkaConsumer(
"captcha-tasks",
bootstrap_servers=["localhost:9092"],
group_id="captcha-workers",
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
auto_offset_reset="earliest",
enable_auto_commit=False, # Manual commit after processing
max_poll_records=10
)
result_producer = KafkaProducer(
bootstrap_servers=["localhost:9092"],
value_serializer=lambda v: json.dumps(v).encode("utf-8")
)
def solve_captcha(task):
"""Submit to CaptchaAI and poll for result."""
# Submit
resp = requests.post("https://ocr.captchaai.com/in.php", data={
"key": API_KEY,
"method": task["method"],
"googlekey": task["sitekey"],
"pageurl": task["pageurl"],
"json": 1
})
data = resp.json()
if data.get("status") != 1:
return {"error": data.get("request")}
captcha_id = data["request"]
# Poll for result
for _ in range(60):
time.sleep(5)
result = requests.get("https://ocr.captchaai.com/res.php", params={
"key": API_KEY,
"action": "get",
"id": captcha_id,
"json": 1
}).json()
if result.get("status") == 1:
return {"solution": result["request"]}
if result.get("request") != "CAPCHA_NOT_READY":
return {"error": result.get("request")}
return {"error": "TIMEOUT"}
# Main consumer loop
print("CAPTCHA worker started. Waiting for tasks...")
for message in consumer:
task = message.value
print(f"Processing {task['task_id']}...")
result = solve_captcha(task)
result["task_id"] = task["task_id"]
result["solved_at"] = time.time()
# Publish result
result_producer.send("captcha-results", value=result)
result_producer.flush()
# Commit offset after successful processing
consumer.commit()
print(f" → {task['task_id']}: {'solved' if 'solution' in result else result.get('error')}")
JavaScript
const { Kafka } = require("kafkajs");
const axios = require("axios");
const API_KEY = process.env.CAPTCHAAI_API_KEY;
const kafka = new Kafka({
clientId: "captcha-worker",
brokers: ["localhost:9092"],
});
const consumer = kafka.consumer({ groupId: "captcha-workers" });
const producer = kafka.producer();
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
async function solveCaptcha(task) {
const submitResp = await axios.post(
"https://ocr.captchaai.com/in.php",
null,
{
params: {
key: API_KEY,
method: task.method,
googlekey: task.sitekey,
pageurl: task.pageurl,
json: 1,
},
}
);
if (submitResp.data.status !== 1) {
return { error: submitResp.data.request };
}
const captchaId = submitResp.data.request;
for (let i = 0; i < 60; i++) {
await sleep(5000);
const result = await axios.get("https://ocr.captchaai.com/res.php", {
params: { key: API_KEY, action: "get", id: captchaId, json: 1 },
});
if (result.data.status === 1) return { solution: result.data.request };
if (result.data.request !== "CAPCHA_NOT_READY")
return { error: result.data.request };
}
return { error: "TIMEOUT" };
}
async function run() {
await consumer.connect();
await producer.connect();
await consumer.subscribe({ topic: "captcha-tasks", fromBeginning: false });
await consumer.run({
eachMessage: async ({ message }) => {
const task = JSON.parse(message.value.toString());
console.log(`Processing ${task.task_id}...`);
const result = await solveCaptcha(task);
result.task_id = task.task_id;
result.solved_at = Date.now();
await producer.send({
topic: "captcha-results",
messages: [{ value: JSON.stringify(result) }],
});
console.log(
` → ${task.task_id}: ${result.solution ? "solved" : result.error}`
);
},
});
}
run();
Ölçeklendirme Çalışanları
Kafka tüketici grupları, bölümleri çalışanlar arasında otomatik olarak dağıtır:
# 6 partitions, 3 workers → each worker gets 2 partitions
Worker-1: partitions 0, 1
Worker-2: partitions 2, 3
Worker-3: partitions 4, 5
# Add Worker-4 → rebalance
Worker-1: partitions 0, 1
Worker-2: partitions 2
Worker-3: partitions 3, 4
Worker-4: partition 5
Bölüm sayısına kadar ölçeklendirin. Bunun ötesinde daha fazla bölüm ekleyin.
İzleme
Temel metrikleri Kafka tüketici gecikmesi yoluyla izleyin:
kafka-consumer-groups.sh --describe --group captcha-workers \
--bootstrap-server localhost:9092
| Metrik | Sağlıklı | Uyarı |
|---|---|---|
| Tüketici gecikmesi | < 100 | > 1000 (işçi ekle) |
| Mesajlar/sec in | Kazıyıcı hızıyla eşleşir | Sivri uçlar patlamayı gösteriyor |
| Mesajlar/sec çıktı | Orandaki eşleşmeler | Geride kalmak = darboğaz |
Üretici doğrulama kuralları
- Çözücü türünü, hedef meta verilerini veya yanıt yönlendirme ayrıntılarını gözden kaçıran iletileri, konuya gelmeden önce reddedin.
- Yeniden denemelerin aynı sorun için yinelenen çözüm denemeleri yaratmaması için eş güç anahtarları ekleyin.
- Hatalı biçimlendirilmiş kayıtları, daha sonra tekrar oynatmak için yeterli bağlama sahip, geçersiz bir yola gönderin.
Sorun giderme
| Sorun | Sebep | Düzeltme |
|---|---|---|
| Tüketici gecikmesi artıyor | İşçiler görev hızına ayak uyduramıyor | Daha fazla çalışan örneği ekleyin (bölüm sayısına kadar) |
| Yinelenen sonuçlar | İşçi mahsup işlemi yapmadan kaza yapıyor | Sonuç tüketicisinde görev_id'ye yetkisizlik kontrolü ekleyin |
| Çok sık yeniden dengeleme | İşçiler kaza yapıyor/restarting | session.timeout.ms'yi artırın; OOM'u kontrol et |
| Görevler eşit dağıtılmıyor | Kötü anahtar dağıtımı | Rastgeleleştirilmiş anahtarlar veya daha fazla bölüm kullanın |
SSS
Neden Redis veya RabbitMQ yerine Kafka?
Kafka, mesaj dayanıklılığına (tekrar oynatma yeteneği), yüksek verime (100.000'den fazla mesaj/sec) ve tüketici grubu ölçeklendirmesine ihtiyaç duyduğunuzda idealdir. 1.000 görevin altındaki daha basit kurulumlar için/hour, Redis veya RabbitMQ yeterlidir.
Bir konu mu yoksa iki konu mu kullanmalıyım?
İki konu (görevler + sonuçlar) üreticileri ve tüketicileri net bir şekilde ayırıyor. Görev üreticisinin sonuç tüketicileri hakkında bilgi sahibi olmasına gerek yoktur ve bunun tersi de geçerlidir.
Zehirli mesajları (çözülemeyen CAPTCHA'lar) nasıl ele alırım?
Çalışanda yeniden deneme sınırı ayarlayın. Maksimum yeniden denemeden sonra, manuel inceleme için bir captcha-dead-letter konusuna yayınlayın. Bölümü sonsuz yeniden denemelerle engellemeyin.
İlgili Makaleler
Sonraki Adımlar
Akış CAPTCHA ardışık düzenleri oluşturun —CaptchaAI API anahtarınızı alınve yüksek verimli işleme için Kafka'yı bağlayın.
İlgili kılavuzlar: