Copier un cours d'eau pour éviter les “flux " a déjà été opéré ou fermée”
Je voudrais dupliquer un Java 8 flux de sorte que je peux faire face à deux fois. Je peux collect
comme une liste et d'obtenir un nouveau flux de ce;
//doSomething() returns a stream
List<A> thing = doSomething().collect(toList());
thing.stream()... //do stuff
thing.stream()... //do other stuff
Mais je pense qu'il devrait y avoir une plus efficace/de manière élégante.
Est-il un moyen de copier le flux sans le transformer en une collection?
Je suis en train de travailler avec un flux de Either
s, de sorte que veux processus de la gauche de la projection d'une façon avant le déplacement sur la droite de projection et de traiter avec celui d'une autre manière. Un peu comme cela (qui, jusqu'à présent, je suis obligé d'utiliser le toList
truc avec).
List<Either<Pair<A, Throwable>, A>> results = doSomething().collect(toList());
Stream<Pair<A, Throwable>> failures = results.stream().flatMap(either -> either.left());
failures.forEach(failure -> ... );
Stream<A> successes = results.stream().flatMap(either -> either.right());
successes.forEach(success -> ... );
- Pourriez-vous donner plus de détails sur le processus "d'une manière"... êtes-vous de consommer des objets? Cartographie eux? partitionBy() et groupingBy (), vous pouvez obtenir directement à 2+ de listes, mais vous pourriez bénéficier de la cartographie d'abord ou tout simplement d'avoir une décision de la fourche dans votre forEach().
Vous devez vous connecter pour publier un commentaire.
Je pense que votre hypothèse sur l'efficacité de l'est du genre à reculer. Vous obtenez cette énorme efficacité de récupération si vous allez seulement d'utiliser les données une seule fois, parce que vous n'avez pas à stocker, et les ruisseaux de vous donner puissante boucle "fusion" optimisations qui vous permettent de flux de l'ensemble des données de manière efficace à travers le pipeline.
Si vous voulez ré-utiliser les mêmes données, alors, par définition, vous devez générer deux fois (de façon déterministe) ou de le ranger. Si c'est déjà arrivé à être dans une collection, grand; puis en itérant deux fois il est bon marché.
Nous avons fait l'expérience dans la conception de la "fourche flux". Ce que nous avons trouvé était que l'appui de cette a les coûts réels; il accablés le cas le plus courant (à utiliser une fois), au détriment de la rare cas. Le gros problème a été de traiter avec "ce qui se passe lorsque les deux pipelines de ne pas consommer des données à la même vitesse." Maintenant vous êtes de retour à la mémoire tampon de toute façon. C'est une fonctionnalité qui n'a pas clairement porter son poids.
Si vous souhaitez utiliser les mêmes données à plusieurs reprises, soit de les stocker, ou de la structure de vos activités en tant que Consommateurs et de faire ce qui suit:
Vous pouvez également regarder dans le RxJava de la bibliothèque, comme son modèle de traitement se prête mieux à ce genre de "flux " bifurquer".
toList
) pour être en mesure de traiter (leEither
cas à être l'exemple)?Utilisation java.util.fonction.Fournisseur.
De http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/:
Nous avons mis en place un
duplicate()
méthode de ruisseaux dans les jOOλ, une bibliothèque Open Source que nous avons créé pour améliorer les tests d'intégration pour jOOQ. Essentiellement, vous pouvez simplement écrire:En interne, il y a un tampon de stockage de toutes les valeurs qui ont été utilisés à partir d'un flux, mais pas de l'autre. C'est probablement aussi efficace qu'il arrive si vos deux flux sont consommés au même taux, et si vous pouvez vivre avec l'absence de fil-sécurité.
Voici comment fonctionne l'algorithme:
De plus le code source ici
Tuple2
est probablement comme votrePair
type, tandis queSeq
estStream
avec quelques améliorations.Tuple2<Seq<A>>, Seq<A>> t = duplicate(stream); long count = t.collect(counting()); List<A> list = t.collect(toList());
, il est préférable deTuple2<Long, List<A>> t = stream.collect(Tuple.collectors(counting(), toList()));
. À l'aide deCollectors.mapping/reducing
on peut exprimer d'autres flux d'opérations que les collectionneurs et les éléments des processus tout à fait différente de la manière de créer seul tuple. Donc, en général, vous pouvez faire beaucoup de choses en consommant le flux une fois, sans double emploi et à être parallèle de l'environnement.Collector
.LinkedList
? Vraiment? J'aimerais que vous savez ce que vous faites, mais dans certains cas, quand il surpasseArrayList
sont extrêmement rares et celui-ci ne ressemble pas à elle. Avez-vous comparé il?offer()
/poll()
API, mais unArrayDeque
pourrait faire de même.Vous pouvez créer un flux de runnables (par exemple):
Où
failure
etsuccess
sont les opérations à appliquer. Ce sera toutefois créer quelques objets temporaires et ne peut pas être plus efficace que de commencer à partir d'une collection et en streaming/itération deux fois.Utilisation fournisseur pour produire le flux pour chaque terminaison de l'opération.
Chaque fois que vous avez besoin d'un flux de cette collection,
utilisation
streamSupplier.get()
pour obtenir un nouveau flux de données.Exemples:
streamSupplier.get().anyMatch(predicate);
streamSupplier.get().allMatch(predicate2);
Une autre façon de manipuler les éléments à plusieurs reprises est d'utiliser Flux de données.peek(de Consommation):
peek(Consumer)
peut être enchaîné autant de fois que nécessaire.cyclope-réagir, une bibliothèque que je contribue, possède une méthode statique qui vous permettra de dupliquer un Flux (et renvoie un jOOλ Tuple de cours d'eau).
Voir les commentaires, il y a des performances qui seront engagés lors de l'utilisation de dupliquer sur un Flux de données existant. Un plus performant alternative serait d'utiliser Diffusables :-
Il y a aussi un (paresseux) Flux de classe qui peut être construit à partir d'un Flux, Itératif ou un Tableau et relus plusieurs fois.
AsStreamable.synchronizedFromStream(flux) - peut être utilisé pour créer un Flux qui va paresseusement remplir c'est la sauvegarde de la collecte, de manière à ce que peuvent être partagées entre les threads. Diffusables.fromStream(flux) n'entraîneront pas de la surcharge de la synchronisation.
List<Integer> list = stream.collect(Collectors.toList()); streams = new Tuple2<>(list.stream(), list.stream())
(comme OP suggère). Veuillez également divulguer de manière explicite dans la réponse que vous êtes l'auteur de cyclope-flux. Lire ceci.Pour ce problème particulier, vous pouvez aussi utiliser le partitionnement. Quelque chose comme
Nous pouvons faire usage de Flux Constructeur au moment de la lecture ou de l'itération d'un ruisseau.
Voici le document de Flux Générateur de.
https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.Builder.html
Cas d'utilisation
Disons que nous avons employé flux et nous avons besoin d'utiliser ce flux d'écrire les données de l'employé dans le fichier excel, puis mise à jour de l'employé collection/table
[C'est juste en cas d'utilisation pour montrer l'utilisation de Flux Builder]:
J'ai eu un problème similaire, et peut penser à trois différentes structures intermédiaires à partir de laquelle pour créer une copie du flux: un
List
, un tableau et uneStream.Builder
. J'ai écrit un petit programme de test, qui a suggéré que, d'un point de vue des performances de laList
était d'environ 30% plus lent que les deux autres qui étaient assez similaires.Le seul inconvénient de la conversion d'un tableau, c'est qu'il est difficile si votre type d'élément est un type générique (qui dans mon cas), c'est pourquoi je préfère utiliser un
Stream.Builder
.J'ai fini par écrire une petite fonction qui crée un
Collector
:Je peux alors faire une copie de n'importe quel flux
str
en faisantstr.collect(copyCollector())
qui se sent tout à fait en accord avec les expressions idiomatiques des cours d'eau.