Files
libnovel/backend/internal/runner/runner.go
Admin aaa008ac99
All checks were successful
Release / Test backend (push) Successful in 43s
Release / Check ui (push) Successful in 43s
Release / Docker / caddy (push) Successful in 46s
Release / Docker / backend (push) Successful in 2m45s
Release / Docker / runner (push) Successful in 2m53s
Release / Docker / ui (push) Successful in 2m5s
Release / Gitea Release (push) Successful in 41s
feat: add Cloudflare AI TTS engine (aura-2-en) with voice grouping in UI
2026-04-04 11:12:55 +05:00

602 lines
20 KiB
Go

// Package runner implements the worker loop that polls PocketBase for pending
// scrape and audio tasks, executes them, and reports results back.
//
// Design:
// - Run(ctx) loops on a ticker; each tick claims and dispatches pending tasks.
// - Scrape tasks are dispatched to the Orchestrator (one goroutine per task,
// up to MaxConcurrentScrape).
// - Audio tasks fetch chapter text, call Kokoro, upload to MinIO, and report
// the result back (up to MaxConcurrentAudio goroutines).
// - The runner is stateless between ticks; all state lives in PocketBase.
// - Atomic task counters are exposed via /metrics (see metrics.go).
// - Books are indexed in Meilisearch via an orchestrator.Config.PostMetadata
// hook injected at construction time.
package runner
import (
"context"
"fmt"
"log/slog"
"os"
"sync"
"sync/atomic"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"github.com/libnovel/backend/internal/bookstore"
"github.com/libnovel/backend/internal/cfai"
"github.com/libnovel/backend/internal/domain"
"github.com/libnovel/backend/internal/kokoro"
"github.com/libnovel/backend/internal/libretranslate"
"github.com/libnovel/backend/internal/meili"
"github.com/libnovel/backend/internal/orchestrator"
"github.com/libnovel/backend/internal/pockettts"
"github.com/libnovel/backend/internal/scraper"
"github.com/libnovel/backend/internal/taskqueue"
"github.com/prometheus/client_golang/prometheus"
)
// Config tunes the runner behaviour.
type Config struct {
// WorkerID uniquely identifies this runner instance in PocketBase records.
WorkerID string
// PollInterval is how often the runner checks for new tasks.
// Only used in PocketBase-polling mode (RedisAddr == "").
PollInterval time.Duration
// MaxConcurrentScrape limits simultaneous book-scrape goroutines.
MaxConcurrentScrape int
// MaxConcurrentAudio limits simultaneous audio-generation goroutines.
MaxConcurrentAudio int
// MaxConcurrentTranslation limits simultaneous translation goroutines.
MaxConcurrentTranslation int
// OrchestratorWorkers is the chapter-scraping parallelism inside each book run.
OrchestratorWorkers int
// HeartbeatInterval is how often active tasks PATCH their heartbeat_at
// timestamp to signal they are still alive. Defaults to 30s when 0.
// Only used in PocketBase-polling mode.
HeartbeatInterval time.Duration
// StaleTaskThreshold is how old a heartbeat must be (or absent) before the
// task is considered orphaned and reset to pending. Defaults to 2m when 0.
// Only used in PocketBase-polling mode.
StaleTaskThreshold time.Duration
// CatalogueRefreshInterval is how often the runner walks the full catalogue,
// scrapes per-book metadata, downloads covers, and re-indexes everything in
// Meilisearch. Defaults to 24h (expensive — full catalogue walk).
CatalogueRefreshInterval time.Duration
// CatalogueRequestDelay is the base inter-request pause during a catalogue
// refresh metadata walk. Jitter of up to 50% is added on top.
// Defaults to 2s. Set via RUNNER_CATALOGUE_REQUEST_DELAY.
CatalogueRequestDelay time.Duration
// SkipInitialCatalogueRefresh suppresses the immediate catalogue walk that
// otherwise fires at startup. The periodic ticker (CatalogueRefreshInterval)
// still fires normally. Set RUNNER_SKIP_INITIAL_CATALOGUE_REFRESH=true for
// quick restarts where the catalogue is already up to date.
SkipInitialCatalogueRefresh bool
// MetricsAddr is the HTTP listen address for the /metrics endpoint.
// Defaults to ":9091". Set to "" to disable.
MetricsAddr string
// RedisAddr is the address of the Redis instance used for Asynq task
// dispatch. When set the runner switches from PocketBase-polling mode to
// Asynq ServeMux mode (immediate task delivery, no polling).
// Supports plain "host:port" or a full "rediss://..." URL.
// When empty the runner falls back to PocketBase polling.
RedisAddr string
// RedisPassword is the Redis AUTH password.
// Not required when RedisAddr is a full URL that includes credentials.
RedisPassword string
}
// Dependencies are the external services the runner depends on.
type Dependencies struct {
// Consumer claims tasks from PocketBase.
Consumer taskqueue.Consumer
// BookWriter persists scraped data (used by orchestrator).
BookWriter bookstore.BookWriter
// BookReader reads chapter text for audio generation.
BookReader bookstore.BookReader
// AudioStore persists generated audio and checks key existence.
AudioStore bookstore.AudioStore
// TranslationStore persists translated markdown and checks key existence.
TranslationStore bookstore.TranslationStore
// CoverStore stores book cover images in MinIO.
CoverStore bookstore.CoverStore
// SearchIndex indexes books in Meilisearch after scraping.
// If nil a no-op is used.
SearchIndex meili.Client
// Novel is the scraper implementation.
Novel scraper.NovelScraper
// Kokoro is the Kokoro-FastAPI TTS client (GPU, OpenAI-compatible voices).
Kokoro kokoro.Client
// PocketTTS is the pocket-tts client (CPU, kyutai voices: alba, marius, etc.).
// If nil, pocket-tts voice tasks will fail with a clear error.
PocketTTS pockettts.Client
// CFAI is the Cloudflare Workers AI TTS client (cfai:* prefixed voices).
// If nil, CF AI voice tasks will fail with a clear error.
CFAI cfai.Client
// LibreTranslate is the machine translation client.
// If nil, translation tasks will fail with a clear error.
LibreTranslate libretranslate.Client
// Log is the structured logger.
Log *slog.Logger
}
// Runner is the main worker process.
type Runner struct {
cfg Config
deps Dependencies
metricsRegistry *prometheus.Registry
// Atomic task counters — read by /metrics without locking.
tasksRunning atomic.Int64
tasksCompleted atomic.Int64
tasksFailed atomic.Int64
startedAt time.Time
}
// New creates a Runner from cfg and deps.
func New(cfg Config, deps Dependencies) *Runner {
if cfg.PollInterval <= 0 {
cfg.PollInterval = 30 * time.Second
}
if cfg.MaxConcurrentScrape <= 0 {
cfg.MaxConcurrentScrape = 2
}
if cfg.MaxConcurrentAudio <= 0 {
cfg.MaxConcurrentAudio = 1
}
if cfg.MaxConcurrentTranslation <= 0 {
cfg.MaxConcurrentTranslation = 1
}
if cfg.WorkerID == "" {
cfg.WorkerID = "runner"
}
if cfg.HeartbeatInterval <= 0 {
cfg.HeartbeatInterval = 30 * time.Second
}
if cfg.StaleTaskThreshold <= 0 {
cfg.StaleTaskThreshold = 2 * time.Minute
}
if cfg.CatalogueRefreshInterval <= 0 {
cfg.CatalogueRefreshInterval = 24 * time.Hour
}
if cfg.CatalogueRequestDelay <= 0 {
cfg.CatalogueRequestDelay = 2 * time.Second
}
if cfg.MetricsAddr == "" {
cfg.MetricsAddr = ":9091"
}
if deps.Log == nil {
deps.Log = slog.Default()
}
if deps.SearchIndex == nil {
deps.SearchIndex = meili.NoopClient{}
}
return &Runner{cfg: cfg, deps: deps, startedAt: time.Now(), metricsRegistry: prometheus.NewRegistry()}
}
// Run starts the worker loop and the metrics HTTP server, blocking until ctx
// is cancelled.
//
// When cfg.RedisAddr is set the runner uses Asynq (immediate task delivery).
// Otherwise it falls back to PocketBase polling (legacy mode).
func (r *Runner) Run(ctx context.Context) error {
r.deps.Log.Info("runner: starting",
"worker_id", r.cfg.WorkerID,
"mode", r.mode(),
"max_scrape", r.cfg.MaxConcurrentScrape,
"max_audio", r.cfg.MaxConcurrentAudio,
"max_translation", r.cfg.MaxConcurrentTranslation,
"catalogue_refresh_interval", r.cfg.CatalogueRefreshInterval,
"metrics_addr", r.cfg.MetricsAddr,
)
// Start metrics HTTP server in background if configured.
if r.cfg.MetricsAddr != "" {
ms := newMetricsServer(r.cfg.MetricsAddr, r, r.deps.Log)
go func() {
if err := ms.ListenAndServe(ctx); err != nil {
r.deps.Log.Error("runner: metrics server error", "err", err)
}
}()
}
if r.cfg.RedisAddr != "" {
return r.runAsynq(ctx)
}
return r.runPoll(ctx)
}
// mode returns a short string describing the active dispatch mode.
func (r *Runner) mode() string {
if r.cfg.RedisAddr != "" {
return "asynq"
}
return "poll"
}
// runPoll is the legacy PocketBase-polling dispatch loop.
// Used when cfg.RedisAddr is empty.
func (r *Runner) runPoll(ctx context.Context) error {
scrapeSem := make(chan struct{}, r.cfg.MaxConcurrentScrape)
audioSem := make(chan struct{}, r.cfg.MaxConcurrentAudio)
translationSem := make(chan struct{}, r.cfg.MaxConcurrentTranslation)
var wg sync.WaitGroup
tick := time.NewTicker(r.cfg.PollInterval)
defer tick.Stop()
catalogueTick := time.NewTicker(r.cfg.CatalogueRefreshInterval)
defer catalogueTick.Stop()
// Run one catalogue refresh immediately on startup (unless skipped by flag).
if !r.cfg.SkipInitialCatalogueRefresh {
go r.runCatalogueRefresh(ctx)
} else {
r.deps.Log.Info("runner: skipping initial catalogue refresh (RUNNER_SKIP_INITIAL_CATALOGUE_REFRESH=true)")
}
r.deps.Log.Info("runner: poll mode active", "poll_interval", r.cfg.PollInterval)
// Run one poll immediately on startup, then on each tick.
for {
r.poll(ctx, scrapeSem, audioSem, translationSem, &wg)
select {
case <-ctx.Done():
r.deps.Log.Info("runner: context cancelled, draining active tasks")
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
r.deps.Log.Info("runner: all tasks drained, exiting")
case <-time.After(2 * time.Minute):
r.deps.Log.Warn("runner: drain timeout exceeded, forcing exit")
}
return nil
case <-catalogueTick.C:
go r.runCatalogueRefresh(ctx)
case <-tick.C:
}
}
}
// poll claims all available pending tasks and dispatches them to goroutines.
func (r *Runner) poll(ctx context.Context, scrapeSem, audioSem, translationSem chan struct{}, wg *sync.WaitGroup) {
// ── Heartbeat file ────────────────────────────────────────────────────
// Touch /tmp/runner.alive so the Docker health check can confirm the
// runner is actively polling. Failure is non-fatal — just log it.
if f, err := os.Create("/tmp/runner.alive"); err != nil {
r.deps.Log.Warn("runner: could not write heartbeat file", "err", err)
} else {
f.Close()
}
// ── Reap orphaned tasks ───────────────────────────────────────────────
if n, err := r.deps.Consumer.ReapStaleTasks(ctx, r.cfg.StaleTaskThreshold); err != nil {
r.deps.Log.Warn("runner: reap stale tasks failed", "err", err)
} else if n > 0 {
r.deps.Log.Info("runner: reaped stale tasks", "count", n)
}
// ── Scrape tasks ──────────────────────────────────────────────────────
for {
if ctx.Err() != nil {
return
}
task, ok, err := r.deps.Consumer.ClaimNextScrapeTask(ctx, r.cfg.WorkerID)
if err != nil {
r.deps.Log.Error("runner: ClaimNextScrapeTask failed", "err", err)
break
}
if !ok {
break
}
select {
case scrapeSem <- struct{}{}:
default:
r.deps.Log.Warn("runner: scrape semaphore full, will retry next tick",
"task_id", task.ID)
break
}
r.tasksRunning.Add(1)
wg.Add(1)
go func(t domain.ScrapeTask) {
defer wg.Done()
defer func() { <-scrapeSem }()
defer r.tasksRunning.Add(-1)
r.runScrapeTask(ctx, t)
}(task)
}
// ── Audio tasks ───────────────────────────────────────────────────────
// Only claim tasks when there is a free slot in the semaphore.
// This avoids the old bug where we claimed (status→running) a task and
// then couldn't dispatch it, leaving it orphaned until the reaper fired.
audioLoop:
for {
if ctx.Err() != nil {
return
}
// Check capacity before claiming to avoid orphaning tasks.
select {
case audioSem <- struct{}{}:
// Slot acquired — proceed to claim a task.
default:
// All slots busy; leave remaining pending tasks for next tick.
break audioLoop
}
task, ok, err := r.deps.Consumer.ClaimNextAudioTask(ctx, r.cfg.WorkerID)
if err != nil {
<-audioSem // release the pre-acquired slot
r.deps.Log.Error("runner: ClaimNextAudioTask failed", "err", err)
break
}
if !ok {
<-audioSem // release the pre-acquired slot; queue empty
break
}
r.tasksRunning.Add(1)
wg.Add(1)
go func(t domain.AudioTask) {
defer wg.Done()
defer func() { <-audioSem }()
defer r.tasksRunning.Add(-1)
r.runAudioTask(ctx, t)
}(task)
}
// ── Translation tasks ─────────────────────────────────────────────────
translationLoop:
for {
if ctx.Err() != nil {
return
}
select {
case translationSem <- struct{}{}:
// Slot acquired — proceed to claim a task.
default:
// All slots busy; leave remaining pending tasks for next tick.
break translationLoop
}
task, ok, err := r.deps.Consumer.ClaimNextTranslationTask(ctx, r.cfg.WorkerID)
if err != nil {
<-translationSem
r.deps.Log.Error("runner: ClaimNextTranslationTask failed", "err", err)
break
}
if !ok {
<-translationSem
break
}
r.tasksRunning.Add(1)
wg.Add(1)
go func(t domain.TranslationTask) {
defer wg.Done()
defer func() { <-translationSem }()
defer r.tasksRunning.Add(-1)
r.runTranslationTask(ctx, t)
}(task)
}
}
// newOrchestrator builds an orchestrator with the Meilisearch post-hook wired in.
func (r *Runner) newOrchestrator() *orchestrator.Orchestrator {
oCfg := orchestrator.Config{
Workers: r.cfg.OrchestratorWorkers,
PostMetadata: func(ctx context.Context, meta domain.BookMeta) {
if err := r.deps.SearchIndex.UpsertBook(ctx, meta); err != nil {
r.deps.Log.Warn("runner: meilisearch upsert failed",
"slug", meta.Slug, "err", err)
}
},
}
return orchestrator.New(oCfg, r.deps.Novel, r.deps.BookWriter, r.deps.Log)
}
// runScrapeTask executes one scrape task end-to-end and reports the result.
func (r *Runner) runScrapeTask(ctx context.Context, task domain.ScrapeTask) {
ctx, span := otel.Tracer("runner").Start(ctx, "runner.scrape_task")
defer span.End()
span.SetAttributes(
attribute.String("task.id", task.ID),
attribute.String("task.kind", task.Kind),
attribute.String("task.url", task.TargetURL),
)
log := r.deps.Log.With("task_id", task.ID, "kind", task.Kind, "url", task.TargetURL)
log.Info("runner: scrape task starting")
hbCtx, hbCancel := context.WithCancel(ctx)
defer hbCancel()
go func() {
tick := time.NewTicker(r.cfg.HeartbeatInterval)
defer tick.Stop()
for {
select {
case <-hbCtx.Done():
return
case <-tick.C:
if err := r.deps.Consumer.HeartbeatTask(ctx, task.ID); err != nil {
log.Warn("runner: heartbeat failed", "err", err)
}
}
}
}()
o := r.newOrchestrator()
var result domain.ScrapeResult
switch task.Kind {
case "catalogue":
result = r.runCatalogueTask(ctx, task, o, log)
case "book", "book_range":
result = o.RunBook(ctx, task)
default:
result.ErrorMessage = fmt.Sprintf("unknown task kind: %q", task.Kind)
log.Warn("runner: unknown task kind")
}
if err := r.deps.Consumer.FinishScrapeTask(ctx, task.ID, result); err != nil {
log.Error("runner: FinishScrapeTask failed", "err", err)
}
if result.ErrorMessage != "" {
r.tasksFailed.Add(1)
span.SetStatus(codes.Error, result.ErrorMessage)
} else {
r.tasksCompleted.Add(1)
span.SetStatus(codes.Ok, "")
}
log.Info("runner: scrape task finished",
"scraped", result.ChaptersScraped,
"skipped", result.ChaptersSkipped,
"errors", result.Errors,
)
}
// runCatalogueTask runs a full catalogue scrape.
func (r *Runner) runCatalogueTask(ctx context.Context, task domain.ScrapeTask, o *orchestrator.Orchestrator, log *slog.Logger) domain.ScrapeResult {
entries, errCh := r.deps.Novel.ScrapeCatalogue(ctx)
var result domain.ScrapeResult
for entry := range entries {
if ctx.Err() != nil {
break
}
bookTask := domain.ScrapeTask{
ID: task.ID,
Kind: "book",
TargetURL: entry.URL,
}
bookResult := o.RunBook(ctx, bookTask)
result.BooksFound += bookResult.BooksFound + 1
result.ChaptersScraped += bookResult.ChaptersScraped
result.ChaptersSkipped += bookResult.ChaptersSkipped
result.Errors += bookResult.Errors
}
if err := <-errCh; err != nil {
log.Warn("runner: catalogue scrape finished with error", "err", err)
result.Errors++
if result.ErrorMessage == "" {
result.ErrorMessage = err.Error()
}
}
return result
}
// runAudioTask executes one audio-generation task.
func (r *Runner) runAudioTask(ctx context.Context, task domain.AudioTask) {
ctx, span := otel.Tracer("runner").Start(ctx, "runner.audio_task")
defer span.End()
span.SetAttributes(
attribute.String("task.id", task.ID),
attribute.String("book.slug", task.Slug),
attribute.Int("chapter.number", task.Chapter),
attribute.String("audio.voice", task.Voice),
)
log := r.deps.Log.With("task_id", task.ID, "slug", task.Slug, "chapter", task.Chapter, "voice", task.Voice)
log.Info("runner: audio task starting")
hbCtx, hbCancel := context.WithCancel(ctx)
defer hbCancel()
go func() {
tick := time.NewTicker(r.cfg.HeartbeatInterval)
defer tick.Stop()
for {
select {
case <-hbCtx.Done():
return
case <-tick.C:
if err := r.deps.Consumer.HeartbeatTask(ctx, task.ID); err != nil {
log.Warn("runner: heartbeat failed", "err", err)
}
}
}
}()
fail := func(msg string) {
log.Error("runner: audio task failed", "reason", msg)
r.tasksFailed.Add(1)
span.SetStatus(codes.Error, msg)
result := domain.AudioResult{ErrorMessage: msg}
if err := r.deps.Consumer.FinishAudioTask(ctx, task.ID, result); err != nil {
log.Error("runner: FinishAudioTask failed", "err", err)
}
}
raw, err := r.deps.BookReader.ReadChapter(ctx, task.Slug, task.Chapter)
if err != nil {
fail(fmt.Sprintf("read chapter: %v", err))
return
}
text := stripMarkdown(raw)
if text == "" {
fail("chapter text is empty after stripping markdown")
return
}
var audioData []byte
if pockettts.IsPocketTTSVoice(task.Voice) {
if r.deps.PocketTTS == nil {
fail("pocket-tts client not configured (POCKET_TTS_URL is empty)")
return
}
var genErr error
audioData, genErr = r.deps.PocketTTS.GenerateAudio(ctx, text, task.Voice)
if genErr != nil {
fail(fmt.Sprintf("pocket-tts generate: %v", genErr))
return
}
log.Info("runner: audio generated via pocket-tts", "voice", task.Voice)
} else if cfai.IsCFAIVoice(task.Voice) {
if r.deps.CFAI == nil {
fail("cloudflare AI client not configured (CFAI_ACCOUNT_ID/CFAI_API_TOKEN empty)")
return
}
var genErr error
audioData, genErr = r.deps.CFAI.GenerateAudio(ctx, text, task.Voice)
if genErr != nil {
fail(fmt.Sprintf("cfai generate: %v", genErr))
return
}
log.Info("runner: audio generated via cloudflare AI", "voice", task.Voice)
} else {
if r.deps.Kokoro == nil {
fail("kokoro client not configured (KOKORO_URL is empty)")
return
}
var genErr error
audioData, genErr = r.deps.Kokoro.GenerateAudio(ctx, text, task.Voice)
if genErr != nil {
fail(fmt.Sprintf("kokoro generate: %v", genErr))
return
}
log.Info("runner: audio generated via kokoro-fastapi", "voice", task.Voice)
}
key := r.deps.AudioStore.AudioObjectKey(task.Slug, task.Chapter, task.Voice)
if err := r.deps.AudioStore.PutAudio(ctx, key, audioData); err != nil {
fail(fmt.Sprintf("put audio: %v", err))
return
}
r.tasksCompleted.Add(1)
span.SetStatus(codes.Ok, "")
result := domain.AudioResult{ObjectKey: key}
if err := r.deps.Consumer.FinishAudioTask(ctx, task.ID, result); err != nil {
log.Error("runner: FinishAudioTask failed", "err", err)
}
log.Info("runner: audio task finished", "key", key)
}