Rx Valeurs d'émission observables périodiquement
J'ai à interroger certains Reposante extrémité régulièrement pour actualiser mon application android de données. J'ai aussi pour mettre en pause et reprendre la lecture basé sur la connectivité (si le téléphone est en mode hors connexion, il n'y a pas besoin de l'essayer, même). Ma solution actuelle est de travailler, mais il utilise le standard de Java ScheduledExecutorService
pour effectuer des tâches périodiques, mais je tiens à rester en Rx paradigme.
Voici mon code actuel, dont les parties sont ignorés pour des raisons de concision.
userProfileObservable = Observable.create(new Observable.OnSubscribe<UserProfile>() {
@Override
public void call(final Subscriber<? super UserProfile> subscriber) {
final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
final Runnable runnable = new Runnable() {
@Override
public void run() {
//making http request here
}
};
final List<ScheduledFuture<?>> futures = new ArrayList<ScheduledFuture<?>>(1);
networkStatusObservable.subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean networkAvailable) {
if (!networkAvailable) {
pause();
} else {
pause();
futures.add(scheduledExecutorService.scheduleWithFixedDelay(runnable, 0, SECOND_IN_MILLIS * SECONDS_TO_EXPIRE, TimeUnit.MILLISECONDS));
}
}
private void pause() {
for (ScheduledFuture<?> future : futures) {
future.cancel(true);
}
futures.clear();
}
});
final Subscription subscription = new Subscription() {
private boolean isUnsubscribed = false;
@Override
public void unsubscribe() {
scheduledExecutorService.shutdownNow();
isUnsubscribed = true;
}
@Override
public boolean isUnsubscribed() {
return isUnsubscribed;
}
};
subscriber.add(subscription);
}
}).multicast(BehaviorSubject.create()).refCount();
networkStatusObservable
est fondamentalement un récepteur de radiodiffusion enveloppé dans Observable<Boolean>
qui indique que le téléphone est connecté au réseau.
Comme je l'ai dit, cette solution fonctionne, mais je veux utiliser Rx approche périodique de l'interrogation et l'émission de nouvelles UserProfile
s, parce qu'il y a de nombreux problèmes avec la planification choses manuellement, que je veux éviter. Je sais que sur Observable.timer
et Observable.interval
mais ne peut pas comprendre comment l'appliquer à cette tâche (et je ne suis pas sûr si j'ai besoin d'utiliser celles à tous).
source d'informationauteur Haspemulator
Vous devez vous connecter pour publier un commentaire.
Il existe quelques approches sur cette GitHub question que vous pourriez trouver utiles.
https://github.com/ReactiveX/RxJava/issues/448
Les trois implémentations sont:
Observables.intervalle de
Le planificateur.schedulePeriodically
Manuel De La Récursivité
Et la conclusion est que le manuel de la récursivité est le chemin à parcourir, car il attend jusqu'à ce que l'opération est terminée avant la planification de la prochaine exécution.
L'une des options est d'utiliser Observables.d'intervalle et de la vérification de l'état de l'utilisateur lorsque les intervalles sont émis:
Vous pouvez faire encore plus simple si votre networkStatusObservable émet des événements au moins aussi souvent que vous avez besoin de vérifier les données de l'utilisateur
Enfin, vous pouvez peut créer observables qui utilise le planificateur d'émettre l'utilisateur périodiquement les états - reportez-vous à Les planificateurs de la documentation pour apprendre l'algorithme adapter à vos besoins le meilleur:
Il y a un moyen plus simple de le faire en utilisant l'intervalle(). J'ai testé ce code et il fonctionne.
Mais d'abord, vous devez vous encapsuler le travail que vous souhaitez exécuter périodiquement dans une sous-classe de l'Action1.
(J'ai gardé les domaines public pour des raisons de concision, mais ce n'est pas conseillé). Maintenant, vous pouvez planifier la façon suivante:
Cela ne bloquera pas votre fils n'importe où, contrairement à l'approche adoptée dans l'autre réponse. Cette approche nous permet de passer une variable comme une sorte de mutable stockage à l'intérieur de l'Action qui pourrait être utile dans la suite des invocations. Aussi, de cette façon, vous pouvez vous abonner à votre appel sur votre propre pool de threads.
D'accord, je vais poster ma propre solution, peut-être quelqu'un qui va en tirer profit. Je vais seulement après la partie traitant de la question, en omettant le HTTP et la mise en cache des trucs. Voici comment je le fais:
Comme vous pouvez le voir, les conditions pour mettre en pause ou de reprendre les mises à jour de devenir un peu plus compliqué, avec une nouvelle Observables ajoutés à l'appui de la pause lors de l'application va à fond.
Ensuite à la base de la solution est la
concatMap
opération qui émet de laObservables
de manière séquentielle (d'où concatMap, pas flatMap, voir à cette question: Quelle est la différence entre concatMap et flatMap dans RxJava). Il émet soitinterval
ounever
Observables
selon que les mises à jour devraient être poursuivis ou mis en pause. Ensuite, chaque émisObservable
esttakenUntil
'un en face de'Observable
émet de la nouvelle valeur.ConnectableObservable
est retourné parce que l'crééeObservable
est chaud, et que tous les abonnés de souscrire à elle avant qu'elle ne commence à émettre quelque chose, sinon événements initiaux pourraient être perdues. J'appelleconnect
sur elle plus tard.Je vais accepter ma ou une autre réponse basées sur les votes, le cas échéant.
Réponse courte. RxJava2:
Choisir initialDelay, unitAmount et TimeUnit en fonction de vos besoins.
Exemple -, 0, 1, TimeUnit.MINUTES.