rx java - groupBy operator, items from different groups interleaved -
the following code:
observable .just(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) .doonnext(item -> system.out.println("source emitting " + item)) .groupby(item -> { system.out.println("groupby called " + item); return item % 3; }) .subscribe(observable -> { system.out.println("got observable " + observable + " key " + observable.getkey()); observable.subscribe(item -> { system.out.println("key " + observable.getkey() + ", item " + item); }); });
leaves me perplexed. output is:
source emitting 0 groupby called 0 got observable rx.observables.groupedobservable@42110406 key 0 key 0, item 0 source emitting 1 groupby called 1 got observable rx.observables.groupedobservable@1698c449 key 1 key 1, item 1 source emitting 2 groupby called 2 got observable rx.observables.groupedobservable@5ef04b5 key 2 key 2, item 2 source emitting 3 groupby called 3 key 0, item 3 source emitting 4 groupby called 4 key 1, item 4 source emitting 5 groupby called 5 key 2, item 5 source emitting 6 groupby called 6 key 0, item 6 source emitting 7 groupby called 7 key 1, item 7 source emitting 8 groupby called 8 key 2, item 8 source emitting 9 groupby called 9 key 0, item 9
so, in top level subscribe method, 3 observables groupedobservable, expected. then, 1 one, subscribe grouped observables - , here thing don't understand:
why original items still emitted in original sequence (i.e. 0, 1, 2, 3, ...) , not 0, 3, 6, 9 ... key 0, followed 1, 4, 7 key 1, followed 2, 5, 8 key 2?
i think understand how groups created:
1. 0 emitted, key function called , gets 0 2. checked if observable 0 exists, doesn't, new 1 created , emitted, , emits 0 3. same happens source items 1 , 2 both create new groups, , observables key 1 , 2 emitted, , emit 1 , 2 correspondingly 4. source item 3 emitted, key function called , gets 0 5. checked if observable 0 exists, -> no new grouped observable created nor emitted, 3 emitted existing observable 6. etc. until source sequence drained
it seems although grouped observables 1 one, emissions somehow interleaved. how happen?
why original items still emitted in original sequence (i.e. 0, 1, 2, 3, ...) , not 0, 3, 6, 9 ... key 0, followed 1, 4, 7 key 1, followed 2, 5, 8 key 2?
you've answered own question. you're operating on stream of items, in order they're emitted. each 1 emitted, gets passed down operator chain , see output you've shown here.
the alternative output you're expecting there requires chain wait until source has stopped emitting items all groups. had observable.just(0, 1, 2, 3, 4, 4, 4, 4, 4, 4, 0)
. you'd expect (0, 3, 0), (1, 4, 4, 4, 4, 4, 4), (2) output groups. if had infinite stream of 4's? subscriber never receive 0, 3.. first group.
you can create behaviour you're looking for. tolist
operator cache output until source completes, , pass list<r>
subscriber:
.subscribe(observable -> { system.out.println("got observable " + observable + " key " + observable.getkey()); observable.tolist().subscribe(items -> { // items list<integer> system.out.println("key " + observable.getkey() + ", items " + items); }); });
Comments
Post a Comment