summaryrefslogtreecommitdiff
path: root/dsa/FireBase/Streaming
diff options
context:
space:
mode:
authorDennis Kobert <d-kobert@web.de>2019-06-11 23:38:13 +0200
committerDennis Kobert <d-kobert@web.de>2019-06-11 23:38:13 +0200
commit2fa4a0e50ebfc97059c8b84dbd17e79f9afc8a8d (patch)
treec3b34ccb2737e347a73768536895cbbaab13cc01 /dsa/FireBase/Streaming
parentec991104f56e90d7bb2878da2fe6ed4e585dfc46 (diff)
parentaf74efccf8d21e6151022b71f3cacd3fa83024ee (diff)
Merge branch 'rework-backend'
Diffstat (limited to 'dsa/FireBase/Streaming')
-rw-r--r--dsa/FireBase/Streaming/FirebaseCache.cs181
-rw-r--r--dsa/FireBase/Streaming/FirebaseEvent.cs37
-rw-r--r--dsa/FireBase/Streaming/FirebaseEventSource.cs38
-rw-r--r--dsa/FireBase/Streaming/FirebaseEventType.cs18
-rw-r--r--dsa/FireBase/Streaming/FirebaseServerEventType.cs15
-rw-r--r--dsa/FireBase/Streaming/FirebaseSubscription.cs217
-rw-r--r--dsa/FireBase/Streaming/NonBlockingStreamReader.cs63
7 files changed, 569 insertions, 0 deletions
diff --git a/dsa/FireBase/Streaming/FirebaseCache.cs b/dsa/FireBase/Streaming/FirebaseCache.cs
new file mode 100644
index 0000000..66241e0
--- /dev/null
+++ b/dsa/FireBase/Streaming/FirebaseCache.cs
@@ -0,0 +1,181 @@
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reflection;
+using Firebase.Database.Http;
+using Newtonsoft.Json;
+
+namespace Firebase.Database.Streaming
+{
+ /// <summary>
+ /// The firebase cache.
+ /// </summary>
+ /// <typeparam name="T"> Type of top-level entities in the cache. </typeparam>
+ public class FirebaseCache<T> : IEnumerable<FirebaseObject<T>>
+ {
+ private readonly IDictionary<string, T> dictionary;
+ private readonly bool isDictionaryType;
+
+ private readonly JsonSerializerSettings serializerSettings = new JsonSerializerSettings
+ {
+ ObjectCreationHandling = ObjectCreationHandling.Replace
+ };
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="FirebaseCache{T}" /> class.
+ /// </summary>
+ public FirebaseCache()
+ : this(new Dictionary<string, T>())
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="FirebaseCache{T}" /> class and populates it with existing data.
+ /// </summary>
+ /// <param name="existingItems"> The existing items. </param>
+ public FirebaseCache(IDictionary<string, T> existingItems)
+ {
+ dictionary = existingItems;
+ isDictionaryType = typeof(IDictionary).GetTypeInfo().IsAssignableFrom(typeof(T).GetTypeInfo());
+ }
+
+ /// <summary>
+ /// The push data.
+ /// </summary>
+ /// <param name="path"> The path of incoming data, separated by slash. </param>
+ /// <param name="data"> The data in json format as returned by firebase. </param>
+ /// <returns> Collection of top-level entities which were affected by the push. </returns>
+ public IEnumerable<FirebaseObject<T>> PushData(string path, string data, bool removeEmptyEntries = true)
+ {
+ object obj = this.dictionary;
+ Action<object> primitiveObjSetter = null;
+ Action objDeleter = null;
+
+ var pathElements = path.Split(new[] {"/"},
+ removeEmptyEntries ? StringSplitOptions.RemoveEmptyEntries : StringSplitOptions.None);
+
+ // first find where we should insert the data to
+ foreach (var element in pathElements)
+ if (obj is IDictionary)
+ {
+ // if it's a dictionary, then it's just a matter of inserting into it / accessing existing object by key
+ var dictionary = obj as IDictionary;
+ var valueType = obj.GetType().GenericTypeArguments[1];
+
+ primitiveObjSetter = d => dictionary[element] = d;
+ objDeleter = () => dictionary.Remove(element);
+
+ if (dictionary.Contains(element))
+ {
+ obj = dictionary[element];
+ }
+ else
+ {
+ dictionary[element] = CreateInstance(valueType);
+ obj = dictionary[element];
+ }
+ }
+ else
+ {
+ // if it's not a dictionary, try to find the property of current object with the matching name
+ var objParent = obj;
+ var property = objParent
+ .GetType()
+ .GetRuntimeProperties()
+ .First(p => p.Name.Equals(element, StringComparison.OrdinalIgnoreCase) ||
+ element == p.GetCustomAttribute<JsonPropertyAttribute>()?.PropertyName);
+
+ objDeleter = () => property.SetValue(objParent, null);
+ primitiveObjSetter = d => property.SetValue(objParent, d);
+ obj = property.GetValue(obj);
+ if (obj == null)
+ {
+ obj = CreateInstance(property.PropertyType);
+ property.SetValue(objParent, obj);
+ }
+ }
+
+ // if data is null (=empty string) delete it
+ if (string.IsNullOrWhiteSpace(data) || data == "null")
+ {
+ var key = pathElements[0];
+ var target = dictionary[key];
+
+ objDeleter();
+
+ yield return new FirebaseObject<T>(key, target);
+ yield break;
+ }
+
+ // now insert the data
+ if (obj is IDictionary && !isDictionaryType)
+ {
+ // insert data into dictionary and return it as a collection of FirebaseObject
+ var dictionary = obj as IDictionary;
+ var valueType = obj.GetType().GenericTypeArguments[1];
+ var objectCollection = data.GetObjectCollection(valueType);
+
+ foreach (var item in objectCollection)
+ {
+ dictionary[item.Key] = item.Object;
+
+ // top level dictionary changed
+ if (!pathElements.Any()) yield return new FirebaseObject<T>(item.Key, (T) item.Object);
+ }
+
+ // nested dictionary changed
+ if (pathElements.Any())
+ {
+ this.dictionary[pathElements[0]] = this.dictionary[pathElements[0]];
+ yield return new FirebaseObject<T>(pathElements[0], this.dictionary[pathElements[0]]);
+ }
+ }
+ else
+ {
+ // set the data on a property of the given object
+ var valueType = obj.GetType();
+
+ // firebase sends strings without double quotes
+ var targetObject = valueType == typeof(string)
+ ? data
+ : JsonConvert.DeserializeObject(data, valueType);
+
+ if ((valueType.GetTypeInfo().IsPrimitive || valueType == typeof(string)) && primitiveObjSetter != null)
+ // handle primitive (value) types separately
+ primitiveObjSetter(targetObject);
+ else
+ JsonConvert.PopulateObject(data, obj, serializerSettings);
+
+ dictionary[pathElements[0]] = dictionary[pathElements[0]];
+ yield return new FirebaseObject<T>(pathElements[0], dictionary[pathElements[0]]);
+ }
+ }
+
+ public bool Contains(string key)
+ {
+ return dictionary.Keys.Contains(key);
+ }
+
+ private object CreateInstance(Type type)
+ {
+ if (type == typeof(string))
+ return string.Empty;
+ return Activator.CreateInstance(type);
+ }
+
+ #region IEnumerable
+
+ IEnumerator IEnumerable.GetEnumerator()
+ {
+ return GetEnumerator();
+ }
+
+ public IEnumerator<FirebaseObject<T>> GetEnumerator()
+ {
+ return dictionary.Select(p => new FirebaseObject<T>(p.Key, p.Value)).GetEnumerator();
+ }
+
+ #endregion
+ }
+} \ No newline at end of file
diff --git a/dsa/FireBase/Streaming/FirebaseEvent.cs b/dsa/FireBase/Streaming/FirebaseEvent.cs
new file mode 100644
index 0000000..1761a72
--- /dev/null
+++ b/dsa/FireBase/Streaming/FirebaseEvent.cs
@@ -0,0 +1,37 @@
+namespace Firebase.Database.Streaming
+{
+ /// <summary>
+ /// Firebase event which hold <see cref="EventType" /> and the object affected by the event.
+ /// </summary>
+ /// <typeparam name="T"> Type of object affected by the event. </typeparam>
+ public class FirebaseEvent<T> : FirebaseObject<T>
+ {
+ /// <summary>
+ /// Initializes a new instance of the <see cref="FirebaseEvent{T}" /> class.
+ /// </summary>
+ /// <param name="key"> The key of the object. </param>
+ /// <param name="obj"> The object. </param>
+ /// <param name="eventType"> The event type. </param>
+ public FirebaseEvent(string key, T obj, FirebaseEventType eventType, FirebaseEventSource eventSource)
+ : base(key, obj)
+ {
+ EventType = eventType;
+ EventSource = eventSource;
+ }
+
+ /// <summary>
+ /// Gets the source of the event.
+ /// </summary>
+ public FirebaseEventSource EventSource { get; }
+
+ /// <summary>
+ /// Gets the event type.
+ /// </summary>
+ public FirebaseEventType EventType { get; }
+
+ public static FirebaseEvent<T> Empty(FirebaseEventSource source)
+ {
+ return new FirebaseEvent<T>(string.Empty, default(T), FirebaseEventType.InsertOrUpdate, source);
+ }
+ }
+} \ No newline at end of file
diff --git a/dsa/FireBase/Streaming/FirebaseEventSource.cs b/dsa/FireBase/Streaming/FirebaseEventSource.cs
new file mode 100644
index 0000000..b1385ca
--- /dev/null
+++ b/dsa/FireBase/Streaming/FirebaseEventSource.cs
@@ -0,0 +1,38 @@
+namespace Firebase.Database.Streaming
+{
+ /// <summary>
+ /// Specifies the origin of given <see cref="FirebaseEvent{T}" />
+ /// </summary>
+ public enum FirebaseEventSource
+ {
+ /// <summary>
+ /// Event comes from an offline source.
+ /// </summary>
+ Offline,
+
+ /// <summary>
+ /// Event comes from online source fetched during initial pull (valid only for RealtimeDatabase).
+ /// </summary>
+ OnlineInitial,
+
+ /// <summary>
+ /// Event comes from online source received thru active stream.
+ /// </summary>
+ OnlineStream,
+
+ /// <summary>
+ /// Event comes from online source being fetched manually.
+ /// </summary>
+ OnlinePull,
+
+ /// <summary>
+ /// Event raised after successful online push (valid only for RealtimeDatabase which isn't streaming).
+ /// </summary>
+ OnlinePush,
+
+ /// <summary>
+ /// Event comes from an online source.
+ /// </summary>
+ Online = OnlineInitial | OnlinePull | OnlinePush | OnlineStream
+ }
+} \ No newline at end of file
diff --git a/dsa/FireBase/Streaming/FirebaseEventType.cs b/dsa/FireBase/Streaming/FirebaseEventType.cs
new file mode 100644
index 0000000..7606331
--- /dev/null
+++ b/dsa/FireBase/Streaming/FirebaseEventType.cs
@@ -0,0 +1,18 @@
+namespace Firebase.Database.Streaming
+{
+ /// <summary>
+ /// The type of event.
+ /// </summary>
+ public enum FirebaseEventType
+ {
+ /// <summary>
+ /// Item was inserted or updated.
+ /// </summary>
+ InsertOrUpdate,
+
+ /// <summary>
+ /// Item was deleted.
+ /// </summary>
+ Delete
+ }
+} \ No newline at end of file
diff --git a/dsa/FireBase/Streaming/FirebaseServerEventType.cs b/dsa/FireBase/Streaming/FirebaseServerEventType.cs
new file mode 100644
index 0000000..79c816d
--- /dev/null
+++ b/dsa/FireBase/Streaming/FirebaseServerEventType.cs
@@ -0,0 +1,15 @@
+namespace Firebase.Database.Streaming
+{
+ internal enum FirebaseServerEventType
+ {
+ Put,
+
+ Patch,
+
+ KeepAlive,
+
+ Cancel,
+
+ AuthRevoked
+ }
+} \ No newline at end of file
diff --git a/dsa/FireBase/Streaming/FirebaseSubscription.cs b/dsa/FireBase/Streaming/FirebaseSubscription.cs
new file mode 100644
index 0000000..fb0f403
--- /dev/null
+++ b/dsa/FireBase/Streaming/FirebaseSubscription.cs
@@ -0,0 +1,217 @@
+using System;
+using System.Linq;
+using System.Net;
+using System.Net.Http;
+using System.Net.Http.Headers;
+using System.Threading;
+using System.Threading.Tasks;
+using Firebase.Database.Query;
+using Newtonsoft.Json.Linq;
+
+namespace Firebase.Database.Streaming
+{
+ /// <summary>
+ /// The firebase subscription.
+ /// </summary>
+ /// <typeparam name="T"> Type of object to be streaming back to the called. </typeparam>
+ internal class FirebaseSubscription<T> : IDisposable
+ {
+ private static readonly HttpClient http;
+ private readonly FirebaseCache<T> cache;
+ private readonly CancellationTokenSource cancel;
+ private readonly FirebaseClient client;
+ private readonly string elementRoot;
+ private readonly IObserver<FirebaseEvent<T>> observer;
+ private readonly IFirebaseQuery query;
+
+ static FirebaseSubscription()
+ {
+ var handler = new HttpClientHandler
+ {
+ AllowAutoRedirect = true,
+ MaxAutomaticRedirections = 10,
+ CookieContainer = new CookieContainer()
+ };
+
+ var httpClient = new HttpClient(handler, true);
+
+ httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("text/event-stream"));
+
+ http = httpClient;
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="FirebaseSubscription{T}" /> class.
+ /// </summary>
+ /// <param name="observer"> The observer. </param>
+ /// <param name="query"> The query. </param>
+ /// <param name="cache"> The cache. </param>
+ public FirebaseSubscription(IObserver<FirebaseEvent<T>> observer, IFirebaseQuery query, string elementRoot,
+ FirebaseCache<T> cache)
+ {
+ this.observer = observer;
+ this.query = query;
+ this.elementRoot = elementRoot;
+ cancel = new CancellationTokenSource();
+ this.cache = cache;
+ client = query.Client;
+ }
+
+ public void Dispose()
+ {
+ cancel.Cancel();
+ }
+
+ public event EventHandler<ExceptionEventArgs<FirebaseException>> ExceptionThrown;
+
+ public IDisposable Run()
+ {
+ Task.Run(() => ReceiveThread());
+
+ return this;
+ }
+
+ private async void ReceiveThread()
+ {
+ while (true)
+ {
+ var url = string.Empty;
+ var line = string.Empty;
+ var statusCode = HttpStatusCode.OK;
+
+ try
+ {
+ cancel.Token.ThrowIfCancellationRequested();
+
+ // initialize network connection
+ url = await query.BuildUrlAsync().ConfigureAwait(false);
+ var request = new HttpRequestMessage(HttpMethod.Get, url);
+ var serverEvent = FirebaseServerEventType.KeepAlive;
+
+ var client = GetHttpClient();
+ var response = await client
+ .SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancel.Token)
+ .ConfigureAwait(false);
+
+ statusCode = response.StatusCode;
+ response.EnsureSuccessStatusCode();
+
+ using (var stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false))
+ using (var reader = this.client.Options.SubscriptionStreamReaderFactory(stream))
+ {
+ while (true)
+ {
+ cancel.Token.ThrowIfCancellationRequested();
+
+ line = reader.ReadLine()?.Trim();
+
+ if (string.IsNullOrWhiteSpace(line)) continue;
+
+ var tuple = line.Split(new[] {':'}, 2, StringSplitOptions.RemoveEmptyEntries)
+ .Select(s => s.Trim()).ToArray();
+
+ switch (tuple[0].ToLower())
+ {
+ case "event":
+ serverEvent = ParseServerEvent(serverEvent, tuple[1]);
+ break;
+ case "data":
+ ProcessServerData(url, serverEvent, tuple[1]);
+ break;
+ }
+
+ if (serverEvent == FirebaseServerEventType.AuthRevoked)
+ // auth token no longer valid, reconnect
+ break;
+ }
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ break;
+ }
+ catch (Exception ex) when (statusCode != HttpStatusCode.OK)
+ {
+ observer.OnError(new FirebaseException(url, string.Empty, line, statusCode, ex));
+ Dispose();
+ break;
+ }
+ catch (Exception ex)
+ {
+ ExceptionThrown?.Invoke(this,
+ new ExceptionEventArgs<FirebaseException>(new FirebaseException(url, string.Empty, line,
+ statusCode, ex)));
+
+ await Task.Delay(2000).ConfigureAwait(false);
+ }
+ }
+ }
+
+ private FirebaseServerEventType ParseServerEvent(FirebaseServerEventType serverEvent, string eventName)
+ {
+ switch (eventName)
+ {
+ case "put":
+ serverEvent = FirebaseServerEventType.Put;
+ break;
+ case "patch":
+ serverEvent = FirebaseServerEventType.Patch;
+ break;
+ case "keep-alive":
+ serverEvent = FirebaseServerEventType.KeepAlive;
+ break;
+ case "cancel":
+ serverEvent = FirebaseServerEventType.Cancel;
+ break;
+ case "auth_revoked":
+ serverEvent = FirebaseServerEventType.AuthRevoked;
+ break;
+ }
+
+ return serverEvent;
+ }
+
+ private void ProcessServerData(string url, FirebaseServerEventType serverEvent, string serverData)
+ {
+ switch (serverEvent)
+ {
+ case FirebaseServerEventType.Put:
+ case FirebaseServerEventType.Patch:
+ var result = JObject.Parse(serverData);
+ var path = result["path"].ToString();
+ var data = result["data"].ToString();
+
+ // If an elementRoot parameter is provided, but it's not in the cache, it was already deleted. So we can return an empty object.
+ if (string.IsNullOrWhiteSpace(elementRoot) || !cache.Contains(elementRoot))
+ if (path == "/" && data == string.Empty)
+ {
+ observer.OnNext(FirebaseEvent<T>.Empty(FirebaseEventSource.OnlineStream));
+ return;
+ }
+
+ var eventType = string.IsNullOrWhiteSpace(data)
+ ? FirebaseEventType.Delete
+ : FirebaseEventType.InsertOrUpdate;
+
+ var items = cache.PushData(elementRoot + path, data);
+
+ foreach (var i in items.ToList())
+ observer.OnNext(new FirebaseEvent<T>(i.Key, i.Object, eventType,
+ FirebaseEventSource.OnlineStream));
+
+ break;
+ case FirebaseServerEventType.KeepAlive:
+ break;
+ case FirebaseServerEventType.Cancel:
+ observer.OnError(new FirebaseException(url, string.Empty, serverData, HttpStatusCode.Unauthorized));
+ Dispose();
+ break;
+ }
+ }
+
+ private HttpClient GetHttpClient()
+ {
+ return http;
+ }
+ }
+} \ No newline at end of file
diff --git a/dsa/FireBase/Streaming/NonBlockingStreamReader.cs b/dsa/FireBase/Streaming/NonBlockingStreamReader.cs
new file mode 100644
index 0000000..8228e32
--- /dev/null
+++ b/dsa/FireBase/Streaming/NonBlockingStreamReader.cs
@@ -0,0 +1,63 @@
+using System.IO;
+using System.Text;
+
+namespace Firebase.Database.Streaming
+{
+ /// <summary>
+ /// When a regular <see cref="StreamReader" /> is used in a UWP app its <see cref="StreamReader.ReadLine" /> method
+ /// tends to take a long
+ /// time for data larger then 2 KB. This extremly simple implementation of <see cref="TextReader" /> can be used
+ /// instead to boost performance
+ /// in your UWP app. Use <see cref="FirebaseOptions" /> to inject an instance of this class into your
+ /// <see cref="FirebaseClient" />.
+ /// </summary>
+ public class NonBlockingStreamReader : TextReader
+ {
+ private const int DefaultBufferSize = 16000;
+ private readonly byte[] buffer;
+ private readonly int bufferSize;
+
+ private readonly Stream stream;
+
+ private string cachedData;
+
+ public NonBlockingStreamReader(Stream stream, int bufferSize = DefaultBufferSize)
+ {
+ this.stream = stream;
+ this.bufferSize = bufferSize;
+ buffer = new byte[bufferSize];
+
+ cachedData = string.Empty;
+ }
+
+ public override string ReadLine()
+ {
+ var currentString = TryGetNewLine();
+
+ while (currentString == null)
+ {
+ var read = stream.Read(buffer, 0, bufferSize);
+ var str = Encoding.UTF8.GetString(buffer, 0, read);
+
+ cachedData += str;
+ currentString = TryGetNewLine();
+ }
+
+ return currentString;
+ }
+
+ private string TryGetNewLine()
+ {
+ var newLine = cachedData.IndexOf('\n');
+
+ if (newLine >= 0)
+ {
+ var r = cachedData.Substring(0, newLine + 1);
+ cachedData = cachedData.Remove(0, r.Length);
+ return r.Trim();
+ }
+
+ return null;
+ }
+ }
+} \ No newline at end of file