Tutorials

CaptchaAI ile Python'da CAPTCHA Çözme Kuyruğu Oluşturma

Ölçekli kazıma yaparken, aynı anda birden fazla CAPTCHA çözümüne ihtiyacınız vardır. Bir kuyruk sistemi, CAPTCHA gönderimini sonuç alımından ayırarak yüzlerce görevde paralel çözüme olanak tanır.


Neden kuyruk kullanıyorsunuz?

Tek istek CAPTCHA çözme, bekleme süresini boşa harcar. Bir kuyruk sistemi:

  • Tüm CAPTCHA'ları anında gönderir
  • Birden fazla görev kimliğini paralel olarak yoklar
  • Yeniden denemeler başarısız olursa otomatik olarak çözer
  • API hız sınırlarına uymak için eşzamanlılığı kontrol eder
  • İlerleme takibi ve geri aramalar sağlar

Temel iş parçacığı kuyruğu

import time
import threading
import requests
from queue import Queue, Empty

API_KEY = "YOUR_API_KEY"


class CaptchaQueue:
    """Thread-based CAPTCHA solving queue."""

    def __init__(self, api_key, max_workers=10):
        self.api_key = api_key
        self.task_queue = Queue()
        self.result_queue = Queue()
        self.max_workers = max_workers
        self.workers = []

    def submit(self, method, callback=None, **params):
        """Add a CAPTCHA task to the queue."""
        task = {
            "method": method,
            "params": params,
            "callback": callback,
        }
        self.task_queue.put(task)

    def start(self):
        """Start worker threads."""
        for _ in range(self.max_workers):
            t = threading.Thread(target=self._worker, daemon=True)
            t.start()
            self.workers.append(t)

    def wait(self):
        """Wait for all tasks to complete."""
        self.task_queue.join()

    def get_results(self):
        """Get all available results."""
        results = []
        while not self.result_queue.empty():
            try:
                results.append(self.result_queue.get_nowait())
            except Empty:
                break
        return results

    def _worker(self):
        while True:
            try:
                task = self.task_queue.get(timeout=1)
            except Empty:
                continue

            try:
                result = self._solve(task["method"], **task["params"])
                entry = {"status": "solved", "result": result, "task": task}
                self.result_queue.put(entry)
                if task["callback"]:
                    task["callback"](result)
            except Exception as e:
                entry = {"status": "error", "error": str(e), "task": task}
                self.result_queue.put(entry)
            finally:
                self.task_queue.task_done()

    def _solve(self, method, **params):
        submit = requests.post("https://ocr.captchaai.com/in.php", data={
            "key": self.api_key, "method": method, "json": 1, **params,
        }, timeout=30).json()

        if submit.get("status") != 1:
            raise Exception(f"Submit error: {submit.get('request')}")

        task_id = submit["request"]
        for _ in range(30):
            time.sleep(5)
            result = requests.get("https://ocr.captchaai.com/res.php", params={
                "key": self.api_key, "action": "get", "id": task_id, "json": 1,
            }, timeout=30).json()
            if result.get("status") == 1:
                return result["request"]
            if result.get("request") == "ERROR_CAPTCHA_UNSOLVABLE":
                raise Exception("CAPTCHA unsolvable")
        raise TimeoutError("Solve timed out")


# Usage
queue = CaptchaQueue(API_KEY, max_workers=5)
queue.start()

# Submit multiple CAPTCHAs
urls_and_sitekeys = [
    ("https://example.com/page1", "SITEKEY_1"),
    ("https://example.com/page2", "SITEKEY_2"),
    ("https://example.com/page3", "SITEKEY_3"),
]

for url, sitekey in urls_and_sitekeys:
    queue.submit("userrecaptcha", googlekey=sitekey, pageurl=url)

queue.wait()
results = queue.get_results()
print(f"Solved {len(results)} CAPTCHAs")
for r in results:
    print(f"  {r['status']}: {r.get('result', r.get('error', ''))[:50]}")

Asyncio ile zaman uyumsuz kuyruk

import asyncio
import aiohttp

API_KEY = "YOUR_API_KEY"


class AsyncCaptchaQueue:
    """Async CAPTCHA solving queue with concurrency control."""

    def __init__(self, api_key, max_concurrent=10):
        self.api_key = api_key
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.results = []

    async def solve_batch(self, tasks):
        """Solve a batch of CAPTCHA tasks concurrently."""
        coros = [self._solve_task(task) for task in tasks]
        self.results = await asyncio.gather(*coros, return_exceptions=True)
        return self.results

    async def _solve_task(self, task):
        async with self.semaphore:
            return await self._solve(task["method"], **task["params"])

    async def _solve(self, method, **params):
        async with aiohttp.ClientSession() as session:
            # Submit
            async with session.post("https://ocr.captchaai.com/in.php", data={
                "key": self.api_key, "method": method, "json": 1, **params,
            }) as resp:
                data = await resp.json(content_type=None)
                if data.get("status") != 1:
                    raise Exception(f"Submit error: {data.get('request')}")
                task_id = data["request"]

            # Poll
            for _ in range(30):
                await asyncio.sleep(5)
                async with session.get("https://ocr.captchaai.com/res.php", params={
                    "key": self.api_key, "action": "get", "id": task_id, "json": 1,
                }) as resp:
                    result = await resp.json(content_type=None)
                    if result.get("status") == 1:
                        return result["request"]
                    if result.get("request") == "ERROR_CAPTCHA_UNSOLVABLE":
                        raise Exception("CAPTCHA unsolvable")

            raise TimeoutError("Solve timed out")


# Usage
async def main():
    queue = AsyncCaptchaQueue(API_KEY, max_concurrent=5)

    tasks = [
        {"method": "userrecaptcha", "params": {"googlekey": f"SITEKEY_{i}", "pageurl": f"https://example.com/page{i}"}}
        for i in range(10)
    ]

    results = await queue.solve_batch(tasks)
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i}: ERROR — {result}")
        else:
            print(f"Task {i}: {result[:50]}...")


asyncio.run(main())

Üretici-tüketici modeli

Sayfaların dinamik olarak keşfedildiği sürekli kazıma iş yükleri için:

import asyncio
import aiohttp

API_KEY = "YOUR_API_KEY"


class ProducerConsumerQueue:
    """Continuous CAPTCHA solving with producer-consumer pattern."""

    def __init__(self, api_key, queue_size=100, num_consumers=5):
        self.api_key = api_key
        self.queue = asyncio.Queue(maxsize=queue_size)
        self.num_consumers = num_consumers
        self.solved_count = 0
        self.error_count = 0
        self.running = True

    async def produce(self, tasks):
        """Producer: feed CAPTCHA tasks into the queue."""
        for task in tasks:
            await self.queue.put(task)
        # Signal consumers to stop
        for _ in range(self.num_consumers):
            await self.queue.put(None)

    async def consume(self, result_handler):
        """Consumer: solve CAPTCHAs and call result handler."""
        async with aiohttp.ClientSession() as session:
            while True:
                task = await self.queue.get()
                if task is None:
                    self.queue.task_done()
                    break

                try:
                    result = await self._solve(session, task["method"], **task["params"])
                    self.solved_count += 1
                    if result_handler:
                        await result_handler(task, result)
                except Exception as e:
                    self.error_count += 1
                    print(f"Error: {e}")
                finally:
                    self.queue.task_done()

    async def run(self, tasks, result_handler=None):
        """Run the producer-consumer pipeline."""
        # Start producer
        producer = asyncio.create_task(self.produce(tasks))

        # Start consumers
        consumers = [
            asyncio.create_task(self.consume(result_handler))
            for _ in range(self.num_consumers)
        ]

        # Wait for everything to finish
        await producer
        await asyncio.gather(*consumers)

        print(f"Complete: {self.solved_count} solved, {self.error_count} errors")

    async def _solve(self, session, method, **params):
        async with session.post("https://ocr.captchaai.com/in.php", data={
            "key": self.api_key, "method": method, "json": 1, **params,
        }) as resp:
            data = await resp.json(content_type=None)
            if data.get("status") != 1:
                raise Exception(f"Submit: {data.get('request')}")
            task_id = data["request"]

        for _ in range(30):
            await asyncio.sleep(5)
            async with session.get("https://ocr.captchaai.com/res.php", params={
                "key": self.api_key, "action": "get", "id": task_id, "json": 1,
            }) as resp:
                result = await resp.json(content_type=None)
                if result.get("status") == 1:
                    return result["request"]
        raise TimeoutError("Timed out")


# Usage
async def handle_result(task, token):
    url = task["params"]["pageurl"]
    print(f"Solved for {url}: {token[:30]}...")


async def main():
    queue = ProducerConsumerQueue(API_KEY, num_consumers=5)

    tasks = [
        {"method": "userrecaptcha", "params": {"googlekey": f"SITEKEY_{i}", "pageurl": f"https://example.com/page{i}"}}
        for i in range(20)
    ]

    await queue.run(tasks, result_handler=handle_result)


asyncio.run(main())

Öncelik kuyruğu

Bazı CAPTCHA'lar diğerlerinden daha önemli olduğunda:

import asyncio
from dataclasses import dataclass, field

API_KEY = "YOUR_API_KEY"


@dataclass(order=True)
class PriorityTask:
    priority: int
    task: dict = field(compare=False)


class PriorityCaptchaQueue:
    """CAPTCHA queue with priority levels."""

    def __init__(self, api_key, num_workers=5):
        self.api_key = api_key
        self.queue = asyncio.PriorityQueue()
        self.num_workers = num_workers
        self.results = {}

    async def submit(self, task_id, method, priority=5, **params):
        """Submit with priority (lower number = higher priority)."""
        await self.queue.put(PriorityTask(
            priority=priority,
            task={"id": task_id, "method": method, "params": params},
        ))

    async def process(self):
        """Process all queued tasks by priority."""
        workers = [asyncio.create_task(self._worker()) for _ in range(self.num_workers)]

        # Wait for queue to drain
        await self.queue.join()

        # Cancel workers
        for w in workers:
            w.cancel()

        return self.results

    async def _worker(self):
        import aiohttp
        async with aiohttp.ClientSession() as session:
            while True:
                item = await self.queue.get()
                task = item.task
                try:
                    result = await self._solve(session, task["method"], **task["params"])
                    self.results[task["id"]] = {"status": "solved", "token": result}
                except Exception as e:
                    self.results[task["id"]] = {"status": "error", "error": str(e)}
                finally:
                    self.queue.task_done()

    async def _solve(self, session, method, **params):
        import aiohttp
        async with session.post("https://ocr.captchaai.com/in.php", data={
            "key": self.api_key, "method": method, "json": 1, **params,
        }) as resp:
            data = await resp.json(content_type=None)
            if data.get("status") != 1:
                raise Exception(data.get("request"))
            task_id = data["request"]

        for _ in range(30):
            await asyncio.sleep(5)
            async with session.get("https://ocr.captchaai.com/res.php", params={
                "key": self.api_key, "action": "get", "id": task_id, "json": 1,
            }) as resp:
                result = await resp.json(content_type=None)
                if result.get("status") == 1:
                    return result["request"]
        raise TimeoutError()


# Usage
async def main():
    pq = PriorityCaptchaQueue(API_KEY, num_workers=3)

    # High priority — checkout pages
    await pq.submit("checkout_1", "turnstile", priority=1, sitekey="KEY", pageurl="https://shop.com/checkout")

    # Normal priority — product pages
    for i in range(5):
        await pq.submit(f"product_{i}", "userrecaptcha", priority=5, googlekey="KEY", pageurl=f"https://shop.com/p/{i}")

    # Low priority — info pages
    for i in range(3):
        await pq.submit(f"info_{i}", "userrecaptcha", priority=10, googlekey="KEY", pageurl=f"https://shop.com/info/{i}")

    results = await pq.process()
    for task_id, result in results.items():
        print(f"{task_id}: {result['status']}")


asyncio.run(main())

İzleme ve raporlama

import time
from dataclasses import dataclass, field


@dataclass
class QueueMetrics:
    submitted: int = 0
    solved: int = 0
    failed: int = 0
    total_solve_time: float = 0.0
    start_time: float = field(default_factory=time.time)

    @property
    def avg_solve_time(self):
        return self.total_solve_time / self.solved if self.solved else 0

    @property
    def success_rate(self):
        total = self.solved + self.failed
        return (self.solved / total * 100) if total else 0

    @property
    def throughput(self):
        elapsed = time.time() - self.start_time
        return self.solved / elapsed * 60 if elapsed > 0 else 0

    def report(self):
        return (
            f"Submitted: {self.submitted} | "
            f"Solved: {self.solved} | "
            f"Failed: {self.failed} | "
            f"Avg time: {self.avg_solve_time:.1f}s | "
            f"Success: {self.success_rate:.1f}% | "
            f"Throughput: {self.throughput:.0f}/min"
        )

Sorun giderme

Belirti Sebep Düzeltme
Kuyruk büyüyor ancak görevler tamamlanmıyor Çok fazla çalışan API'yi bunaltıyor max_workers / max_concurrent'yi azaltın
ERROR_NO_SLOT_AVAILABLE API eşzamanlılık sınırına ulaşıldı Gönderimler arasına gecikme ekleyin
Görevler kuyrukta kaldı Çalışan iş parçacıkları istisna nedeniyle öldü Çalışan döngüsünü try/except'ye sarın
Hafıza zamanla büyür Sonuçlar tüketilmedi get_results()'yi periyodik olarak arayın
Eşzamansız kuyruk blokları await eksik Tüm eşzamansız çağrıların beklendiğinden emin olun

Sık sorulan sorular

Kaç tane eş zamanlı çözüm çalıştırabilirim?

CaptchaAI, sunucu tarafında eşzamanlılığı yönetir. 10 eş zamanlı çalışanla başlayın ve plan limitlerinize göre artırın. Ne zaman kısılacağını öğrenmek için ERROR_NO_SLOT_AVAILABLE'yi kontrol edin.

İş parçacığı mı yoksa asyncio mu kullanmalıyım?

Yeni projeler için asyncio kullanın; I/O-bound CAPTCHA çözümünü daha verimli bir şekilde gerçekleştirir. Mevcut senkronize koda entegre ediliyorsa iş parçacığı kullanın.

API hız sınırlarını nasıl yönetirim?

Eşzamanlı istekleri sınırlamak için bir semafor (zaman uyumsuz) veya sınırlı kuyruk (iş parçacığı) kullanın. ERROR_NO_SLOT_AVAILABLE'ye basarsanız gönderimler arasına kısa bir gecikme ekleyin.


Özet

CAPTCHA çözme kuyruğu, gönderimi oylamadan ayırır ve CAPTCHA ile paralel çözüme olanak tanır.CaptchaAI. Senkronize kod için iş parçacığını, modern Python için asyncio'yu ve sürekli iş yükleri için üretici-tüketiciyi seçin.

İlgili Makaleler

Bu makale için yorumlar devre dışı bırakılmıştır.

İlgili Yazılar

DevOps & Scaling CAPTCHA Çözme Altyapısı için Mavi-Yeşil Dağıtım
Üretimdeki Captcha AI iş akışlarına yönelik mimari kararları, işletim hususlarını ve otomasyon modellerini içeren CAPTCHA Çözme Altyapısı için Mavi-Yeşil Dağıtı...

Üretimdeki Captcha AI iş akışlarına yönelik mimari kararları, işletim hususlarını ve otomasyon modellerini içe...

Apr 27, 2026
DevOps & Scaling Azure İşlevleri + CaptchaAI: Bulut Entegrasyonu
Azure İşlevleri + Captcha AI: Bulut Entegrasyonu için Dev Ops kılavuzu, üretimdeki Captcha AI iş akışlarına yönelik mimari kararları, işletim hususlarını ve oto...

Azure İşlevleri + Captcha AI: Bulut Entegrasyonu için Dev Ops kılavuzu, üretimdeki Captcha AI iş akışlarına yö...

Apr 23, 2026
DevOps & Scaling CaptchaAI Çalışan Dağıtımı için Ansible Playbook'lar
Ansible Playbook'lar için Captcha AI Çalışan Dağıtımı için Dev Ops kılavuzu, üretimdeki Captcha AI iş akışlarına yönelik mimari kararları, işletim hususlarını v...

Ansible Playbook'lar için Captcha AI Çalışan Dağıtımı için Dev Ops kılavuzu, üretimdeki Captcha AI iş akışları...

Apr 19, 2026