Pouvez-vous diviser un flux de données en deux volets?
J'ai un ensemble de données représentée par un Java 8 stream:
Stream<T> stream = ...;
Je peux voir comment filtrer pour obtenir un sous-ensemble aléatoire - par exemple
Random r = new Random();
PrimitiveIterator.OfInt coin = r.ints(0, 2).iterator();
Stream<T> heads = stream.filter((x) -> (coin.nextInt() == 0));
Je peux aussi voir comment je pourrais réduire ce flux pour obtenir, par exemple, deux listes représentant les deux aléatoire moitiés de l'ensemble de données, puis activez-les dans le cours d'eau.
Mais, est-il un moyen direct pour générer deux flux à partir de la première? Quelque chose comme
(heads, tails) = stream.[some kind of split based on filter]
Merci pour toute la perspicacité.
Vous devez vous connecter pour publier un commentaire.
Pas exactement. Vous ne pouvez pas obtenir deux
Stream
s à l'extérieur de celle-ci; ce n'est pas logique -- comment voulez-vous effectuer une itération sur un sans avoir besoin de générer de l'autre en même temps? Un flux peut uniquement être exploité à la fois.Cependant, si vous souhaitez vider dans une liste ou quelque chose, vous pourriez faire
stream.collect(...)
pour prédéfinis thread-safeCollectors
, qui fonctionnent bien, même sur les non thread-safe Collections (sans synchronisé conflit de verrouillage). Meilleure réponse par @MarkJeronimus.Un collecteur peut être utilisé pour cela.
Collectors.partitioningBy()
usine.Cela va créer une
Map
deBoolean
àList
, et de mettre des articles dans l'une ou l'autre liste basée sur unPredicate
.Remarque: Depuis le flux doit être consommé entier, cela ne peut pas travailler sur les flux infinis. Parce que le flux est consommé de toute façon, cette méthode met tout simplement dans les Listes au lieu de faire un nouveau flux de données avec la mémoire.
Aussi, pas besoin de l'itérateur, pas même dans les chefs-seul exemple que vous avez fournis.
Collectors.groupingBy()
usine.Dans le cas où les flux ne sont
Stream
, mais l'une des primitives des cours d'eau,IntStream
, alors ce.collect(Collectors)
méthode n'est pas disponible. Vous aurez à le faire de la manière manuelle sans un collecteur d'usine. C'est la mise en œuvre ressemble à ceci:Modifier
Comme l'a fait remarquer, au dessus de "solution" n'est pas thread-safe. La conversion à une vie normale
Stream
avant la collecte est le chemin à parcourir:stream.boxed().collect(...);
! Il va faire comme annoncé: convertir la primitiveIntStream
à la boîteStream<Integer>
version.itertools.tee()
, qui, un peu comme un carrefour en T, dans un pipeline, divise le flux de séquence. La magie, c'est que tous les éléments consommés par un flux et non les autres sont mis en cache dans un tampon. Cela signifie que, si un flux consomme tout avant les autres flux consomme, vous pourriez aussi bien vider le tout dans un récipient. Mais si ils avancent plus-moins en parallèle, puis, petit état est conservé dans la mémoire. Cela dit, je soupçonne le filetage des implications de l'empêchent d'être mis en œuvre en Java.Malheureusement, ce que vous demandez est directement désapprouvées dans la JavaDoc de Flux:
Vous pouvez contourner cela en utilisant
peek
ou d'autres méthodes si vous désirez vraiment, ce type de comportement. Dans ce cas, ce que vous devez faire, c'est plutôt d'essayer de prendre deux cours d'eau à partir de la même Flux d'origine de la source avec un fork de filtre, vous dupliquez votre flux et le filtre de chacun des doublons de manière appropriée.Cependant, vous pouvez vouloir reconsidérer si un
Stream
est la structure appropriée à votre cas d'utilisation.List<Stream> forkStream(Stream s)
mais mon conséquence, les flux seront, au moins partiellement, être soutenus par des collections, et non pas directement par le flux sous-jacent, par opposition à-direfilter
qui n'est pas un terminal de flux de l'opération.Je suis tombé sur cette question à mon auto et j'ai l'impression que fourchue flux a certains cas d'utilisation, qui pourrait s'avérer valide. J'ai écrit le code ci-dessous en tant que consommateur, de sorte qu'il ne fait rien mais on pourrait l'appliquer à des fonctions et à rien d'autre, vous pourriez trouver.
Maintenant votre code de mise en œuvre pourrait être quelque chose comme ceci:
C'est contre le mécanisme général de Flux. Dire que vous pouvez diviser les Flux de S0 à Sa et Sb comme tu le voulais. L'exécution de toute exploitation du terminal, dire
count()
, sur Sa va nécessairement de "consommer" de tous les éléments dans S0. Par conséquent Sb perdu sa source de données.Précédemment, le Flux a une
tee()
méthode, je pense, qui double un flux à deux. Il est retiré maintenant.Stream peek() la méthode, vous pourriez être en mesure de l'utiliser pour atteindre vos exigences.
peek
est exactement ce que l'habitude d'êtretee
.pas exactement, mais vous pouvez être en mesure d'accomplir ce que vous avez besoin en invoquant
Collectors.groupingBy()
. vous créez une nouvelle Collection, et peut ensuite instancier streams sur cette nouvelle collection.C'était la moins mauvaise réponse que je pouvais venir.
Cela prend un flux de nombres entiers et les sépare à 5. Pour ceux de plus de 5 filtrer uniquement les numéros et les met dans une liste. Pour le reste, il se joint à eux avec |.
sorties:
Ses pas idéal, car il rassemble tout dans l'intermédiaire des collections de la rupture du flux (et a trop d'arguments!)
Je suis tombé sur cette question tout en cherchant un moyen de filtrer certains éléments d'un flux de données et les enregistre comme des erreurs. Je n'ai donc pas vraiment besoin de diviser le flux de façon plus joindre un prématuré de résiliation de l'action d'un prédicat avec discrète de la syntaxe. C'est ce que je suis venu avec:
Comment sur: