diff options
Diffstat (limited to 'FireBase/Streaming/FirebaseSubscription.cs')
-rw-r--r-- | FireBase/Streaming/FirebaseSubscription.cs | 217 |
1 files changed, 0 insertions, 217 deletions
diff --git a/FireBase/Streaming/FirebaseSubscription.cs b/FireBase/Streaming/FirebaseSubscription.cs deleted file mode 100644 index fb0f403..0000000 --- a/FireBase/Streaming/FirebaseSubscription.cs +++ /dev/null @@ -1,217 +0,0 @@ -using System; -using System.Linq; -using System.Net; -using System.Net.Http; -using System.Net.Http.Headers; -using System.Threading; -using System.Threading.Tasks; -using Firebase.Database.Query; -using Newtonsoft.Json.Linq; - -namespace Firebase.Database.Streaming -{ - /// <summary> - /// The firebase subscription. - /// </summary> - /// <typeparam name="T"> Type of object to be streaming back to the called. </typeparam> - internal class FirebaseSubscription<T> : IDisposable - { - private static readonly HttpClient http; - private readonly FirebaseCache<T> cache; - private readonly CancellationTokenSource cancel; - private readonly FirebaseClient client; - private readonly string elementRoot; - private readonly IObserver<FirebaseEvent<T>> observer; - private readonly IFirebaseQuery query; - - static FirebaseSubscription() - { - var handler = new HttpClientHandler - { - AllowAutoRedirect = true, - MaxAutomaticRedirections = 10, - CookieContainer = new CookieContainer() - }; - - var httpClient = new HttpClient(handler, true); - - httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("text/event-stream")); - - http = httpClient; - } - - /// <summary> - /// Initializes a new instance of the <see cref="FirebaseSubscription{T}" /> class. - /// </summary> - /// <param name="observer"> The observer. </param> - /// <param name="query"> The query. </param> - /// <param name="cache"> The cache. </param> - public FirebaseSubscription(IObserver<FirebaseEvent<T>> observer, IFirebaseQuery query, string elementRoot, - FirebaseCache<T> cache) - { - this.observer = observer; - this.query = query; - this.elementRoot = elementRoot; - cancel = new CancellationTokenSource(); - this.cache = cache; - client = query.Client; - } - - public void Dispose() - { - cancel.Cancel(); - } - - public event EventHandler<ExceptionEventArgs<FirebaseException>> ExceptionThrown; - - public IDisposable Run() - { - Task.Run(() => ReceiveThread()); - - return this; - } - - private async void ReceiveThread() - { - while (true) - { - var url = string.Empty; - var line = string.Empty; - var statusCode = HttpStatusCode.OK; - - try - { - cancel.Token.ThrowIfCancellationRequested(); - - // initialize network connection - url = await query.BuildUrlAsync().ConfigureAwait(false); - var request = new HttpRequestMessage(HttpMethod.Get, url); - var serverEvent = FirebaseServerEventType.KeepAlive; - - var client = GetHttpClient(); - var response = await client - .SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancel.Token) - .ConfigureAwait(false); - - statusCode = response.StatusCode; - response.EnsureSuccessStatusCode(); - - using (var stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false)) - using (var reader = this.client.Options.SubscriptionStreamReaderFactory(stream)) - { - while (true) - { - cancel.Token.ThrowIfCancellationRequested(); - - line = reader.ReadLine()?.Trim(); - - if (string.IsNullOrWhiteSpace(line)) continue; - - var tuple = line.Split(new[] {':'}, 2, StringSplitOptions.RemoveEmptyEntries) - .Select(s => s.Trim()).ToArray(); - - switch (tuple[0].ToLower()) - { - case "event": - serverEvent = ParseServerEvent(serverEvent, tuple[1]); - break; - case "data": - ProcessServerData(url, serverEvent, tuple[1]); - break; - } - - if (serverEvent == FirebaseServerEventType.AuthRevoked) - // auth token no longer valid, reconnect - break; - } - } - } - catch (OperationCanceledException) - { - break; - } - catch (Exception ex) when (statusCode != HttpStatusCode.OK) - { - observer.OnError(new FirebaseException(url, string.Empty, line, statusCode, ex)); - Dispose(); - break; - } - catch (Exception ex) - { - ExceptionThrown?.Invoke(this, - new ExceptionEventArgs<FirebaseException>(new FirebaseException(url, string.Empty, line, - statusCode, ex))); - - await Task.Delay(2000).ConfigureAwait(false); - } - } - } - - private FirebaseServerEventType ParseServerEvent(FirebaseServerEventType serverEvent, string eventName) - { - switch (eventName) - { - case "put": - serverEvent = FirebaseServerEventType.Put; - break; - case "patch": - serverEvent = FirebaseServerEventType.Patch; - break; - case "keep-alive": - serverEvent = FirebaseServerEventType.KeepAlive; - break; - case "cancel": - serverEvent = FirebaseServerEventType.Cancel; - break; - case "auth_revoked": - serverEvent = FirebaseServerEventType.AuthRevoked; - break; - } - - return serverEvent; - } - - private void ProcessServerData(string url, FirebaseServerEventType serverEvent, string serverData) - { - switch (serverEvent) - { - case FirebaseServerEventType.Put: - case FirebaseServerEventType.Patch: - var result = JObject.Parse(serverData); - var path = result["path"].ToString(); - var data = result["data"].ToString(); - - // If an elementRoot parameter is provided, but it's not in the cache, it was already deleted. So we can return an empty object. - if (string.IsNullOrWhiteSpace(elementRoot) || !cache.Contains(elementRoot)) - if (path == "/" && data == string.Empty) - { - observer.OnNext(FirebaseEvent<T>.Empty(FirebaseEventSource.OnlineStream)); - return; - } - - var eventType = string.IsNullOrWhiteSpace(data) - ? FirebaseEventType.Delete - : FirebaseEventType.InsertOrUpdate; - - var items = cache.PushData(elementRoot + path, data); - - foreach (var i in items.ToList()) - observer.OnNext(new FirebaseEvent<T>(i.Key, i.Object, eventType, - FirebaseEventSource.OnlineStream)); - - break; - case FirebaseServerEventType.KeepAlive: - break; - case FirebaseServerEventType.Cancel: - observer.OnError(new FirebaseException(url, string.Empty, serverData, HttpStatusCode.Unauthorized)); - Dispose(); - break; - } - } - - private HttpClient GetHttpClient() - { - return http; - } - } -}
\ No newline at end of file |