summaryrefslogtreecommitdiff
path: root/dsa/FireBase/Extensions/ObservableExtensions.cs
blob: 1dae7e9f57b2509cd986e51692a94145e5f8712e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
using System;
using System.Reactive.Linq;

namespace Firebase.Database.Extensions {
    public static class ObservableExtensions {
        /// <summary>
        ///     Returns a cold observable which retries (re-subscribes to) the source observable on error until it successfully
        ///     terminates.
        /// </summary>
        /// <param name="source">The source observable.</param>
        /// <param name="dueTime">How long to wait between attempts.</param>
        /// <param name="retryOnError">A predicate determining for which exceptions to retry. Defaults to all</param>
        /// <returns>
        ///     A cold observable which retries (re-subscribes to) the source observable on error up to the
        ///     specified number of times or until it successfully terminates.
        /// </returns>
        public static IObservable<T> RetryAfterDelay<T, TException>(
            this IObservable<T> source,
            TimeSpan dueTime,
            Func<TException, bool> retryOnError)
            where TException : Exception {
            var attempt = 0;

            return Observable.Defer(() => {
                    return (++attempt == 1 ? source : source.DelaySubscription(dueTime))
                        .Select(item => new Tuple<bool, T, Exception>(true, item, null))
                        .Catch<Tuple<bool, T, Exception>, TException>(e => retryOnError(e)
                            ? Observable.Throw<Tuple<bool, T, Exception>>(e)
                            : Observable.Return(new Tuple<bool, T, Exception>(false, default(T), e)));
                })
                .Retry()
                .SelectMany(t => t.Item1
                    ? Observable.Return(t.Item2)
                    : Observable.Throw<T>(t.Item3));
        }
    }
}