Rx Valeurs d'émission observables périodiquement

J'ai à interroger certains Reposante extrémité régulièrement pour actualiser mon application android de données. J'ai aussi pour mettre en pause et reprendre la lecture basé sur la connectivité (si le téléphone est en mode hors connexion, il n'y a pas besoin de l'essayer, même). Ma solution actuelle est de travailler, mais il utilise le standard de Java ScheduledExecutorService pour effectuer des tâches périodiques, mais je tiens à rester en Rx paradigme.

Voici mon code actuel, dont les parties sont ignorés pour des raisons de concision.

userProfileObservable = Observable.create(new Observable.OnSubscribe<UserProfile>() {
    @Override
    public void call(final Subscriber<? super UserProfile> subscriber) {
        final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        final Runnable runnable = new Runnable() {
            @Override
            public void run() {
                //making http request here
            }
        };
        final List<ScheduledFuture<?>> futures = new ArrayList<ScheduledFuture<?>>(1);
        networkStatusObservable.subscribe(new Action1<Boolean>() {
            @Override
            public void call(Boolean networkAvailable) {
                if (!networkAvailable) {
                    pause();
                } else {
                    pause();                        
                    futures.add(scheduledExecutorService.scheduleWithFixedDelay(runnable, 0, SECOND_IN_MILLIS * SECONDS_TO_EXPIRE, TimeUnit.MILLISECONDS));
                }
            }

            private void pause() {
                for (ScheduledFuture<?> future : futures) {
                    future.cancel(true);
                }
                futures.clear();
            }
        });

        final Subscription subscription = new Subscription() {
            private boolean isUnsubscribed = false;

            @Override
            public void unsubscribe() {
                scheduledExecutorService.shutdownNow();
                isUnsubscribed = true;
            }

            @Override
            public boolean isUnsubscribed() {
                return isUnsubscribed;
            }
        };
        subscriber.add(subscription);
    }
}).multicast(BehaviorSubject.create()).refCount();

networkStatusObservable est fondamentalement un récepteur de radiodiffusion enveloppé dans Observable<Boolean>qui indique que le téléphone est connecté au réseau.

Comme je l'ai dit, cette solution fonctionne, mais je veux utiliser Rx approche périodique de l'interrogation et l'émission de nouvelles UserProfiles, parce qu'il y a de nombreux problèmes avec la planification choses manuellement, que je veux éviter. Je sais que sur Observable.timer et Observable.intervalmais ne peut pas comprendre comment l'appliquer à cette tâche (et je ne suis pas sûr si j'ai besoin d'utiliser celles à tous).

source d'informationauteur Haspemulator