Attendant parallèle RX abonnés à compléter
Je suis à la recherche de la meilleure méthode pour attendre async tâches pour terminer en rx-java.
Comme un exemple commun de dire qu'il est une fonction qui permet d'obtenir une liste d'id à partir d'un magasin local et les requêtes d'un système distant pour ceux de l'Id, le système distant résultats sont ensuite regroupés dans un seul rapport et retourné à l'appelant de la fonction. L'appel de la télécommande du système est lent nous voulons être fait de manière asynchrone et je ne le veux de retour, une fois que tous les appels sont de retour et leurs résultats ont été traitées.
Le seul moyen que j'ai trouvé à faire c'est d'interroger l'abonnement pour vérifier qu'il est désabonné encore. Mais je suis en train de penser ne semble pas être le 'RX' façon de faire les choses!
Titre d'exemple, j'ai pris l'exemple de http://howrobotswork.wordpress.com/2013/10/28/using-rxjava-in-android/ et modifié légèrement pour le rendre non-android et pour montrer ce que je veux dire. - Je avoir le code suivant à la fin de la méthode main() pour arrêter quitter immédiatement.
while (!subscription.isUnsubscribed()) {
Thread.sleep(100);
}
Le code complet de l'exemple est repris ci-dessous (il est tributaire de la http://square.github.io/retrofit/ si vous essayez de compiler)
package org.examples;
import retrofit.RestAdapter;
import retrofit.http.GET;
import retrofit.http.Query;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
public class AsyncExample {
public static void main(String[] args) throws InterruptedException {
final Subscription subscription = Observable.from("London", "Paris", "Berlin")
.flatMap(new Func1<String, Observable<WeatherData>>() {
@Override
public Observable<WeatherData> call(String s) {
return ApiManager.getWeatherData(s);
}
})
.subscribe(
new Action1<WeatherData>() {
@Override
public void call(WeatherData weatherData) {
System.out.println(Thread.currentThread().getName() + " - " + weatherData.name + ", " + weatherData.base);
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println(Thread.currentThread().getName() + " - ERROR: " + throwable.getMessage());
}
},
new Action0() {
@Override
public void call() {
System.out.println(Thread.currentThread().getName() + " - COMPLETED");
}
}
);
//Have to poll subscription to check if its finished - is this the only way to do it?
while (!subscription.isUnsubscribed()) {
Thread.sleep(100);
}
}
}
class ApiManager {
private interface ApiManagerService {
@GET("/weather")
WeatherData getWeather(@Query("q") String place, @Query("units") String units);
}
private static final RestAdapter restAdapter = new RestAdapter.Builder()
.setEndpoint("http://api.openweathermap.org/data/2.5")
.build();
private static final ApiManagerService apiManager = restAdapter.create(ApiManagerService.class);
public static Observable<WeatherData> getWeatherData(final String city) {
return Observable.create(new Observable.OnSubscribe<WeatherData>() {
@Override
public void call(Subscriber<? super WeatherData> subscriber) {
try {
System.out.println(Thread.currentThread() + " - Getting " + city);
subscriber.onNext(apiManager.getWeather(city, "metric"));
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.io());
}
}
- Il se sent comme vous allez à présent dans le sens inverse. Si vous êtes à la recherche lors de l'attente pour tout, pour finir pourquoi voudriez-vous utiliser le
Schedulers.io
? Si vous n'utilisez pas toutes les planificateurs puis rx-java est par défaut synchrone. Tout se complète de manière synchrone et alors vous seriez de retour. Je sais que vous avez mentionné que vous vouliez faire cela de façon asynchrone, mais alors vous allez à l'encontre de votre propre désir de vouloir attendre qu'elle se termine. - La raison pour
Schedulers.io
est que je veux que chaque système distant appel à être dans un thread séparé donc je reçois le parallélisme et la fonction globale est plus rapide.
Vous devez vous connecter pour publier un commentaire.
Si vous vous demandez comment activer asynchrone Observables dans synchrone celui que vous pouvez utiliser .toBlocking() et ensuite l'utiliser .last() pour attendre le Observables pour terminer.. Par exemple, la suite ne va pas continuer jusqu'à ce que la minuterie est terminée, même si le minuteur s'exécute sur le calcul fil.
Vous ne devriez pas utiliser cette méthode, sauf si absolument nécessaire. Normalement, l'arrêt de l'application serait contrôlée par un autre événement (clés de la presse, à proximité de l'élément de menu clic, etc.) Votre demande de réseau Observables complète de manière asynchrone et vous permettrait de prendre des mesures dans le onNext(), onCompleted() et onError() appelle à la mise à jour de l'INTERFACE utilisateur ou l'affichage d'une erreur.
Aussi, la beauté de la Rénovation, c'est qu'il a un support intégré pour RxJava et exécuter les demandes de réseau en arrière-plan. Pour utilisation que le soutien déclarer votre interface comme au retour d'une Observable.
Qui vous permet de simplifier votre getWeatherData() la méthode pour cela:
Dans le but de synchrone d'attente pour asynchrones observables, j'ai écrit une fonction simple:
Vous pouvez l'utiliser de cette façon:
Dans la plupart des cas, il est préférable de ne pas bloquer une méthode. Essayez d'être aussi non-blocage possible. Si vous avez besoin de faire quelque chose après une Observable a terminé tout simplement appeler votre logique à partir de la
onCompleted
de rappel, ou d'utiliser ledoOnCompleted
opérateur.Si vous avez vraiment, vraiment besoin de bloquer ensuite, vous pouvez utiliser le
Blocage Observables Opérateurs
.La plupart de ces opérateurs de bloquer jusqu'à ce que les Observables complète.
Cependant, l'arrêt de votre méthode principale de quitter prématurément peut être réalisé par un simple
System.in.read()
à la fin.