Files
libnovel/backend/internal/runner/asynq_runner.go
Admin 8e611840d1
All checks were successful
Release / Test backend (push) Successful in 32s
Release / Docker / caddy (push) Successful in 42s
Release / Check ui (push) Successful in 44s
Release / Docker / ui (push) Successful in 2m32s
Release / Docker / backend (push) Successful in 2m49s
Release / Docker / runner (push) Successful in 3m26s
Release / Gitea Release (push) Successful in 15s
fix: add 30s timeout to PB HTTP client; halve heartbeat tick interval
- storage/pocketbase.go: replace http.DefaultClient (no timeout) with a
  dedicated pbHTTPClient{Timeout: 30s} so a slow/hung PocketBase cannot
  stall the backend or runner indefinitely
- runner/asynq_runner.go: heartbeat ticker was firing at StaleTaskThreshold
  (2 min) == the Docker healthcheck deadline, so a single missed tick would
  mark the container unhealthy; halved to StaleTaskThreshold/2 (1 min)
2026-04-02 18:49:04 +05:00

237 lines
7.5 KiB
Go

package runner
// asynq_runner.go — Asynq-based task dispatch for the runner.
//
// When cfg.RedisAddr is set, Run() calls runAsynq() instead of runPoll().
// The Asynq server replaces the polling loop: it listens on Redis for tasks
// enqueued by the backend Producer and delivers them immediately.
//
// Handlers in this file decode Asynq job payloads and call the existing
// runScrapeTask / runAudioTask methods, keeping all execution logic in one place.
import (
"context"
"encoding/json"
"fmt"
"os"
"sync"
"time"
"github.com/hibiken/asynq"
asynqmetrics "github.com/hibiken/asynq/x/metrics"
"github.com/libnovel/backend/internal/asynqqueue"
"github.com/libnovel/backend/internal/domain"
)
// runAsynq starts an Asynq server that replaces the PocketBase poll loop.
// It also starts the periodic catalogue refresh ticker.
// Blocks until ctx is cancelled.
func (r *Runner) runAsynq(ctx context.Context) error {
redisOpt, err := r.redisConnOpt()
if err != nil {
return fmt.Errorf("runner: parse redis addr: %w", err)
}
srv := asynq.NewServer(redisOpt, asynq.Config{
// Allocate concurrency slots for each task type.
// Total concurrency = scrape + audio slots.
Concurrency: r.cfg.MaxConcurrentScrape + r.cfg.MaxConcurrentAudio,
Queues: map[string]int{
asynqqueue.QueueDefault: 1,
},
// Let Asynq handle retries with exponential back-off.
RetryDelayFunc: asynq.DefaultRetryDelayFunc,
// Log errors from handlers via the existing structured logger.
ErrorHandler: asynq.ErrorHandlerFunc(func(_ context.Context, task *asynq.Task, err error) {
r.deps.Log.Error("runner: asynq task failed",
"type", task.Type(),
"err", err,
)
}),
})
mux := asynq.NewServeMux()
mux.HandleFunc(asynqqueue.TypeAudioGenerate, r.handleAudioTask)
mux.HandleFunc(asynqqueue.TypeScrapeBook, r.handleScrapeTask)
mux.HandleFunc(asynqqueue.TypeScrapeCatalogue, r.handleScrapeTask)
// Register Asynq queue metrics with the default Prometheus registry so
// the /metrics endpoint (metrics.go) can expose them.
inspector := asynq.NewInspector(redisOpt)
collector := asynqmetrics.NewQueueMetricsCollector(inspector)
if err := r.metricsRegistry.Register(collector); err != nil {
r.deps.Log.Warn("runner: could not register asynq prometheus collector", "err", err)
}
// Start the periodic catalogue refresh.
catalogueTick := time.NewTicker(r.cfg.CatalogueRefreshInterval)
defer catalogueTick.Stop()
if !r.cfg.SkipInitialCatalogueRefresh {
go r.runCatalogueRefresh(ctx)
} else {
r.deps.Log.Info("runner: skipping initial catalogue refresh (RUNNER_SKIP_INITIAL_CATALOGUE_REFRESH=true)")
}
r.deps.Log.Info("runner: asynq mode active", "redis_addr", r.cfg.RedisAddr)
// ── Heartbeat goroutine ──────────────────────────────────────────────
// Write /tmp/runner.alive every 30s so Docker healthcheck passes in asynq mode.
// This mirrors the heartbeat file behavior from the poll() loop.
go func() {
heartbeatTick := time.NewTicker(r.cfg.StaleTaskThreshold / 2)
defer heartbeatTick.Stop()
for {
select {
case <-ctx.Done():
return
case <-heartbeatTick.C:
if f, err := os.Create("/tmp/runner.alive"); err != nil {
r.deps.Log.Warn("runner: could not write heartbeat file", "err", err)
} else {
f.Close()
}
}
}
}()
// ── Translation polling goroutine ────────────────────────────────────
// Translation tasks live in PocketBase (not Redis), so we need a separate
// poll loop to claim and dispatch them. This runs alongside the Asynq server.
translationSem := make(chan struct{}, r.cfg.MaxConcurrentTranslation)
var translationWg sync.WaitGroup
go func() {
tick := time.NewTicker(r.cfg.PollInterval)
defer tick.Stop()
for {
select {
case <-ctx.Done():
return
case <-tick.C:
r.pollTranslationTasks(ctx, translationSem, &translationWg)
}
}
}()
// Run catalogue refresh ticker in the background.
go func() {
for {
select {
case <-ctx.Done():
return
case <-catalogueTick.C:
go r.runCatalogueRefresh(ctx)
}
}
}()
// Start Asynq server (non-blocking).
if err := srv.Start(mux); err != nil {
return fmt.Errorf("runner: asynq server start: %w", err)
}
// Block until context is cancelled, then gracefully stop.
<-ctx.Done()
r.deps.Log.Info("runner: context cancelled, shutting down asynq server")
srv.Shutdown()
// Wait for translation tasks to complete.
translationWg.Wait()
return nil
}
// redisConnOpt parses cfg.RedisAddr into an asynq.RedisConnOpt.
// Supports full "redis://" / "rediss://" URLs and plain "host:port".
func (r *Runner) redisConnOpt() (asynq.RedisConnOpt, error) {
addr := r.cfg.RedisAddr
// ParseRedisURI handles redis:// and rediss:// schemes.
if len(addr) > 7 && (addr[:8] == "redis://" || addr[:9] == "rediss://") {
return asynq.ParseRedisURI(addr)
}
// Plain "host:port" — use RedisClientOpt directly.
return asynq.RedisClientOpt{
Addr: addr,
Password: r.cfg.RedisPassword,
}, nil
}
// handleScrapeTask is the Asynq handler for TypeScrapeBook and TypeScrapeCatalogue.
func (r *Runner) handleScrapeTask(ctx context.Context, t *asynq.Task) error {
var p asynqqueue.ScrapePayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("unmarshal scrape payload: %w", err)
}
task := domain.ScrapeTask{
ID: p.PBTaskID,
Kind: p.Kind,
TargetURL: p.TargetURL,
FromChapter: p.FromChapter,
ToChapter: p.ToChapter,
}
r.tasksRunning.Add(1)
defer r.tasksRunning.Add(-1)
r.runScrapeTask(ctx, task)
return nil
}
// handleAudioTask is the Asynq handler for TypeAudioGenerate.
func (r *Runner) handleAudioTask(ctx context.Context, t *asynq.Task) error {
var p asynqqueue.AudioPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("unmarshal audio payload: %w", err)
}
task := domain.AudioTask{
ID: p.PBTaskID,
Slug: p.Slug,
Chapter: p.Chapter,
Voice: p.Voice,
}
r.tasksRunning.Add(1)
defer r.tasksRunning.Add(-1)
r.runAudioTask(ctx, task)
return nil
}
// pollTranslationTasks claims all available translation tasks from PocketBase
// and dispatches them to goroutines. Translation tasks don't go through Redis/Asynq
// because they're stored in PocketBase, so we need this separate poll loop.
func (r *Runner) pollTranslationTasks(ctx context.Context, translationSem chan struct{}, wg *sync.WaitGroup) {
// Reap orphaned tasks (same logic as poll() in runner.go).
if n, err := r.deps.Consumer.ReapStaleTasks(ctx, r.cfg.StaleTaskThreshold); err != nil {
r.deps.Log.Warn("runner: reap stale translation tasks failed", "err", err)
} else if n > 0 {
r.deps.Log.Info("runner: reaped stale translation tasks", "count", n)
}
translationLoop:
for {
if ctx.Err() != nil {
return
}
select {
case translationSem <- struct{}{}:
// Slot acquired — proceed to claim a task.
default:
// All slots busy; leave remaining pending tasks for next tick.
break translationLoop
}
task, ok, err := r.deps.Consumer.ClaimNextTranslationTask(ctx, r.cfg.WorkerID)
if err != nil {
<-translationSem
r.deps.Log.Error("runner: ClaimNextTranslationTask failed", "err", err)
break
}
if !ok {
<-translationSem
break
}
r.tasksRunning.Add(1)
wg.Add(1)
go func(t domain.TranslationTask) {
defer wg.Done()
defer func() { <-translationSem }()
defer r.tasksRunning.Add(-1)
r.runTranslationTask(ctx, t)
}(task)
}
}