2020import java .util .concurrent .CountDownLatch ;
2121import java .util .concurrent .TimeUnit ;
2222import java .util .concurrent .atomic .AtomicInteger ;
23+ import java .util .stream .Collectors ;
2324import org .apache .curator .framework .CuratorFramework ;
2425import org .apache .curator .framework .api .BackgroundCallback ;
2526import org .apache .curator .framework .api .CuratorEvent ;
@@ -57,22 +58,16 @@ private <T> List<T> getAsyncChildrenThrows(
5758 final Transcoder <T > transcoder
5859 )
5960 throws Exception {
60- try {
61- List <String > children = getChildren (parent );
62- final List <String > paths = Lists .newArrayListWithCapacity (children .size ());
63-
64- for (String child : children ) {
65- paths .add (ZKPaths .makePath (parent , child ));
66- }
67-
68- List <T > result = new ArrayList <>(
69- getAsyncThrows (parent , paths , transcoder , Optional .empty ()).values ()
70- );
61+ List <String > children = getChildren (parent );
62+ final List <String > paths = Lists .newArrayListWithCapacity (children .size ());
7163
72- return result ;
73- } catch (Throwable t ) {
74- throw t ;
64+ for (String child : children ) {
65+ paths .add (ZKPaths .makePath (parent , child ));
7566 }
67+
68+ return new ArrayList <>(
69+ getAsyncThrows (parent , paths , transcoder , Optional .empty ()).values ()
70+ );
7671 }
7772
7873 private <T > Map <String , T > getAsyncThrows (
@@ -104,29 +99,22 @@ private <T> Map<String, T> getAsyncThrows(
10499 final CountDownLatch latch = new CountDownLatch (paths .size ());
105100 final AtomicInteger bytes = new AtomicInteger ();
106101
107- final BackgroundCallback callback = new BackgroundCallback () {
108-
109- @ Override
110- public void processResult (CuratorFramework client , CuratorEvent event )
111- throws Exception {
112- try {
113- if (event .getData () == null || event .getData ().length == 0 ) {
114- LOG .trace ("Expected active node {} but it wasn't there" , event .getPath ());
115- return ;
116- }
117-
118- bytes .getAndAdd (event .getData ().length );
119- final T object = transcoder .fromBytes (event .getData ());
120- synchronizedObjects .put (event .getPath (), object );
121-
122- if (cache .isPresent ()) {
123- cache .get ().set (event .getPath (), object );
124- }
125- } catch (Exception e ) {
126- LOG .error ("Exception processing curator result" , e );
127- } finally {
128- latch .countDown ();
102+ final BackgroundCallback callback = (client , event ) -> {
103+ try {
104+ if (event .getData () == null || event .getData ().length == 0 ) {
105+ LOG .trace ("Expected active node {} but it wasn't there" , event .getPath ());
106+ return ;
129107 }
108+
109+ bytes .getAndAdd (event .getData ().length );
110+ final T object = transcoder .fromBytes (event .getData ());
111+ synchronizedObjects .put (event .getPath (), object );
112+
113+ cache .ifPresent (tZkCache -> tZkCache .set (event .getPath (), object ));
114+ } catch (Exception e ) {
115+ LOG .error ("Exception processing curator result" , e );
116+ } finally {
117+ latch .countDown ();
130118 }
131119 };
132120
@@ -169,25 +157,21 @@ private <T extends SingularityId> List<T> getChildrenAsIdsForParentsThrows(
169157
170158 final CountDownLatch latch = new CountDownLatch (parents .size ());
171159
172- final BackgroundCallback callback = new BackgroundCallback () {
173-
174- @ Override
175- public void processResult (CuratorFramework client , CuratorEvent event )
176- throws Exception {
177- try {
178- if (event .getChildren () == null || event .getChildren ().size () == 0 ) {
179- LOG .trace ("Expected children for node {} - but found none" , event .getPath ());
180- return ;
181- }
182- synchronizedObjects .addAll (
183- Lists .transform (
184- event .getChildren (),
185- Transcoders .getFromStringFunction (idTranscoder )
186- )
187- );
188- } finally {
189- latch .countDown ();
160+ final BackgroundCallback callback = (client , event ) -> {
161+ try {
162+ if (event .getChildren () == null || event .getChildren ().size () == 0 ) {
163+ LOG .trace ("Expected children for node {} - but found none" , event .getPath ());
164+ return ;
190165 }
166+ synchronizedObjects .addAll (
167+ event
168+ .getChildren ()
169+ .stream ()
170+ .map (Transcoders .getFromStringFunction (idTranscoder ))
171+ .collect (Collectors .toList ())
172+ );
173+ } finally {
174+ latch .countDown ();
191175 }
192176 };
193177
@@ -218,10 +202,10 @@ protected <T extends SingularityId> List<T> getChildrenAsIds(
218202 final String rootPath ,
219203 final IdTranscoder <T > idTranscoder
220204 ) {
221- return Lists . transform (
222- getChildren ( rootPath ),
223- Transcoders .getFromStringFunction (idTranscoder )
224- );
205+ return getChildren ( rootPath )
206+ . stream ()
207+ . map ( Transcoders .getFromStringFunction (idTranscoder ) )
208+ . collect ( Collectors . toList () );
225209 }
226210
227211 private <T extends SingularityId > List <T > existsThrows (
@@ -238,22 +222,17 @@ private <T extends SingularityId> List<T> existsThrows(
238222
239223 final CountDownLatch latch = new CountDownLatch (paths .size ());
240224
241- final BackgroundCallback callback = new BackgroundCallback () {
242-
243- @ Override
244- public void processResult (CuratorFramework client , CuratorEvent event )
245- throws Exception {
246- try {
247- if (event .getStat () != null ) {
248- objects .add (
249- Transcoders
250- .getFromStringFunction (idTranscoder )
251- .apply (ZKPaths .getNodeFromPath (event .getPath ()))
252- );
253- }
254- } finally {
255- latch .countDown ();
225+ final BackgroundCallback callback = (client , event ) -> {
226+ try {
227+ if (event .getStat () != null ) {
228+ objects .add (
229+ Transcoders
230+ .getFromStringFunction (idTranscoder )
231+ .apply (ZKPaths .getNodeFromPath (event .getPath ()))
232+ );
256233 }
234+ } finally {
235+ latch .countDown ();
257236 }
258237 };
259238
@@ -293,18 +272,13 @@ private <T extends SingularityId> List<T> notExistsThrows(
293272
294273 final CountDownLatch latch = new CountDownLatch (pathsMap .size ());
295274
296- final BackgroundCallback callback = new BackgroundCallback () {
297-
298- @ Override
299- public void processResult (CuratorFramework client , CuratorEvent event )
300- throws Exception {
301- try {
302- if (event .getStat () == null ) {
303- objects .add (pathsMap .get (event .getPath ()));
304- }
305- } finally {
306- latch .countDown ();
275+ final BackgroundCallback callback = (client , event ) -> {
276+ try {
277+ if (event .getStat () == null ) {
278+ objects .add (pathsMap .get (event .getPath ()));
307279 }
280+ } finally {
281+ latch .countDown ();
308282 }
309283 };
310284
@@ -352,25 +326,7 @@ protected <T> List<T> getAsync(
352326 ) {
353327 try {
354328 return new ArrayList <>(
355- getAsyncThrows (pathNameForLogs , paths , transcoder , Optional .<ZkCache <T >>empty ())
356- .values ()
357- );
358- } catch (Throwable t ) {
359- throw new RuntimeException (t );
360- }
361- }
362-
363- protected <T > Map <String , T > getAsyncWithPath (
364- final String pathNameForLogs ,
365- final Collection <String > paths ,
366- final Transcoder <T > transcoder
367- ) {
368- try {
369- return getAsyncThrows (
370- pathNameForLogs ,
371- paths ,
372- transcoder ,
373- Optional .<ZkCache <T >>empty ()
329+ getAsyncThrows (pathNameForLogs , paths , transcoder , Optional .empty ()).values ()
374330 );
375331 } catch (Throwable t ) {
376332 throw new RuntimeException (t );
@@ -404,24 +360,19 @@ protected <T> List<T> getAsyncNestedChildrenAsListThrows(
404360 final List <T > results = new ArrayList <>();
405361 final CountDownLatch latch = new CountDownLatch (allPaths .size ());
406362 final AtomicInteger bytes = new AtomicInteger ();
407- final BackgroundCallback callback = new BackgroundCallback () {
408-
409- @ Override
410- public void processResult (CuratorFramework client , CuratorEvent event )
411- throws Exception {
412- try {
413- if (event .getData () == null || event .getData ().length == 0 ) {
414- LOG .trace ("Expected active node {} but it wasn't there" , event .getPath ());
415- return ;
416- }
417- bytes .getAndAdd (event .getData ().length );
418-
419- final T object = transcoder .fromBytes (event .getData ());
420-
421- results .add (object );
422- } finally {
423- latch .countDown ();
363+ final BackgroundCallback callback = (client , event ) -> {
364+ try {
365+ if (event .getData () == null || event .getData ().length == 0 ) {
366+ LOG .trace ("Expected active node {} but it wasn't there" , event .getPath ());
367+ return ;
424368 }
369+ bytes .getAndAdd (event .getData ().length );
370+
371+ final T object = transcoder .fromBytes (event .getData ());
372+
373+ results .add (object );
374+ } finally {
375+ latch .countDown ();
425376 }
426377 };
427378
@@ -468,27 +419,22 @@ protected <T, Q> Map<T, List<Q>> getAsyncNestedChildDataAsMapThrows(
468419 final ConcurrentHashMap <T , List <Q >> resultsMap = new ConcurrentHashMap <>();
469420 final CountDownLatch latch = new CountDownLatch (allPathsMap .size ());
470421 final AtomicInteger bytes = new AtomicInteger ();
471- final BackgroundCallback callback = new BackgroundCallback () {
472-
473- @ Override
474- public void processResult (CuratorFramework client , CuratorEvent event )
475- throws Exception {
476- try {
477- if (event .getData () == null || event .getData ().length == 0 ) {
478- LOG .trace ("Expected active node {} but it wasn't there" , event .getPath ());
479- return ;
480- }
481- bytes .getAndAdd (event .getData ().length );
482-
483- final Q object = transcoder .fromBytes (event .getData ());
484-
485- if (allPathsMap .get (event .getPath ()) != null ) {
486- resultsMap .putIfAbsent (allPathsMap .get (event .getPath ()), new ArrayList <Q >());
487- resultsMap .get (allPathsMap .get (event .getPath ())).add (object );
488- }
489- } finally {
490- latch .countDown ();
422+ final BackgroundCallback callback = (client , event ) -> {
423+ try {
424+ if (event .getData () == null || event .getData ().length == 0 ) {
425+ LOG .trace ("Expected active node {} but it wasn't there" , event .getPath ());
426+ return ;
491427 }
428+ bytes .getAndAdd (event .getData ().length );
429+
430+ final Q object = transcoder .fromBytes (event .getData ());
431+
432+ if (allPathsMap .get (event .getPath ()) != null ) {
433+ resultsMap .putIfAbsent (allPathsMap .get (event .getPath ()), new ArrayList <>());
434+ resultsMap .get (allPathsMap .get (event .getPath ())).add (object );
435+ }
436+ } finally {
437+ latch .countDown ();
492438 }
493439 };
494440
@@ -535,24 +481,19 @@ protected <T extends SingularityId> List<T> getAsyncNestedChildIdsAsListThrows(
535481 final List <T > results = new ArrayList <>();
536482 final CountDownLatch latch = new CountDownLatch (allPaths .size ());
537483 final AtomicInteger bytes = new AtomicInteger ();
538- final BackgroundCallback callback = new BackgroundCallback () {
539-
540- @ Override
541- public void processResult (CuratorFramework client , CuratorEvent event )
542- throws Exception {
543- try {
544- event
545- .getChildren ()
546- .forEach (
547- child -> {
548- final T object = transcoder .fromString (child );
549- bytes .getAndAdd (child .getBytes ().length );
550- results .add (object );
551- }
552- );
553- } finally {
554- latch .countDown ();
555- }
484+ final BackgroundCallback callback = (client , event ) -> {
485+ try {
486+ event
487+ .getChildren ()
488+ .forEach (
489+ child -> {
490+ final T object = transcoder .fromString (child );
491+ bytes .getAndAdd (child .getBytes ().length );
492+ results .add (object );
493+ }
494+ );
495+ } finally {
496+ latch .countDown ();
556497 }
557498 };
558499
@@ -612,7 +553,7 @@ private <T> T queryAndReturnResultsThrows(
612553 log (
613554 method .operationType ,
614555 Optional .of (paths .size ()),
615- bytes .get () > 0 ? Optional .of (bytes .get ()) : Optional .< Integer > empty (),
556+ bytes .get () > 0 ? Optional .of (bytes .get ()) : Optional .empty (),
616557 start ,
617558 pathNameForLogs
618559 );
0 commit comments