- Replace bell dropdown with full-screen NotificationsModal (mirrors SearchModal pattern) - Notifications visible to all logged-in users (not just admin) - Admin users excluded from new-chapter fan-out (dedup vs Scrape Complete notification) - Users with notify_new_chapters=false opted out of new-chapter in-app notifications - Toggle in profile page to enable/disable in-app new-chapter notifications - PATCH /api/profile endpoint to save notification preferences - User-facing /notifications page (admin redirects to /admin/notifications)
885 lines
30 KiB
Go
885 lines
30 KiB
Go
// Package runner implements the worker loop that polls PocketBase for pending
|
|
// scrape and audio tasks, executes them, and reports results back.
|
|
//
|
|
// Design:
|
|
// - Run(ctx) loops on a ticker; each tick claims and dispatches pending tasks.
|
|
// - Scrape tasks are dispatched to the Orchestrator (one goroutine per task,
|
|
// up to MaxConcurrentScrape).
|
|
// - Audio tasks fetch chapter text, call Kokoro, upload to MinIO, and report
|
|
// the result back (up to MaxConcurrentAudio goroutines).
|
|
// - The runner is stateless between ticks; all state lives in PocketBase.
|
|
// - Atomic task counters are exposed via /metrics (see metrics.go).
|
|
// - Books are indexed in Meilisearch via an orchestrator.Config.PostMetadata
|
|
// hook injected at construction time.
|
|
package runner
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"os"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/codes"
|
|
|
|
"github.com/libnovel/backend/internal/bookstore"
|
|
"github.com/libnovel/backend/internal/cfai"
|
|
"github.com/libnovel/backend/internal/domain"
|
|
"github.com/libnovel/backend/internal/kokoro"
|
|
"github.com/libnovel/backend/internal/libretranslate"
|
|
"github.com/libnovel/backend/internal/meili"
|
|
"github.com/libnovel/backend/internal/orchestrator"
|
|
"github.com/libnovel/backend/internal/pockettts"
|
|
"github.com/libnovel/backend/internal/scraper"
|
|
"github.com/libnovel/backend/internal/storage"
|
|
"github.com/libnovel/backend/internal/taskqueue"
|
|
"github.com/libnovel/backend/internal/webpush"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
)
|
|
|
|
// Notifier creates notifications for users.
|
|
type Notifier interface {
|
|
CreateNotification(ctx context.Context, userID, title, message, link string) error
|
|
}
|
|
|
|
// ChapterIngester persists imported chapters for a book.
|
|
type ChapterIngester interface {
|
|
IngestChapters(ctx context.Context, slug string, chapters []bookstore.Chapter) error
|
|
}
|
|
|
|
// ImportChapterStore retrieves pre-parsed chapter JSON blobs from object storage.
|
|
type ImportChapterStore interface {
|
|
GetImportChapters(ctx context.Context, key string) ([]byte, error)
|
|
}
|
|
|
|
// Config tunes the runner behaviour.
|
|
type Config struct {
|
|
// WorkerID uniquely identifies this runner instance in PocketBase records.
|
|
WorkerID string
|
|
// PollInterval is how often the runner checks for new tasks.
|
|
// Only used in PocketBase-polling mode (RedisAddr == "").
|
|
PollInterval time.Duration
|
|
// MaxConcurrentScrape limits simultaneous book-scrape goroutines.
|
|
MaxConcurrentScrape int
|
|
// MaxConcurrentAudio limits simultaneous audio-generation goroutines.
|
|
MaxConcurrentAudio int
|
|
// MaxConcurrentTranslation limits simultaneous translation goroutines.
|
|
MaxConcurrentTranslation int
|
|
// OrchestratorWorkers is the chapter-scraping parallelism inside each book run.
|
|
OrchestratorWorkers int
|
|
// HeartbeatInterval is how often active tasks PATCH their heartbeat_at
|
|
// timestamp to signal they are still alive. Defaults to 30s when 0.
|
|
// Only used in PocketBase-polling mode.
|
|
HeartbeatInterval time.Duration
|
|
// StaleTaskThreshold is how old a heartbeat must be (or absent) before the
|
|
// task is considered orphaned and reset to pending. Defaults to 2m when 0.
|
|
// Only used in PocketBase-polling mode.
|
|
StaleTaskThreshold time.Duration
|
|
// CatalogueRefreshInterval is how often the runner walks the full catalogue,
|
|
// scrapes per-book metadata, downloads covers, and re-indexes everything in
|
|
// Meilisearch. Defaults to 24h (expensive — full catalogue walk).
|
|
CatalogueRefreshInterval time.Duration
|
|
// CatalogueRequestDelay is the base inter-request pause during a catalogue
|
|
// refresh metadata walk. Jitter of up to 50% is added on top.
|
|
// Defaults to 2s. Set via RUNNER_CATALOGUE_REQUEST_DELAY.
|
|
CatalogueRequestDelay time.Duration
|
|
// SkipInitialCatalogueRefresh suppresses the immediate catalogue walk that
|
|
// otherwise fires at startup. The periodic ticker (CatalogueRefreshInterval)
|
|
// still fires normally. Set RUNNER_SKIP_INITIAL_CATALOGUE_REFRESH=true for
|
|
// quick restarts where the catalogue is already up to date.
|
|
SkipInitialCatalogueRefresh bool
|
|
// MetricsAddr is the HTTP listen address for the /metrics endpoint.
|
|
// Defaults to ":9091". Set to "" to disable.
|
|
MetricsAddr string
|
|
// RedisAddr is the address of the Redis instance used for Asynq task
|
|
// dispatch. When set the runner switches from PocketBase-polling mode to
|
|
// Asynq ServeMux mode (immediate task delivery, no polling).
|
|
// Supports plain "host:port" or a full "rediss://..." URL.
|
|
// When empty the runner falls back to PocketBase polling.
|
|
RedisAddr string
|
|
// RedisPassword is the Redis AUTH password.
|
|
// Not required when RedisAddr is a full URL that includes credentials.
|
|
RedisPassword string
|
|
}
|
|
|
|
// Dependencies are the external services the runner depends on.
|
|
type Dependencies struct {
|
|
// Consumer claims tasks from PocketBase.
|
|
Consumer taskqueue.Consumer
|
|
// BookWriter persists scraped data (used by orchestrator).
|
|
BookWriter bookstore.BookWriter
|
|
// BookReader reads chapter text for audio generation.
|
|
BookReader bookstore.BookReader
|
|
// AudioStore persists generated audio and checks key existence.
|
|
AudioStore bookstore.AudioStore
|
|
// TranslationStore persists translated markdown and checks key existence.
|
|
TranslationStore bookstore.TranslationStore
|
|
// CoverStore stores book cover images in MinIO.
|
|
CoverStore bookstore.CoverStore
|
|
// BookImport handles PDF/EPUB file parsing and chapter extraction.
|
|
// Kept for backward compatibility when ChaptersKey is not set.
|
|
BookImport bookstore.BookImporter
|
|
// ImportChapterStore retrieves pre-parsed chapter JSON blobs from MinIO.
|
|
// When set and the task has a ChaptersKey, the runner reads from here
|
|
// instead of calling BookImport.Import() (the new preferred path).
|
|
ImportChapterStore ImportChapterStore
|
|
// ChapterIngester persists extracted chapters into MinIO/PocketBase.
|
|
ChapterIngester ChapterIngester
|
|
// Notifier creates notifications for users.
|
|
Notifier Notifier
|
|
// WebPush sends browser push notifications to subscribed users.
|
|
// If nil, push notifications are disabled.
|
|
WebPush *webpush.Sender
|
|
// Store is the underlying *storage.Store; used for push subscription lookups.
|
|
// Only needed when WebPush is non-nil.
|
|
Store *storage.Store
|
|
// SearchIndex indexes books in Meilisearch after scraping.
|
|
// If nil a no-op is used.
|
|
SearchIndex meili.Client
|
|
// Novel is the scraper implementation.
|
|
Novel scraper.NovelScraper
|
|
// Kokoro is the Kokoro-FastAPI TTS client (GPU, OpenAI-compatible voices).
|
|
Kokoro kokoro.Client
|
|
// PocketTTS is the pocket-tts client (CPU, kyutai voices: alba, marius, etc.).
|
|
// If nil, pocket-tts voice tasks will fail with a clear error.
|
|
PocketTTS pockettts.Client
|
|
// CFAI is the Cloudflare Workers AI TTS client (cfai:* prefixed voices).
|
|
// If nil, CF AI voice tasks will fail with a clear error.
|
|
CFAI cfai.Client
|
|
// LibreTranslate is the machine translation client.
|
|
// If nil, translation tasks will fail with a clear error.
|
|
LibreTranslate libretranslate.Client
|
|
// Log is the structured logger.
|
|
Log *slog.Logger
|
|
}
|
|
|
|
// Runner is the main worker process.
|
|
type Runner struct {
|
|
cfg Config
|
|
deps Dependencies
|
|
|
|
metricsRegistry *prometheus.Registry
|
|
|
|
// Atomic task counters — read by /metrics without locking.
|
|
tasksRunning atomic.Int64
|
|
tasksCompleted atomic.Int64
|
|
tasksFailed atomic.Int64
|
|
|
|
startedAt time.Time
|
|
}
|
|
|
|
// New creates a Runner from cfg and deps.
|
|
func New(cfg Config, deps Dependencies) *Runner {
|
|
if cfg.PollInterval <= 0 {
|
|
cfg.PollInterval = 30 * time.Second
|
|
}
|
|
if cfg.MaxConcurrentScrape <= 0 {
|
|
cfg.MaxConcurrentScrape = 2
|
|
}
|
|
if cfg.MaxConcurrentAudio <= 0 {
|
|
cfg.MaxConcurrentAudio = 1
|
|
}
|
|
if cfg.MaxConcurrentTranslation <= 0 {
|
|
cfg.MaxConcurrentTranslation = 1
|
|
}
|
|
if cfg.WorkerID == "" {
|
|
cfg.WorkerID = "runner"
|
|
}
|
|
if cfg.HeartbeatInterval <= 0 {
|
|
cfg.HeartbeatInterval = 30 * time.Second
|
|
}
|
|
if cfg.StaleTaskThreshold <= 0 {
|
|
cfg.StaleTaskThreshold = 2 * time.Minute
|
|
}
|
|
if cfg.CatalogueRefreshInterval <= 0 {
|
|
cfg.CatalogueRefreshInterval = 24 * time.Hour
|
|
}
|
|
if cfg.CatalogueRequestDelay <= 0 {
|
|
cfg.CatalogueRequestDelay = 2 * time.Second
|
|
}
|
|
if cfg.MetricsAddr == "" {
|
|
cfg.MetricsAddr = ":9091"
|
|
}
|
|
if deps.Log == nil {
|
|
deps.Log = slog.Default()
|
|
}
|
|
if deps.SearchIndex == nil {
|
|
deps.SearchIndex = meili.NoopClient{}
|
|
}
|
|
return &Runner{cfg: cfg, deps: deps, startedAt: time.Now(), metricsRegistry: prometheus.NewRegistry()}
|
|
}
|
|
|
|
// Run starts the worker loop and the metrics HTTP server, blocking until ctx
|
|
// is cancelled.
|
|
//
|
|
// When cfg.RedisAddr is set the runner uses Asynq (immediate task delivery).
|
|
// Otherwise it falls back to PocketBase polling (legacy mode).
|
|
func (r *Runner) Run(ctx context.Context) error {
|
|
r.deps.Log.Info("runner: starting",
|
|
"worker_id", r.cfg.WorkerID,
|
|
"mode", r.mode(),
|
|
"max_scrape", r.cfg.MaxConcurrentScrape,
|
|
"max_audio", r.cfg.MaxConcurrentAudio,
|
|
"max_translation", r.cfg.MaxConcurrentTranslation,
|
|
"catalogue_refresh_interval", r.cfg.CatalogueRefreshInterval,
|
|
"metrics_addr", r.cfg.MetricsAddr,
|
|
)
|
|
|
|
// Start metrics HTTP server in background if configured.
|
|
if r.cfg.MetricsAddr != "" {
|
|
ms := newMetricsServer(r.cfg.MetricsAddr, r, r.deps.Log)
|
|
go func() {
|
|
if err := ms.ListenAndServe(ctx); err != nil {
|
|
r.deps.Log.Error("runner: metrics server error", "err", err)
|
|
}
|
|
}()
|
|
}
|
|
|
|
if r.cfg.RedisAddr != "" {
|
|
return r.runAsynq(ctx)
|
|
}
|
|
return r.runPoll(ctx)
|
|
}
|
|
|
|
// mode returns a short string describing the active dispatch mode.
|
|
func (r *Runner) mode() string {
|
|
if r.cfg.RedisAddr != "" {
|
|
return "asynq"
|
|
}
|
|
return "poll"
|
|
}
|
|
|
|
// runPoll is the legacy PocketBase-polling dispatch loop.
|
|
// Used when cfg.RedisAddr is empty.
|
|
func (r *Runner) runPoll(ctx context.Context) error {
|
|
scrapeSem := make(chan struct{}, r.cfg.MaxConcurrentScrape)
|
|
audioSem := make(chan struct{}, r.cfg.MaxConcurrentAudio)
|
|
translationSem := make(chan struct{}, r.cfg.MaxConcurrentTranslation)
|
|
importSem := make(chan struct{}, 1) // Limit concurrent imports
|
|
var wg sync.WaitGroup
|
|
|
|
tick := time.NewTicker(r.cfg.PollInterval)
|
|
defer tick.Stop()
|
|
|
|
catalogueTick := time.NewTicker(r.cfg.CatalogueRefreshInterval)
|
|
defer catalogueTick.Stop()
|
|
|
|
// Run one catalogue refresh immediately on startup (unless skipped by flag).
|
|
if !r.cfg.SkipInitialCatalogueRefresh {
|
|
go r.runCatalogueRefresh(ctx)
|
|
} else {
|
|
r.deps.Log.Info("runner: skipping initial catalogue refresh (RUNNER_SKIP_INITIAL_CATALOGUE_REFRESH=true)")
|
|
}
|
|
|
|
r.deps.Log.Info("runner: poll mode active", "poll_interval", r.cfg.PollInterval)
|
|
|
|
// Run one poll immediately on startup, then on each tick.
|
|
for {
|
|
r.poll(ctx, scrapeSem, audioSem, translationSem, importSem, &wg)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
r.deps.Log.Info("runner: context cancelled, draining active tasks")
|
|
done := make(chan struct{})
|
|
go func() {
|
|
wg.Wait()
|
|
close(done)
|
|
}()
|
|
select {
|
|
case <-done:
|
|
r.deps.Log.Info("runner: all tasks drained, exiting")
|
|
case <-time.After(2 * time.Minute):
|
|
r.deps.Log.Warn("runner: drain timeout exceeded, forcing exit")
|
|
}
|
|
return nil
|
|
case <-catalogueTick.C:
|
|
go r.runCatalogueRefresh(ctx)
|
|
case <-tick.C:
|
|
}
|
|
}
|
|
}
|
|
|
|
// poll claims all available pending tasks and dispatches them to goroutines.
|
|
func (r *Runner) poll(ctx context.Context, scrapeSem, audioSem, translationSem, importSem chan struct{}, wg *sync.WaitGroup) {
|
|
// ── Heartbeat file ────────────────────────────────────────────────────
|
|
// Touch /tmp/runner.alive so the Docker health check can confirm the
|
|
// runner is actively polling. Failure is non-fatal — just log it.
|
|
if f, err := os.Create("/tmp/runner.alive"); err != nil {
|
|
r.deps.Log.Warn("runner: could not write heartbeat file", "err", err)
|
|
} else {
|
|
f.Close()
|
|
}
|
|
|
|
// ── Reap orphaned tasks ───────────────────────────────────────────────
|
|
if n, err := r.deps.Consumer.ReapStaleTasks(ctx, r.cfg.StaleTaskThreshold); err != nil {
|
|
r.deps.Log.Warn("runner: reap stale tasks failed", "err", err)
|
|
} else if n > 0 {
|
|
r.deps.Log.Info("runner: reaped stale tasks", "count", n)
|
|
}
|
|
|
|
// ── Scrape tasks ──────────────────────────────────────────────────────
|
|
for {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
task, ok, err := r.deps.Consumer.ClaimNextScrapeTask(ctx, r.cfg.WorkerID)
|
|
if err != nil {
|
|
r.deps.Log.Error("runner: ClaimNextScrapeTask failed", "err", err)
|
|
break
|
|
}
|
|
if !ok {
|
|
break
|
|
}
|
|
select {
|
|
case scrapeSem <- struct{}{}:
|
|
default:
|
|
r.deps.Log.Warn("runner: scrape semaphore full, will retry next tick",
|
|
"task_id", task.ID)
|
|
break
|
|
}
|
|
r.tasksRunning.Add(1)
|
|
wg.Add(1)
|
|
go func(t domain.ScrapeTask) {
|
|
defer wg.Done()
|
|
defer func() { <-scrapeSem }()
|
|
defer r.tasksRunning.Add(-1)
|
|
r.runScrapeTask(ctx, t)
|
|
}(task)
|
|
}
|
|
|
|
// ── Audio tasks ───────────────────────────────────────────────────────
|
|
// Only claim tasks when there is a free slot in the semaphore.
|
|
// This avoids the old bug where we claimed (status→running) a task and
|
|
// then couldn't dispatch it, leaving it orphaned until the reaper fired.
|
|
audioLoop:
|
|
for {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
// Check capacity before claiming to avoid orphaning tasks.
|
|
select {
|
|
case audioSem <- struct{}{}:
|
|
// Slot acquired — proceed to claim a task.
|
|
default:
|
|
// All slots busy; leave remaining pending tasks for next tick.
|
|
break audioLoop
|
|
}
|
|
task, ok, err := r.deps.Consumer.ClaimNextAudioTask(ctx, r.cfg.WorkerID)
|
|
if err != nil {
|
|
<-audioSem // release the pre-acquired slot
|
|
r.deps.Log.Error("runner: ClaimNextAudioTask failed", "err", err)
|
|
break
|
|
}
|
|
if !ok {
|
|
<-audioSem // release the pre-acquired slot; queue empty
|
|
break
|
|
}
|
|
r.tasksRunning.Add(1)
|
|
wg.Add(1)
|
|
go func(t domain.AudioTask) {
|
|
defer wg.Done()
|
|
defer func() { <-audioSem }()
|
|
defer r.tasksRunning.Add(-1)
|
|
r.runAudioTask(ctx, t)
|
|
}(task)
|
|
}
|
|
|
|
// ── Translation tasks ─────────────────────────────────────────────────
|
|
translationLoop:
|
|
for {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
select {
|
|
case translationSem <- struct{}{}:
|
|
// Slot acquired — proceed to claim a task.
|
|
default:
|
|
// All slots busy; leave remaining pending tasks for next tick.
|
|
break translationLoop
|
|
}
|
|
task, ok, err := r.deps.Consumer.ClaimNextTranslationTask(ctx, r.cfg.WorkerID)
|
|
if err != nil {
|
|
<-translationSem
|
|
r.deps.Log.Error("runner: ClaimNextTranslationTask failed", "err", err)
|
|
break
|
|
}
|
|
if !ok {
|
|
<-translationSem
|
|
break
|
|
}
|
|
r.tasksRunning.Add(1)
|
|
wg.Add(1)
|
|
go func(t domain.TranslationTask) {
|
|
defer wg.Done()
|
|
defer func() { <-translationSem }()
|
|
defer r.tasksRunning.Add(-1)
|
|
r.runTranslationTask(ctx, t)
|
|
}(task)
|
|
}
|
|
|
|
// ── Import tasks ─────────────────────────────────────────────────────
|
|
importLoop:
|
|
for {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
select {
|
|
case importSem <- struct{}{}:
|
|
// Slot acquired — proceed to claim a task.
|
|
default:
|
|
// All slots busy; leave remaining pending tasks for next tick.
|
|
break importLoop
|
|
}
|
|
task, ok, err := r.deps.Consumer.ClaimNextImportTask(ctx, r.cfg.WorkerID)
|
|
if err != nil {
|
|
<-importSem
|
|
r.deps.Log.Error("runner: ClaimNextImportTask failed", "err", err)
|
|
break
|
|
}
|
|
if !ok {
|
|
<-importSem
|
|
break
|
|
}
|
|
r.tasksRunning.Add(1)
|
|
wg.Add(1)
|
|
go func(t domain.ImportTask) {
|
|
defer wg.Done()
|
|
defer func() { <-importSem }()
|
|
defer r.tasksRunning.Add(-1)
|
|
r.runImportTask(ctx, t, t.ObjectKey)
|
|
}(task)
|
|
}
|
|
}
|
|
|
|
// newOrchestrator builds an orchestrator with the Meilisearch post-hook wired in.
|
|
func (r *Runner) newOrchestrator() *orchestrator.Orchestrator {
|
|
oCfg := orchestrator.Config{
|
|
Workers: r.cfg.OrchestratorWorkers,
|
|
PostMetadata: func(ctx context.Context, meta domain.BookMeta) {
|
|
if err := r.deps.SearchIndex.UpsertBook(ctx, meta); err != nil {
|
|
r.deps.Log.Warn("runner: meilisearch upsert failed",
|
|
"slug", meta.Slug, "err", err)
|
|
}
|
|
},
|
|
}
|
|
return orchestrator.New(oCfg, r.deps.Novel, r.deps.BookWriter, r.deps.Log)
|
|
}
|
|
|
|
// runScrapeTask executes one scrape task end-to-end and reports the result.
|
|
func (r *Runner) runScrapeTask(ctx context.Context, task domain.ScrapeTask) {
|
|
ctx, span := otel.Tracer("runner").Start(ctx, "runner.scrape_task")
|
|
defer span.End()
|
|
span.SetAttributes(
|
|
attribute.String("task.id", task.ID),
|
|
attribute.String("task.kind", task.Kind),
|
|
attribute.String("task.url", task.TargetURL),
|
|
)
|
|
|
|
log := r.deps.Log.With("task_id", task.ID, "kind", task.Kind, "url", task.TargetURL)
|
|
log.Info("runner: scrape task starting")
|
|
|
|
hbCtx, hbCancel := context.WithCancel(ctx)
|
|
defer hbCancel()
|
|
go func() {
|
|
tick := time.NewTicker(r.cfg.HeartbeatInterval)
|
|
defer tick.Stop()
|
|
for {
|
|
select {
|
|
case <-hbCtx.Done():
|
|
return
|
|
case <-tick.C:
|
|
if err := r.deps.Consumer.HeartbeatTask(ctx, task.ID); err != nil {
|
|
log.Warn("runner: heartbeat failed", "err", err)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
o := r.newOrchestrator()
|
|
var result domain.ScrapeResult
|
|
|
|
switch task.Kind {
|
|
case "catalogue":
|
|
result = r.runCatalogueTask(ctx, task, o, log)
|
|
case "book", "book_range":
|
|
result = o.RunBook(ctx, task)
|
|
default:
|
|
result.ErrorMessage = fmt.Sprintf("unknown task kind: %q", task.Kind)
|
|
log.Warn("runner: unknown task kind")
|
|
}
|
|
|
|
// Use a fresh context for the final write so a cancelled task context doesn't
|
|
// prevent the result counters from being persisted to PocketBase.
|
|
finishCtx, finishCancel := context.WithTimeout(context.Background(), 15*time.Second)
|
|
defer finishCancel()
|
|
if err := r.deps.Consumer.FinishScrapeTask(finishCtx, task.ID, result); err != nil {
|
|
log.Error("runner: FinishScrapeTask failed", "err", err)
|
|
}
|
|
|
|
if result.ErrorMessage != "" {
|
|
r.tasksFailed.Add(1)
|
|
span.SetStatus(codes.Error, result.ErrorMessage)
|
|
if r.deps.Notifier != nil {
|
|
_ = r.deps.Notifier.CreateNotification(ctx, "admin",
|
|
"Scrape Failed",
|
|
fmt.Sprintf("Scrape task (%s) failed: %s", task.Kind, result.ErrorMessage),
|
|
"/admin/tasks")
|
|
}
|
|
} else {
|
|
r.tasksCompleted.Add(1)
|
|
span.SetStatus(codes.Ok, "")
|
|
if r.deps.Notifier != nil {
|
|
_ = r.deps.Notifier.CreateNotification(ctx, "admin",
|
|
"Scrape Complete",
|
|
fmt.Sprintf("Scraped %d chapters, skipped %d (%s)", result.ChaptersScraped, result.ChaptersSkipped, task.Kind),
|
|
"/admin/tasks")
|
|
}
|
|
// Fan-out in-app new-chapter notification to all users who have this book
|
|
// in their library. Runs in background so it doesn't block the task loop.
|
|
if r.deps.Store != nil && result.ChaptersScraped > 0 &&
|
|
result.Slug != "" && task.Kind != "catalogue" {
|
|
go func() {
|
|
notifyCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
title := result.Slug
|
|
_ = r.deps.Store.NotifyUsersWithBook(notifyCtx, result.Slug,
|
|
"New chapters available",
|
|
fmt.Sprintf("%d new chapter(s) added to %s", result.ChaptersScraped, title),
|
|
"/books/"+result.Slug)
|
|
}()
|
|
}
|
|
// Send Web Push notifications to subscribed browsers.
|
|
if r.deps.WebPush != nil && r.deps.Store != nil &&
|
|
result.ChaptersScraped > 0 && result.Slug != "" && task.Kind != "catalogue" {
|
|
go r.deps.WebPush.SendToBook(context.Background(), r.deps.Store, result.Slug, webpush.Payload{
|
|
Title: "New chapter available",
|
|
Body: fmt.Sprintf("%d new chapter(s) added", result.ChaptersScraped),
|
|
URL: "/books/" + result.Slug,
|
|
Icon: "/icon-192.png",
|
|
})
|
|
}
|
|
}
|
|
|
|
log.Info("runner: scrape task finished",
|
|
"scraped", result.ChaptersScraped,
|
|
"skipped", result.ChaptersSkipped,
|
|
"errors", result.Errors,
|
|
)
|
|
}
|
|
|
|
// runCatalogueTask runs a full catalogue scrape.
|
|
func (r *Runner) runCatalogueTask(ctx context.Context, task domain.ScrapeTask, o *orchestrator.Orchestrator, log *slog.Logger) domain.ScrapeResult {
|
|
entries, errCh := r.deps.Novel.ScrapeCatalogue(ctx)
|
|
var result domain.ScrapeResult
|
|
|
|
for entry := range entries {
|
|
if ctx.Err() != nil {
|
|
break
|
|
}
|
|
bookTask := domain.ScrapeTask{
|
|
ID: task.ID,
|
|
Kind: "book",
|
|
TargetURL: entry.URL,
|
|
}
|
|
bookResult := o.RunBook(ctx, bookTask)
|
|
result.BooksFound += bookResult.BooksFound
|
|
result.ChaptersScraped += bookResult.ChaptersScraped
|
|
result.ChaptersSkipped += bookResult.ChaptersSkipped
|
|
result.Errors += bookResult.Errors
|
|
}
|
|
|
|
if err := <-errCh; err != nil {
|
|
log.Warn("runner: catalogue scrape finished with error", "err", err)
|
|
result.Errors++
|
|
if result.ErrorMessage == "" {
|
|
result.ErrorMessage = err.Error()
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
// runAudioTask executes one audio-generation task.
|
|
func (r *Runner) runAudioTask(ctx context.Context, task domain.AudioTask) {
|
|
ctx, span := otel.Tracer("runner").Start(ctx, "runner.audio_task")
|
|
defer span.End()
|
|
span.SetAttributes(
|
|
attribute.String("task.id", task.ID),
|
|
attribute.String("book.slug", task.Slug),
|
|
attribute.Int("chapter.number", task.Chapter),
|
|
attribute.String("audio.voice", task.Voice),
|
|
)
|
|
|
|
log := r.deps.Log.With("task_id", task.ID, "slug", task.Slug, "chapter", task.Chapter, "voice", task.Voice)
|
|
log.Info("runner: audio task starting")
|
|
|
|
hbCtx, hbCancel := context.WithCancel(ctx)
|
|
defer hbCancel()
|
|
go func() {
|
|
tick := time.NewTicker(r.cfg.HeartbeatInterval)
|
|
defer tick.Stop()
|
|
for {
|
|
select {
|
|
case <-hbCtx.Done():
|
|
return
|
|
case <-tick.C:
|
|
if err := r.deps.Consumer.HeartbeatTask(ctx, task.ID); err != nil {
|
|
log.Warn("runner: heartbeat failed", "err", err)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
fail := func(msg string) {
|
|
log.Error("runner: audio task failed", "reason", msg)
|
|
r.tasksFailed.Add(1)
|
|
span.SetStatus(codes.Error, msg)
|
|
result := domain.AudioResult{ErrorMessage: msg}
|
|
if err := r.deps.Consumer.FinishAudioTask(ctx, task.ID, result); err != nil {
|
|
log.Error("runner: FinishAudioTask failed", "err", err)
|
|
}
|
|
if r.deps.Notifier != nil {
|
|
_ = r.deps.Notifier.CreateNotification(ctx, "admin",
|
|
"Audio Failed",
|
|
fmt.Sprintf("Ch.%d of %s (%s): %s", task.Chapter, task.Slug, task.Voice, msg),
|
|
fmt.Sprintf("/books/%s", task.Slug))
|
|
}
|
|
}
|
|
|
|
raw, err := r.deps.BookReader.ReadChapter(ctx, task.Slug, task.Chapter)
|
|
if err != nil {
|
|
fail(fmt.Sprintf("read chapter: %v", err))
|
|
return
|
|
}
|
|
text := stripMarkdown(raw)
|
|
if text == "" {
|
|
fail("chapter text is empty after stripping markdown")
|
|
return
|
|
}
|
|
|
|
var audioData []byte
|
|
if pockettts.IsPocketTTSVoice(task.Voice) {
|
|
if r.deps.PocketTTS == nil {
|
|
fail("pocket-tts client not configured (POCKET_TTS_URL is empty)")
|
|
return
|
|
}
|
|
var genErr error
|
|
audioData, genErr = r.deps.PocketTTS.GenerateAudio(ctx, text, task.Voice)
|
|
if genErr != nil {
|
|
fail(fmt.Sprintf("pocket-tts generate: %v", genErr))
|
|
return
|
|
}
|
|
log.Info("runner: audio generated via pocket-tts", "voice", task.Voice)
|
|
} else if cfai.IsCFAIVoice(task.Voice) {
|
|
if r.deps.CFAI == nil {
|
|
fail("cloudflare AI client not configured (CFAI_ACCOUNT_ID/CFAI_API_TOKEN empty)")
|
|
return
|
|
}
|
|
var genErr error
|
|
audioData, genErr = r.deps.CFAI.GenerateAudio(ctx, text, task.Voice)
|
|
if genErr != nil {
|
|
fail(fmt.Sprintf("cfai generate: %v", genErr))
|
|
return
|
|
}
|
|
log.Info("runner: audio generated via cloudflare AI", "voice", task.Voice)
|
|
} else {
|
|
if r.deps.Kokoro == nil {
|
|
fail("kokoro client not configured (KOKORO_URL is empty)")
|
|
return
|
|
}
|
|
var genErr error
|
|
audioData, genErr = kokoroGenerateChunked(ctx, r.deps.Kokoro, text, task.Voice, log)
|
|
if genErr != nil {
|
|
fail(fmt.Sprintf("kokoro generate: %v", genErr))
|
|
return
|
|
}
|
|
log.Info("runner: audio generated via kokoro-fastapi", "voice", task.Voice)
|
|
}
|
|
|
|
key := r.deps.AudioStore.AudioObjectKey(task.Slug, task.Chapter, task.Voice)
|
|
if err := r.deps.AudioStore.PutAudio(ctx, key, audioData); err != nil {
|
|
fail(fmt.Sprintf("put audio: %v", err))
|
|
return
|
|
}
|
|
|
|
r.tasksCompleted.Add(1)
|
|
span.SetStatus(codes.Ok, "")
|
|
result := domain.AudioResult{ObjectKey: key}
|
|
if err := r.deps.Consumer.FinishAudioTask(ctx, task.ID, result); err != nil {
|
|
log.Error("runner: FinishAudioTask failed", "err", err)
|
|
}
|
|
if r.deps.Notifier != nil {
|
|
_ = r.deps.Notifier.CreateNotification(ctx, "admin",
|
|
"Audio Ready",
|
|
fmt.Sprintf("Ch.%d of %s (%s) is ready", task.Chapter, task.Slug, task.Voice),
|
|
fmt.Sprintf("/books/%s", task.Slug))
|
|
}
|
|
log.Info("runner: audio task finished", "key", key)
|
|
}
|
|
|
|
// kokoroGenerateChunked splits text into ~1 000-character sentence-boundary
|
|
// chunks, calls Kokoro.GenerateAudio for each, and concatenates the raw MP3
|
|
// bytes. This avoids EOF / timeout failures that occur when the Kokoro
|
|
// FastAPI server receives very large inputs (e.g. a full imported PDF chapter).
|
|
//
|
|
// Concatenating raw MP3 frames is valid — MP3 is a frame-based format and
|
|
// standard players handle multi-segment files correctly.
|
|
func kokoroGenerateChunked(ctx context.Context, k kokoro.Client, text, voice string, log *slog.Logger) ([]byte, error) {
|
|
const chunkSize = 1000
|
|
|
|
chunks := chunkText(text, chunkSize)
|
|
log.Info("runner: kokoro chunked generation", "chunks", len(chunks), "total_chars", len(text))
|
|
|
|
var combined []byte
|
|
for i, chunk := range chunks {
|
|
data, err := k.GenerateAudio(ctx, chunk, voice)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("chunk %d/%d: %w", i+1, len(chunks), err)
|
|
}
|
|
combined = append(combined, data...)
|
|
log.Info("runner: kokoro chunk done", "chunk", i+1, "of", len(chunks), "bytes", len(data))
|
|
}
|
|
return combined, nil
|
|
}
|
|
|
|
// runImportTask executes one PDF/EPUB import task.
|
|
// Preferred path: when task.ChaptersKey is set, it reads pre-parsed chapters
|
|
// JSON from MinIO (written by the backend at upload time) and ingests them.
|
|
// Fallback path: when ChaptersKey is empty, calls BookImport.Import() to
|
|
// parse the raw file on the runner (legacy behaviour, not used for new tasks).
|
|
func (r *Runner) runImportTask(ctx context.Context, task domain.ImportTask, objectKey string) {
|
|
ctx, span := otel.Tracer("runner").Start(ctx, "runner.import_task")
|
|
defer span.End()
|
|
span.SetAttributes(
|
|
attribute.String("task.id", task.ID),
|
|
attribute.String("book.slug", task.Slug),
|
|
attribute.String("file.type", task.FileType),
|
|
attribute.String("chapters_key", task.ChaptersKey),
|
|
)
|
|
|
|
log := r.deps.Log.With("task_id", task.ID, "slug", task.Slug, "file_type", task.FileType)
|
|
log.Info("runner: import task starting", "chapters_key", task.ChaptersKey)
|
|
|
|
hbCtx, hbCancel := context.WithCancel(ctx)
|
|
defer hbCancel()
|
|
go func() {
|
|
tick := time.NewTicker(r.cfg.HeartbeatInterval)
|
|
defer tick.Stop()
|
|
for {
|
|
select {
|
|
case <-hbCtx.Done():
|
|
return
|
|
case <-tick.C:
|
|
if err := r.deps.Consumer.HeartbeatTask(ctx, task.ID); err != nil {
|
|
log.Warn("runner: heartbeat failed", "err", err)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
fail := func(msg string) {
|
|
log.Error("runner: import task failed", "reason", msg)
|
|
r.tasksFailed.Add(1)
|
|
span.SetStatus(codes.Error, msg)
|
|
result := domain.ImportResult{ErrorMessage: msg}
|
|
if err := r.deps.Consumer.FinishImportTask(ctx, task.ID, result); err != nil {
|
|
log.Error("runner: FinishImportTask failed", "err", err)
|
|
}
|
|
}
|
|
|
|
var chapters []bookstore.Chapter
|
|
|
|
if task.ChaptersKey != "" && r.deps.ImportChapterStore != nil {
|
|
// New path: read pre-parsed chapters JSON uploaded by the backend.
|
|
raw, err := r.deps.ImportChapterStore.GetImportChapters(ctx, task.ChaptersKey)
|
|
if err != nil {
|
|
fail(fmt.Sprintf("get chapters JSON: %v", err))
|
|
return
|
|
}
|
|
if err := json.Unmarshal(raw, &chapters); err != nil {
|
|
fail(fmt.Sprintf("unmarshal chapters JSON: %v", err))
|
|
return
|
|
}
|
|
log.Info("runner: loaded pre-parsed chapters", "count", len(chapters))
|
|
} else {
|
|
// Legacy path: parse the raw file on the runner.
|
|
if r.deps.BookImport == nil {
|
|
fail("book import not configured (BookImport dependency missing)")
|
|
return
|
|
}
|
|
var err error
|
|
chapters, err = r.deps.BookImport.Import(ctx, objectKey, task.FileType)
|
|
if err != nil {
|
|
fail(fmt.Sprintf("import file: %v", err))
|
|
return
|
|
}
|
|
log.Info("runner: parsed chapters from file (legacy path)", "count", len(chapters))
|
|
}
|
|
|
|
if len(chapters) == 0 {
|
|
fail("no chapters extracted from file")
|
|
return
|
|
}
|
|
|
|
// Persist chapters via ChapterIngester.
|
|
if r.deps.ChapterIngester == nil {
|
|
fail("chapter ingester not configured")
|
|
return
|
|
}
|
|
if err := r.deps.ChapterIngester.IngestChapters(ctx, task.Slug, chapters); err != nil {
|
|
fail(fmt.Sprintf("store chapters: %v", err))
|
|
return
|
|
}
|
|
|
|
// Write book metadata so the book appears in PocketBase catalogue.
|
|
if r.deps.BookWriter != nil {
|
|
meta := domain.BookMeta{
|
|
Slug: task.Slug,
|
|
Title: task.Title,
|
|
Author: task.Author,
|
|
Cover: task.CoverURL,
|
|
Status: task.BookStatus,
|
|
Genres: task.Genres,
|
|
Summary: task.Summary,
|
|
TotalChapters: len(chapters),
|
|
}
|
|
if meta.Status == "" {
|
|
meta.Status = "completed"
|
|
}
|
|
if err := r.deps.BookWriter.WriteMetadata(ctx, meta); err != nil {
|
|
log.Warn("runner: import task WriteMetadata failed (non-fatal)", "err", err)
|
|
} else {
|
|
// Index in Meilisearch so the book is searchable.
|
|
if err := r.deps.SearchIndex.UpsertBook(ctx, meta); err != nil {
|
|
log.Warn("runner: import task meilisearch upsert failed (non-fatal)", "err", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
r.tasksCompleted.Add(1)
|
|
span.SetStatus(codes.Ok, "")
|
|
result := domain.ImportResult{
|
|
Slug: task.Slug,
|
|
ChaptersImported: len(chapters),
|
|
}
|
|
if err := r.deps.Consumer.FinishImportTask(ctx, task.ID, result); err != nil {
|
|
log.Error("runner: FinishImportTask failed", "err", err)
|
|
}
|
|
|
|
// Notify the user who initiated the import.
|
|
if r.deps.Notifier != nil {
|
|
msg := fmt.Sprintf("Import completed: %d chapters from %s", len(chapters), task.Title)
|
|
targetUser := task.InitiatorUserID
|
|
if targetUser == "" {
|
|
targetUser = "admin"
|
|
}
|
|
_ = r.deps.Notifier.CreateNotification(ctx, targetUser, "Import Complete", msg, "/admin/import")
|
|
}
|
|
|
|
log.Info("runner: import task finished", "chapters", len(chapters))
|
|
}
|