Compare commits

..

1 Commits

Author SHA1 Message Date
Admin
7e99fc6d70 fix(runner): fix audio task infinite loop and semaphore race
Some checks failed
Release / Check ui (push) Successful in 22s
Release / Test backend (push) Successful in 33s
Release / Docker / backend (push) Failing after 30s
Release / Docker / caddy (push) Successful in 1m9s
Release / Docker / ui (push) Successful in 1m34s
Release / Docker / runner (push) Failing after 1m15s
Release / Gitea Release (push) Has been skipped
Two bugs caused audio tasks to loop endlessly:

1. claimRecord never set heartbeat_at — newly claimed tasks had
   heartbeat_at=null, which matched the reaper's stale filter
   (heartbeat_at=null || heartbeat_at<threshold). Tasks were reaped
   and reset to pending within seconds of being claimed, before the
   30s heartbeat goroutine had a chance to write a timestamp.
   Fix: set heartbeat_at=now() in claimRecord alongside status=running.

2. Audio semaphore was checked AFTER claiming the task. When the
   semaphore was full the select/break only broke the inner select,
   not the for loop — the code fell through and launched an uncapped
   goroutine that blocked forever on <-audioSem drain. The task also
   stayed status=running with no heartbeat, feeding bug #1.
   Fix: pre-acquire a semaphore slot BEFORE claiming the task; release
   it immediately if the queue is empty or claim fails.
2026-03-25 15:09:52 +05:00
2 changed files with 17 additions and 9 deletions

View File

@@ -248,23 +248,30 @@ func (r *Runner) poll(ctx context.Context, scrapeSem, audioSem chan struct{}, wg
}
// ── Audio tasks ───────────────────────────────────────────────────────
// Only claim tasks when there is a free slot in the semaphore.
// This avoids the old bug where we claimed (status→running) a task and
// then couldn't dispatch it, leaving it orphaned until the reaper fired.
audioLoop:
for {
if ctx.Err() != nil {
return
}
// Check capacity before claiming to avoid orphaning tasks.
select {
case audioSem <- struct{}{}:
// Slot acquired — proceed to claim a task.
default:
// All slots busy; leave remaining pending tasks for next tick.
break audioLoop
}
task, ok, err := r.deps.Consumer.ClaimNextAudioTask(ctx, r.cfg.WorkerID)
if err != nil {
<-audioSem // release the pre-acquired slot
r.deps.Log.Error("runner: ClaimNextAudioTask failed", "err", err)
break
}
if !ok {
break
}
select {
case audioSem <- struct{}{}:
default:
r.deps.Log.Warn("runner: audio semaphore full, will retry next tick",
"task_id", task.ID)
<-audioSem // release the pre-acquired slot; queue empty
break
}
r.tasksRunning.Add(1)

View File

@@ -247,8 +247,9 @@ func (c *pbClient) claimRecord(ctx context.Context, collection, workerID string,
}
claim := map[string]any{
"status": string(domain.TaskStatusRunning),
"worker_id": workerID,
"status": string(domain.TaskStatusRunning),
"worker_id": workerID,
"heartbeat_at": time.Now().UTC().Format(time.RFC3339),
}
for k, v := range extraClaim {
claim[k] = v