17.04.2018 К знатокам Reactive Extensions
 
Привет всем!

Есть асинхронная задача, предназначенная для опроса внешнего ресурса и возвращающая от него результат.
Хотелось бы завернуть последовательность результатов в Observable. Но при этом добиться такого эффекта, чтобы
1. Опрос происходил по таймеру через равные промежутки времени
2. Если Task падает с исключением, выждать некоторое время, по прошествии которого весь процесс опросов перезапускается

Пока что делаю так:
    private static readonly Random random__ = new Random( 123 );
    private static async Task<int> ArbeitAsync()
    {
      await Task.Delay( 100 ); // Jesus is coming, look busy
      const int size = 5;
      var result = random__.Next( size );
      if ( result % size == 0 )
      {
        throw new Exception( "Boom!" );
      }
      return result;
    }

    private static void Main( string[] args )
    {
        const int interval = 1000;

        var sequence = Observable
          .Timer( TimeSpan.Zero, TimeSpan.FromMilliseconds( interval ) ) // Первый опрос идет сразу, дальше через паузу
          .Select( tick => Observable.FromAsync( async () => await ArbeitAsync() ) )
          .Concat()
          .Catch<int, Exception>(
            exception =>
            {
              Console.WriteLine( "{0:hh:mm:ss.fff} Error: {1}", DateTime.Now, exception.Message );
              return Observable
                .Timer( TimeSpan.FromMilliseconds( 3 * interval ), TimeSpan.FromMilliseconds( interval ) ) // Перед первым опросом после восстановления выжидаем подольше, дальше через обычную паузу
                .Select( tick => Observable.FromAsync( async () => await ArbeitAsync() ) )
                .Concat();
            } );
          

        sequence.Subscribe( number =>
                            {
                              Console.WriteLine( "{0:hh:mm:ss.fff} {1}", DateTime.Now, number );
                            },
                            error =>
                            {
                              Console.WriteLine( "{0:hh:mm:ss.fff} Error leaked to subscription: {1}", DateTime.Now, error.Message );
                            } );

        Console.ReadKey();
    }


Но результат получается неожиданный, показан ниже.
Управление попадает в Catch, но ошибка все равно просачивается в подписку, то есть Catch не восстанавливает обычную работу последовательности. Как так?

01:30:31.021 4
01:30:32.018 4
01:30:33.030 3
01:30:34.020 4
01:30:35.052 3
01:30:36.060 Error: Boom!
01:30:37.183 Error leaked to subscription: Boom!

Может кто подскажет где я напортачил?

 
 
 
 
10.12  .NET Reactor
15.11  n
15.11  C# ClickOnce
 
17.04  Wpf binding