Skip to content

Commit e0938ad

Browse files
committed
Initial commit
0 parents  commit e0938ad

File tree

5 files changed

+323
-0
lines changed

5 files changed

+323
-0
lines changed

LICENSE

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
MIT License
2+
3+
Copyright (c) 2023 pkg
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in all
13+
copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
SOFTWARE.

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Group
2+
3+
A group provides a way manage the lifetime of a group of related goroutines.

example_test.go

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
package group_test
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"net"
8+
"net/http"
9+
"os"
10+
"os/signal"
11+
"time"
12+
13+
"github.com/pkg/group"
14+
)
15+
16+
type Group = group.G
17+
18+
func ExampleGroup_Wait() {
19+
// A Group's zero value is ready to use.
20+
var g group.G
21+
22+
// Add a goroutine to the group.
23+
g.Add(func(c context.Context) error {
24+
select {
25+
case <-c.Done():
26+
return c.Err()
27+
case <-time.After(1 * time.Second):
28+
return errors.New("timed out")
29+
}
30+
})
31+
32+
// Wait for all goroutines to finish.
33+
if err := g.Wait(); err != nil {
34+
fmt.Println(err)
35+
}
36+
37+
// Output: timed out
38+
}
39+
40+
func ExampleGroup_Wait_with_startup_error() {
41+
// A Group's zero value is ready to use.
42+
var g group.G
43+
44+
// Add a goroutine to the group.
45+
g.Add(func(_ context.Context) error {
46+
return errors.New("startup error")
47+
})
48+
49+
// Wait for all goroutines to finish, in this case it will return the startup error.
50+
if err := g.Wait(); err != nil {
51+
fmt.Println(err)
52+
}
53+
54+
// Output: startup error
55+
}
56+
57+
func ExampleGroup_Wait_with_panic() {
58+
// A Group's zero value is ready to use.
59+
var g group.G
60+
61+
// Add a goroutine to the group.
62+
g.Add(func(c context.Context) error {
63+
panic("boom")
64+
})
65+
66+
// Wait for all goroutines to finish.
67+
if err := g.Wait(); err != nil {
68+
fmt.Println(err)
69+
}
70+
71+
// Output: panic: boom
72+
}
73+
74+
func ExampleGroup_Wait_with_shutdown() {
75+
// A Group's zero value is ready to use.
76+
var g group.G
77+
78+
shutdown := make(chan struct{})
79+
80+
// Add a goroutine to the group.
81+
g.Add(func(c context.Context) error {
82+
select {
83+
case <-c.Done():
84+
return errors.New("stopped")
85+
case <-shutdown:
86+
return errors.New("shutdown")
87+
}
88+
})
89+
90+
time.AfterFunc(100*time.Millisecond, func() {
91+
close(shutdown)
92+
})
93+
94+
// Wait for all goroutines to finish.
95+
if err := g.Wait(); err != nil {
96+
fmt.Println(err)
97+
}
98+
99+
// Output: shutdown
100+
}
101+
102+
func ExampleGroup_Wait_with_context_cancel() {
103+
ctx := context.Background()
104+
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
105+
106+
// pass WithContext option to New to use the provided context.
107+
g := group.New(group.WithContext(ctx))
108+
109+
// Add a goroutine to the group.
110+
g.Add(func(c context.Context) error {
111+
select {
112+
case <-c.Done():
113+
return c.Err()
114+
}
115+
})
116+
117+
// Cancel the context.
118+
cancel()
119+
120+
// Wait for all goroutines to finish.
121+
if err := g.Wait(); err != nil {
122+
fmt.Println(err)
123+
}
124+
125+
// Output: context canceled
126+
}
127+
128+
func ExampleGroup_Wait_with_signal() {
129+
ctx := context.Background()
130+
ctx, _ = signal.NotifyContext(ctx, os.Interrupt)
131+
132+
g := group.New(group.WithContext(ctx))
133+
134+
g.Add(MainHTTPServer)
135+
g.Add(DebugHTTPServer)
136+
g.Add(AsyncLogger)
137+
138+
<-time.After(100 * time.Millisecond)
139+
140+
// simulate ^C
141+
proc, _ := os.FindProcess(os.Getpid())
142+
proc.Signal(os.Interrupt)
143+
144+
if err := g.Wait(); err != nil {
145+
fmt.Println(err)
146+
}
147+
148+
// Unordered output:
149+
// async logger started
150+
// debug http server started
151+
// main http server started
152+
// async logger stopped
153+
// main http server stopped
154+
// debug http server stopped
155+
// context canceled
156+
}
157+
158+
func ExampleGroup_Wait_with_http_shutdown() {
159+
ctx := context.Background()
160+
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
161+
defer cancel()
162+
163+
g := group.New(group.WithContext(ctx))
164+
165+
g.Add(func(ctx context.Context) error {
166+
l, err := net.Listen("tcp", "127.0.0.1:0")
167+
if err != nil {
168+
return err
169+
}
170+
svr := http.Server{
171+
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
172+
fmt.Fprintln(w, "hello, world!")
173+
})}
174+
go func() {
175+
svr.Serve(l)
176+
}()
177+
178+
<-ctx.Done() // wait for group to stop
179+
180+
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) // five seconds graceful timeout
181+
defer cancel()
182+
return svr.Shutdown(shutdownCtx)
183+
})
184+
185+
if err := g.Wait(); err != nil {
186+
fmt.Println(err)
187+
}
188+
189+
// Output:
190+
}
191+
func MainHTTPServer(ctx context.Context) error {
192+
fmt.Println("main http server started")
193+
defer fmt.Println("main http server stopped")
194+
<-ctx.Done()
195+
return ctx.Err()
196+
}
197+
198+
func DebugHTTPServer(ctx context.Context) error {
199+
fmt.Println("debug http server started")
200+
defer fmt.Println("debug http server stopped")
201+
<-ctx.Done()
202+
return ctx.Err()
203+
}
204+
205+
func AsyncLogger(ctx context.Context) error {
206+
fmt.Println("async logger started")
207+
defer fmt.Println("async logger stopped")
208+
<-ctx.Done()
209+
return ctx.Err()
210+
}

