summaryrefslogtreecommitdiff
path: root/dsa/FireBase/Streaming/FirebaseSubscription.cs
diff options
context:
space:
mode:
authoruzvkl <dennis.kobert@student.kit.edu>2019-06-11 23:05:52 +0200
committeruzvkl <dennis.kobert@student.kit.edu>2019-06-11 23:05:52 +0200
commite6181c24124d97f2fbc932b8a68311e625463156 (patch)
treec1f097c344ca266b7941c9668590b0fd35c7870a /dsa/FireBase/Streaming/FirebaseSubscription.cs
parent2490ad5d31fe2ac778ff9303776f0e91f47a2862 (diff)
Move dsa related stuff to subfolder
Diffstat (limited to 'dsa/FireBase/Streaming/FirebaseSubscription.cs')
-rw-r--r--dsa/FireBase/Streaming/FirebaseSubscription.cs217
1 files changed, 217 insertions, 0 deletions
diff --git a/dsa/FireBase/Streaming/FirebaseSubscription.cs b/dsa/FireBase/Streaming/FirebaseSubscription.cs
new file mode 100644
index 0000000..fb0f403
--- /dev/null
+++ b/dsa/FireBase/Streaming/FirebaseSubscription.cs
@@ -0,0 +1,217 @@
+using System;
+using System.Linq;
+using System.Net;
+using System.Net.Http;
+using System.Net.Http.Headers;
+using System.Threading;
+using System.Threading.Tasks;
+using Firebase.Database.Query;
+using Newtonsoft.Json.Linq;
+
+namespace Firebase.Database.Streaming
+{
+ /// <summary>
+ /// The firebase subscription.
+ /// </summary>
+ /// <typeparam name="T"> Type of object to be streaming back to the called. </typeparam>
+ internal class FirebaseSubscription<T> : IDisposable
+ {
+ private static readonly HttpClient http;
+ private readonly FirebaseCache<T> cache;
+ private readonly CancellationTokenSource cancel;
+ private readonly FirebaseClient client;
+ private readonly string elementRoot;
+ private readonly IObserver<FirebaseEvent<T>> observer;
+ private readonly IFirebaseQuery query;
+
+ static FirebaseSubscription()
+ {
+ var handler = new HttpClientHandler
+ {
+ AllowAutoRedirect = true,
+ MaxAutomaticRedirections = 10,
+ CookieContainer = new CookieContainer()
+ };
+
+ var httpClient = new HttpClient(handler, true);
+
+ httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("text/event-stream"));
+
+ http = httpClient;
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="FirebaseSubscription{T}" /> class.
+ /// </summary>
+ /// <param name="observer"> The observer. </param>
+ /// <param name="query"> The query. </param>
+ /// <param name="cache"> The cache. </param>
+ public FirebaseSubscription(IObserver<FirebaseEvent<T>> observer, IFirebaseQuery query, string elementRoot,
+ FirebaseCache<T> cache)
+ {
+ this.observer = observer;
+ this.query = query;
+ this.elementRoot = elementRoot;
+ cancel = new CancellationTokenSource();
+ this.cache = cache;
+ client = query.Client;
+ }
+
+ public void Dispose()
+ {
+ cancel.Cancel();
+ }
+
+ public event EventHandler<ExceptionEventArgs<FirebaseException>> ExceptionThrown;
+
+ public IDisposable Run()
+ {
+ Task.Run(() => ReceiveThread());
+
+ return this;
+ }
+
+ private async void ReceiveThread()
+ {
+ while (true)
+ {
+ var url = string.Empty;
+ var line = string.Empty;
+ var statusCode = HttpStatusCode.OK;
+
+ try
+ {
+ cancel.Token.ThrowIfCancellationRequested();
+
+ // initialize network connection
+ url = await query.BuildUrlAsync().ConfigureAwait(false);
+ var request = new HttpRequestMessage(HttpMethod.Get, url);
+ var serverEvent = FirebaseServerEventType.KeepAlive;
+
+ var client = GetHttpClient();
+ var response = await client
+ .SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancel.Token)
+ .ConfigureAwait(false);
+
+ statusCode = response.StatusCode;
+ response.EnsureSuccessStatusCode();
+
+ using (var stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false))
+ using (var reader = this.client.Options.SubscriptionStreamReaderFactory(stream))
+ {
+ while (true)
+ {
+ cancel.Token.ThrowIfCancellationRequested();
+
+ line = reader.ReadLine()?.Trim();
+
+ if (string.IsNullOrWhiteSpace(line)) continue;
+
+ var tuple = line.Split(new[] {':'}, 2, StringSplitOptions.RemoveEmptyEntries)
+ .Select(s => s.Trim()).ToArray();
+
+ switch (tuple[0].ToLower())
+ {
+ case "event":
+ serverEvent = ParseServerEvent(serverEvent, tuple[1]);
+ break;
+ case "data":
+ ProcessServerData(url, serverEvent, tuple[1]);
+ break;
+ }
+
+ if (serverEvent == FirebaseServerEventType.AuthRevoked)
+ // auth token no longer valid, reconnect
+ break;
+ }
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ break;
+ }
+ catch (Exception ex) when (statusCode != HttpStatusCode.OK)
+ {
+ observer.OnError(new FirebaseException(url, string.Empty, line, statusCode, ex));
+ Dispose();
+ break;
+ }
+ catch (Exception ex)
+ {
+ ExceptionThrown?.Invoke(this,
+ new ExceptionEventArgs<FirebaseException>(new FirebaseException(url, string.Empty, line,
+ statusCode, ex)));
+
+ await Task.Delay(2000).ConfigureAwait(false);
+ }
+ }
+ }
+
+ private FirebaseServerEventType ParseServerEvent(FirebaseServerEventType serverEvent, string eventName)
+ {
+ switch (eventName)
+ {
+ case "put":
+ serverEvent = FirebaseServerEventType.Put;
+ break;
+ case "patch":
+ serverEvent = FirebaseServerEventType.Patch;
+ break;
+ case "keep-alive":
+ serverEvent = FirebaseServerEventType.KeepAlive;
+ break;
+ case "cancel":
+ serverEvent = FirebaseServerEventType.Cancel;
+ break;
+ case "auth_revoked":
+ serverEvent = FirebaseServerEventType.AuthRevoked;
+ break;
+ }
+
+ return serverEvent;
+ }
+
+ private void ProcessServerData(string url, FirebaseServerEventType serverEvent, string serverData)
+ {
+ switch (serverEvent)
+ {
+ case FirebaseServerEventType.Put:
+ case FirebaseServerEventType.Patch:
+ var result = JObject.Parse(serverData);
+ var path = result["path"].ToString();
+ var data = result["data"].ToString();
+
+ // If an elementRoot parameter is provided, but it's not in the cache, it was already deleted. So we can return an empty object.
+ if (string.IsNullOrWhiteSpace(elementRoot) || !cache.Contains(elementRoot))
+ if (path == "/" && data == string.Empty)
+ {
+ observer.OnNext(FirebaseEvent<T>.Empty(FirebaseEventSource.OnlineStream));
+ return;
+ }
+
+ var eventType = string.IsNullOrWhiteSpace(data)
+ ? FirebaseEventType.Delete
+ : FirebaseEventType.InsertOrUpdate;
+
+ var items = cache.PushData(elementRoot + path, data);
+
+ foreach (var i in items.ToList())
+ observer.OnNext(new FirebaseEvent<T>(i.Key, i.Object, eventType,
+ FirebaseEventSource.OnlineStream));
+
+ break;
+ case FirebaseServerEventType.KeepAlive:
+ break;
+ case FirebaseServerEventType.Cancel:
+ observer.OnError(new FirebaseException(url, string.Empty, serverData, HttpStatusCode.Unauthorized));
+ Dispose();
+ break;
+ }
+ }
+
+ private HttpClient GetHttpClient()
+ {
+ return http;
+ }
+ }
+} \ No newline at end of file