summaryrefslogtreecommitdiff
path: root/dsa/FireBase/Streaming
diff options
context:
space:
mode:
Diffstat (limited to 'dsa/FireBase/Streaming')
-rw-r--r--dsa/FireBase/Streaming/FirebaseCache.cs60
-rw-r--r--dsa/FireBase/Streaming/FirebaseEvent.cs12
-rw-r--r--dsa/FireBase/Streaming/FirebaseEventSource.cs6
-rw-r--r--dsa/FireBase/Streaming/FirebaseEventType.cs6
-rw-r--r--dsa/FireBase/Streaming/FirebaseServerEventType.cs6
-rw-r--r--dsa/FireBase/Streaming/FirebaseSubscription.cs66
-rw-r--r--dsa/FireBase/Streaming/NonBlockingStreamReader.cs21
7 files changed, 59 insertions, 118 deletions
diff --git a/dsa/FireBase/Streaming/FirebaseCache.cs b/dsa/FireBase/Streaming/FirebaseCache.cs
index 66241e0..be6f2c7 100644
--- a/dsa/FireBase/Streaming/FirebaseCache.cs
+++ b/dsa/FireBase/Streaming/FirebaseCache.cs
@@ -6,19 +6,16 @@ using System.Reflection;
using Firebase.Database.Http;
using Newtonsoft.Json;
-namespace Firebase.Database.Streaming
-{
+namespace Firebase.Database.Streaming {
/// <summary>
/// The firebase cache.
/// </summary>
/// <typeparam name="T"> Type of top-level entities in the cache. </typeparam>
- public class FirebaseCache<T> : IEnumerable<FirebaseObject<T>>
- {
+ public class FirebaseCache<T> : IEnumerable<FirebaseObject<T>> {
private readonly IDictionary<string, T> dictionary;
private readonly bool isDictionaryType;
- private readonly JsonSerializerSettings serializerSettings = new JsonSerializerSettings
- {
+ private readonly JsonSerializerSettings serializerSettings = new JsonSerializerSettings {
ObjectCreationHandling = ObjectCreationHandling.Replace
};
@@ -26,16 +23,14 @@ namespace Firebase.Database.Streaming
/// Initializes a new instance of the <see cref="FirebaseCache{T}" /> class.
/// </summary>
public FirebaseCache()
- : this(new Dictionary<string, T>())
- {
+ : this(new Dictionary<string, T>()) {
}
/// <summary>
/// Initializes a new instance of the <see cref="FirebaseCache{T}" /> class and populates it with existing data.
/// </summary>
/// <param name="existingItems"> The existing items. </param>
- public FirebaseCache(IDictionary<string, T> existingItems)
- {
+ public FirebaseCache(IDictionary<string, T> existingItems) {
dictionary = existingItems;
isDictionaryType = typeof(IDictionary).GetTypeInfo().IsAssignableFrom(typeof(T).GetTypeInfo());
}
@@ -46,8 +41,7 @@ namespace Firebase.Database.Streaming
/// <param name="path"> The path of incoming data, separated by slash. </param>
/// <param name="data"> The data in json format as returned by firebase. </param>
/// <returns> Collection of top-level entities which were affected by the push. </returns>
- public IEnumerable<FirebaseObject<T>> PushData(string path, string data, bool removeEmptyEntries = true)
- {
+ public IEnumerable<FirebaseObject<T>> PushData(string path, string data, bool removeEmptyEntries = true) {
object obj = this.dictionary;
Action<object> primitiveObjSetter = null;
Action objDeleter = null;
@@ -57,8 +51,7 @@ namespace Firebase.Database.Streaming
// first find where we should insert the data to
foreach (var element in pathElements)
- if (obj is IDictionary)
- {
+ if (obj is IDictionary) {
// if it's a dictionary, then it's just a matter of inserting into it / accessing existing object by key
var dictionary = obj as IDictionary;
var valueType = obj.GetType().GenericTypeArguments[1];
@@ -66,18 +59,15 @@ namespace Firebase.Database.Streaming
primitiveObjSetter = d => dictionary[element] = d;
objDeleter = () => dictionary.Remove(element);
- if (dictionary.Contains(element))
- {
+ if (dictionary.Contains(element)) {
obj = dictionary[element];
}
- else
- {
+ else {
dictionary[element] = CreateInstance(valueType);
obj = dictionary[element];
}
}
- else
- {
+ else {
// if it's not a dictionary, try to find the property of current object with the matching name
var objParent = obj;
var property = objParent
@@ -89,16 +79,14 @@ namespace Firebase.Database.Streaming
objDeleter = () => property.SetValue(objParent, null);
primitiveObjSetter = d => property.SetValue(objParent, d);
obj = property.GetValue(obj);
- if (obj == null)
- {
+ if (obj == null) {
obj = CreateInstance(property.PropertyType);
property.SetValue(objParent, obj);
}
}
// if data is null (=empty string) delete it
- if (string.IsNullOrWhiteSpace(data) || data == "null")
- {
+ if (string.IsNullOrWhiteSpace(data) || data == "null") {
var key = pathElements[0];
var target = dictionary[key];
@@ -109,15 +97,13 @@ namespace Firebase.Database.Streaming
}
// now insert the data
- if (obj is IDictionary && !isDictionaryType)
- {
+ if (obj is IDictionary && !isDictionaryType) {
// insert data into dictionary and return it as a collection of FirebaseObject
var dictionary = obj as IDictionary;
var valueType = obj.GetType().GenericTypeArguments[1];
var objectCollection = data.GetObjectCollection(valueType);
- foreach (var item in objectCollection)
- {
+ foreach (var item in objectCollection) {
dictionary[item.Key] = item.Object;
// top level dictionary changed
@@ -125,14 +111,12 @@ namespace Firebase.Database.Streaming
}
// nested dictionary changed
- if (pathElements.Any())
- {
+ if (pathElements.Any()) {
this.dictionary[pathElements[0]] = this.dictionary[pathElements[0]];
yield return new FirebaseObject<T>(pathElements[0], this.dictionary[pathElements[0]]);
}
}
- else
- {
+ else {
// set the data on a property of the given object
var valueType = obj.GetType();
@@ -152,13 +136,11 @@ namespace Firebase.Database.Streaming
}
}
- public bool Contains(string key)
- {
+ public bool Contains(string key) {
return dictionary.Keys.Contains(key);
}
- private object CreateInstance(Type type)
- {
+ private object CreateInstance(Type type) {
if (type == typeof(string))
return string.Empty;
return Activator.CreateInstance(type);
@@ -166,13 +148,11 @@ namespace Firebase.Database.Streaming
#region IEnumerable
- IEnumerator IEnumerable.GetEnumerator()
- {
+ IEnumerator IEnumerable.GetEnumerator() {
return GetEnumerator();
}
- public IEnumerator<FirebaseObject<T>> GetEnumerator()
- {
+ public IEnumerator<FirebaseObject<T>> GetEnumerator() {
return dictionary.Select(p => new FirebaseObject<T>(p.Key, p.Value)).GetEnumerator();
}
diff --git a/dsa/FireBase/Streaming/FirebaseEvent.cs b/dsa/FireBase/Streaming/FirebaseEvent.cs
index 1761a72..2a1ec6d 100644
--- a/dsa/FireBase/Streaming/FirebaseEvent.cs
+++ b/dsa/FireBase/Streaming/FirebaseEvent.cs
@@ -1,11 +1,9 @@
-namespace Firebase.Database.Streaming
-{
+namespace Firebase.Database.Streaming {
/// <summary>
/// Firebase event which hold <see cref="EventType" /> and the object affected by the event.
/// </summary>
/// <typeparam name="T"> Type of object affected by the event. </typeparam>
- public class FirebaseEvent<T> : FirebaseObject<T>
- {
+ public class FirebaseEvent<T> : FirebaseObject<T> {
/// <summary>
/// Initializes a new instance of the <see cref="FirebaseEvent{T}" /> class.
/// </summary>
@@ -13,8 +11,7 @@ namespace Firebase.Database.Streaming
/// <param name="obj"> The object. </param>
/// <param name="eventType"> The event type. </param>
public FirebaseEvent(string key, T obj, FirebaseEventType eventType, FirebaseEventSource eventSource)
- : base(key, obj)
- {
+ : base(key, obj) {
EventType = eventType;
EventSource = eventSource;
}
@@ -29,8 +26,7 @@ namespace Firebase.Database.Streaming
/// </summary>
public FirebaseEventType EventType { get; }
- public static FirebaseEvent<T> Empty(FirebaseEventSource source)
- {
+ public static FirebaseEvent<T> Empty(FirebaseEventSource source) {
return new FirebaseEvent<T>(string.Empty, default(T), FirebaseEventType.InsertOrUpdate, source);
}
}
diff --git a/dsa/FireBase/Streaming/FirebaseEventSource.cs b/dsa/FireBase/Streaming/FirebaseEventSource.cs
index b1385ca..9c14f83 100644
--- a/dsa/FireBase/Streaming/FirebaseEventSource.cs
+++ b/dsa/FireBase/Streaming/FirebaseEventSource.cs
@@ -1,10 +1,8 @@
-namespace Firebase.Database.Streaming
-{
+namespace Firebase.Database.Streaming {
/// <summary>
/// Specifies the origin of given <see cref="FirebaseEvent{T}" />
/// </summary>
- public enum FirebaseEventSource
- {
+ public enum FirebaseEventSource {
/// <summary>
/// Event comes from an offline source.
/// </summary>
diff --git a/dsa/FireBase/Streaming/FirebaseEventType.cs b/dsa/FireBase/Streaming/FirebaseEventType.cs
index 7606331..e6127bf 100644
--- a/dsa/FireBase/Streaming/FirebaseEventType.cs
+++ b/dsa/FireBase/Streaming/FirebaseEventType.cs
@@ -1,10 +1,8 @@
-namespace Firebase.Database.Streaming
-{
+namespace Firebase.Database.Streaming {
/// <summary>
/// The type of event.
/// </summary>
- public enum FirebaseEventType
- {
+ public enum FirebaseEventType {
/// <summary>
/// Item was inserted or updated.
/// </summary>
diff --git a/dsa/FireBase/Streaming/FirebaseServerEventType.cs b/dsa/FireBase/Streaming/FirebaseServerEventType.cs
index 79c816d..9abdb43 100644
--- a/dsa/FireBase/Streaming/FirebaseServerEventType.cs
+++ b/dsa/FireBase/Streaming/FirebaseServerEventType.cs
@@ -1,7 +1,5 @@
-namespace Firebase.Database.Streaming
-{
- internal enum FirebaseServerEventType
- {
+namespace Firebase.Database.Streaming {
+ internal enum FirebaseServerEventType {
Put,
Patch,
diff --git a/dsa/FireBase/Streaming/FirebaseSubscription.cs b/dsa/FireBase/Streaming/FirebaseSubscription.cs
index fb0f403..6488815 100644
--- a/dsa/FireBase/Streaming/FirebaseSubscription.cs
+++ b/dsa/FireBase/Streaming/FirebaseSubscription.cs
@@ -8,14 +8,12 @@ using System.Threading.Tasks;
using Firebase.Database.Query;
using Newtonsoft.Json.Linq;
-namespace Firebase.Database.Streaming
-{
+namespace Firebase.Database.Streaming {
/// <summary>
/// The firebase subscription.
/// </summary>
/// <typeparam name="T"> Type of object to be streaming back to the called. </typeparam>
- internal class FirebaseSubscription<T> : IDisposable
- {
+ internal class FirebaseSubscription<T> : IDisposable {
private static readonly HttpClient http;
private readonly FirebaseCache<T> cache;
private readonly CancellationTokenSource cancel;
@@ -24,10 +22,8 @@ namespace Firebase.Database.Streaming
private readonly IObserver<FirebaseEvent<T>> observer;
private readonly IFirebaseQuery query;
- static FirebaseSubscription()
- {
- var handler = new HttpClientHandler
- {
+ static FirebaseSubscription() {
+ var handler = new HttpClientHandler {
AllowAutoRedirect = true,
MaxAutomaticRedirections = 10,
CookieContainer = new CookieContainer()
@@ -47,8 +43,7 @@ namespace Firebase.Database.Streaming
/// <param name="query"> The query. </param>
/// <param name="cache"> The cache. </param>
public FirebaseSubscription(IObserver<FirebaseEvent<T>> observer, IFirebaseQuery query, string elementRoot,
- FirebaseCache<T> cache)
- {
+ FirebaseCache<T> cache) {
this.observer = observer;
this.query = query;
this.elementRoot = elementRoot;
@@ -57,30 +52,25 @@ namespace Firebase.Database.Streaming
client = query.Client;
}
- public void Dispose()
- {
+ public void Dispose() {
cancel.Cancel();
}
public event EventHandler<ExceptionEventArgs<FirebaseException>> ExceptionThrown;
- public IDisposable Run()
- {
+ public IDisposable Run() {
Task.Run(() => ReceiveThread());
return this;
}
- private async void ReceiveThread()
- {
- while (true)
- {
+ private async void ReceiveThread() {
+ while (true) {
var url = string.Empty;
var line = string.Empty;
var statusCode = HttpStatusCode.OK;
- try
- {
+ try {
cancel.Token.ThrowIfCancellationRequested();
// initialize network connection
@@ -97,10 +87,8 @@ namespace Firebase.Database.Streaming
response.EnsureSuccessStatusCode();
using (var stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false))
- using (var reader = this.client.Options.SubscriptionStreamReaderFactory(stream))
- {
- while (true)
- {
+ using (var reader = this.client.Options.SubscriptionStreamReaderFactory(stream)) {
+ while (true) {
cancel.Token.ThrowIfCancellationRequested();
line = reader.ReadLine()?.Trim();
@@ -110,8 +98,7 @@ namespace Firebase.Database.Streaming
var tuple = line.Split(new[] {':'}, 2, StringSplitOptions.RemoveEmptyEntries)
.Select(s => s.Trim()).ToArray();
- switch (tuple[0].ToLower())
- {
+ switch (tuple[0].ToLower()) {
case "event":
serverEvent = ParseServerEvent(serverEvent, tuple[1]);
break;
@@ -126,18 +113,15 @@ namespace Firebase.Database.Streaming
}
}
}
- catch (OperationCanceledException)
- {
+ catch (OperationCanceledException) {
break;
}
- catch (Exception ex) when (statusCode != HttpStatusCode.OK)
- {
+ catch (Exception ex) when (statusCode != HttpStatusCode.OK) {
observer.OnError(new FirebaseException(url, string.Empty, line, statusCode, ex));
Dispose();
break;
}
- catch (Exception ex)
- {
+ catch (Exception ex) {
ExceptionThrown?.Invoke(this,
new ExceptionEventArgs<FirebaseException>(new FirebaseException(url, string.Empty, line,
statusCode, ex)));
@@ -147,10 +131,8 @@ namespace Firebase.Database.Streaming
}
}
- private FirebaseServerEventType ParseServerEvent(FirebaseServerEventType serverEvent, string eventName)
- {
- switch (eventName)
- {
+ private FirebaseServerEventType ParseServerEvent(FirebaseServerEventType serverEvent, string eventName) {
+ switch (eventName) {
case "put":
serverEvent = FirebaseServerEventType.Put;
break;
@@ -171,10 +153,8 @@ namespace Firebase.Database.Streaming
return serverEvent;
}
- private void ProcessServerData(string url, FirebaseServerEventType serverEvent, string serverData)
- {
- switch (serverEvent)
- {
+ private void ProcessServerData(string url, FirebaseServerEventType serverEvent, string serverData) {
+ switch (serverEvent) {
case FirebaseServerEventType.Put:
case FirebaseServerEventType.Patch:
var result = JObject.Parse(serverData);
@@ -183,8 +163,7 @@ namespace Firebase.Database.Streaming
// If an elementRoot parameter is provided, but it's not in the cache, it was already deleted. So we can return an empty object.
if (string.IsNullOrWhiteSpace(elementRoot) || !cache.Contains(elementRoot))
- if (path == "/" && data == string.Empty)
- {
+ if (path == "/" && data == string.Empty) {
observer.OnNext(FirebaseEvent<T>.Empty(FirebaseEventSource.OnlineStream));
return;
}
@@ -209,8 +188,7 @@ namespace Firebase.Database.Streaming
}
}
- private HttpClient GetHttpClient()
- {
+ private HttpClient GetHttpClient() {
return http;
}
}
diff --git a/dsa/FireBase/Streaming/NonBlockingStreamReader.cs b/dsa/FireBase/Streaming/NonBlockingStreamReader.cs
index 8228e32..805716b 100644
--- a/dsa/FireBase/Streaming/NonBlockingStreamReader.cs
+++ b/dsa/FireBase/Streaming/NonBlockingStreamReader.cs
@@ -1,8 +1,7 @@
using System.IO;
using System.Text;
-namespace Firebase.Database.Streaming
-{
+namespace Firebase.Database.Streaming {
/// <summary>
/// When a regular <see cref="StreamReader" /> is used in a UWP app its <see cref="StreamReader.ReadLine" /> method
/// tends to take a long
@@ -11,8 +10,7 @@ namespace Firebase.Database.Streaming
/// in your UWP app. Use <see cref="FirebaseOptions" /> to inject an instance of this class into your
/// <see cref="FirebaseClient" />.
/// </summary>
- public class NonBlockingStreamReader : TextReader
- {
+ public class NonBlockingStreamReader : TextReader {
private const int DefaultBufferSize = 16000;
private readonly byte[] buffer;
private readonly int bufferSize;
@@ -21,8 +19,7 @@ namespace Firebase.Database.Streaming
private string cachedData;
- public NonBlockingStreamReader(Stream stream, int bufferSize = DefaultBufferSize)
- {
+ public NonBlockingStreamReader(Stream stream, int bufferSize = DefaultBufferSize) {
this.stream = stream;
this.bufferSize = bufferSize;
buffer = new byte[bufferSize];
@@ -30,12 +27,10 @@ namespace Firebase.Database.Streaming
cachedData = string.Empty;
}
- public override string ReadLine()
- {
+ public override string ReadLine() {
var currentString = TryGetNewLine();
- while (currentString == null)
- {
+ while (currentString == null) {
var read = stream.Read(buffer, 0, bufferSize);
var str = Encoding.UTF8.GetString(buffer, 0, read);
@@ -46,12 +41,10 @@ namespace Firebase.Database.Streaming
return currentString;
}
- private string TryGetNewLine()
- {
+ private string TryGetNewLine() {
var newLine = cachedData.IndexOf('\n');
- if (newLine >= 0)
- {
+ if (newLine >= 0) {
var r = cachedData.Substring(0, newLine + 1);
cachedData = cachedData.Remove(0, r.Length);
return r.Trim();