go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
module github.com/pkg/group
2+
3+
go 1.23

group.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// package group provides a way to manage the lifecycle of a group of goroutines.
2+
package group
3+
4+
import (
5+
"context"
6+
"fmt"
7+
"sync"
8+
)
9+
10+
// G manages the lifetime of a set of goroutines from a common context.
11+
// The first goroutine in the group to return will cause the context to be canceled,
12+
// terminating the remaining goroutines.
13+
type G struct {
14+
// ctx is the context passed to all goroutines in the group.
15+
ctx context.Context
16+
cancel context.CancelFunc
17+
done sync.WaitGroup
18+
19+
initOnce sync.Once
20+
21+
errOnce sync.Once
22+
err error
23+
}
24+
25+
type Option func(*G)
26+
27+
// WithContext uses the provided context for the group.
28+
func WithContext(ctx context.Context) Option {
29+
return func(g *G) {
30+
g.ctx = ctx
31+
}
32+
}
33+
34+
// New creates a new group.
35+
func New(opts ...Option) *G {
36+
g := new(G)
37+
for _, opt := range opts {
38+
opt(g)
39+
}
40+
return g
41+
}
42+
43+
// init initializes the group.
44+
func (g *G) init() {
45+
if g.ctx == nil {
46+
g.ctx = context.Background()
47+
}
48+
g.ctx, g.cancel = context.WithCancel(g.ctx)
49+
}
50+
51+
// add adds a new goroutine to the group. The goroutine should exit when the context
52+
// passed to it is canceled.
53+
func (g *G) Add(fn func(context.Context) error) {
54+
g.initOnce.Do(g.init)
55+
g.done.Add(1)
56+
go func() {
57+
defer g.done.Done()
58+
defer g.cancel()
59+
defer func() {
60+
if r := recover(); r != nil {
61+
g.errOnce.Do(func() {
62+
if err, ok := r.(error); ok {
63+
g.err = err
64+
} else {
65+
g.err = fmt.Errorf("panic: %v", r)
66+
}
67+
})
68+
}
69+
}()
70+
if err := fn(g.ctx); err != nil {
71+
g.errOnce.Do(func() { g.err = err })
72+
}
73+
}()
74+
}
75+
76+
// wait waits for all goroutines in the group to exit. If any of the goroutines
77+
// fail with an error, wait will return the first error.
78+
// Wait waits for all goroutines in the group to exit.
79+
// If any of the goroutines fail with an error, Wait will return the first error.
80+
func (g *G) Wait() error {
81+
g.done.Wait()
82+
g.errOnce.Do(func() {
83+
// noop, required to synchronise on the errOnce mutex.
84+
})
85+
return g.err
86+
}

0 commit comments

Comments
 (0)