Arquitectura del host: events, settings, scheduler
Tres decisiones cambiaron cómo pensé el host. Ninguna es espectacular por separado. Juntas, hacen toda la diferencia.
Tres decisiones cambiaron cómo pienso este host. Ninguna es espectacular por separado — todas son “lo que tocaba hacer”. Juntas, han evitado tres meses de bugs que ya sé que no tendré.
El contexto
El host tiene tres tareas simultáneas que hay que coordinar:
- Leer medios (imágenes, vídeos) y convertirlos en frames RGB con la resolución de la tira.
- Aplicar transformaciones de color (gamma, kelvin, brightness) frame a frame.
- Encodear y enviar por serie al ritmo que marca el scheduler.
Y encima, debe poder ser controlado por dos clientes distintos: un proceso Unity por stdin (legacy contract), y una UI web por WebSocket (nuevo). Sin duplicar lógica entre las dos vías.
Si todo eso vive en un solo thread con print() para
informar, estás en la misma película de antes. Tres
decisiones cambian la mecánica.
La decisión
1. EventBus en lugar de print().
scripts/core/events.py expone un pub/sub thread-safe con
14 tipos de eventos bien definidos:
PLAYBACK_*— started, paused, stopped, frame-sent, metrics.TRANSPORT_*— connected, disconnected, error, rx.RESOURCE_*— loaded, error.CACHE_*— evicted, cleared.SETTINGS_CHANGED.
Los productores (orchestrator, scheduler, transport)
emiten eventos. Los consumidores (cli/ para Unity,
server/ para la UI web) los formatean. El core no
sabe a quién habla — solo sabe qué ha pasado. Añadir un
cliente nuevo no requiere tocar ni una línea del core.
2. Settings inmutables.
Settings es un pydantic.BaseModel con frozen=True. No
se muta — se sustituye. SettingsStore.patch(...) hace
eso: toma las claves del patch, las fusiona con el
current, re-valida el conjunto entero y commuta
atómicamente la referencia. Después emite
SETTINGS_CHANGED y programa una escritura debounced
(0.5 s) a disco.
El debounce no es cosmética: es lo que evita que mil movimientos de un slider de FPS provoquen mil writes al disco. La UI envía patches a 60 Hz; el disco recibe como mucho 2/s.
3. FrameScheduler con time.perf_counter.
El código antiguo hacía time.sleep(1/fps) entre frames.
Parece correcto. No lo es: time.time() no es
monotónico (puede saltar atrás con NTP) y sleep acumula
deriva. Al principio es invisible; después de una hora de
playback, el frame 100 000 ya no cae donde debería.
El FrameScheduler calcula
target_t = start_t + idx / fps con time.perf_counter()
(monotónico, alta resolución) y sleep hasta exactamente
ese momento. Si llegamos tarde — porque el frame
anterior ha tardado más del presupuesto —, no dormimos
y contamos el frame como “dropped” en la ventana de
métricas.
Cada segundo emite PLAYBACK_METRICS con measured_fps,
dropped, latency_p50_ms, latency_p99_ms. La UI dibuja
un gráfico a partir de estos eventos sin saber nada del
scheduler.
class Settings(BaseModel):
model_config = ConfigDict(frozen=True, extra="forbid")
requested_fps: int = Field(default=30, gt=0, le=120)
gamma: float = Field(default=0.4, gt=0)
# ...
def patch(self, partial: dict[str, Any]) -> Settings:
with self._lock:
merged = {**self._current.model_dump(), **partial}
new = Settings.model_validate(merged) # validators run
self._current = new
self._bus.publish(EventType.SETTINGS_CHANGED, ...)
if self._persist_path:
self._schedule_write() # debounced
return new
Por qué el EventBus no se rompe si un subscriber revienta. Los subscribers se llaman en el thread del publisher (cero allocations de threads por evento), pero dentro de un try/except: si un subscriber lanza, la excepción se captura y se re-emite como
TRANSPORT_ERROR. La cascada está protegida — unTRANSPORT_ERRORque también pete no genera otroTRANSPORT_ERROR, simplemente se descarta. Así un mal listener (un WebSocket caído a mitad de mensaje, por ejemplo) nunca puede impedir que los demás reciban su evento.
Lo que viene
Tenemos un host estructurado. Pero todo esto habría sido inviable sin una manera de planificar el trabajo antes de escribirlo. Próximo post: BMAD y la disciplina de ramas por historia.