From e6181c24124d97f2fbc932b8a68311e625463156 Mon Sep 17 00:00:00 2001 From: uzvkl Date: Tue, 11 Jun 2019 23:05:52 +0200 Subject: Move dsa related stuff to subfolder --- dsa/FireBase/Streaming/FirebaseSubscription.cs | 217 +++++++++++++++++++++++++ 1 file changed, 217 insertions(+) create mode 100644 dsa/FireBase/Streaming/FirebaseSubscription.cs (limited to 'dsa/FireBase/Streaming/FirebaseSubscription.cs') diff --git a/dsa/FireBase/Streaming/FirebaseSubscription.cs b/dsa/FireBase/Streaming/FirebaseSubscription.cs new file mode 100644 index 0000000..fb0f403 --- /dev/null +++ b/dsa/FireBase/Streaming/FirebaseSubscription.cs @@ -0,0 +1,217 @@ +using System; +using System.Linq; +using System.Net; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Threading; +using System.Threading.Tasks; +using Firebase.Database.Query; +using Newtonsoft.Json.Linq; + +namespace Firebase.Database.Streaming +{ + /// + /// The firebase subscription. + /// + /// Type of object to be streaming back to the called. + internal class FirebaseSubscription : IDisposable + { + private static readonly HttpClient http; + private readonly FirebaseCache cache; + private readonly CancellationTokenSource cancel; + private readonly FirebaseClient client; + private readonly string elementRoot; + private readonly IObserver> observer; + private readonly IFirebaseQuery query; + + static FirebaseSubscription() + { + var handler = new HttpClientHandler + { + AllowAutoRedirect = true, + MaxAutomaticRedirections = 10, + CookieContainer = new CookieContainer() + }; + + var httpClient = new HttpClient(handler, true); + + httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("text/event-stream")); + + http = httpClient; + } + + /// + /// Initializes a new instance of the class. + /// + /// The observer. + /// The query. + /// The cache. + public FirebaseSubscription(IObserver> observer, IFirebaseQuery query, string elementRoot, + FirebaseCache cache) + { + this.observer = observer; + this.query = query; + this.elementRoot = elementRoot; + cancel = new CancellationTokenSource(); + this.cache = cache; + client = query.Client; + } + + public void Dispose() + { + cancel.Cancel(); + } + + public event EventHandler> ExceptionThrown; + + public IDisposable Run() + { + Task.Run(() => ReceiveThread()); + + return this; + } + + private async void ReceiveThread() + { + while (true) + { + var url = string.Empty; + var line = string.Empty; + var statusCode = HttpStatusCode.OK; + + try + { + cancel.Token.ThrowIfCancellationRequested(); + + // initialize network connection + url = await query.BuildUrlAsync().ConfigureAwait(false); + var request = new HttpRequestMessage(HttpMethod.Get, url); + var serverEvent = FirebaseServerEventType.KeepAlive; + + var client = GetHttpClient(); + var response = await client + .SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancel.Token) + .ConfigureAwait(false); + + statusCode = response.StatusCode; + response.EnsureSuccessStatusCode(); + + using (var stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false)) + using (var reader = this.client.Options.SubscriptionStreamReaderFactory(stream)) + { + while (true) + { + cancel.Token.ThrowIfCancellationRequested(); + + line = reader.ReadLine()?.Trim(); + + if (string.IsNullOrWhiteSpace(line)) continue; + + var tuple = line.Split(new[] {':'}, 2, StringSplitOptions.RemoveEmptyEntries) + .Select(s => s.Trim()).ToArray(); + + switch (tuple[0].ToLower()) + { + case "event": + serverEvent = ParseServerEvent(serverEvent, tuple[1]); + break; + case "data": + ProcessServerData(url, serverEvent, tuple[1]); + break; + } + + if (serverEvent == FirebaseServerEventType.AuthRevoked) + // auth token no longer valid, reconnect + break; + } + } + } + catch (OperationCanceledException) + { + break; + } + catch (Exception ex) when (statusCode != HttpStatusCode.OK) + { + observer.OnError(new FirebaseException(url, string.Empty, line, statusCode, ex)); + Dispose(); + break; + } + catch (Exception ex) + { + ExceptionThrown?.Invoke(this, + new ExceptionEventArgs(new FirebaseException(url, string.Empty, line, + statusCode, ex))); + + await Task.Delay(2000).ConfigureAwait(false); + } + } + } + + private FirebaseServerEventType ParseServerEvent(FirebaseServerEventType serverEvent, string eventName) + { + switch (eventName) + { + case "put": + serverEvent = FirebaseServerEventType.Put; + break; + case "patch": + serverEvent = FirebaseServerEventType.Patch; + break; + case "keep-alive": + serverEvent = FirebaseServerEventType.KeepAlive; + break; + case "cancel": + serverEvent = FirebaseServerEventType.Cancel; + break; + case "auth_revoked": + serverEvent = FirebaseServerEventType.AuthRevoked; + break; + } + + return serverEvent; + } + + private void ProcessServerData(string url, FirebaseServerEventType serverEvent, string serverData) + { + switch (serverEvent) + { + case FirebaseServerEventType.Put: + case FirebaseServerEventType.Patch: + var result = JObject.Parse(serverData); + var path = result["path"].ToString(); + var data = result["data"].ToString(); + + // If an elementRoot parameter is provided, but it's not in the cache, it was already deleted. So we can return an empty object. + if (string.IsNullOrWhiteSpace(elementRoot) || !cache.Contains(elementRoot)) + if (path == "/" && data == string.Empty) + { + observer.OnNext(FirebaseEvent.Empty(FirebaseEventSource.OnlineStream)); + return; + } + + var eventType = string.IsNullOrWhiteSpace(data) + ? FirebaseEventType.Delete + : FirebaseEventType.InsertOrUpdate; + + var items = cache.PushData(elementRoot + path, data); + + foreach (var i in items.ToList()) + observer.OnNext(new FirebaseEvent(i.Key, i.Object, eventType, + FirebaseEventSource.OnlineStream)); + + break; + case FirebaseServerEventType.KeepAlive: + break; + case FirebaseServerEventType.Cancel: + observer.OnError(new FirebaseException(url, string.Empty, serverData, HttpStatusCode.Unauthorized)); + Dispose(); + break; + } + } + + private HttpClient GetHttpClient() + { + return http; + } + } +} \ No newline at end of file -- cgit v1.2.3-54-g00ecf