summaryrefslogtreecommitdiff
path: root/FireBase/Streaming
diff options
context:
space:
mode:
authoruzvkl <dennis.kobert@student.kit.edu>2019-06-11 23:05:52 +0200
committeruzvkl <dennis.kobert@student.kit.edu>2019-06-11 23:05:52 +0200
commite6181c24124d97f2fbc932b8a68311e625463156 (patch)
treec1f097c344ca266b7941c9668590b0fd35c7870a /FireBase/Streaming
parent2490ad5d31fe2ac778ff9303776f0e91f47a2862 (diff)
Move dsa related stuff to subfolder
Diffstat (limited to 'FireBase/Streaming')
-rw-r--r--FireBase/Streaming/FirebaseCache.cs181
-rw-r--r--FireBase/Streaming/FirebaseEvent.cs37
-rw-r--r--FireBase/Streaming/FirebaseEventSource.cs38
-rw-r--r--FireBase/Streaming/FirebaseEventType.cs18
-rw-r--r--FireBase/Streaming/FirebaseServerEventType.cs15
-rw-r--r--FireBase/Streaming/FirebaseSubscription.cs217
-rw-r--r--FireBase/Streaming/NonBlockingStreamReader.cs63
7 files changed, 0 insertions, 569 deletions
diff --git a/FireBase/Streaming/FirebaseCache.cs b/FireBase/Streaming/FirebaseCache.cs
deleted file mode 100644
index 66241e0..0000000
--- a/FireBase/Streaming/FirebaseCache.cs
+++ /dev/null
@@ -1,181 +0,0 @@
-using System;
-using System.Collections;
-using System.Collections.Generic;
-using System.Linq;
-using System.Reflection;
-using Firebase.Database.Http;
-using Newtonsoft.Json;
-
-namespace Firebase.Database.Streaming
-{
- /// <summary>
- /// The firebase cache.
- /// </summary>
- /// <typeparam name="T"> Type of top-level entities in the cache. </typeparam>
- public class FirebaseCache<T> : IEnumerable<FirebaseObject<T>>
- {
- private readonly IDictionary<string, T> dictionary;
- private readonly bool isDictionaryType;
-
- private readonly JsonSerializerSettings serializerSettings = new JsonSerializerSettings
- {
- ObjectCreationHandling = ObjectCreationHandling.Replace
- };
-
- /// <summary>
- /// Initializes a new instance of the <see cref="FirebaseCache{T}" /> class.
- /// </summary>
- public FirebaseCache()
- : this(new Dictionary<string, T>())
- {
- }
-
- /// <summary>
- /// Initializes a new instance of the <see cref="FirebaseCache{T}" /> class and populates it with existing data.
- /// </summary>
- /// <param name="existingItems"> The existing items. </param>
- public FirebaseCache(IDictionary<string, T> existingItems)
- {
- dictionary = existingItems;
- isDictionaryType = typeof(IDictionary).GetTypeInfo().IsAssignableFrom(typeof(T).GetTypeInfo());
- }
-
- /// <summary>
- /// The push data.
- /// </summary>
- /// <param name="path"> The path of incoming data, separated by slash. </param>
- /// <param name="data"> The data in json format as returned by firebase. </param>
- /// <returns> Collection of top-level entities which were affected by the push. </returns>
- public IEnumerable<FirebaseObject<T>> PushData(string path, string data, bool removeEmptyEntries = true)
- {
- object obj = this.dictionary;
- Action<object> primitiveObjSetter = null;
- Action objDeleter = null;
-
- var pathElements = path.Split(new[] {"/"},
- removeEmptyEntries ? StringSplitOptions.RemoveEmptyEntries : StringSplitOptions.None);
-
- // first find where we should insert the data to
- foreach (var element in pathElements)
- if (obj is IDictionary)
- {
- // if it's a dictionary, then it's just a matter of inserting into it / accessing existing object by key
- var dictionary = obj as IDictionary;
- var valueType = obj.GetType().GenericTypeArguments[1];
-
- primitiveObjSetter = d => dictionary[element] = d;
- objDeleter = () => dictionary.Remove(element);
-
- if (dictionary.Contains(element))
- {
- obj = dictionary[element];
- }
- else
- {
- dictionary[element] = CreateInstance(valueType);
- obj = dictionary[element];
- }
- }
- else
- {
- // if it's not a dictionary, try to find the property of current object with the matching name
- var objParent = obj;
- var property = objParent
- .GetType()
- .GetRuntimeProperties()
- .First(p => p.Name.Equals(element, StringComparison.OrdinalIgnoreCase) ||
- element == p.GetCustomAttribute<JsonPropertyAttribute>()?.PropertyName);
-
- objDeleter = () => property.SetValue(objParent, null);
- primitiveObjSetter = d => property.SetValue(objParent, d);
- obj = property.GetValue(obj);
- if (obj == null)
- {
- obj = CreateInstance(property.PropertyType);
- property.SetValue(objParent, obj);
- }
- }
-
- // if data is null (=empty string) delete it
- if (string.IsNullOrWhiteSpace(data) || data == "null")
- {
- var key = pathElements[0];
- var target = dictionary[key];
-
- objDeleter();
-
- yield return new FirebaseObject<T>(key, target);
- yield break;
- }
-
- // now insert the data
- if (obj is IDictionary && !isDictionaryType)
- {
- // insert data into dictionary and return it as a collection of FirebaseObject
- var dictionary = obj as IDictionary;
- var valueType = obj.GetType().GenericTypeArguments[1];
- var objectCollection = data.GetObjectCollection(valueType);
-
- foreach (var item in objectCollection)
- {
- dictionary[item.Key] = item.Object;
-
- // top level dictionary changed
- if (!pathElements.Any()) yield return new FirebaseObject<T>(item.Key, (T) item.Object);
- }
-
- // nested dictionary changed
- if (pathElements.Any())
- {
- this.dictionary[pathElements[0]] = this.dictionary[pathElements[0]];
- yield return new FirebaseObject<T>(pathElements[0], this.dictionary[pathElements[0]]);
- }
- }
- else
- {
- // set the data on a property of the given object
- var valueType = obj.GetType();
-
- // firebase sends strings without double quotes
- var targetObject = valueType == typeof(string)
- ? data
- : JsonConvert.DeserializeObject(data, valueType);
-
- if ((valueType.GetTypeInfo().IsPrimitive || valueType == typeof(string)) && primitiveObjSetter != null)
- // handle primitive (value) types separately
- primitiveObjSetter(targetObject);
- else
- JsonConvert.PopulateObject(data, obj, serializerSettings);
-
- dictionary[pathElements[0]] = dictionary[pathElements[0]];
- yield return new FirebaseObject<T>(pathElements[0], dictionary[pathElements[0]]);
- }
- }
-
- public bool Contains(string key)
- {
- return dictionary.Keys.Contains(key);
- }
-
- private object CreateInstance(Type type)
- {
- if (type == typeof(string))
- return string.Empty;
- return Activator.CreateInstance(type);
- }
-
- #region IEnumerable
-
- IEnumerator IEnumerable.GetEnumerator()
- {
- return GetEnumerator();
- }
-
- public IEnumerator<FirebaseObject<T>> GetEnumerator()
- {
- return dictionary.Select(p => new FirebaseObject<T>(p.Key, p.Value)).GetEnumerator();
- }
-
- #endregion
- }
-} \ No newline at end of file
diff --git a/FireBase/Streaming/FirebaseEvent.cs b/FireBase/Streaming/FirebaseEvent.cs
deleted file mode 100644
index 1761a72..0000000
--- a/FireBase/Streaming/FirebaseEvent.cs
+++ /dev/null
@@ -1,37 +0,0 @@
-namespace Firebase.Database.Streaming
-{
- /// <summary>
- /// Firebase event which hold <see cref="EventType" /> and the object affected by the event.
- /// </summary>
- /// <typeparam name="T"> Type of object affected by the event. </typeparam>
- public class FirebaseEvent<T> : FirebaseObject<T>
- {
- /// <summary>
- /// Initializes a new instance of the <see cref="FirebaseEvent{T}" /> class.
- /// </summary>
- /// <param name="key"> The key of the object. </param>
- /// <param name="obj"> The object. </param>
- /// <param name="eventType"> The event type. </param>
- public FirebaseEvent(string key, T obj, FirebaseEventType eventType, FirebaseEventSource eventSource)
- : base(key, obj)
- {
- EventType = eventType;
- EventSource = eventSource;
- }
-
- /// <summary>
- /// Gets the source of the event.
- /// </summary>
- public FirebaseEventSource EventSource { get; }
-
- /// <summary>
- /// Gets the event type.
- /// </summary>
- public FirebaseEventType EventType { get; }
-
- public static FirebaseEvent<T> Empty(FirebaseEventSource source)
- {
- return new FirebaseEvent<T>(string.Empty, default(T), FirebaseEventType.InsertOrUpdate, source);
- }
- }
-} \ No newline at end of file
diff --git a/FireBase/Streaming/FirebaseEventSource.cs b/FireBase/Streaming/FirebaseEventSource.cs
deleted file mode 100644
index b1385ca..0000000
--- a/FireBase/Streaming/FirebaseEventSource.cs
+++ /dev/null
@@ -1,38 +0,0 @@
-namespace Firebase.Database.Streaming
-{
- /// <summary>
- /// Specifies the origin of given <see cref="FirebaseEvent{T}" />
- /// </summary>
- public enum FirebaseEventSource
- {
- /// <summary>
- /// Event comes from an offline source.
- /// </summary>
- Offline,
-
- /// <summary>
- /// Event comes from online source fetched during initial pull (valid only for RealtimeDatabase).
- /// </summary>
- OnlineInitial,
-
- /// <summary>
- /// Event comes from online source received thru active stream.
- /// </summary>
- OnlineStream,
-
- /// <summary>
- /// Event comes from online source being fetched manually.
- /// </summary>
- OnlinePull,
-
- /// <summary>
- /// Event raised after successful online push (valid only for RealtimeDatabase which isn't streaming).
- /// </summary>
- OnlinePush,
-
- /// <summary>
- /// Event comes from an online source.
- /// </summary>
- Online = OnlineInitial | OnlinePull | OnlinePush | OnlineStream
- }
-} \ No newline at end of file
diff --git a/FireBase/Streaming/FirebaseEventType.cs b/FireBase/Streaming/FirebaseEventType.cs
deleted file mode 100644
index 7606331..0000000
--- a/FireBase/Streaming/FirebaseEventType.cs
+++ /dev/null
@@ -1,18 +0,0 @@
-namespace Firebase.Database.Streaming
-{
- /// <summary>
- /// The type of event.
- /// </summary>
- public enum FirebaseEventType
- {
- /// <summary>
- /// Item was inserted or updated.
- /// </summary>
- InsertOrUpdate,
-
- /// <summary>
- /// Item was deleted.
- /// </summary>
- Delete
- }
-} \ No newline at end of file
diff --git a/FireBase/Streaming/FirebaseServerEventType.cs b/FireBase/Streaming/FirebaseServerEventType.cs
deleted file mode 100644
index 79c816d..0000000
--- a/FireBase/Streaming/FirebaseServerEventType.cs
+++ /dev/null
@@ -1,15 +0,0 @@
-namespace Firebase.Database.Streaming
-{
- internal enum FirebaseServerEventType
- {
- Put,
-
- Patch,
-
- KeepAlive,
-
- Cancel,
-
- AuthRevoked
- }
-} \ No newline at end of file
diff --git a/FireBase/Streaming/FirebaseSubscription.cs b/FireBase/Streaming/FirebaseSubscription.cs
deleted file mode 100644
index fb0f403..0000000
--- a/FireBase/Streaming/FirebaseSubscription.cs
+++ /dev/null
@@ -1,217 +0,0 @@
-using System;
-using System.Linq;
-using System.Net;
-using System.Net.Http;
-using System.Net.Http.Headers;
-using System.Threading;
-using System.Threading.Tasks;
-using Firebase.Database.Query;
-using Newtonsoft.Json.Linq;
-
-namespace Firebase.Database.Streaming
-{
- /// <summary>
- /// The firebase subscription.
- /// </summary>
- /// <typeparam name="T"> Type of object to be streaming back to the called. </typeparam>
- internal class FirebaseSubscription<T> : IDisposable
- {
- private static readonly HttpClient http;
- private readonly FirebaseCache<T> cache;
- private readonly CancellationTokenSource cancel;
- private readonly FirebaseClient client;
- private readonly string elementRoot;
- private readonly IObserver<FirebaseEvent<T>> observer;
- private readonly IFirebaseQuery query;
-
- static FirebaseSubscription()
- {
- var handler = new HttpClientHandler
- {
- AllowAutoRedirect = true,
- MaxAutomaticRedirections = 10,
- CookieContainer = new CookieContainer()
- };
-
- var httpClient = new HttpClient(handler, true);
-
- httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("text/event-stream"));
-
- http = httpClient;
- }
-
- /// <summary>
- /// Initializes a new instance of the <see cref="FirebaseSubscription{T}" /> class.
- /// </summary>
- /// <param name="observer"> The observer. </param>
- /// <param name="query"> The query. </param>
- /// <param name="cache"> The cache. </param>
- public FirebaseSubscription(IObserver<FirebaseEvent<T>> observer, IFirebaseQuery query, string elementRoot,
- FirebaseCache<T> cache)
- {
- this.observer = observer;
- this.query = query;
- this.elementRoot = elementRoot;
- cancel = new CancellationTokenSource();
- this.cache = cache;
- client = query.Client;
- }
-
- public void Dispose()
- {
- cancel.Cancel();
- }
-
- public event EventHandler<ExceptionEventArgs<FirebaseException>> ExceptionThrown;
-
- public IDisposable Run()
- {
- Task.Run(() => ReceiveThread());
-
- return this;
- }
-
- private async void ReceiveThread()
- {
- while (true)
- {
- var url = string.Empty;
- var line = string.Empty;
- var statusCode = HttpStatusCode.OK;
-
- try
- {
- cancel.Token.ThrowIfCancellationRequested();
-
- // initialize network connection
- url = await query.BuildUrlAsync().ConfigureAwait(false);
- var request = new HttpRequestMessage(HttpMethod.Get, url);
- var serverEvent = FirebaseServerEventType.KeepAlive;
-
- var client = GetHttpClient();
- var response = await client
- .SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancel.Token)
- .ConfigureAwait(false);
-
- statusCode = response.StatusCode;
- response.EnsureSuccessStatusCode();
-
- using (var stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false))
- using (var reader = this.client.Options.SubscriptionStreamReaderFactory(stream))
- {
- while (true)
- {
- cancel.Token.ThrowIfCancellationRequested();
-
- line = reader.ReadLine()?.Trim();
-
- if (string.IsNullOrWhiteSpace(line)) continue;
-
- var tuple = line.Split(new[] {':'}, 2, StringSplitOptions.RemoveEmptyEntries)
- .Select(s => s.Trim()).ToArray();
-
- switch (tuple[0].ToLower())
- {
- case "event":
- serverEvent = ParseServerEvent(serverEvent, tuple[1]);
- break;
- case "data":
- ProcessServerData(url, serverEvent, tuple[1]);
- break;
- }
-
- if (serverEvent == FirebaseServerEventType.AuthRevoked)
- // auth token no longer valid, reconnect
- break;
- }
- }
- }
- catch (OperationCanceledException)
- {
- break;
- }
- catch (Exception ex) when (statusCode != HttpStatusCode.OK)
- {
- observer.OnError(new FirebaseException(url, string.Empty, line, statusCode, ex));
- Dispose();
- break;
- }
- catch (Exception ex)
- {
- ExceptionThrown?.Invoke(this,
- new ExceptionEventArgs<FirebaseException>(new FirebaseException(url, string.Empty, line,
- statusCode, ex)));
-
- await Task.Delay(2000).ConfigureAwait(false);
- }
- }
- }
-
- private FirebaseServerEventType ParseServerEvent(FirebaseServerEventType serverEvent, string eventName)
- {
- switch (eventName)
- {
- case "put":
- serverEvent = FirebaseServerEventType.Put;
- break;
- case "patch":
- serverEvent = FirebaseServerEventType.Patch;
- break;
- case "keep-alive":
- serverEvent = FirebaseServerEventType.KeepAlive;
- break;
- case "cancel":
- serverEvent = FirebaseServerEventType.Cancel;
- break;
- case "auth_revoked":
- serverEvent = FirebaseServerEventType.AuthRevoked;
- break;
- }
-
- return serverEvent;
- }
-
- private void ProcessServerData(string url, FirebaseServerEventType serverEvent, string serverData)
- {
- switch (serverEvent)
- {
- case FirebaseServerEventType.Put:
- case FirebaseServerEventType.Patch:
- var result = JObject.Parse(serverData);
- var path = result["path"].ToString();
- var data = result["data"].ToString();
-
- // If an elementRoot parameter is provided, but it's not in the cache, it was already deleted. So we can return an empty object.
- if (string.IsNullOrWhiteSpace(elementRoot) || !cache.Contains(elementRoot))
- if (path == "/" && data == string.Empty)
- {
- observer.OnNext(FirebaseEvent<T>.Empty(FirebaseEventSource.OnlineStream));
- return;
- }
-
- var eventType = string.IsNullOrWhiteSpace(data)
- ? FirebaseEventType.Delete
- : FirebaseEventType.InsertOrUpdate;
-
- var items = cache.PushData(elementRoot + path, data);
-
- foreach (var i in items.ToList())
- observer.OnNext(new FirebaseEvent<T>(i.Key, i.Object, eventType,
- FirebaseEventSource.OnlineStream));
-
- break;
- case FirebaseServerEventType.KeepAlive:
- break;
- case FirebaseServerEventType.Cancel:
- observer.OnError(new FirebaseException(url, string.Empty, serverData, HttpStatusCode.Unauthorized));
- Dispose();
- break;
- }
- }
-
- private HttpClient GetHttpClient()
- {
- return http;
- }
- }
-} \ No newline at end of file
diff --git a/FireBase/Streaming/NonBlockingStreamReader.cs b/FireBase/Streaming/NonBlockingStreamReader.cs
deleted file mode 100644
index 8228e32..0000000
--- a/FireBase/Streaming/NonBlockingStreamReader.cs
+++ /dev/null
@@ -1,63 +0,0 @@
-using System.IO;
-using System.Text;
-
-namespace Firebase.Database.Streaming
-{
- /// <summary>
- /// When a regular <see cref="StreamReader" /> is used in a UWP app its <see cref="StreamReader.ReadLine" /> method
- /// tends to take a long
- /// time for data larger then 2 KB. This extremly simple implementation of <see cref="TextReader" /> can be used
- /// instead to boost performance
- /// in your UWP app. Use <see cref="FirebaseOptions" /> to inject an instance of this class into your
- /// <see cref="FirebaseClient" />.
- /// </summary>
- public class NonBlockingStreamReader : TextReader
- {
- private const int DefaultBufferSize = 16000;
- private readonly byte[] buffer;
- private readonly int bufferSize;
-
- private readonly Stream stream;
-
- private string cachedData;
-
- public NonBlockingStreamReader(Stream stream, int bufferSize = DefaultBufferSize)
- {
- this.stream = stream;
- this.bufferSize = bufferSize;
- buffer = new byte[bufferSize];
-
- cachedData = string.Empty;
- }
-
- public override string ReadLine()
- {
- var currentString = TryGetNewLine();
-
- while (currentString == null)
- {
- var read = stream.Read(buffer, 0, bufferSize);
- var str = Encoding.UTF8.GetString(buffer, 0, read);
-
- cachedData += str;
- currentString = TryGetNewLine();
- }
-
- return currentString;
- }
-
- private string TryGetNewLine()
- {
- var newLine = cachedData.IndexOf('\n');
-
- if (newLine >= 0)
- {
- var r = cachedData.Substring(0, newLine + 1);
- cachedData = cachedData.Remove(0, r.Length);
- return r.Trim();
- }
-
- return null;
- }
- }
-} \ No newline at end of file