Observable bounded in-process queues for Go services.
chanprobe is a small Go library for instrumenting important async boundaries:
worker pools, event pipelines, webhook delivery, background jobs, rate-limited
integrations, Kafka/NATS consumers, and similar queues.
The goal is not to replace every chan. The goal is to make production queues
visible:
- Is the producer blocked?
- Is the consumer slow?
- How long do items wait in the queue?
- Are we dropping work?
- Which queue introduced latency?
- When did backpressure start?
Use chanprobe when the queue itself is an operational signal.
go get github.com/devflex-pro/chanprobepackage main
import (
"context"
"fmt"
"github.com/devflex-pro/chanprobe"
)
func main() {
ctx := context.Background()
jobs := chanprobe.New[string]("jobs", 1024)
defer jobs.Close()
if err := jobs.Send(ctx, "hello"); err != nil {
panic(err)
}
job, ok := jobs.Recv(ctx)
if !ok {
return
}
fmt.Println(job)
}The default policy is Block: Send waits for capacity and TrySend fails
immediately when the queue is full.
q := chanprobe.New[string](
"webhook_delivery",
10_000,
chanprobe.WithDropPolicy(chanprobe.DropOldest),
)Available policies:
Block: wait for space inSend.DropNewest: reject incoming work when full.DropOldest: evict the oldest queued item and insert the new one.
Every queue exposes a point-in-time snapshot:
snap := jobs.Snapshot()
fmt.Println("queue", snap.Name)
fmt.Println("depth", snap.Len, "of", snap.Cap)
fmt.Println("sent", snap.SentTotal)
fmt.Println("received", snap.ReceivedTotal)
fmt.Println("dropped", snap.DroppedTotal)
fmt.Println("oldest age", snap.OldestItemAge)Snapshot fields:
lencapsent_totalreceived_totaldropped_totalsend_blocked_totalrecv_blocked_totalsend_wait_totalrecv_wait_totalitem_wait_totaloldest_item_ageclosed
dropped_total counts work the queue actually discarded: DropNewest
rejections and DropOldest evictions. A failed TrySend on a full blocking
queue is not counted as dropped work.
Queues register in the default registry unless registration is disabled with
WithRegistry(nil).
chanprobe.PublishExpvar("chanprobe", nil)
http.ListenAndServe(":8080", nil)Then inspect:
curl http://localhost:8080/debug/varsSee examples/http_debug for a runnable example.
chanprobe adds observability and context-aware queue operations. It has
overhead compared to native channels. Use it at important async boundaries where
visibility matters, not as a blanket replacement for every channel.
Run local benchmarks with:
go test -bench=. -benchmem ./...- For tiny internal channels where native channel performance matters most.
- For hot loops that do not need queue metrics.
- When you need a drop-in
chan Treplacement. - When your metrics backend already instruments the queue boundary directly.
- No
unsafein the core package. - No runtime monkey-patching.
- No magical resizing of Go channels.
- No global goroutine scanning.
- No mandatory Prometheus or OpenTelemetry dependency in the core package.
MIT