namespace Firebase.Database.Extensions
{
using System;
using System.Reactive.Linq;
public static class ObservableExtensions
{
///
/// Returns a cold observable which retries (re-subscribes to) the source observable on error until it successfully terminates.
///
/// The source observable.
/// How long to wait between attempts.
/// A predicate determining for which exceptions to retry. Defaults to all
///
/// A cold observable which retries (re-subscribes to) the source observable on error up to the
/// specified number of times or until it successfully terminates.
///
public static IObservable RetryAfterDelay(
this IObservable source,
TimeSpan dueTime,
Func retryOnError)
where TException: Exception
{
int attempt = 0;
return Observable.Defer(() =>
{
return ((++attempt == 1) ? source : source.DelaySubscription(dueTime))
.Select(item => new Tuple(true, item, null))
.Catch, TException>(e => retryOnError(e)
? Observable.Throw>(e)
: Observable.Return(new Tuple(false, default(T), e)));
})
.Retry()
.SelectMany(t => t.Item1
? Observable.Return(t.Item2)
: Observable.Throw(t.Item3));
}
}
}