Files
libnovel/backend/internal/taskqueue/taskqueue.go
Admin a771405db8
Some checks failed
CI / Backend (push) Successful in 48s
CI / UI (push) Successful in 28s
Release / Check ui (push) Successful in 39s
Release / Test backend (push) Successful in 49s
Release / Docker / caddy (push) Successful in 59s
CI / Backend (pull_request) Successful in 41s
CI / UI (pull_request) Successful in 46s
Release / Docker / ui (push) Successful in 1m31s
Release / Docker / backend (push) Successful in 3m27s
Release / Docker / runner (push) Successful in 3m47s
Release / Gitea Release (push) Failing after 32s
feat(audio): WAV streaming, bulk audio generation admin endpoints, cancel/resume
- Add StreamAudioWAV() to pocket-tts and Kokoro clients; pocket-tts streams
  raw WAV directly (no ffmpeg), Kokoro requests response_format:wav with stream:true
- GET /api/audio-stream supports ?format=wav for lower-latency first-byte delivery;
  WAV cached separately in MinIO as {slug}/{n}/{voice}.wav
- Add GET /api/admin/audio/jobs with optional ?slug filter
- Add POST /api/admin/audio/bulk {slug, voice, from, to, skip_existing, force}
  where skip_existing=true (default) resumes interrupted bulk jobs
- Add POST /api/admin/audio/cancel-bulk {slug} to cancel all pending/running tasks
- Add CancelAudioTasksBySlug to taskqueue.Producer + asynqqueue implementation
- Add AudioObjectKeyExt to bookstore.AudioStore for format-aware MinIO keys

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-02 16:19:14 +05:00

108 lines
5.0 KiB
Go

// Package taskqueue defines the interfaces for creating and consuming
// scrape/audio tasks stored in PocketBase.
//
// Interface segregation:
// - Producer is used only by the backend (creates tasks, cancels tasks).
// - Consumer is used only by the runner (claims tasks, reports results).
// - Reader is used by the backend for status/history endpoints.
//
// Concrete implementations live in internal/storage.
package taskqueue
import (
"context"
"time"
"github.com/libnovel/backend/internal/domain"
)
// Producer is the write side of the task queue used by the backend service.
// It creates new tasks in PocketBase for the runner to pick up.
type Producer interface {
// CreateScrapeTask inserts a new scrape task with status=pending and
// returns the assigned PocketBase record ID.
// kind is one of "catalogue", "book", or "book_range".
// targetURL is the book URL (empty for catalogue-wide tasks).
CreateScrapeTask(ctx context.Context, kind, targetURL string, fromChapter, toChapter int) (string, error)
// CreateAudioTask inserts a new audio task with status=pending and
// returns the assigned PocketBase record ID.
CreateAudioTask(ctx context.Context, slug string, chapter int, voice string) (string, error)
// CreateTranslationTask inserts a new translation task with status=pending and
// returns the assigned PocketBase record ID.
CreateTranslationTask(ctx context.Context, slug string, chapter int, lang string) (string, error)
// CancelTask transitions a pending task to status=cancelled.
// Returns ErrNotFound if the task does not exist.
CancelTask(ctx context.Context, id string) error
// CancelAudioTasksBySlug cancels all pending or running audio tasks for slug.
// Returns the number of tasks cancelled.
CancelAudioTasksBySlug(ctx context.Context, slug string) (int, error)
}
// Consumer is the read/claim side of the task queue used by the runner.
type Consumer interface {
// ClaimNextScrapeTask atomically finds the oldest pending scrape task,
// sets its status=running and worker_id=workerID, and returns it.
// Returns (zero, false, nil) when the queue is empty.
ClaimNextScrapeTask(ctx context.Context, workerID string) (domain.ScrapeTask, bool, error)
// ClaimNextAudioTask atomically finds the oldest pending audio task,
// sets its status=running and worker_id=workerID, and returns it.
// Returns (zero, false, nil) when the queue is empty.
ClaimNextAudioTask(ctx context.Context, workerID string) (domain.AudioTask, bool, error)
// ClaimNextTranslationTask atomically finds the oldest pending translation task,
// sets its status=running and worker_id=workerID, and returns it.
// Returns (zero, false, nil) when the queue is empty.
ClaimNextTranslationTask(ctx context.Context, workerID string) (domain.TranslationTask, bool, error)
// FinishScrapeTask marks a running scrape task as done and records the result.
FinishScrapeTask(ctx context.Context, id string, result domain.ScrapeResult) error
// FinishAudioTask marks a running audio task as done and records the result.
FinishAudioTask(ctx context.Context, id string, result domain.AudioResult) error
// FinishTranslationTask marks a running translation task as done and records the result.
FinishTranslationTask(ctx context.Context, id string, result domain.TranslationResult) error
// FailTask marks a task (scrape, audio, or translation) as failed with an error message.
FailTask(ctx context.Context, id, errMsg string) error
// HeartbeatTask updates the heartbeat_at timestamp on a running task.
// Should be called periodically by the runner while the task is active so
// the reaper knows the task is still alive.
HeartbeatTask(ctx context.Context, id string) error
// ReapStaleTasks finds all running tasks whose heartbeat_at is older than
// staleAfter (or was never set) and resets them to pending so they can be
// re-claimed by a healthy runner. Returns the number of tasks reaped.
ReapStaleTasks(ctx context.Context, staleAfter time.Duration) (int, error)
}
// Reader is the read-only side used by the backend for status pages.
type Reader interface {
// ListScrapeTasks returns all scrape tasks sorted by started descending.
ListScrapeTasks(ctx context.Context) ([]domain.ScrapeTask, error)
// GetScrapeTask returns a single scrape task by ID.
// Returns (zero, false, nil) if not found.
GetScrapeTask(ctx context.Context, id string) (domain.ScrapeTask, bool, error)
// ListAudioTasks returns all audio tasks sorted by started descending.
ListAudioTasks(ctx context.Context) ([]domain.AudioTask, error)
// GetAudioTask returns the most recent audio task for cacheKey.
// Returns (zero, false, nil) if not found.
GetAudioTask(ctx context.Context, cacheKey string) (domain.AudioTask, bool, error)
// ListTranslationTasks returns all translation tasks sorted by started descending.
ListTranslationTasks(ctx context.Context) ([]domain.TranslationTask, error)
// GetTranslationTask returns the most recent translation task for cacheKey.
// Returns (zero, false, nil) if not found.
GetTranslationTask(ctx context.Context, cacheKey string) (domain.TranslationTask, bool, error)
}