From f89f308c525e9deebc6d2cf6416e27dfe1a299dc Mon Sep 17 00:00:00 2001 From: Dennis Kobert Date: Sun, 19 May 2019 16:03:38 +0200 Subject: Cleanup DiscoBot Project --- FireBase/Streaming/FirebaseSubscription.cs | 76 +++++++++++++++--------------- 1 file changed, 37 insertions(+), 39 deletions(-) (limited to 'FireBase/Streaming/FirebaseSubscription.cs') diff --git a/FireBase/Streaming/FirebaseSubscription.cs b/FireBase/Streaming/FirebaseSubscription.cs index 4b5e643..acdc76c 100644 --- a/FireBase/Streaming/FirebaseSubscription.cs +++ b/FireBase/Streaming/FirebaseSubscription.cs @@ -7,9 +7,7 @@ namespace Firebase.Database.Streaming using System.Net.Http.Headers; using System.Threading; using System.Threading.Tasks; - - using Firebase.Database.Query; - + using Query; using Newtonsoft.Json.Linq; using System.Net; @@ -50,26 +48,27 @@ namespace Firebase.Database.Streaming /// The observer. /// The query. /// The cache. - public FirebaseSubscription(IObserver> observer, IFirebaseQuery query, string elementRoot, FirebaseCache cache) + public FirebaseSubscription(IObserver> observer, IFirebaseQuery query, string elementRoot, + FirebaseCache cache) { this.observer = observer; this.query = query; this.elementRoot = elementRoot; - this.cancel = new CancellationTokenSource(); + cancel = new CancellationTokenSource(); this.cache = cache; - this.client = query.Client; + client = query.Client; } public event EventHandler> ExceptionThrown; public void Dispose() { - this.cancel.Cancel(); + cancel.Cancel(); } public IDisposable Run() { - Task.Run(() => this.ReceiveThread()); + Task.Run(() => ReceiveThread()); return this; } @@ -84,15 +83,17 @@ namespace Firebase.Database.Streaming try { - this.cancel.Token.ThrowIfCancellationRequested(); + cancel.Token.ThrowIfCancellationRequested(); // initialize network connection - url = await this.query.BuildUrlAsync().ConfigureAwait(false); + url = await query.BuildUrlAsync().ConfigureAwait(false); var request = new HttpRequestMessage(HttpMethod.Get, url); var serverEvent = FirebaseServerEventType.KeepAlive; - var client = this.GetHttpClient(); - var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, this.cancel.Token).ConfigureAwait(false); + var client = GetHttpClient(); + var response = await client + .SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancel.Token) + .ConfigureAwait(false); statusCode = response.StatusCode; response.EnsureSuccessStatusCode(); @@ -102,32 +103,28 @@ namespace Firebase.Database.Streaming { while (true) { - this.cancel.Token.ThrowIfCancellationRequested(); + cancel.Token.ThrowIfCancellationRequested(); line = reader.ReadLine()?.Trim(); - if (string.IsNullOrWhiteSpace(line)) - { - continue; - } + if (string.IsNullOrWhiteSpace(line)) continue; + + var tuple = line.Split(new[] {':'}, 2, StringSplitOptions.RemoveEmptyEntries) + .Select(s => s.Trim()).ToArray(); - var tuple = line.Split(new[] { ':' }, 2, StringSplitOptions.RemoveEmptyEntries).Select(s => s.Trim()).ToArray(); - switch (tuple[0].ToLower()) { case "event": - serverEvent = this.ParseServerEvent(serverEvent, tuple[1]); + serverEvent = ParseServerEvent(serverEvent, tuple[1]); break; case "data": - this.ProcessServerData(url, serverEvent, tuple[1]); + ProcessServerData(url, serverEvent, tuple[1]); break; } if (serverEvent == FirebaseServerEventType.AuthRevoked) - { // auth token no longer valid, reconnect break; - } } } } @@ -137,13 +134,15 @@ namespace Firebase.Database.Streaming } catch (Exception ex) when (statusCode != HttpStatusCode.OK) { - this.observer.OnError(new FirebaseException(url, string.Empty, line, statusCode, ex)); - this.Dispose(); + observer.OnError(new FirebaseException(url, string.Empty, line, statusCode, ex)); + Dispose(); break; } catch (Exception ex) { - this.ExceptionThrown?.Invoke(this, new ExceptionEventArgs(new FirebaseException(url, string.Empty, line, statusCode, ex))); + ExceptionThrown?.Invoke(this, + new ExceptionEventArgs(new FirebaseException(url, string.Empty, line, + statusCode, ex))); await Task.Delay(2000).ConfigureAwait(false); } @@ -185,30 +184,29 @@ namespace Firebase.Database.Streaming var data = result["data"].ToString(); // If an elementRoot parameter is provided, but it's not in the cache, it was already deleted. So we can return an empty object. - if(string.IsNullOrWhiteSpace(this.elementRoot) || !this.cache.Contains(this.elementRoot)) - { - if(path == "/" && data == string.Empty) + if (string.IsNullOrWhiteSpace(elementRoot) || !cache.Contains(elementRoot)) + if (path == "/" && data == string.Empty) { - this.observer.OnNext(FirebaseEvent.Empty(FirebaseEventSource.OnlineStream)); + observer.OnNext(FirebaseEvent.Empty(FirebaseEventSource.OnlineStream)); return; } - } - var eventType = string.IsNullOrWhiteSpace(data) ? FirebaseEventType.Delete : FirebaseEventType.InsertOrUpdate; + var eventType = string.IsNullOrWhiteSpace(data) + ? FirebaseEventType.Delete + : FirebaseEventType.InsertOrUpdate; - var items = this.cache.PushData(this.elementRoot + path, data); + var items = cache.PushData(elementRoot + path, data); foreach (var i in items.ToList()) - { - this.observer.OnNext(new FirebaseEvent(i.Key, i.Object, eventType, FirebaseEventSource.OnlineStream)); - } + observer.OnNext(new FirebaseEvent(i.Key, i.Object, eventType, + FirebaseEventSource.OnlineStream)); break; case FirebaseServerEventType.KeepAlive: break; case FirebaseServerEventType.Cancel: - this.observer.OnError(new FirebaseException(url, string.Empty, serverData, HttpStatusCode.Unauthorized)); - this.Dispose(); + observer.OnError(new FirebaseException(url, string.Empty, serverData, HttpStatusCode.Unauthorized)); + Dispose(); break; } } @@ -218,4 +216,4 @@ namespace Firebase.Database.Streaming return http; } } -} +} \ No newline at end of file -- cgit v1.2.3-54-g00ecf