Выросший TestScheduler RxJava для блокирующего вызова к задержке ()

голоса
0

Использование RxJava2 У меня есть класс , который соединяет реактивный неактивный мир. В этом классе есть переопределяется метод, который возвращает некоторое значение после потенциально долгой обработки . Эта обработка внедренный в реактивном мире с использованием Single.create(emitter -> ...).

Время обработки имеет решающее значение для меня, так как есть разница в отношении бизнес-логики, появляется ли второй абонент в то время как обработка работает или после его завершения (или отмены).

Теперь я в тестировании этого класса - поэтому мне нужно много времени обработки , чтобы быть только виртуальным . RxJava - й TestSchedulerна помощь!

Чтобы создать тест-реализацию для этого шунтирующего базового класса, который эмулирует задержку, мне нужна блокировка (!) Слово TestScheduler, где опережение во время запускаются в отдельном потоке. (В противном случае опережения во времени никогда не будет срабатывать.)

Моя проблема в том, что мне нужно опережение во времени , чтобы быть отложено по крайней мере до тех пор , «дорогой вычисления» не блокирует - в противном случае время получить изменилось , прежде чем задержка-оператор получает активный и поэтому она ждет , пока другой опережения , который никогда не будет.

Я могу решить эту проблему путем вызова Thread.sleep(100)перед вызовом заранее во время - но это кажется немного некрасиво мне ... Как долго ждать? Для нескольких тестов, что время добавляет, но , не из - за временные проблемы ... тьфу.

Любая идея , как проверить эту ситуацию в надеюсь , более чистым способом? Спасибо!

import io.reactivex.Completable;
import io.reactivex.Single;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.schedulers.TestScheduler;

import java.util.concurrent.TimeUnit;

public class TestSchedulerBlocking {

    public static void main(String[] args) throws Exception {

        TestScheduler testScheduler = new TestScheduler();

        // the task - in reality, this is a task with a overridden method returning some value, where I need to delay the return in a test-implementation.
        Single<String> single = Single.create(emitter -> {

            System.out.println(emitting on  + Thread.currentThread() +  ...);

            // This is the core of my problem: I need some way to test a (synchronous) delay before the next line executes
            // (e.g. method returns, or, in another case, an exception is thrown).
            // Thread.sleep(<delay>) would be straight forward, but I want to use TestScheduler's magic time-advancing in my tests...
            // Using the usual non-blocking methods of RX, everything works fine for testing using the testScheduler, but here it doesn't help.
            // Here I need to synchronize it somehow, that the advancing in time is done *after* this call is blocking.
            // I tried a CountDownLatch in doOnSubscribe, but apparently, that's executed too early for me - I'd need doAfterSubscribe or so...
            // Final word on architecture: I'm dealing with the bridging of the reactive to the non-reactive world, so I can't just change the architecture.
            Completable.complete()
                       .delay(3, TimeUnit.SECONDS, testScheduler)
                       .doOnSubscribe(__ -> System.out.println(onSubcribe: completable))
                       .blockingAwait();

            System.out.println(<< blocking released >>);

            // this has to be delayed! Might be, for example, a return or an exception in real world.
            emitter.onSuccess(FooBar!);

        });


        System.out.println(running the task ...);
        TestObserver<String> testObserver = single.doOnSubscribe(__ -> System.out.println(onSubcribe: single))
                                                  .subscribeOn(Schedulers.newThread())
                                                  .test();

        // Without this sleep, the advancing in testScheduler's time is done *before* the doIt() is executed,
        // and therefore the blockingAwait() blocks until infinity...
        System.out.println(---SLEEPING---);
        Thread.sleep(100);

        // virtually advance the blocking delay 
        System.out.println(---advancing time---);
        testScheduler.advanceTimeBy(3, TimeUnit.SECONDS);

        // wait for the task to complete
        System.out.println(await...);
        testObserver.await();

        // assertions on task results, etc.
        System.out.println(finished. now do some testing... values (expected [FooBar!]):  + testObserver.values());

    }

}

[Примечание: Я задал этот вопрос в более сложной и многословной, как вчера, - но, как это было слишком жарко, в моем кабинете, и он имел ряд недостатков и ошибок. Поэтому я удалил его, и теперь он более конденсируется к реальной проблеме.]

Задан 19/09/2018 в 13:31
источник пользователем
На других языках...                            


1 ответов

голоса
0

Если я правильно понимаю вашу проблему правильно, у вас есть затянувшееся обслуживание и вы хотите, чтобы иметь возможность проверить подписки на услуги, которые будут затронуты вашей длительной службой. Я подошел к этому следующим образом.

Определите услугу , как у вас есть, используя TestSchedulerдля контроля его продвижения во время. Затем определяют тестовые стимулы , используя один и тот же тест планировщика.

Это эмулирует блокирующий объект. Не используйте «блок».

Single<String> single = Observable.timer( 3, TimeUnit.SECONDS, scheduler )
                          .doOnNext( _l -> {} )
                          .map( _l -> "FooBar!" )
                          .take( 1 )
                          .toSingle();

// set up asynchronous operations
Observable.timer( 1_500, TimeUnit.MILLISECONDS, scheduler )
  .subscribe( _l -> performTestAction1() );
Observable.timer( 1_900, TimeUnit.MILLISECONDS, scheduler )
  .subscribe( _l -> performTestAction2() );
Observable.timer( 3_100, TimeUnit.MILLISECONDS, scheduler )
  .subscribe( _l -> performTestAction3() );
System.out.println("running the task ...");
TestObserver<String> testObserver = single.doOnSubscribe(__ -> System.out.println("onSubcribe: single"))
                                            .subscribeOn(Schedulers.newThread())
                                            .test();

// virtually advance the blocking delay 
System.out.println("---advancing time---");
testScheduler.advanceTimeBy(4, TimeUnit.SECONDS);

Вы должны видеть , что performTestAction1()происходит, то performTestAction2()тогда на singleотделку, затем performTestAction3(). Все , что движет тест планировщика.

Ответил 20/09/2018 d 15:00
источник пользователем

Cookies help us deliver our services. By using our services, you agree to our use of cookies. Learn more