- Import task: persist object_key, author, cover_url, genres, summary, book_status in PocketBase so the runner can fetch the file and write book metadata on completion - Runner poll mode: pass task.ObjectKey instead of empty string - Runner: write BookMeta + UpsertBook in Meilisearch after chapter ingest so imported books appear in catalogue and search - Import UI: add author, cover URL, genres, summary, status fields; add AI tasks panel (chapter names, description, image gen, tagline) after import completes; add AI tasks button on each done task in the list - Admin nav: add Notifications entry to sidebar (all 5 locales) - Logout: delete user_sessions row on sign-out so sessions don't accumulate as phantoms after each login/logout cycle
128 lines
6.1 KiB
Go
128 lines
6.1 KiB
Go
// Package taskqueue defines the interfaces for creating and consuming
|
|
// scrape/audio tasks stored in PocketBase.
|
|
//
|
|
// Interface segregation:
|
|
// - Producer is used only by the backend (creates tasks, cancels tasks).
|
|
// - Consumer is used only by the runner (claims tasks, reports results).
|
|
// - Reader is used by the backend for status/history endpoints.
|
|
//
|
|
// Concrete implementations live in internal/storage.
|
|
package taskqueue
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/libnovel/backend/internal/domain"
|
|
)
|
|
|
|
// Producer is the write side of the task queue used by the backend service.
|
|
// It creates new tasks in PocketBase for the runner to pick up.
|
|
type Producer interface {
|
|
// CreateScrapeTask inserts a new scrape task with status=pending and
|
|
// returns the assigned PocketBase record ID.
|
|
// kind is one of "catalogue", "book", or "book_range".
|
|
// targetURL is the book URL (empty for catalogue-wide tasks).
|
|
CreateScrapeTask(ctx context.Context, kind, targetURL string, fromChapter, toChapter int) (string, error)
|
|
|
|
// CreateAudioTask inserts a new audio task with status=pending and
|
|
// returns the assigned PocketBase record ID.
|
|
CreateAudioTask(ctx context.Context, slug string, chapter int, voice string) (string, error)
|
|
|
|
// CreateTranslationTask inserts a new translation task with status=pending and
|
|
// returns the assigned PocketBase record ID.
|
|
CreateTranslationTask(ctx context.Context, slug string, chapter int, lang string) (string, error)
|
|
|
|
// CreateImportTask inserts a new import task with status=pending and
|
|
// returns the assigned PocketBase record ID.
|
|
// The task struct must have at minimum Slug, Title, FileType, and ObjectKey set.
|
|
CreateImportTask(ctx context.Context, task domain.ImportTask) (string, error)
|
|
|
|
// CancelTask transitions a pending task to status=cancelled.
|
|
// Returns ErrNotFound if the task does not exist.
|
|
CancelTask(ctx context.Context, id string) error
|
|
|
|
// CancelAudioTasksBySlug cancels all pending or running audio tasks for slug.
|
|
// Returns the number of tasks cancelled.
|
|
CancelAudioTasksBySlug(ctx context.Context, slug string) (int, error)
|
|
}
|
|
|
|
// Consumer is the read/claim side of the task queue used by the runner.
|
|
type Consumer interface {
|
|
// ClaimNextScrapeTask atomically finds the oldest pending scrape task,
|
|
// sets its status=running and worker_id=workerID, and returns it.
|
|
// Returns (zero, false, nil) when the queue is empty.
|
|
ClaimNextScrapeTask(ctx context.Context, workerID string) (domain.ScrapeTask, bool, error)
|
|
|
|
// ClaimNextAudioTask atomically finds the oldest pending audio task,
|
|
// sets its status=running and worker_id=workerID, and returns it.
|
|
// Returns (zero, false, nil) when the queue is empty.
|
|
ClaimNextAudioTask(ctx context.Context, workerID string) (domain.AudioTask, bool, error)
|
|
|
|
// ClaimNextTranslationTask atomically finds the oldest pending translation task,
|
|
// sets its status=running and worker_id=workerID, and returns it.
|
|
// Returns (zero, false, nil) when the queue is empty.
|
|
ClaimNextTranslationTask(ctx context.Context, workerID string) (domain.TranslationTask, bool, error)
|
|
|
|
// ClaimNextImportTask atomically finds the oldest pending import task,
|
|
// sets its status=running and worker_id=workerID, and returns it.
|
|
// Returns (zero, false, nil) when the queue is empty.
|
|
ClaimNextImportTask(ctx context.Context, workerID string) (domain.ImportTask, bool, error)
|
|
|
|
// FinishScrapeTask marks a running scrape task as done and records the result.
|
|
FinishScrapeTask(ctx context.Context, id string, result domain.ScrapeResult) error
|
|
|
|
// FinishAudioTask marks a running audio task as done and records the result.
|
|
FinishAudioTask(ctx context.Context, id string, result domain.AudioResult) error
|
|
|
|
// FinishTranslationTask marks a running translation task as done and records the result.
|
|
FinishTranslationTask(ctx context.Context, id string, result domain.TranslationResult) error
|
|
|
|
// FinishImportTask marks a running import task as done and records the result.
|
|
FinishImportTask(ctx context.Context, id string, result domain.ImportResult) error
|
|
|
|
// FailTask marks a task (scrape, audio, or translation) as failed with an error message.
|
|
FailTask(ctx context.Context, id, errMsg string) error
|
|
|
|
// HeartbeatTask updates the heartbeat_at timestamp on a running task.
|
|
// Should be called periodically by the runner while the task is active so
|
|
// the reaper knows the task is still alive.
|
|
HeartbeatTask(ctx context.Context, id string) error
|
|
|
|
// ReapStaleTasks finds all running tasks whose heartbeat_at is older than
|
|
// staleAfter (or was never set) and resets them to pending so they can be
|
|
// re-claimed by a healthy runner. Returns the number of tasks reaped.
|
|
ReapStaleTasks(ctx context.Context, staleAfter time.Duration) (int, error)
|
|
}
|
|
|
|
// Reader is the read-only side used by the backend for status pages.
|
|
type Reader interface {
|
|
// ListScrapeTasks returns all scrape tasks sorted by started descending.
|
|
ListScrapeTasks(ctx context.Context) ([]domain.ScrapeTask, error)
|
|
|
|
// GetScrapeTask returns a single scrape task by ID.
|
|
// Returns (zero, false, nil) if not found.
|
|
GetScrapeTask(ctx context.Context, id string) (domain.ScrapeTask, bool, error)
|
|
|
|
// ListAudioTasks returns all audio tasks sorted by started descending.
|
|
ListAudioTasks(ctx context.Context) ([]domain.AudioTask, error)
|
|
|
|
// GetAudioTask returns the most recent audio task for cacheKey.
|
|
// Returns (zero, false, nil) if not found.
|
|
GetAudioTask(ctx context.Context, cacheKey string) (domain.AudioTask, bool, error)
|
|
|
|
// ListTranslationTasks returns all translation tasks sorted by started descending.
|
|
ListTranslationTasks(ctx context.Context) ([]domain.TranslationTask, error)
|
|
|
|
// GetTranslationTask returns the most recent translation task for cacheKey.
|
|
// Returns (zero, false, nil) if not found.
|
|
GetTranslationTask(ctx context.Context, cacheKey string) (domain.TranslationTask, bool, error)
|
|
|
|
// ListImportTasks returns all import tasks sorted by started descending.
|
|
ListImportTasks(ctx context.Context) ([]domain.ImportTask, error)
|
|
|
|
// GetImportTask returns a single import task by ID.
|
|
// Returns (zero, false, nil) if not found.
|
|
GetImportTask(ctx context.Context, id string) (domain.ImportTask, bool, error)
|
|
}
|