Some checks failed
CI / Docker / caddy (pull_request) Failing after 50s
CI / Check ui (pull_request) Successful in 1m5s
Release / Check ui (push) Successful in 45s
Release / Test backend (push) Successful in 1m29s
CI / Docker / ui (pull_request) Successful in 1m28s
Release / Docker / backend (push) Successful in 3m14s
CI / Test backend (pull_request) Failing after 42s
CI / Docker / backend (pull_request) Has been skipped
CI / Docker / runner (pull_request) Has been skipped
Release / Docker / caddy (push) Successful in 6m48s
Release / Docker / ui (push) Successful in 2m8s
Release / Docker / runner (push) Successful in 2m51s
Release / Upload source maps (push) Failing after 53s
Release / Gitea Release (push) Has been skipped
Introduce a Redis-backed Asynq task queue so the runner consumes TTS
jobs pushed by the backend instead of polling PocketBase.
- backend/internal/asynqqueue: Producer and Consumer wrappers
- backend/internal/runner: AsynqRunner mux, per-instance Prometheus
registry (fixes duplicate-collector panic in tests), redisConnOpt
- backend/internal/config: REDIS_ADDR / REDIS_PASSWORD env vars
- backend/cmd/{backend,runner}/main.go: wire Redis when env set; fall
back to legacy poll mode when unset
- Caddyfile: caddy-l4 TCP proxy for redis.libnovel.cc:6380 → homelab
- caddy/Dockerfile: add --with github.com/mholt/caddy-l4
- docker-compose.yml: Caddy exposes 6380, backend/runner get Redis env
- homelab/runner/docker-compose.yml: Redis sidecar, runner depends_on
- homelab/otel/grafana: Grafana dashboards (backend, catalogue, runner)
and alerting rules / contact-points provisioning
114 lines
3.5 KiB
Go
114 lines
3.5 KiB
Go
package runner
|
|
|
|
// metrics.go — Prometheus metrics HTTP endpoint for the runner.
|
|
//
|
|
// GET /metrics returns a Prometheus text/plain scrape response.
|
|
// Exposes:
|
|
// - Standard Go runtime metrics (via promhttp)
|
|
// - Runner task counters (tasks_running, tasks_completed, tasks_failed)
|
|
// - Asynq queue metrics (registered in asynq_runner.go when Redis is enabled)
|
|
//
|
|
// GET /health — simple liveness probe.
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"net"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
)
|
|
|
|
// metricsServer serves GET /metrics and GET /health for the runner process.
|
|
type metricsServer struct {
|
|
addr string
|
|
r *Runner
|
|
log *slog.Logger
|
|
}
|
|
|
|
func newMetricsServer(addr string, r *Runner, log *slog.Logger) *metricsServer {
|
|
ms := &metricsServer{addr: addr, r: r, log: log}
|
|
ms.registerCollectors()
|
|
return ms
|
|
}
|
|
|
|
// registerCollectors registers runner-specific Prometheus collectors.
|
|
// Called once at construction; Asynq queue collector is registered separately
|
|
// in asynq_runner.go after the Redis connection is established.
|
|
func (ms *metricsServer) registerCollectors() {
|
|
// Runner task gauges / counters backed by the atomic fields on Runner.
|
|
ms.r.metricsRegistry.MustRegister(prometheus.NewGaugeFunc(
|
|
prometheus.GaugeOpts{
|
|
Namespace: "runner",
|
|
Name: "tasks_running",
|
|
Help: "Number of tasks currently being processed.",
|
|
},
|
|
func() float64 { return float64(ms.r.tasksRunning.Load()) },
|
|
))
|
|
ms.r.metricsRegistry.MustRegister(prometheus.NewCounterFunc(
|
|
prometheus.CounterOpts{
|
|
Namespace: "runner",
|
|
Name: "tasks_completed_total",
|
|
Help: "Total number of tasks completed successfully since startup.",
|
|
},
|
|
func() float64 { return float64(ms.r.tasksCompleted.Load()) },
|
|
))
|
|
ms.r.metricsRegistry.MustRegister(prometheus.NewCounterFunc(
|
|
prometheus.CounterOpts{
|
|
Namespace: "runner",
|
|
Name: "tasks_failed_total",
|
|
Help: "Total number of tasks that ended in failure since startup.",
|
|
},
|
|
func() float64 { return float64(ms.r.tasksFailed.Load()) },
|
|
))
|
|
ms.r.metricsRegistry.MustRegister(prometheus.NewGaugeFunc(
|
|
prometheus.GaugeOpts{
|
|
Namespace: "runner",
|
|
Name: "uptime_seconds",
|
|
Help: "Seconds since the runner process started.",
|
|
},
|
|
func() float64 { return time.Since(ms.r.startedAt).Seconds() },
|
|
))
|
|
}
|
|
|
|
// ListenAndServe starts the HTTP server and blocks until ctx is cancelled or
|
|
// a fatal listen error occurs.
|
|
func (ms *metricsServer) ListenAndServe(ctx context.Context) error {
|
|
mux := http.NewServeMux()
|
|
mux.Handle("GET /metrics", promhttp.HandlerFor(ms.r.metricsRegistry, promhttp.HandlerOpts{}))
|
|
mux.HandleFunc("GET /health", ms.handleHealth)
|
|
|
|
srv := &http.Server{
|
|
Addr: ms.addr,
|
|
Handler: mux,
|
|
ReadTimeout: 5 * time.Second,
|
|
WriteTimeout: 10 * time.Second,
|
|
BaseContext: func(_ net.Listener) context.Context { return ctx },
|
|
}
|
|
|
|
errCh := make(chan error, 1)
|
|
go func() {
|
|
ms.log.Info("runner: metrics server listening", "addr", ms.addr)
|
|
errCh <- srv.ListenAndServe()
|
|
}()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
shutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
_ = srv.Shutdown(shutCtx)
|
|
return nil
|
|
case err := <-errCh:
|
|
return fmt.Errorf("runner: metrics server: %w", err)
|
|
}
|
|
}
|
|
|
|
// handleHealth handles GET /health — simple liveness probe.
|
|
func (ms *metricsServer) handleHealth(w http.ResponseWriter, _ *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_, _ = w.Write([]byte(`{"status":"ok"}`))
|
|
}
|