Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7e99fc6d70 |
@@ -248,23 +248,30 @@ func (r *Runner) poll(ctx context.Context, scrapeSem, audioSem chan struct{}, wg
|
||||
}
|
||||
|
||||
// ── Audio tasks ───────────────────────────────────────────────────────
|
||||
// Only claim tasks when there is a free slot in the semaphore.
|
||||
// This avoids the old bug where we claimed (status→running) a task and
|
||||
// then couldn't dispatch it, leaving it orphaned until the reaper fired.
|
||||
audioLoop:
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
// Check capacity before claiming to avoid orphaning tasks.
|
||||
select {
|
||||
case audioSem <- struct{}{}:
|
||||
// Slot acquired — proceed to claim a task.
|
||||
default:
|
||||
// All slots busy; leave remaining pending tasks for next tick.
|
||||
break audioLoop
|
||||
}
|
||||
task, ok, err := r.deps.Consumer.ClaimNextAudioTask(ctx, r.cfg.WorkerID)
|
||||
if err != nil {
|
||||
<-audioSem // release the pre-acquired slot
|
||||
r.deps.Log.Error("runner: ClaimNextAudioTask failed", "err", err)
|
||||
break
|
||||
}
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
select {
|
||||
case audioSem <- struct{}{}:
|
||||
default:
|
||||
r.deps.Log.Warn("runner: audio semaphore full, will retry next tick",
|
||||
"task_id", task.ID)
|
||||
<-audioSem // release the pre-acquired slot; queue empty
|
||||
break
|
||||
}
|
||||
r.tasksRunning.Add(1)
|
||||
|
||||
@@ -247,8 +247,9 @@ func (c *pbClient) claimRecord(ctx context.Context, collection, workerID string,
|
||||
}
|
||||
|
||||
claim := map[string]any{
|
||||
"status": string(domain.TaskStatusRunning),
|
||||
"worker_id": workerID,
|
||||
"status": string(domain.TaskStatusRunning),
|
||||
"worker_id": workerID,
|
||||
"heartbeat_at": time.Now().UTC().Format(time.RFC3339),
|
||||
}
|
||||
for k, v := range extraClaim {
|
||||
claim[k] = v
|
||||
|
||||
Reference in New Issue
Block a user