@@ -9,10 +9,12 @@ import (
99 "time"
1010
1111 "github.com/stretchr/testify/assert"
12+ "go.uber.org/goleak"
1213 "golang.org/x/sync/errgroup"
1314)
1415
1516func Test_Connectable_IterableChannel_Single (t * testing.T ) {
17+ defer goleak .VerifyNone (t )
1618 ch := make (chan Item , 10 )
1719 go func () {
1820 ch <- Of (1 )
@@ -27,6 +29,7 @@ func Test_Connectable_IterableChannel_Single(t *testing.T) {
2729}
2830
2931func Test_Connectable_IterableChannel_Composed (t * testing.T ) {
32+ defer goleak .VerifyNone (t )
3033 ch := make (chan Item , 10 )
3134 go func () {
3235 ch <- Of (1 )
@@ -41,6 +44,7 @@ func Test_Connectable_IterableChannel_Composed(t *testing.T) {
4144}
4245
4346func Test_Connectable_IterableChannel_Disposed (t * testing.T ) {
47+ defer goleak .VerifyNone (t )
4448 ch := make (chan Item , 10 )
4549 go func () {
4650 ch <- Of (1 )
@@ -51,15 +55,16 @@ func Test_Connectable_IterableChannel_Disposed(t *testing.T) {
5155 obs := & ObservableImpl {
5256 iterable : newChannelIterable (ch , WithPublishStrategy ()),
5357 }
54- _ , disposable := obs .Connect ()
55- disposable ()
5658 ctx , cancel := context .WithTimeout (context .Background (), 50 * time .Millisecond )
5759 defer cancel ()
60+ _ , disposable := obs .Connect (ctx )
61+ disposable ()
5862 time .Sleep (50 * time .Millisecond )
5963 Assert (ctx , t , obs , IsEmpty ())
6064}
6165
6266func Test_Connectable_IterableChannel_WithoutConnect (t * testing.T ) {
67+ defer goleak .VerifyNone (t )
6368 ch := make (chan Item , 10 )
6469 go func () {
6570 ch <- Of (1 )
@@ -74,6 +79,7 @@ func Test_Connectable_IterableChannel_WithoutConnect(t *testing.T) {
7479}
7580
7681func Test_Connectable_IterableCreate_Single (t * testing.T ) {
82+ defer goleak .VerifyNone (t )
7783 ctx , cancel := context .WithCancel (context .Background ())
7884 defer cancel ()
7985 obs := & ObservableImpl {
@@ -88,6 +94,7 @@ func Test_Connectable_IterableCreate_Single(t *testing.T) {
8894}
8995
9096func Test_Connectable_IterableCreate_Composed (t * testing.T ) {
97+ defer goleak .VerifyNone (t )
9198 ctx , cancel := context .WithCancel (context .Background ())
9299 defer cancel ()
93100 obs := & ObservableImpl {
@@ -102,6 +109,7 @@ func Test_Connectable_IterableCreate_Composed(t *testing.T) {
102109}
103110
104111func Test_Connectable_IterableCreate_Disposed (t * testing.T ) {
112+ defer goleak .VerifyNone (t )
105113 ctx , cancel := context .WithCancel (context .Background ())
106114 defer cancel ()
107115 obs := & ObservableImpl {
@@ -112,14 +120,15 @@ func Test_Connectable_IterableCreate_Disposed(t *testing.T) {
112120 cancel ()
113121 }}, WithPublishStrategy (), WithContext (ctx )),
114122 }
115- obs .Connect ()
123+ obs .Connect (ctx )
116124 _ , cancel2 := context .WithTimeout (context .Background (), 550 * time .Millisecond )
117125 defer cancel2 ()
118126 time .Sleep (50 * time .Millisecond )
119127 Assert (ctx , t , obs , IsEmpty ())
120128}
121129
122130func Test_Connectable_IterableCreate_WithoutConnect (t * testing.T ) {
131+ defer goleak .VerifyNone (t )
123132 ctx , cancel := context .WithCancel (context .Background ())
124133 defer cancel ()
125134 obs := & ObservableImpl {
@@ -128,12 +137,13 @@ func Test_Connectable_IterableCreate_WithoutConnect(t *testing.T) {
128137 ch <- Of (2 )
129138 ch <- Of (3 )
130139 cancel ()
131- }}, WithPublishStrategy (), WithContext (ctx )),
140+ }}, WithBufferedChannel ( 3 ), WithPublishStrategy (), WithContext (ctx )),
132141 }
133142 testConnectableWithoutConnect (t , obs )
134143}
135144
136145func Test_Connectable_IterableDefer_Single (t * testing.T ) {
146+ defer goleak .VerifyNone (t )
137147 ctx , cancel := context .WithCancel (context .Background ())
138148 defer cancel ()
139149 obs := & ObservableImpl {
@@ -142,12 +152,13 @@ func Test_Connectable_IterableDefer_Single(t *testing.T) {
142152 ch <- Of (2 )
143153 ch <- Of (3 )
144154 cancel ()
145- }}, WithPublishStrategy (), WithContext (ctx )),
155+ }}, WithBufferedChannel ( 3 ), WithPublishStrategy (), WithContext (ctx )),
146156 }
147157 testConnectableSingle (t , obs )
148158}
149159
150160func Test_Connectable_IterableDefer_Composed (t * testing.T ) {
161+ defer goleak .VerifyNone (t )
151162 ctx , cancel := context .WithCancel (context .Background ())
152163 defer cancel ()
153164 obs := & ObservableImpl {
@@ -156,12 +167,13 @@ func Test_Connectable_IterableDefer_Composed(t *testing.T) {
156167 ch <- Of (2 )
157168 ch <- Of (3 )
158169 cancel ()
159- }}, WithPublishStrategy (), WithContext (ctx )),
170+ }}, WithBufferedChannel ( 3 ), WithPublishStrategy (), WithContext (ctx )),
160171 }
161172 testConnectableComposed (t , obs )
162173}
163174
164175func Test_Connectable_IterableJust_Single (t * testing.T ) {
176+ defer goleak .VerifyNone (t )
165177 ctx , cancel := context .WithCancel (context .Background ())
166178 defer cancel ()
167179 obs := & ObservableImpl {
@@ -171,6 +183,7 @@ func Test_Connectable_IterableJust_Single(t *testing.T) {
171183}
172184
173185func Test_Connectable_IterableJust_Composed (t * testing.T ) {
186+ defer goleak .VerifyNone (t )
174187 ctx , cancel := context .WithCancel (context .Background ())
175188 defer cancel ()
176189 obs := & ObservableImpl {
@@ -180,6 +193,7 @@ func Test_Connectable_IterableJust_Composed(t *testing.T) {
180193}
181194
182195func Test_Connectable_IterableRange_Single (t * testing.T ) {
196+ defer goleak .VerifyNone (t )
183197 ctx , cancel := context .WithCancel (context .Background ())
184198 defer cancel ()
185199 obs := & ObservableImpl {
@@ -189,6 +203,7 @@ func Test_Connectable_IterableRange_Single(t *testing.T) {
189203}
190204
191205func Test_Connectable_IterableRange_Composed (t * testing.T ) {
206+ defer goleak .VerifyNone (t )
192207 ctx , cancel := context .WithCancel (context .Background ())
193208 defer cancel ()
194209 obs := & ObservableImpl {
@@ -198,6 +213,7 @@ func Test_Connectable_IterableRange_Composed(t *testing.T) {
198213}
199214
200215func Test_Connectable_IterableSlice_Single (t * testing.T ) {
216+ defer goleak .VerifyNone (t )
201217 ctx , cancel := context .WithCancel (context .Background ())
202218 defer cancel ()
203219 obs := & ObservableImpl {iterable : newSliceIterable ([]Item {Of (1 ), Of (2 ), Of (3 )},
@@ -206,6 +222,7 @@ func Test_Connectable_IterableSlice_Single(t *testing.T) {
206222}
207223
208224func Test_Connectable_IterableSlice_Composed (t * testing.T ) {
225+ defer goleak .VerifyNone (t )
209226 ctx , cancel := context .WithCancel (context .Background ())
210227 defer cancel ()
211228 obs := & ObservableImpl {iterable : newSliceIterable ([]Item {Of (1 ), Of (2 ), Of (3 )},
@@ -241,7 +258,7 @@ func testConnectableSingle(t *testing.T, obs Observable) {
241258 }
242259
243260 wg .Wait ()
244- obs .Connect ()
261+ obs .Connect (ctx )
245262 assert .NoError (t , eg .Wait ())
246263}
247264
@@ -278,7 +295,7 @@ func testConnectableComposed(t *testing.T, obs Observable) {
278295 }
279296
280297 wg .Wait ()
281- obs .Connect ()
298+ obs .Connect (ctx )
282299 assert .NoError (t , eg .Wait ())
283300}
284301
0 commit comments