Files
libnovel/backend/internal/asynqqueue/consumer.go
Admin 809dc8d898
All checks were successful
Release / Check ui (push) Successful in 27s
Release / Test backend (push) Successful in 43s
Release / Docker / caddy (push) Successful in 1m3s
Release / Docker / ui (push) Successful in 1m58s
Release / Docker / runner (push) Successful in 3m23s
Release / Docker / backend (push) Successful in 4m26s
Release / Gitea Release (push) Successful in 13s
fix: make asynq consumer actually claim and heartbeat translation tasks
ClaimNextTranslationTask and HeartbeatTask were no-ops in the asynq
Consumer, so translation tasks created in PocketBase were never picked
up by the runner. Translation tasks live in PocketBase (not Redis),
so they must be claimed/heartbeated via the underlying pb consumer.
ReapStaleTasks is also delegated so stale translation tasks get reset.

Also removes the LibreTranslate healthcheck from homelab/runner
docker-compose.yml and relaxes depends_on to service_started — the
healthcheck was blocking runner startup until models loaded (~2 min)
and the models are already pre-downloaded in the volume.
2026-04-02 21:16:48 +05:00

72 lines
2.7 KiB
Go

package asynqqueue
import (
"context"
"time"
"github.com/libnovel/backend/internal/domain"
"github.com/libnovel/backend/internal/taskqueue"
)
// Consumer wraps the PocketBase-backed Consumer for result write-back only.
//
// When using Asynq, the runner no longer polls for scrape/audio work — Asynq
// delivers those tasks via the ServeMux handlers. However translation tasks
// live in PocketBase (not Redis), so ClaimNextTranslationTask and HeartbeatTask
// still delegate to the underlying PocketBase consumer.
//
// ClaimNextAudioTask, ClaimNextScrapeTask are no-ops here because Asynq owns
// those responsibilities.
type Consumer struct {
pb taskqueue.Consumer // underlying PocketBase consumer (for write-back)
}
// NewConsumer wraps an existing PocketBase Consumer.
func NewConsumer(pb taskqueue.Consumer) *Consumer {
return &Consumer{pb: pb}
}
// ── Write-back (delegated to PocketBase) ──────────────────────────────────────
func (c *Consumer) FinishScrapeTask(ctx context.Context, id string, result domain.ScrapeResult) error {
return c.pb.FinishScrapeTask(ctx, id, result)
}
func (c *Consumer) FinishAudioTask(ctx context.Context, id string, result domain.AudioResult) error {
return c.pb.FinishAudioTask(ctx, id, result)
}
func (c *Consumer) FinishTranslationTask(ctx context.Context, id string, result domain.TranslationResult) error {
return c.pb.FinishTranslationTask(ctx, id, result)
}
func (c *Consumer) FailTask(ctx context.Context, id, errMsg string) error {
return c.pb.FailTask(ctx, id, errMsg)
}
// ── No-ops (Asynq owns claiming / heartbeating / reaping) ───────────────────
func (c *Consumer) ClaimNextScrapeTask(_ context.Context, _ string) (domain.ScrapeTask, bool, error) {
return domain.ScrapeTask{}, false, nil
}
func (c *Consumer) ClaimNextAudioTask(_ context.Context, _ string) (domain.AudioTask, bool, error) {
return domain.AudioTask{}, false, nil
}
// ClaimNextTranslationTask delegates to PocketBase because translation tasks
// are stored in PocketBase (not Redis/Asynq) and must still be polled directly.
func (c *Consumer) ClaimNextTranslationTask(ctx context.Context, workerID string) (domain.TranslationTask, bool, error) {
return c.pb.ClaimNextTranslationTask(ctx, workerID)
}
func (c *Consumer) HeartbeatTask(ctx context.Context, id string) error {
return c.pb.HeartbeatTask(ctx, id)
}
// ReapStaleTasks delegates to PocketBase so stale translation tasks are reset
// to pending and can be reclaimed.
func (c *Consumer) ReapStaleTasks(ctx context.Context, staleAfter time.Duration) (int, error) {
return c.pb.ReapStaleTasks(ctx, staleAfter)
}