NATS hafif, yüksek performanslı bir mesajlaşma sistemidir - JVM içermez, varsayılan olarak disk kalıcılığı yoktur, milisaniyeden kısa gecikme süresi. Kafka'nın dayanıklılığından ziyade hız ve basitliğe ihtiyaç duyduğunuz CAPTCHA görev dağıtımı için NATS ideal bir seçimdir.
Neden CAPTCHA Görevleri için NATS
| Özellik | NATS | Kafka'nın | TavşanMQ |
|---|---|---|---|
| Gecikme | < 1 ms | 5-10 ms | 1-5 ms |
| Kurulum karmaşıklığı | Tek ikili | Küme + ZooKeeper | Orta |
| Bellek ayak izi | ~20MB | ~1GB+ | ~200MB |
| Kalıcılık | İsteğe bağlı (JetStream) | Yerleşik | Yerleşik |
| Şunun için en iyisi: | Geçici görevler, düşük gecikme | Dayanıklı akış | Karmaşık yönlendirme |
CAPTCHA görevleri geçicidir; bir görev kaybolursa yeniden gönderirsiniz. NATS'ın basitliği ve hızı onu doğal bir seçim haline getiriyor.
Mimarlık
[Scrapers] → Publish → [NATS: captcha.tasks]
↓
Queue Group: captcha-workers
├── Worker 1 (solve via CaptchaAI)
├── Worker 2
└── Worker 3
↓
Publish → [NATS: captcha.results]
↓
[Result Subscribers]
NATS kuyruk grupları, mesajları çalışanlar arasında otomatik olarak dağıtır; her görev tam olarak bir çalışana gider.
Önkoşullar
# Install NATS server
# macOS
brew install nats-server
# Linux
curl -L https://github.com/nats-io/nats-server/releases/download/v2.10.0/nats-server-v2.10.0-linux-amd64.tar.gz | tar xz
# Start
nats-server
# Python client
pip install nats-py
# Node.js client
npm install nats
Görev Yayımlayıcı (Kazıyıcı)
Python
import asyncio
import json
import nats
async def publish_captcha_tasks():
nc = await nats.connect("nats://localhost:4222")
tasks = [
{
"task_id": f"task_{i}",
"method": "userrecaptcha",
"sitekey": "6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-",
"pageurl": f"https://example.com/page/{i}"
}
for i in range(100)
]
for task in tasks:
await nc.publish("captcha.tasks", json.dumps(task).encode())
print(f"Published: {task['task_id']}")
await nc.flush()
await nc.close()
asyncio.run(publish_captcha_tasks())
JavaScript
const { connect, StringCodec } = require("nats");
const sc = StringCodec();
async function publishCaptchaTasks() {
const nc = await connect({ servers: "nats://localhost:4222" });
for (let i = 0; i < 100; i++) {
const task = {
task_id: `task_${i}`,
method: "userrecaptcha",
sitekey: "6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-",
pageurl: `https://example.com/page/${i}`,
};
nc.publish("captcha.tasks", sc.encode(JSON.stringify(task)));
console.log(`Published: ${task.task_id}`);
}
await nc.flush();
await nc.close();
}
publishCaptchaTasks();
CAPTCHA Çalışanı (Kuyruk Grubu Abonesi)
Kuyruk grupları, birden fazla çalışan çalışıyor olsa bile her mesajın tam olarak bir çalışana gitmesini sağlar.
Python
import asyncio
import json
import os
import nats
import aiohttp
API_KEY = os.environ["CAPTCHAAI_API_KEY"]
async def solve_captcha(session, task):
"""Submit to CaptchaAI and poll for result."""
# Submit
async with session.post("https://ocr.captchaai.com/in.php", data={
"key": API_KEY,
"method": task["method"],
"googlekey": task["sitekey"],
"pageurl": task["pageurl"],
"json": 1
}) as resp:
data = await resp.json(content_type=None)
if data.get("status") != 1:
return {"task_id": task["task_id"], "error": data.get("request")}
captcha_id = data["request"]
# Poll for result
for _ in range(60):
await asyncio.sleep(5)
async with session.get("https://ocr.captchaai.com/res.php", params={
"key": API_KEY, "action": "get", "id": captcha_id, "json": 1
}) as resp:
result = await resp.json(content_type=None)
if result.get("status") == 1:
return {"task_id": task["task_id"], "solution": result["request"]}
if result.get("request") != "CAPCHA_NOT_READY":
return {"task_id": task["task_id"], "error": result.get("request")}
return {"task_id": task["task_id"], "error": "TIMEOUT"}
async def worker(worker_id):
nc = await nats.connect("nats://localhost:4222")
# Subscribe with queue group — each message goes to one worker only
sub = await nc.subscribe("captcha.tasks", queue="captcha-workers")
print(f"Worker {worker_id} listening...")
async with aiohttp.ClientSession() as session:
async for msg in sub.messages:
task = json.loads(msg.data.decode())
print(f"Worker {worker_id} processing {task['task_id']}")
result = await solve_captcha(session, task)
# Publish result
await nc.publish(
"captcha.results",
json.dumps(result).encode()
)
status = "solved" if "solution" in result else result.get("error")
print(f" → {task['task_id']}: {status}")
asyncio.run(worker(1))
JavaScript
const { connect, StringCodec } = require("nats");
const axios = require("axios");
const sc = StringCodec();
const API_KEY = process.env.CAPTCHAAI_API_KEY;
function sleep(ms) {
return new Promise((r) => setTimeout(r, ms));
}
async function solveCaptcha(task) {
const submitResp = await axios.post(
"https://ocr.captchaai.com/in.php",
null,
{
params: {
key: API_KEY,
method: task.method,
googlekey: task.sitekey,
pageurl: task.pageurl,
json: 1,
},
}
);
if (submitResp.data.status !== 1) {
return { task_id: task.task_id, error: submitResp.data.request };
}
const captchaId = submitResp.data.request;
for (let i = 0; i < 60; i++) {
await sleep(5000);
const result = await axios.get("https://ocr.captchaai.com/res.php", {
params: { key: API_KEY, action: "get", id: captchaId, json: 1 },
});
if (result.data.status === 1) {
return { task_id: task.task_id, solution: result.data.request };
}
if (result.data.request !== "CAPCHA_NOT_READY") {
return { task_id: task.task_id, error: result.data.request };
}
}
return { task_id: task.task_id, error: "TIMEOUT" };
}
async function worker(workerId) {
const nc = await connect({ servers: "nats://localhost:4222" });
// Queue group subscription — load-balanced across workers
const sub = nc.subscribe("captcha.tasks", { queue: "captcha-workers" });
console.log(`Worker ${workerId} listening...`);
for await (const msg of sub) {
const task = JSON.parse(sc.decode(msg.data));
console.log(`Worker ${workerId} processing ${task.task_id}`);
const result = await solveCaptcha(task);
nc.publish("captcha.results", sc.encode(JSON.stringify(result)));
const status = result.solution ? "solved" : result.error;
console.log(` → ${task.task_id}: ${status}`);
}
}
worker(1);
Sonuç Toplayıcı
async def collect_results():
nc = await nats.connect("nats://localhost:4222")
sub = await nc.subscribe("captcha.results")
solved = 0
failed = 0
async for msg in sub.messages:
result = json.loads(msg.data.decode())
if "solution" in result:
solved += 1
print(f"[SOLVED] {result['task_id']} — {result['solution'][:30]}...")
else:
failed += 1
print(f"[FAILED] {result['task_id']} — {result['error']}")
print(f" Stats: {solved} solved, {failed} failed")
asyncio.run(collect_results())
Dayanıklılık için NATS JetStream
Kaybolmaması gereken görevler için JetStream kalıcılığını etkinleştirin:
async def durable_publisher():
nc = await nats.connect("nats://localhost:4222")
js = nc.jetstream()
# Create stream (one-time setup)
await js.add_stream(name="CAPTCHA", subjects=["captcha.>"])
# Publish with acknowledgment
ack = await js.publish("captcha.tasks", json.dumps(task).encode())
print(f"Published to stream, seq={ack.seq}")
JetStream, Kafka'ya benzer ancak NATS'nin basitliğiyle disk kalıcılığı, yeniden oynatma yeteneği ve tam olarak bir kez teslimat ekler.
Ölçeklendirme Çalışanları
# Run multiple workers — NATS distributes automatically via queue groups
python worker.py --id=1 &
python worker.py --id=2 &
python worker.py --id=3 &
# Each task goes to exactly one worker
# Add more workers to increase throughput
Sorun giderme
| Sorun | Sebep | Düzeltme |
|---|---|---|
| İletiler bırakıldı | NATS core pub/sub yavaş tüketiciler için ara belleğe alma işlemi yapmıyor | Kalıcılık için JetStream'i kullanın veya tüketici kapasitesini artırın |
| Çalışan mesaj almıyor | Yanlış kuyruk grubu adı veya konusu | Konu ve kuyruk grubunun yayıncıyla eşleştiğini doğrulayın |
| Bağlantı sıfırlama | NATS sunucusu yeniden başlatılıyor | İstemci seçeneklerinde otomatik yeniden bağlanmayı etkinleştir |
| Düzensiz dağıtım | Bir çalışan diğerlerinden daha hızlı işlem yapıyor | Normal – NATS mevcut çalışanlara dağıtılır; daha hızlı çalışanlar daha fazlasını alır |
SSS
Redis veya Kafka yerine NATS'yi ne zaman kullanmalıyım?
Minimum düzeyde altyapı (tek ikili dosya, bağımlılık yok), milisaniyenin altında gecikme süresi istediğinizde ve kalıcı mesaj depolamaya ihtiyaç duymadığınızda NATS'yi kullanın. Dayanıklı akış için Kafka'yı, önbelleğe alma + pub/sub birleşimi için Redis'i kullanın.
NATS saatte 10.000'den fazla CAPTCHA görevini gerçekleştirebilir mi?
Kolayca. NATS saniyede milyonlarca mesajı işler. Darboğaz NATS verimi değil, CaptchaAI çözüm süreleri olacaktır.
JetStream'e ihtiyacım var mı?
Sadece ısrara ihtiyacınız varsa. Çoğu CAPTCHA iş akışı için temel NATS yeterlidir; bir görev kaybolursa yeniden gönderebilirsiniz. Denetim izleri veya tam olarak bir kez işleme gereksinimleri için JetStream'i etkinleştirin.
Sonraki Adımlar
CAPTCHA görevlerini NATS ile dağıtın —CaptchaAI API anahtarınızı alınve hafif sıklet işçilerini harekete geçirin.
İlgili kılavuzlar: