Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Padrões de Integração AsyncIO em Python

O SDK Python do paebiru usa um runtime Tokio bloqueante para simplicidade e compatibilidade. Este documento mostra as melhores práticas para usá-lo com o asyncio do Python para I/O não bloqueante.

Visão Geral

Design Atual:

  • Os métodos do PaebiruClient são síncronos
  • Cada chamada de método bloqueia até a conclusão
  • O runtime do Tokio está oculto (por instância)
  • Seguro para threads via Arc<Mutex<Runtime>>

Por que bloqueante?

  1. Evita conflitos de versão do PyO3 (incompatibilidades entre pyo3-asyncio e pyo3 0.21)
  2. Comportamento mais simples e previsível
  3. Funciona perfeitamente com threading e multiprocessamento
  4. Atualização para async real possível no PyO3 0.22+ com melhor suporte

Padrão de Zero-Copy: Acesso ao Estado Cognitivo (Layer 9)

Conforme estabelecido na RFC 025, o SDK Python expõe BiologyState e Layer9Snapshot via FFI local com zero-copy. O estado cognitivo reside na memória do Rust; o Python recebe apenas uma vista (snapshot) ou realiza consultas cirúrgicas via ponteiros opacos, sem serialização JSON intermediária.

import asyncio
from paebiru import BiologyState, Layer9Snapshot

async def monitor_cognitive_state():
    bio = BiologyState()

    # Leituras cirúrgicas zero-copy — nenhuma alocação de string JSON
    print(f"Modo: {bio.layer9_mode()}")
    print(f"Temperatura: {bio.layer9_temperature()}")
    print(f"Limiar: {bio.layer9_threshold()}")

    # Snapshot completo (struct copiada, sem parse JSON)
    snap: Layer9Snapshot = bio.layer9_snapshot()
    print(f"Snapshot → modo={snap.mode}, T={snap.temperature}")

    # Avaliação assíncrona (sem bloquear o loop de eventos)
    changed = await asyncio.to_thread(bio.evaluate_layer9, 2.0)
    if changed:
        print("Transição Fine → Coarse detectada")

asyncio.run(monitor_cognitive_state())

Vantagens:

  • ✅ Sem overhead de serialização/deserialização JSON
  • ✅ Latência sub-milissegundo para consultas de estado
  • ✅ Compatível com asyncio.to_thread() para avaliações que exigem &mut

Nota: evaluate_layer9 requer &mut self (exclusão do GIL). Use asyncio.to_thread() ou garanta que apenas uma corrotina o chame por vez.

Padrão 1: asyncio.to_thread() (Recomendado)

Python 3.9+ — Execute chamadas do SDK bloqueantes em um pool de threads:

import asyncio
from paebiru import PaebiruClient

client = PaebiruClient("http://localhost:8080")

async def main():
    # Chamada não bloqueante para função bloqueante
    status = await asyncio.to_thread(client.get_status)
    print(status)

    # Enviar trabalho (não bloqueante)
    job = '{"task": "compute"}'
    result = await asyncio.to_thread(
        client.submit_compute_job,
        job,
        "ML-DSA"
    )
    print(result)

asyncio.run(main())

Vantagens:

  • ✅ Realmente não bloqueante do ponto de vista do loop de eventos do Python
  • ✅ Não é necessário gerenciamento de pool de threads
  • ✅ Embutido no Python 3.9+
  • ✅ Funciona com streaming, provas ZK

Desvantagens:

  • Usa thread do SO (não leve como async/await)
  • Overhead de inicialização de thread (~1ms por chamada)

Padrão 2: Executor (Python 3.7+)

Executor de pool de threads explícito:

import asyncio
from concurrent.futures import ThreadPoolExecutor
from paebiru import PaebiruClient

client = PaebiruClient("http://localhost:8080")
executor = ThreadPoolExecutor(max_workers=10)

async def main():
    loop = asyncio.get_event_loop()

    # Executar no pool de threads
    status = await loop.run_in_executor(
        executor,
        client.get_status
    )
    print(status)

    # Limpeza
    executor.shutdown(wait=True)

asyncio.run(main())

Vantagens:

  • ✅ Controle explícito do pool de threads
  • ✅ Reutiliza threads (sem overhead de inicialização)
  • ✅ Funciona em todo Python 3.7+

Desvantagens:

  • Mais boilerplate
  • Precisa gerenciar o ciclo de vida do executor

Padrão 3: Múltiplas Chamadas Simultâneas

Buscar status de múltiplos nós em paralelo:

import asyncio
from paebiru import PaebiruClient

clients = [
    PaebiruClient(f"http://node{i}.local:8080")
    for i in range(3)
]

async def main():
    # Executar todas as chamadas simultaneamente no pool de threads
    statuses = await asyncio.gather(
        *[asyncio.to_thread(c.get_status) for c in clients]
    )

    for i, status in enumerate(statuses):
        print(f"Nó {i}: {status}")

asyncio.run(main())

Padrão 4: Streaming com asyncio

Processar saída de streaming de forma assíncrona:

import asyncio
import json
from paebiru import PaebiruClient

client = PaebiruClient("http://localhost:8080")

async def process_stream():
    job = '{"task": "stream_compute", "iterations": 100}'

    # Obter resultado de streaming em thread
    chunks_json = await asyncio.to_thread(
        client.submit_compute_job_streaming,
        job
    )

    # Analisar e processar
    chunks = json.loads(chunks_json)
    for chunk in chunks:
        print(f"Seq {chunk['sequence']}: {chunk['data']}")
        await asyncio.sleep(0)  # Ceder ao loop de eventos

asyncio.run(process_stream())

Padrão 5: Geração de Prova ZK

Gerar provas sem bloquear o loop de eventos:

import asyncio
from paebiru import PaebiruClient

client = PaebiruClient("http://localhost:8080")

async def prove_and_verify():
    # Gerar prova em thread
    proof = await asyncio.to_thread(
        client.prove_range,
        42,      # valor
        8        # n (bits)
    )
    print(f"Prova gerada: {proof[:50]}...")

    # Verificar em thread
    is_valid = await asyncio.to_thread(
        client.verify_proof,
        proof
    )
    print(f"Válido: {is_valid}")

asyncio.run(prove_and_verify())

Padrão 6: Gerenciador de Contexto para Múltiplas Operações

Reutilizar o cliente em operações assíncronas:

import asyncio
from paebiru import PaebiruClient

class AsyncPaebiruNode:
    """Wrapper amigável a async em torno do PaebiruClient bloqueante."""

    def __init__(self, base_url: str):
        self.client = PaebiruClient(base_url)

    async def get_status(self):
        return await asyncio.to_thread(self.client.get_status)

    async def submit_job(self, job, crypto_suite=None):
        return await asyncio.to_thread(
            self.client.submit_compute_job,
            job,
            crypto_suite
        )

    async def get_metabolism(self):
        return await asyncio.to_thread(self.client.get_metabolism)

# Uso
async def main():
    node = AsyncPaebiruNode("http://localhost:8080")
    status = await node.get_status()
    metabolism = await node.get_metabolism()
    print(f"Status: {status}")
    print(f"Metabolismo: {metabolism}")

asyncio.run(main())

Considerações de Performance

Dimensionamento do Pool de Threads

  • Padrão: min(32, os.cpu_count() + 4) threads
  • Para PAEBIRU: Defina como o número de trabalhos simultâneos
    executor = ThreadPoolExecutor(max_workers=10)
    loop.run_in_executor(executor, ...)
    

Latência

  • Chamada do SDK: 1-5ms (I/O de rede dominante)
  • Inicialização da thread: ~0.5ms (uma única vez, reutilizada com executor)
  • Despacho do loop de eventos: <0.1ms

Memória

  • Por thread: ~8MB (overhead de thread Python)
  • Runtime Tokio: ~1MB (por instância de cliente)
  • Total por cliente: Base de ~10MB

Comparação: Bloqueante vs Padrões Assíncronos

AspectoBloqueanteto_thread()Executor
SintaxeChamadas diretasasync/awaitasync/await
Loop de eventosBloqueia ❌Não bloqueante ✅Não bloqueante ✅
Overhead de threadOcultoExplícitoControlado
ConcorrênciaSequencialParalelaParalela
Python 3.7❌ (precisa de 3.9+)
ComplexidadeBaixaMédiaMédia

Quando Usar Cada Um

Usar o SDK Bloqueante Diretamente

  • Scripts de thread única
  • Processamento em lote (trabalhos sequenciais)
  • Quando o GIL já estiver retido (dentro de contexto síncrono)

Usar to_thread()

  • Recomendado para a maioria dos códigos assíncronos
  • Migração simples de síncrono para assíncrono
  • Aplicações Python 3.9+
  • Quando o tamanho do pool de threads não for crítico

Usar Executor

  • Necessidade de controle refinado do pool de threads
  • Compatibilidade com Python 3.7/3.8
  • Ajuste da contagem de threads para a carga de trabalho
  • Ambientes com recursos limitados

Futuro: SDK Assíncrono Real

Quando o PyO3 0.22+ estabilizar o pyo3-asyncio:

# API futura (ainda não disponível)
from paebiru import AsyncPaebiruClient

async def future_main():
    client = AsyncPaebiruClient("http://localhost:8080")
    status = await client.get_status()  # Async nativo
    print(status)

Isso irá:

  • ✅ Eliminar o overhead de thread
  • ✅ Integração direta com asyncio
  • ✅ Sem necessidade de wrapper to_thread() explícito
  • ✅ Leve e eficiente

Disponibilidade estimada: PyO3 0.22+ (Q3-Q4 2026)

Solução de Problemas

“Event loop is closed”

Causa: Reutilizar o cliente após o fechamento do loop de eventos

Solução:

# Errado
client = PaebiruClient("http://...")
asyncio.run(main())
asyncio.run(main())  # Erro!

# Correto
async def main(client):
    ...

client = PaebiruClient("http://...")
asyncio.run(main(client))

Alta latência em chamadas simultâneas

Causa: Não há workers suficientes no pool de threads

Solução:

executor = ThreadPoolExecutor(max_workers=20)  # Aumentar workers

RuntimeError: No running event loop

Causa: Chamar função assíncrona fora de asyncio.run()

Solução:

# Executar com asyncio
await some_async_function()  # Apenas em contexto assíncrono

# Ou envolver:
def sync_wrapper():
    return asyncio.run(some_async_function())

Configuração Recomendada

Para uma aplicação assíncrona de produção:

import asyncio
from paebiru import PaebiruClient

class PaebiruAsyncClient:
    """Wrapper assíncrono pronto para produção."""

    def __init__(self, base_url: str, executor=None):
        self.client = PaebiruClient(base_url)
        self.executor = executor  # None = pool de threads padrão

    async def __aenter__(self):
        return self

    async def __aexit__(self, *args):
        pass

    async def get_status(self):
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            self.executor,
            self.client.get_status
        )

    async def submit_job(self, job, suite=None):
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            self.executor,
            self.client.submit_compute_job,
            job,
            suite
        )

# Uso
async def main():
    async with PaebiruAsyncClient("http://localhost:8080") as node:
        status = await node.get_status()
        print(status)

asyncio.run(main())

Referências