1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
using System;
using System.Reactive.Linq;
namespace Firebase.Database.Extensions
{
public static class ObservableExtensions
{
/// <summary>
/// Returns a cold observable which retries (re-subscribes to) the source observable on error until it successfully
/// terminates.
/// </summary>
/// <param name="source">The source observable.</param>
/// <param name="dueTime">How long to wait between attempts.</param>
/// <param name="retryOnError">A predicate determining for which exceptions to retry. Defaults to all</param>
/// <returns>
/// A cold observable which retries (re-subscribes to) the source observable on error up to the
/// specified number of times or until it successfully terminates.
/// </returns>
public static IObservable<T> RetryAfterDelay<T, TException>(
this IObservable<T> source,
TimeSpan dueTime,
Func<TException, bool> retryOnError)
where TException : Exception
{
var attempt = 0;
return Observable.Defer(() =>
{
return (++attempt == 1 ? source : source.DelaySubscription(dueTime))
.Select(item => new Tuple<bool, T, Exception>(true, item, null))
.Catch<Tuple<bool, T, Exception>, TException>(e => retryOnError(e)
? Observable.Throw<Tuple<bool, T, Exception>>(e)
: Observable.Return(new Tuple<bool, T, Exception>(false, default(T), e)));
})
.Retry()
.SelectMany(t => t.Item1
? Observable.Return(t.Item2)
: Observable.Throw<T>(t.Item3));
}
}
}
|