Attendant parallèle RX abonnés à compléter

Je suis à la recherche de la meilleure méthode pour attendre async tâches pour terminer en rx-java.

Comme un exemple commun de dire qu'il est une fonction qui permet d'obtenir une liste d'id à partir d'un magasin local et les requêtes d'un système distant pour ceux de l'Id, le système distant résultats sont ensuite regroupés dans un seul rapport et retourné à l'appelant de la fonction. L'appel de la télécommande du système est lent nous voulons être fait de manière asynchrone et je ne le veux de retour, une fois que tous les appels sont de retour et leurs résultats ont été traitées.

Le seul moyen que j'ai trouvé à faire c'est d'interroger l'abonnement pour vérifier qu'il est désabonné encore. Mais je suis en train de penser ne semble pas être le 'RX' façon de faire les choses!

Titre d'exemple, j'ai pris l'exemple de http://howrobotswork.wordpress.com/2013/10/28/using-rxjava-in-android/ et modifié légèrement pour le rendre non-android et pour montrer ce que je veux dire. - Je avoir le code suivant à la fin de la méthode main() pour arrêter quitter immédiatement.

while (!subscription.isUnsubscribed()) {
    Thread.sleep(100);
}

Le code complet de l'exemple est repris ci-dessous (il est tributaire de la http://square.github.io/retrofit/ si vous essayez de compiler)

package org.examples;
import retrofit.RestAdapter;
import retrofit.http.GET;
import retrofit.http.Query;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
public class AsyncExample {
public static void main(String[] args) throws InterruptedException {
final Subscription subscription = Observable.from("London", "Paris", "Berlin")
.flatMap(new Func1<String, Observable<WeatherData>>() {
@Override
public Observable<WeatherData> call(String s) {
return ApiManager.getWeatherData(s);
}
})
.subscribe(
new Action1<WeatherData>() {
@Override
public void call(WeatherData weatherData) {
System.out.println(Thread.currentThread().getName() + " - " + weatherData.name + ", " + weatherData.base);
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println(Thread.currentThread().getName() + " - ERROR: " + throwable.getMessage());
}
},
new Action0() {
@Override
public void call() {
System.out.println(Thread.currentThread().getName() + " - COMPLETED");
}
}
);
//Have to poll subscription to check if its finished - is this the only way to do it?
while (!subscription.isUnsubscribed()) {
Thread.sleep(100);
}
}
}
class ApiManager {
private interface ApiManagerService {
@GET("/weather")
WeatherData getWeather(@Query("q") String place, @Query("units") String units);
}
private static final RestAdapter restAdapter = new RestAdapter.Builder()
.setEndpoint("http://api.openweathermap.org/data/2.5")
.build();
private static final ApiManagerService apiManager = restAdapter.create(ApiManagerService.class);
public static Observable<WeatherData> getWeatherData(final String city) {
return Observable.create(new Observable.OnSubscribe<WeatherData>() {
@Override
public void call(Subscriber<? super WeatherData> subscriber) {
try {
System.out.println(Thread.currentThread() + " - Getting " + city);
subscriber.onNext(apiManager.getWeather(city, "metric"));
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.io());
}
}
  • Il se sent comme vous allez à présent dans le sens inverse. Si vous êtes à la recherche lors de l'attente pour tout, pour finir pourquoi voudriez-vous utiliser le Schedulers.io? Si vous n'utilisez pas toutes les planificateurs puis rx-java est par défaut synchrone. Tout se complète de manière synchrone et alors vous seriez de retour. Je sais que vous avez mentionné que vous vouliez faire cela de façon asynchrone, mais alors vous allez à l'encontre de votre propre désir de vouloir attendre qu'elle se termine.
  • La raison pour Schedulers.io est que je veux que chaque système distant appel à être dans un thread séparé donc je reçois le parallélisme et la fonction globale est plus rapide.
InformationsquelleAutor user1232555 | 2014-08-06