summaryrefslogtreecommitdiff
path: root/FireBase/Streaming
diff options
context:
space:
mode:
Diffstat (limited to 'FireBase/Streaming')
-rw-r--r--FireBase/Streaming/FirebaseCache.cs192
-rw-r--r--FireBase/Streaming/FirebaseEvent.cs40
-rw-r--r--FireBase/Streaming/FirebaseEventSource.cs38
-rw-r--r--FireBase/Streaming/FirebaseEventType.cs18
-rw-r--r--FireBase/Streaming/FirebaseServerEventType.cs15
-rw-r--r--FireBase/Streaming/FirebaseSubscription.cs221
-rw-r--r--FireBase/Streaming/NonBlockingStreamReader.cs60
7 files changed, 584 insertions, 0 deletions
diff --git a/FireBase/Streaming/FirebaseCache.cs b/FireBase/Streaming/FirebaseCache.cs
new file mode 100644
index 0000000..ba7990b
--- /dev/null
+++ b/FireBase/Streaming/FirebaseCache.cs
@@ -0,0 +1,192 @@
+namespace Firebase.Database.Streaming
+{
+ using System;
+ using System.Collections;
+ using System.Collections.Generic;
+ using System.Linq;
+ using System.Reflection;
+
+ using Firebase.Database.Http;
+
+ using Newtonsoft.Json;
+
+ /// <summary>
+ /// The firebase cache.
+ /// </summary>
+ /// <typeparam name="T"> Type of top-level entities in the cache. </typeparam>
+ public class FirebaseCache<T> : IEnumerable<FirebaseObject<T>>
+ {
+ private readonly IDictionary<string, T> dictionary;
+ private readonly bool isDictionaryType;
+ private readonly JsonSerializerSettings serializerSettings = new JsonSerializerSettings()
+ {
+ ObjectCreationHandling = ObjectCreationHandling.Replace
+ };
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="FirebaseCache{T}"/> class.
+ /// </summary>
+ public FirebaseCache()
+ : this(new Dictionary<string, T>())
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="FirebaseCache{T}"/> class and populates it with existing data.
+ /// </summary>
+ /// <param name="existingItems"> The existing items. </param>
+ public FirebaseCache(IDictionary<string, T> existingItems)
+ {
+ this.dictionary = existingItems;
+ this.isDictionaryType = typeof(IDictionary).GetTypeInfo().IsAssignableFrom(typeof(T).GetTypeInfo());
+ }
+
+ /// <summary>
+ /// The push data.
+ /// </summary>
+ /// <param name="path"> The path of incoming data, separated by slash. </param>
+ /// <param name="data"> The data in json format as returned by firebase. </param>
+ /// <returns> Collection of top-level entities which were affected by the push. </returns>
+ public IEnumerable<FirebaseObject<T>> PushData(string path, string data, bool removeEmptyEntries = true)
+ {
+ object obj = this.dictionary;
+ Action<object> primitiveObjSetter = null;
+ Action objDeleter = null;
+
+ var pathElements = path.Split(new[] { "/" }, removeEmptyEntries ? StringSplitOptions.RemoveEmptyEntries : StringSplitOptions.None);
+
+ // first find where we should insert the data to
+ foreach (var element in pathElements)
+ {
+ if (obj is IDictionary)
+ {
+ // if it's a dictionary, then it's just a matter of inserting into it / accessing existing object by key
+ var dictionary = obj as IDictionary;
+ var valueType = obj.GetType().GenericTypeArguments[1];
+
+ primitiveObjSetter = (d) => dictionary[element] = d;
+ objDeleter = () => dictionary.Remove(element);
+
+ if (dictionary.Contains(element))
+ {
+ obj = dictionary[element];
+ }
+ else
+ {
+ dictionary[element] = this.CreateInstance(valueType);
+ obj = dictionary[element];
+ }
+ }
+ else
+ {
+ // if it's not a dictionary, try to find the property of current object with the matching name
+ var objParent = obj;
+ var property = objParent
+ .GetType()
+ .GetRuntimeProperties()
+ .First(p => p.Name.Equals(element, StringComparison.OrdinalIgnoreCase) || element == p.GetCustomAttribute<JsonPropertyAttribute>()?.PropertyName);
+
+ objDeleter = () => property.SetValue(objParent, null);
+ primitiveObjSetter = (d) => property.SetValue(objParent, d);
+ obj = property.GetValue(obj);
+ if (obj == null)
+ {
+ obj = this.CreateInstance(property.PropertyType);
+ property.SetValue(objParent, obj);
+ }
+ }
+ }
+
+ // if data is null (=empty string) delete it
+ if (string.IsNullOrWhiteSpace(data) || data == "null")
+ {
+ var key = pathElements[0];
+ var target = this.dictionary[key];
+
+ objDeleter();
+
+ yield return new FirebaseObject<T>(key, target);
+ yield break;
+ }
+
+ // now insert the data
+ if (obj is IDictionary && !this.isDictionaryType)
+ {
+ // insert data into dictionary and return it as a collection of FirebaseObject
+ var dictionary = obj as IDictionary;
+ var valueType = obj.GetType().GenericTypeArguments[1];
+ var objectCollection = data.GetObjectCollection(valueType);
+
+ foreach (var item in objectCollection)
+ {
+ dictionary[item.Key] = item.Object;
+
+ // top level dictionary changed
+ if (!pathElements.Any())
+ {
+ yield return new FirebaseObject<T>(item.Key, (T)item.Object);
+ }
+ }
+
+ // nested dictionary changed
+ if (pathElements.Any())
+ {
+ this.dictionary[pathElements[0]] = this.dictionary[pathElements[0]];
+ yield return new FirebaseObject<T>(pathElements[0], this.dictionary[pathElements[0]]);
+ }
+ }
+ else
+ {
+ // set the data on a property of the given object
+ var valueType = obj.GetType();
+
+ // firebase sends strings without double quotes
+ var targetObject = valueType == typeof(string) ? data.ToString() : JsonConvert.DeserializeObject(data, valueType);
+
+ if ((valueType.GetTypeInfo().IsPrimitive || valueType == typeof(string)) && primitiveObjSetter != null)
+ {
+ // handle primitive (value) types separately
+ primitiveObjSetter(targetObject);
+ }
+ else
+ {
+ JsonConvert.PopulateObject(data, obj, this.serializerSettings);
+ }
+
+ this.dictionary[pathElements[0]] = this.dictionary[pathElements[0]];
+ yield return new FirebaseObject<T>(pathElements[0], this.dictionary[pathElements[0]]);
+ }
+ }
+
+ public bool Contains(string key)
+ {
+ return this.dictionary.Keys.Contains(key);
+ }
+
+ private object CreateInstance(Type type)
+ {
+ if (type == typeof(string))
+ {
+ return string.Empty;
+ }
+ else
+ {
+ return Activator.CreateInstance(type);
+ }
+ }
+
+ #region IEnumerable
+
+ IEnumerator IEnumerable.GetEnumerator()
+ {
+ return this.GetEnumerator();
+ }
+
+ public IEnumerator<FirebaseObject<T>> GetEnumerator()
+ {
+ return this.dictionary.Select(p => new FirebaseObject<T>(p.Key, p.Value)).GetEnumerator();
+ }
+
+ #endregion
+ }
+}
diff --git a/FireBase/Streaming/FirebaseEvent.cs b/FireBase/Streaming/FirebaseEvent.cs
new file mode 100644
index 0000000..c2338ca
--- /dev/null
+++ b/FireBase/Streaming/FirebaseEvent.cs
@@ -0,0 +1,40 @@
+namespace Firebase.Database.Streaming
+{
+ /// <summary>
+ /// Firebase event which hold <see cref="EventType"/> and the object affected by the event.
+ /// </summary>
+ /// <typeparam name="T"> Type of object affected by the event. </typeparam>
+ public class FirebaseEvent<T> : FirebaseObject<T>
+ {
+ /// <summary>
+ /// Initializes a new instance of the <see cref="FirebaseEvent{T}"/> class.
+ /// </summary>
+ /// <param name="key"> The key of the object. </param>
+ /// <param name="obj"> The object. </param>
+ /// <param name="eventType"> The event type. </param>
+ public FirebaseEvent(string key, T obj, FirebaseEventType eventType, FirebaseEventSource eventSource)
+ : base(key, obj)
+ {
+ this.EventType = eventType;
+ this.EventSource = eventSource;
+ }
+
+ /// <summary>
+ /// Gets the source of the event.
+ /// </summary>
+ public FirebaseEventSource EventSource
+ {
+ get;
+ }
+
+ /// <summary>
+ /// Gets the event type.
+ /// </summary>
+ public FirebaseEventType EventType
+ {
+ get;
+ }
+
+ public static FirebaseEvent<T> Empty(FirebaseEventSource source) => new FirebaseEvent<T>(string.Empty, default(T), FirebaseEventType.InsertOrUpdate, source);
+ }
+}
diff --git a/FireBase/Streaming/FirebaseEventSource.cs b/FireBase/Streaming/FirebaseEventSource.cs
new file mode 100644
index 0000000..98df977
--- /dev/null
+++ b/FireBase/Streaming/FirebaseEventSource.cs
@@ -0,0 +1,38 @@
+namespace Firebase.Database.Streaming
+{
+ /// <summary>
+ /// Specifies the origin of given <see cref="FirebaseEvent{T}"/>
+ /// </summary>
+ public enum FirebaseEventSource
+ {
+ /// <summary>
+ /// Event comes from an offline source.
+ /// </summary>
+ Offline,
+
+ /// <summary>
+ /// Event comes from online source fetched during initial pull (valid only for RealtimeDatabase).
+ /// </summary>
+ OnlineInitial,
+
+ /// <summary>
+ /// Event comes from online source received thru active stream.
+ /// </summary>
+ OnlineStream,
+
+ /// <summary>
+ /// Event comes from online source being fetched manually.
+ /// </summary>
+ OnlinePull,
+
+ /// <summary>
+ /// Event raised after successful online push (valid only for RealtimeDatabase which isn't streaming).
+ /// </summary>
+ OnlinePush,
+
+ /// <summary>
+ /// Event comes from an online source.
+ /// </summary>
+ Online = OnlineInitial | OnlinePull | OnlinePush | OnlineStream
+ }
+}
diff --git a/FireBase/Streaming/FirebaseEventType.cs b/FireBase/Streaming/FirebaseEventType.cs
new file mode 100644
index 0000000..5fb21ef
--- /dev/null
+++ b/FireBase/Streaming/FirebaseEventType.cs
@@ -0,0 +1,18 @@
+namespace Firebase.Database.Streaming
+{
+ /// <summary>
+ /// The type of event.
+ /// </summary>
+ public enum FirebaseEventType
+ {
+ /// <summary>
+ /// Item was inserted or updated.
+ /// </summary>
+ InsertOrUpdate,
+
+ /// <summary>
+ /// Item was deleted.
+ /// </summary>
+ Delete
+ }
+}
diff --git a/FireBase/Streaming/FirebaseServerEventType.cs b/FireBase/Streaming/FirebaseServerEventType.cs
new file mode 100644
index 0000000..1f10bc8
--- /dev/null
+++ b/FireBase/Streaming/FirebaseServerEventType.cs
@@ -0,0 +1,15 @@
+namespace Firebase.Database.Streaming
+{
+ internal enum FirebaseServerEventType
+ {
+ Put,
+
+ Patch,
+
+ KeepAlive,
+
+ Cancel,
+
+ AuthRevoked
+ }
+}
diff --git a/FireBase/Streaming/FirebaseSubscription.cs b/FireBase/Streaming/FirebaseSubscription.cs
new file mode 100644
index 0000000..4b5e643
--- /dev/null
+++ b/FireBase/Streaming/FirebaseSubscription.cs
@@ -0,0 +1,221 @@
+namespace Firebase.Database.Streaming
+{
+ using System;
+ using System.Diagnostics;
+ using System.Linq;
+ using System.Net.Http;
+ using System.Net.Http.Headers;
+ using System.Threading;
+ using System.Threading.Tasks;
+
+ using Firebase.Database.Query;
+
+ using Newtonsoft.Json.Linq;
+ using System.Net;
+
+ /// <summary>
+ /// The firebase subscription.
+ /// </summary>
+ /// <typeparam name="T"> Type of object to be streaming back to the called. </typeparam>
+ internal class FirebaseSubscription<T> : IDisposable
+ {
+ private readonly CancellationTokenSource cancel;
+ private readonly IObserver<FirebaseEvent<T>> observer;
+ private readonly IFirebaseQuery query;
+ private readonly FirebaseCache<T> cache;
+ private readonly string elementRoot;
+ private readonly FirebaseClient client;
+
+ private static HttpClient http;
+
+ static FirebaseSubscription()
+ {
+ var handler = new HttpClientHandler
+ {
+ AllowAutoRedirect = true,
+ MaxAutomaticRedirections = 10,
+ CookieContainer = new CookieContainer()
+ };
+
+ var httpClient = new HttpClient(handler, true);
+
+ httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("text/event-stream"));
+
+ http = httpClient;
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="FirebaseSubscription{T}"/> class.
+ /// </summary>
+ /// <param name="observer"> The observer. </param>
+ /// <param name="query"> The query. </param>
+ /// <param name="cache"> The cache. </param>
+ public FirebaseSubscription(IObserver<FirebaseEvent<T>> observer, IFirebaseQuery query, string elementRoot, FirebaseCache<T> cache)
+ {
+ this.observer = observer;
+ this.query = query;
+ this.elementRoot = elementRoot;
+ this.cancel = new CancellationTokenSource();
+ this.cache = cache;
+ this.client = query.Client;
+ }
+
+ public event EventHandler<ExceptionEventArgs<FirebaseException>> ExceptionThrown;
+
+ public void Dispose()
+ {
+ this.cancel.Cancel();
+ }
+
+ public IDisposable Run()
+ {
+ Task.Run(() => this.ReceiveThread());
+
+ return this;
+ }
+
+ private async void ReceiveThread()
+ {
+ while (true)
+ {
+ var url = string.Empty;
+ var line = string.Empty;
+ var statusCode = HttpStatusCode.OK;
+
+ try
+ {
+ this.cancel.Token.ThrowIfCancellationRequested();
+
+ // initialize network connection
+ url = await this.query.BuildUrlAsync().ConfigureAwait(false);
+ var request = new HttpRequestMessage(HttpMethod.Get, url);
+ var serverEvent = FirebaseServerEventType.KeepAlive;
+
+ var client = this.GetHttpClient();
+ var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, this.cancel.Token).ConfigureAwait(false);
+
+ statusCode = response.StatusCode;
+ response.EnsureSuccessStatusCode();
+
+ using (var stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false))
+ using (var reader = this.client.Options.SubscriptionStreamReaderFactory(stream))
+ {
+ while (true)
+ {
+ this.cancel.Token.ThrowIfCancellationRequested();
+
+ line = reader.ReadLine()?.Trim();
+
+ if (string.IsNullOrWhiteSpace(line))
+ {
+ continue;
+ }
+
+ var tuple = line.Split(new[] { ':' }, 2, StringSplitOptions.RemoveEmptyEntries).Select(s => s.Trim()).ToArray();
+
+ switch (tuple[0].ToLower())
+ {
+ case "event":
+ serverEvent = this.ParseServerEvent(serverEvent, tuple[1]);
+ break;
+ case "data":
+ this.ProcessServerData(url, serverEvent, tuple[1]);
+ break;
+ }
+
+ if (serverEvent == FirebaseServerEventType.AuthRevoked)
+ {
+ // auth token no longer valid, reconnect
+ break;
+ }
+ }
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ break;
+ }
+ catch (Exception ex) when (statusCode != HttpStatusCode.OK)
+ {
+ this.observer.OnError(new FirebaseException(url, string.Empty, line, statusCode, ex));
+ this.Dispose();
+ break;
+ }
+ catch (Exception ex)
+ {
+ this.ExceptionThrown?.Invoke(this, new ExceptionEventArgs<FirebaseException>(new FirebaseException(url, string.Empty, line, statusCode, ex)));
+
+ await Task.Delay(2000).ConfigureAwait(false);
+ }
+ }
+ }
+
+ private FirebaseServerEventType ParseServerEvent(FirebaseServerEventType serverEvent, string eventName)
+ {
+ switch (eventName)
+ {
+ case "put":
+ serverEvent = FirebaseServerEventType.Put;
+ break;
+ case "patch":
+ serverEvent = FirebaseServerEventType.Patch;
+ break;
+ case "keep-alive":
+ serverEvent = FirebaseServerEventType.KeepAlive;
+ break;
+ case "cancel":
+ serverEvent = FirebaseServerEventType.Cancel;
+ break;
+ case "auth_revoked":
+ serverEvent = FirebaseServerEventType.AuthRevoked;
+ break;
+ }
+
+ return serverEvent;
+ }
+
+ private void ProcessServerData(string url, FirebaseServerEventType serverEvent, string serverData)
+ {
+ switch (serverEvent)
+ {
+ case FirebaseServerEventType.Put:
+ case FirebaseServerEventType.Patch:
+ var result = JObject.Parse(serverData);
+ var path = result["path"].ToString();
+ var data = result["data"].ToString();
+
+ // If an elementRoot parameter is provided, but it's not in the cache, it was already deleted. So we can return an empty object.
+ if(string.IsNullOrWhiteSpace(this.elementRoot) || !this.cache.Contains(this.elementRoot))
+ {
+ if(path == "/" && data == string.Empty)
+ {
+ this.observer.OnNext(FirebaseEvent<T>.Empty(FirebaseEventSource.OnlineStream));
+ return;
+ }
+ }
+
+ var eventType = string.IsNullOrWhiteSpace(data) ? FirebaseEventType.Delete : FirebaseEventType.InsertOrUpdate;
+
+ var items = this.cache.PushData(this.elementRoot + path, data);
+
+ foreach (var i in items.ToList())
+ {
+ this.observer.OnNext(new FirebaseEvent<T>(i.Key, i.Object, eventType, FirebaseEventSource.OnlineStream));
+ }
+
+ break;
+ case FirebaseServerEventType.KeepAlive:
+ break;
+ case FirebaseServerEventType.Cancel:
+ this.observer.OnError(new FirebaseException(url, string.Empty, serverData, HttpStatusCode.Unauthorized));
+ this.Dispose();
+ break;
+ }
+ }
+
+ private HttpClient GetHttpClient()
+ {
+ return http;
+ }
+ }
+}
diff --git a/FireBase/Streaming/NonBlockingStreamReader.cs b/FireBase/Streaming/NonBlockingStreamReader.cs
new file mode 100644
index 0000000..2ac83fd
--- /dev/null
+++ b/FireBase/Streaming/NonBlockingStreamReader.cs
@@ -0,0 +1,60 @@
+namespace Firebase.Database.Streaming
+{
+ using System.IO;
+ using System.Text;
+
+ /// <summary>
+ /// When a regular <see cref="StreamReader"/> is used in a UWP app its <see cref="StreamReader.ReadLine"/> method tends to take a long
+ /// time for data larger then 2 KB. This extremly simple implementation of <see cref="TextReader"/> can be used instead to boost performance
+ /// in your UWP app. Use <see cref="FirebaseOptions"/> to inject an instance of this class into your <see cref="FirebaseClient"/>.
+ /// </summary>
+ public class NonBlockingStreamReader : TextReader
+ {
+ private const int DefaultBufferSize = 16000;
+
+ private readonly Stream stream;
+ private readonly byte[] buffer;
+ private readonly int bufferSize;
+
+ private string cachedData;
+
+ public NonBlockingStreamReader(Stream stream, int bufferSize = DefaultBufferSize)
+ {
+ this.stream = stream;
+ this.bufferSize = bufferSize;
+ this.buffer = new byte[bufferSize];
+
+ this.cachedData = string.Empty;
+ }
+
+ public override string ReadLine()
+ {
+ var currentString = this.TryGetNewLine();
+
+ while (currentString == null)
+ {
+ var read = this.stream.Read(this.buffer, 0, this.bufferSize);
+ var str = Encoding.UTF8.GetString(buffer, 0, read);
+
+ cachedData += str;
+ currentString = this.TryGetNewLine();
+ }
+
+ return currentString;
+ }
+
+ private string TryGetNewLine()
+ {
+ var newLine = cachedData.IndexOf('\n');
+
+ if (newLine >= 0)
+ {
+ var r = cachedData.Substring(0, newLine + 1);
+ this.cachedData = cachedData.Remove(0, r.Length);
+ return r.Trim();
+ }
+
+ return null;
+ }
+ }
+}