Files
libnovel/backend/internal/taskqueue/taskqueue.go
Admin 5825b859b7
All checks were successful
Release / Scraper / Test (push) Successful in 10s
Release / UI / Build (push) Successful in 26s
Release / v2 / Build ui-v2 (push) Successful in 17s
Release / Scraper / Docker (push) Successful in 47s
Release / UI / Docker (push) Successful in 56s
CI / Scraper / Lint (pull_request) Successful in 7s
CI / Scraper / Test (pull_request) Successful in 8s
CI / UI / Build (pull_request) Successful in 16s
CI / Scraper / Docker Push (pull_request) Has been skipped
CI / UI / Docker Push (pull_request) Has been skipped
Release / v2 / Docker / ui-v2 (push) Successful in 56s
Release / v2 / Test backend (push) Successful in 4m35s
iOS CI / Build (pull_request) Successful in 4m28s
Release / v2 / Docker / backend (push) Successful in 1m29s
Release / v2 / Docker / runner (push) Successful in 1m39s
iOS CI / Test (pull_request) Successful in 9m51s
feat: add v2 stack (backend, runner, ui-v2) with release workflow
- backend/: Go API server and runner binaries with PocketBase + MinIO storage
- ui-v2/: SvelteKit frontend rewrite
- docker-compose-new.yml: compose file for the v2 stack
- .gitea/workflows/release-v2.yaml: CI/CD for backend, runner, and ui-v2 Docker Hub images
- scripts/pb-init.sh: migrate from wget to curl, add superuser bootstrap for fresh installs
- .env.example: document DOCKER_BUILDKIT=1 for Colima users
2026-03-15 19:32:40 +05:00

85 lines
3.8 KiB
Go

// Package taskqueue defines the interfaces for creating and consuming
// scrape/audio tasks stored in PocketBase.
//
// Interface segregation:
// - Producer is used only by the backend (creates tasks, cancels tasks).
// - Consumer is used only by the runner (claims tasks, reports results).
// - Reader is used by the backend for status/history endpoints.
//
// Concrete implementations live in internal/storage.
package taskqueue
import (
"context"
"time"
"github.com/libnovel/backend/internal/domain"
)
// Producer is the write side of the task queue used by the backend service.
// It creates new tasks in PocketBase for the runner to pick up.
type Producer interface {
// CreateScrapeTask inserts a new scrape task with status=pending and
// returns the assigned PocketBase record ID.
// kind is one of "catalogue", "book", or "book_range".
// targetURL is the book URL (empty for catalogue-wide tasks).
CreateScrapeTask(ctx context.Context, kind, targetURL string, fromChapter, toChapter int) (string, error)
// CreateAudioTask inserts a new audio task with status=pending and
// returns the assigned PocketBase record ID.
CreateAudioTask(ctx context.Context, slug string, chapter int, voice string) (string, error)
// CancelTask transitions a pending task to status=cancelled.
// Returns ErrNotFound if the task does not exist.
CancelTask(ctx context.Context, id string) error
}
// Consumer is the read/claim side of the task queue used by the runner.
type Consumer interface {
// ClaimNextScrapeTask atomically finds the oldest pending scrape task,
// sets its status=running and worker_id=workerID, and returns it.
// Returns (zero, false, nil) when the queue is empty.
ClaimNextScrapeTask(ctx context.Context, workerID string) (domain.ScrapeTask, bool, error)
// ClaimNextAudioTask atomically finds the oldest pending audio task,
// sets its status=running and worker_id=workerID, and returns it.
// Returns (zero, false, nil) when the queue is empty.
ClaimNextAudioTask(ctx context.Context, workerID string) (domain.AudioTask, bool, error)
// FinishScrapeTask marks a running scrape task as done and records the result.
FinishScrapeTask(ctx context.Context, id string, result domain.ScrapeResult) error
// FinishAudioTask marks a running audio task as done and records the result.
FinishAudioTask(ctx context.Context, id string, result domain.AudioResult) error
// FailTask marks a task (scrape or audio) as failed with an error message.
FailTask(ctx context.Context, id, errMsg string) error
// HeartbeatTask updates the heartbeat_at timestamp on a running task.
// Should be called periodically by the runner while the task is active so
// the reaper knows the task is still alive.
HeartbeatTask(ctx context.Context, id string) error
// ReapStaleTasks finds all running tasks whose heartbeat_at is older than
// staleAfter (or was never set) and resets them to pending so they can be
// re-claimed by a healthy runner. Returns the number of tasks reaped.
ReapStaleTasks(ctx context.Context, staleAfter time.Duration) (int, error)
}
// Reader is the read-only side used by the backend for status pages.
type Reader interface {
// ListScrapeTasks returns all scrape tasks sorted by started descending.
ListScrapeTasks(ctx context.Context) ([]domain.ScrapeTask, error)
// GetScrapeTask returns a single scrape task by ID.
// Returns (zero, false, nil) if not found.
GetScrapeTask(ctx context.Context, id string) (domain.ScrapeTask, bool, error)
// ListAudioTasks returns all audio tasks sorted by started descending.
ListAudioTasks(ctx context.Context) ([]domain.AudioTask, error)
// GetAudioTask returns the most recent audio task for cacheKey.
// Returns (zero, false, nil) if not found.
GetAudioTask(ctx context.Context, cacheKey string) (domain.AudioTask, bool, error)
}