Statik çalışan havuzları, sessiz zamanlarda para israfına neden olur ve yoğun zamanlarda darboğazlar yaratır. Otomatik ölçeklendirme, çalışan sayısını gerçek taleple eşleştirerek hem maliyeti hem de verimi optimize eder.
Ölçeklendirme Sinyalleri
| Sinyal | Ne Zaman Ölçeklendirin | Ne Zaman Ölçeği Azaltın |
|---|---|---|
| Sıra derinliği | > 20 bekleyen görev | < 5 bekleyen görev |
| İşçi kullanımı | > %80 meşgul | < %20 meşgul |
| Gecikmeyi çözün | P95 > 60 saniye | P95 < 20 saniye |
| Hata oranı | > %5 (yeni işçilere ihtiyaç var) | Kararlı <%1 |
| Denge | N/A | Bakiye < 1 $ (ölçeklendirmeyi durdur) |
Konu Tabanlı Otomatik Ölçekleyici
Çalışan iş parçacıklarını tek bir işlemde ölçeklendirin:
import os
import time
import threading
import requests
import json
import redis
class AutoScalingPool:
"""Dynamically scale CaptchaAI worker threads."""
def __init__(self, api_key, redis_url="redis://localhost:6379"):
self.api_key = api_key
self.redis = redis.from_url(redis_url)
self.base = "https://ocr.captchaai.com"
self.queue_key = "captcha:tasks"
self.results_key = "captcha:results"
self.min_workers = 2
self.max_workers = 20
self.workers = []
self.active_count = 0
self.lock = threading.Lock()
self.running = True
def start(self):
"""Start the pool with minimum workers."""
for _ in range(self.min_workers):
self._add_worker()
# Start scaler in background
scaler = threading.Thread(target=self._scaling_loop, daemon=True)
scaler.start()
print(f"Pool started with {self.min_workers} workers")
def _add_worker(self):
"""Add a worker thread."""
if len(self.workers) >= self.max_workers:
return
t = threading.Thread(target=self._worker_loop, daemon=True)
t.start()
self.workers.append(t)
def _remove_worker(self):
"""Signal one worker to stop (lazy removal)."""
if len(self.workers) <= self.min_workers:
return
self.workers.pop() # Thread will exit on next idle cycle
def _worker_loop(self):
"""Worker loop: fetch and process tasks."""
while self.running and threading.current_thread() in self.workers:
result = self.redis.blpop(self.queue_key, timeout=10)
if result is None:
continue
_, raw = result
task = json.loads(raw)
task_id = task["id"]
with self.lock:
self.active_count += 1
try:
token = self._solve(task["method"], task["params"])
self.redis.hset(self.results_key, task_id, json.dumps({
"status": "success", "token": token,
}))
except Exception as e:
self.redis.hset(self.results_key, task_id, json.dumps({
"status": "error", "error": str(e),
}))
finally:
with self.lock:
self.active_count -= 1
def _scaling_loop(self):
"""Periodically adjust worker count."""
while self.running:
time.sleep(10)
queue_depth = self.redis.llen(self.queue_key)
current = len(self.workers)
utilization = (
self.active_count / current * 100 if current > 0 else 0
)
# Scale up: queue growing and workers busy
if queue_depth > 20 and utilization > 70:
new_count = min(current + 2, self.max_workers)
while len(self.workers) < new_count:
self._add_worker()
print(f"Scaled up: {current} → {len(self.workers)} workers")
# Scale down: queue empty and workers idle
elif queue_depth < 5 and utilization < 20:
target = max(current - 1, self.min_workers)
while len(self.workers) > target:
self._remove_worker()
if len(self.workers) < current:
print(f"Scaled down: {current} → {len(self.workers)} workers")
def _solve(self, method, params, timeout=120):
data = {"key": self.api_key, "method": method, "json": 1}
data.update(params)
resp = requests.post(
f"{self.base}/in.php", data=data, timeout=30,
)
result = resp.json()
if result.get("status") != 1:
raise RuntimeError(result.get("request"))
captcha_id = result["request"]
start = time.time()
while time.time() - start < timeout:
time.sleep(5)
resp = requests.get(f"{self.base}/res.php", params={
"key": self.api_key,
"action": "get",
"id": captcha_id,
"json": 1,
}, timeout=15)
data = resp.json()
if data["request"] != "CAPCHA_NOT_READY":
if data.get("status") == 1:
return data["request"]
raise RuntimeError(data["request"])
raise TimeoutError("Solve timeout")
def stats(self):
return {
"workers": len(self.workers),
"active": self.active_count,
"queue": self.redis.llen(self.queue_key),
}
# Usage
pool = AutoScalingPool(os.environ["CAPTCHAAI_KEY"])
pool.start()
# Monitor
while True:
print(pool.stats())
time.sleep(30)
Süreç Tabanlı Otomatik Ölçekleyici
CPU izolasyonu için çalışan işlemlerini ölçeklendirin:
import multiprocessing
import time
import redis
import os
class ProcessScaler:
"""Scale worker processes based on queue depth."""
def __init__(self, worker_fn, redis_url="redis://localhost:6379"):
self.worker_fn = worker_fn
self.redis = redis.from_url(redis_url)
self.processes = []
self.min_workers = 2
self.max_workers = 16
def run(self, check_interval=15):
"""Run the scaler loop."""
# Start minimum workers
for _ in range(self.min_workers):
self._spawn()
while True:
time.sleep(check_interval)
self._cleanup_dead()
queue_depth = self.redis.llen("captcha:tasks")
current = len(self.processes)
# Scale up
if queue_depth > current * 5 and current < self.max_workers:
to_add = min(
max(1, queue_depth // 10),
self.max_workers - current,
)
for _ in range(to_add):
self._spawn()
print(f"Scaled up to {len(self.processes)} workers")
# Scale down
elif queue_depth < 3 and current > self.min_workers:
to_remove = min(2, current - self.min_workers)
for _ in range(to_remove):
p = self.processes.pop()
p.terminate()
print(f"Scaled down to {len(self.processes)} workers")
def _spawn(self):
p = multiprocessing.Process(target=self.worker_fn)
p.start()
self.processes.append(p)
def _cleanup_dead(self):
self.processes = [p for p in self.processes if p.is_alive()]
# Ensure minimum
while len(self.processes) < self.min_workers:
self._spawn()
Denge Farkında Ölçeklendirme
Fon azaldığında ölçeklendirmeyi durdurun:
def check_balance(api_key, min_balance=2.0):
"""Check if balance is sufficient for scaling."""
resp = requests.get("https://ocr.captchaai.com/res.php", params={
"key": api_key,
"action": "getbalance",
"json": 1,
}, timeout=15)
balance = float(resp.json()["request"])
if balance < min_balance:
print(f"Balance ${balance:.2f} below ${min_balance} — halting scale-up")
return False
return True
Ölçeklendirme döngüsüne entegre edin:
# In _scaling_loop:
if queue_depth > 20 and utilization > 70:
if check_balance(self.api_key, min_balance=2.0):
# Scale up
...
else:
print("Scaling paused — low balance")
Ölçeklendirme Stratejileri Karşılaştırıldı
| Strateji | En İyisi | Gecikme | Karmaşıklık |
|---|---|---|---|
| Konu havuzu | I/O-bound (API çağrıları) | Düşük | Düşük |
| İşlem havuzu | CPU'ya bağlı ön işleme | Orta | Orta |
| Kubernetes HPA | Bulutta yerel dağıtımlar | Daha yüksek | Yüksek |
| KEDA | Olay odaklı ölçeklendirme | Orta | Orta |
Sorun giderme
| Sorun | Sebep | Düzeltme |
|---|---|---|
| İşçiler büyümeye devam ediyor | Kuyruk asla tükenmez | İşçilerin gerçekten işlem yapıp yapmadığını kontrol edin |
| Ölçek küçültme çok agresif | Düşük eşik | Ölçek küçültme gecikmesini 30 saniyenin üzerine çıkarın |
| Zombi süreçleri | Temizlenmeyen işlemler | _cleanup_dead()'yi düzenli olarak kullanın |
| Denge hızla tükeniyor | Çok fazla işçi | Ölçeklendirme mantığına denge kontrolü ekleyin |
SSS
Doğru çalışan-kuyruk oranı nedir?
Sıradaki 5-10 görev başına 1 çalışanı hedefleyin. Her çalışan, türe bağlı olarak dakikada ~3-6 CAPTCHA işler.
Konuları veya süreçleri kullanmalı mıyım?
Saf API çağrısına yönelik iş parçacıkları (CaptchaAI, I/O-bound'dir). Çözmenin yanı sıra görüntü ön işleme veya yoğun hesaplama da yaptığınız işlemler.
Ne kadar hızlı ölçek büyütmeliyim?
Hızla ölçeklendirin (her 10-15 saniyede bir kontrol edin), yavaşça ölçeklendirin (düşük yükte 30-60 saniye bekleyin). Bu durum devletler arasındaki çatışmayı önler.
İlgili Kılavuzlar
Akıllıca ölçeklendirin —CaptchaAI anahtarınızı alınBugün.