diff options
Diffstat (limited to 'FireBase/Streaming/FirebaseSubscription.cs')
-rw-r--r-- | FireBase/Streaming/FirebaseSubscription.cs | 112 |
1 files changed, 54 insertions, 58 deletions
diff --git a/FireBase/Streaming/FirebaseSubscription.cs b/FireBase/Streaming/FirebaseSubscription.cs index 4b5e643..fb0f403 100644 --- a/FireBase/Streaming/FirebaseSubscription.cs +++ b/FireBase/Streaming/FirebaseSubscription.cs @@ -1,32 +1,28 @@ +using System; +using System.Linq; +using System.Net; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Threading; +using System.Threading.Tasks; +using Firebase.Database.Query; +using Newtonsoft.Json.Linq; + namespace Firebase.Database.Streaming { - using System; - using System.Diagnostics; - using System.Linq; - using System.Net.Http; - using System.Net.Http.Headers; - using System.Threading; - using System.Threading.Tasks; - - using Firebase.Database.Query; - - using Newtonsoft.Json.Linq; - using System.Net; - /// <summary> - /// The firebase subscription. + /// The firebase subscription. /// </summary> /// <typeparam name="T"> Type of object to be streaming back to the called. </typeparam> internal class FirebaseSubscription<T> : IDisposable { + private static readonly HttpClient http; + private readonly FirebaseCache<T> cache; private readonly CancellationTokenSource cancel; + private readonly FirebaseClient client; + private readonly string elementRoot; private readonly IObserver<FirebaseEvent<T>> observer; private readonly IFirebaseQuery query; - private readonly FirebaseCache<T> cache; - private readonly string elementRoot; - private readonly FirebaseClient client; - - private static HttpClient http; static FirebaseSubscription() { @@ -45,31 +41,32 @@ namespace Firebase.Database.Streaming } /// <summary> - /// Initializes a new instance of the <see cref="FirebaseSubscription{T}"/> class. + /// Initializes a new instance of the <see cref="FirebaseSubscription{T}" /> class. /// </summary> /// <param name="observer"> The observer. </param> /// <param name="query"> The query. </param> /// <param name="cache"> The cache. </param> - public FirebaseSubscription(IObserver<FirebaseEvent<T>> observer, IFirebaseQuery query, string elementRoot, FirebaseCache<T> cache) + public FirebaseSubscription(IObserver<FirebaseEvent<T>> observer, IFirebaseQuery query, string elementRoot, + FirebaseCache<T> cache) { this.observer = observer; this.query = query; this.elementRoot = elementRoot; - this.cancel = new CancellationTokenSource(); + cancel = new CancellationTokenSource(); this.cache = cache; - this.client = query.Client; + client = query.Client; } - public event EventHandler<ExceptionEventArgs<FirebaseException>> ExceptionThrown; - public void Dispose() { - this.cancel.Cancel(); + cancel.Cancel(); } + public event EventHandler<ExceptionEventArgs<FirebaseException>> ExceptionThrown; + public IDisposable Run() { - Task.Run(() => this.ReceiveThread()); + Task.Run(() => ReceiveThread()); return this; } @@ -84,15 +81,17 @@ namespace Firebase.Database.Streaming try { - this.cancel.Token.ThrowIfCancellationRequested(); + cancel.Token.ThrowIfCancellationRequested(); // initialize network connection - url = await this.query.BuildUrlAsync().ConfigureAwait(false); + url = await query.BuildUrlAsync().ConfigureAwait(false); var request = new HttpRequestMessage(HttpMethod.Get, url); var serverEvent = FirebaseServerEventType.KeepAlive; - var client = this.GetHttpClient(); - var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, this.cancel.Token).ConfigureAwait(false); + var client = GetHttpClient(); + var response = await client + .SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancel.Token) + .ConfigureAwait(false); statusCode = response.StatusCode; response.EnsureSuccessStatusCode(); @@ -102,32 +101,28 @@ namespace Firebase.Database.Streaming { while (true) { - this.cancel.Token.ThrowIfCancellationRequested(); + cancel.Token.ThrowIfCancellationRequested(); line = reader.ReadLine()?.Trim(); - if (string.IsNullOrWhiteSpace(line)) - { - continue; - } + if (string.IsNullOrWhiteSpace(line)) continue; + + var tuple = line.Split(new[] {':'}, 2, StringSplitOptions.RemoveEmptyEntries) + .Select(s => s.Trim()).ToArray(); - var tuple = line.Split(new[] { ':' }, 2, StringSplitOptions.RemoveEmptyEntries).Select(s => s.Trim()).ToArray(); - switch (tuple[0].ToLower()) { case "event": - serverEvent = this.ParseServerEvent(serverEvent, tuple[1]); + serverEvent = ParseServerEvent(serverEvent, tuple[1]); break; case "data": - this.ProcessServerData(url, serverEvent, tuple[1]); + ProcessServerData(url, serverEvent, tuple[1]); break; } if (serverEvent == FirebaseServerEventType.AuthRevoked) - { // auth token no longer valid, reconnect break; - } } } } @@ -137,13 +132,15 @@ namespace Firebase.Database.Streaming } catch (Exception ex) when (statusCode != HttpStatusCode.OK) { - this.observer.OnError(new FirebaseException(url, string.Empty, line, statusCode, ex)); - this.Dispose(); + observer.OnError(new FirebaseException(url, string.Empty, line, statusCode, ex)); + Dispose(); break; } catch (Exception ex) { - this.ExceptionThrown?.Invoke(this, new ExceptionEventArgs<FirebaseException>(new FirebaseException(url, string.Empty, line, statusCode, ex))); + ExceptionThrown?.Invoke(this, + new ExceptionEventArgs<FirebaseException>(new FirebaseException(url, string.Empty, line, + statusCode, ex))); await Task.Delay(2000).ConfigureAwait(false); } @@ -185,30 +182,29 @@ namespace Firebase.Database.Streaming var data = result["data"].ToString(); // If an elementRoot parameter is provided, but it's not in the cache, it was already deleted. So we can return an empty object. - if(string.IsNullOrWhiteSpace(this.elementRoot) || !this.cache.Contains(this.elementRoot)) - { - if(path == "/" && data == string.Empty) + if (string.IsNullOrWhiteSpace(elementRoot) || !cache.Contains(elementRoot)) + if (path == "/" && data == string.Empty) { - this.observer.OnNext(FirebaseEvent<T>.Empty(FirebaseEventSource.OnlineStream)); + observer.OnNext(FirebaseEvent<T>.Empty(FirebaseEventSource.OnlineStream)); return; } - } - var eventType = string.IsNullOrWhiteSpace(data) ? FirebaseEventType.Delete : FirebaseEventType.InsertOrUpdate; + var eventType = string.IsNullOrWhiteSpace(data) + ? FirebaseEventType.Delete + : FirebaseEventType.InsertOrUpdate; - var items = this.cache.PushData(this.elementRoot + path, data); + var items = cache.PushData(elementRoot + path, data); foreach (var i in items.ToList()) - { - this.observer.OnNext(new FirebaseEvent<T>(i.Key, i.Object, eventType, FirebaseEventSource.OnlineStream)); - } + observer.OnNext(new FirebaseEvent<T>(i.Key, i.Object, eventType, + FirebaseEventSource.OnlineStream)); break; case FirebaseServerEventType.KeepAlive: break; case FirebaseServerEventType.Cancel: - this.observer.OnError(new FirebaseException(url, string.Empty, serverData, HttpStatusCode.Unauthorized)); - this.Dispose(); + observer.OnError(new FirebaseException(url, string.Empty, serverData, HttpStatusCode.Unauthorized)); + Dispose(); break; } } @@ -218,4 +214,4 @@ namespace Firebase.Database.Streaming return http; } } -} +}
\ No newline at end of file |