using System; using System.Reactive.Linq; namespace Firebase.Database.Extensions { public static class ObservableExtensions { /// /// Returns a cold observable which retries (re-subscribes to) the source observable on error until it successfully /// terminates. /// /// The source observable. /// How long to wait between attempts. /// A predicate determining for which exceptions to retry. Defaults to all /// /// A cold observable which retries (re-subscribes to) the source observable on error up to the /// specified number of times or until it successfully terminates. /// public static IObservable RetryAfterDelay( this IObservable source, TimeSpan dueTime, Func retryOnError) where TException : Exception { var attempt = 0; return Observable.Defer(() => { return (++attempt == 1 ? source : source.DelaySubscription(dueTime)) .Select(item => new Tuple(true, item, null)) .Catch, TException>(e => retryOnError(e) ? Observable.Throw>(e) : Observable.Return(new Tuple(false, default(T), e))); }) .Retry() .SelectMany(t => t.Item1 ? Observable.Return(t.Item2) : Observable.Throw(t.Item3)); } } }