Skip to content

Commit 387e84d

Browse files
committed
Fix for mapcat, issue #1556
Corrected mapcat to actually concat its results rather than merging. Added warning to docstring thta this might not be ideal. Added flatmap to cover map+merge case.
1 parent 478ea19 commit 387e84d

File tree

2 files changed

+70
-3
lines changed

2 files changed

+70
-3
lines changed

src/main/clojure/rx/lang/clojure/core.clj

+31-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
(:refer-clojure :exclude [concat cons count cycle
33
distinct do drop drop-while
44
empty every?
5-
filter first future
5+
filter first future flatmap
66
group-by
77
interleave interpose into iterate
88
keep keep-indexed
@@ -616,17 +616,46 @@
616616
return an Observable. The resulting observables are concatentated together
617617
into one observable.
618618
619+
WARNING: This operator, like clojure.core/mapcat, preserves ordering of the
620+
generated Observables. In an asynchronous context, this may cause unintended
621+
blocking. Try flatmap instead.
622+
619623
If multiple Observables are given, the arguments to f are the first item from
620624
each observable, then the second item, etc.
621625
622626
See:
623627
clojure.core/mapcat
624-
rx.Observable/flatMap
628+
flatmap
629+
rx.Observable/concatMap
625630
"
626631
[f & xs]
627632
(if (clojure.core/next xs)
628633
(mapcat* f (seq->o xs))
629634
; use built-in flatMap for single-arg case
635+
(.concatMap ^Observable (clojure.core/first xs) (iop/fn* f))))
636+
637+
(defn ^Observable flatmap*
638+
"Same as multi-arg flatmap, but input is an Observable of Observables.
639+
640+
See:
641+
flatmap
642+
"
643+
[f ^Observable xs]
644+
(->> xs
645+
(map* f)
646+
(merge*)))
647+
648+
(defn ^Observable flatmap
649+
"Like mapcat, but the Observables produced by f are merged rather than concatenated.
650+
This behavior is preferable in asynchronous contexts where order is not important.
651+
652+
See:
653+
mapcat
654+
rx.Observable/flatMap
655+
"
656+
[f & xs]
657+
(if (clojure.core/next xs)
658+
(flatmap* f (seq->o xs))
630659
(.flatMap ^Observable (clojure.core/first xs) (iop/fn* f))))
631660

632661
(defn map-indexed

src/test/clojure/rx/lang/clojure/core_test.clj

+39-1
Original file line numberDiff line numberDiff line change
@@ -277,8 +277,8 @@
277277
(let [xs [{:k :a :v 1} {:k :b :v 2} {:k :a :v 3} {:k :c :v 4}]]
278278
(testing "with just a key-fn"
279279
(is (= [[:a {:k :a :v 1}]
280-
[:b {:k :b :v 2}]
281280
[:a {:k :a :v 3}]
281+
[:b {:k :b :v 2}]
282282
[:c {:k :c :v 4}]]
283283
(->> xs
284284
(rx/seq->o)
@@ -452,6 +452,44 @@
452452
(rx/seq->o as)
453453
(rx/seq->o bs)))))))
454454

455+
(deftest test-flatmap
456+
(let [f (fn [v] [v (* v v)])
457+
xs (range 10)]
458+
(is (= (mapcat f xs)
459+
(b/into [] (rx/flatmap (comp rx/seq->o f) (rx/seq->o xs))))))
460+
461+
; group-by is a good way to test merge behavior without truly async code
462+
; here the :a and :b observables are interleaved when merged
463+
(let [xs [{:k :a :v 1} {:k :b :v 2} {:k :a :v 3} {:k :c :v 4}]]
464+
(is (= [[:a {:k :a :v 1}]
465+
[:b {:k :b :v 2}]
466+
[:a {:k :a :v 3}]
467+
[:c {:k :c :v 4}]]
468+
(->> xs
469+
(rx/seq->o)
470+
(rx/group-by :k)
471+
(rx/flatmap (fn [[k vo :as me]]
472+
(is (instance? clojure.lang.MapEntry me))
473+
(rx/map #(vector k %) vo)))
474+
(b/into [])))))
475+
476+
; still looking for a simple demo of merging for the multi-arg case
477+
; Here, because ys is "inline", the interleaving is removed. sigh.
478+
(let [xs [{:k :a :v 1} {:k :b :v 2} {:k :a :v 3} {:k :c :v 4}]
479+
ys [:ay :by :cy]]
480+
(is (= [[:a {:k :a :v 1} :ay]
481+
[:a {:k :a :v 3} :ay]
482+
[:b {:k :b :v 2} :by]
483+
[:c {:k :c :v 4} :cy]]
484+
(->> (rx/flatmap (fn [[k vo :as me] y]
485+
(is (instance? clojure.lang.MapEntry me))
486+
(rx/map #(vector k % y) vo))
487+
(->> xs
488+
rx/seq->o
489+
(rx/group-by :k))
490+
(rx/seq->o ys))
491+
(b/into []))))))
492+
455493
(deftest test-next
456494
(let [in [:q :r :s :t :u]]
457495
(is (= (next in) (b/into [] (rx/next (rx/seq->o in)))))))

0 commit comments

Comments
 (0)