summaryrefslogtreecommitdiff
path: root/FireBase/Streaming
diff options
context:
space:
mode:
Diffstat (limited to 'FireBase/Streaming')
-rw-r--r--FireBase/Streaming/FirebaseCache.cs93
-rw-r--r--FireBase/Streaming/FirebaseEvent.cs27
-rw-r--r--FireBase/Streaming/FirebaseEventSource.cs16
-rw-r--r--FireBase/Streaming/FirebaseEventType.cs8
-rw-r--r--FireBase/Streaming/FirebaseServerEventType.cs2
-rw-r--r--FireBase/Streaming/FirebaseSubscription.cs112
-rw-r--r--FireBase/Streaming/NonBlockingStreamReader.cs43
7 files changed, 143 insertions, 158 deletions
diff --git a/FireBase/Streaming/FirebaseCache.cs b/FireBase/Streaming/FirebaseCache.cs
index ba7990b..66241e0 100644
--- a/FireBase/Streaming/FirebaseCache.cs
+++ b/FireBase/Streaming/FirebaseCache.cs
@@ -1,51 +1,50 @@
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reflection;
+using Firebase.Database.Http;
+using Newtonsoft.Json;
+
namespace Firebase.Database.Streaming
{
- using System;
- using System.Collections;
- using System.Collections.Generic;
- using System.Linq;
- using System.Reflection;
-
- using Firebase.Database.Http;
-
- using Newtonsoft.Json;
-
/// <summary>
- /// The firebase cache.
+ /// The firebase cache.
/// </summary>
/// <typeparam name="T"> Type of top-level entities in the cache. </typeparam>
public class FirebaseCache<T> : IEnumerable<FirebaseObject<T>>
{
private readonly IDictionary<string, T> dictionary;
private readonly bool isDictionaryType;
- private readonly JsonSerializerSettings serializerSettings = new JsonSerializerSettings()
+
+ private readonly JsonSerializerSettings serializerSettings = new JsonSerializerSettings
{
ObjectCreationHandling = ObjectCreationHandling.Replace
};
/// <summary>
- /// Initializes a new instance of the <see cref="FirebaseCache{T}"/> class.
+ /// Initializes a new instance of the <see cref="FirebaseCache{T}" /> class.
/// </summary>
- public FirebaseCache()
+ public FirebaseCache()
: this(new Dictionary<string, T>())
{
}
/// <summary>
- /// Initializes a new instance of the <see cref="FirebaseCache{T}"/> class and populates it with existing data.
+ /// Initializes a new instance of the <see cref="FirebaseCache{T}" /> class and populates it with existing data.
/// </summary>
/// <param name="existingItems"> The existing items. </param>
public FirebaseCache(IDictionary<string, T> existingItems)
{
- this.dictionary = existingItems;
- this.isDictionaryType = typeof(IDictionary).GetTypeInfo().IsAssignableFrom(typeof(T).GetTypeInfo());
+ dictionary = existingItems;
+ isDictionaryType = typeof(IDictionary).GetTypeInfo().IsAssignableFrom(typeof(T).GetTypeInfo());
}
/// <summary>
- /// The push data.
+ /// The push data.
/// </summary>
- /// <param name="path"> The path of incoming data, separated by slash. </param>
- /// <param name="data"> The data in json format as returned by firebase. </param>
+ /// <param name="path"> The path of incoming data, separated by slash. </param>
+ /// <param name="data"> The data in json format as returned by firebase. </param>
/// <returns> Collection of top-level entities which were affected by the push. </returns>
public IEnumerable<FirebaseObject<T>> PushData(string path, string data, bool removeEmptyEntries = true)
{
@@ -53,18 +52,18 @@ namespace Firebase.Database.Streaming
Action<object> primitiveObjSetter = null;
Action objDeleter = null;
- var pathElements = path.Split(new[] { "/" }, removeEmptyEntries ? StringSplitOptions.RemoveEmptyEntries : StringSplitOptions.None);
+ var pathElements = path.Split(new[] {"/"},
+ removeEmptyEntries ? StringSplitOptions.RemoveEmptyEntries : StringSplitOptions.None);
// first find where we should insert the data to
foreach (var element in pathElements)
- {
if (obj is IDictionary)
{
// if it's a dictionary, then it's just a matter of inserting into it / accessing existing object by key
var dictionary = obj as IDictionary;
var valueType = obj.GetType().GenericTypeArguments[1];
- primitiveObjSetter = (d) => dictionary[element] = d;
+ primitiveObjSetter = d => dictionary[element] = d;
objDeleter = () => dictionary.Remove(element);
if (dictionary.Contains(element))
@@ -73,7 +72,7 @@ namespace Firebase.Database.Streaming
}
else
{
- dictionary[element] = this.CreateInstance(valueType);
+ dictionary[element] = CreateInstance(valueType);
obj = dictionary[element];
}
}
@@ -84,24 +83,24 @@ namespace Firebase.Database.Streaming
var property = objParent
.GetType()
.GetRuntimeProperties()
- .First(p => p.Name.Equals(element, StringComparison.OrdinalIgnoreCase) || element == p.GetCustomAttribute<JsonPropertyAttribute>()?.PropertyName);
+ .First(p => p.Name.Equals(element, StringComparison.OrdinalIgnoreCase) ||
+ element == p.GetCustomAttribute<JsonPropertyAttribute>()?.PropertyName);
objDeleter = () => property.SetValue(objParent, null);
- primitiveObjSetter = (d) => property.SetValue(objParent, d);
+ primitiveObjSetter = d => property.SetValue(objParent, d);
obj = property.GetValue(obj);
if (obj == null)
{
- obj = this.CreateInstance(property.PropertyType);
+ obj = CreateInstance(property.PropertyType);
property.SetValue(objParent, obj);
}
}
- }
// if data is null (=empty string) delete it
if (string.IsNullOrWhiteSpace(data) || data == "null")
{
var key = pathElements[0];
- var target = this.dictionary[key];
+ var target = dictionary[key];
objDeleter();
@@ -110,7 +109,7 @@ namespace Firebase.Database.Streaming
}
// now insert the data
- if (obj is IDictionary && !this.isDictionaryType)
+ if (obj is IDictionary && !isDictionaryType)
{
// insert data into dictionary and return it as a collection of FirebaseObject
var dictionary = obj as IDictionary;
@@ -122,10 +121,7 @@ namespace Firebase.Database.Streaming
dictionary[item.Key] = item.Object;
// top level dictionary changed
- if (!pathElements.Any())
- {
- yield return new FirebaseObject<T>(item.Key, (T)item.Object);
- }
+ if (!pathElements.Any()) yield return new FirebaseObject<T>(item.Key, (T) item.Object);
}
// nested dictionary changed
@@ -141,52 +137,45 @@ namespace Firebase.Database.Streaming
var valueType = obj.GetType();
// firebase sends strings without double quotes
- var targetObject = valueType == typeof(string) ? data.ToString() : JsonConvert.DeserializeObject(data, valueType);
+ var targetObject = valueType == typeof(string)
+ ? data
+ : JsonConvert.DeserializeObject(data, valueType);
if ((valueType.GetTypeInfo().IsPrimitive || valueType == typeof(string)) && primitiveObjSetter != null)
- {
// handle primitive (value) types separately
primitiveObjSetter(targetObject);
- }
else
- {
- JsonConvert.PopulateObject(data, obj, this.serializerSettings);
- }
+ JsonConvert.PopulateObject(data, obj, serializerSettings);
- this.dictionary[pathElements[0]] = this.dictionary[pathElements[0]];
- yield return new FirebaseObject<T>(pathElements[0], this.dictionary[pathElements[0]]);
+ dictionary[pathElements[0]] = dictionary[pathElements[0]];
+ yield return new FirebaseObject<T>(pathElements[0], dictionary[pathElements[0]]);
}
}
public bool Contains(string key)
{
- return this.dictionary.Keys.Contains(key);
+ return dictionary.Keys.Contains(key);
}
private object CreateInstance(Type type)
{
if (type == typeof(string))
- {
return string.Empty;
- }
- else
- {
- return Activator.CreateInstance(type);
- }
+ return Activator.CreateInstance(type);
}
#region IEnumerable
IEnumerator IEnumerable.GetEnumerator()
{
- return this.GetEnumerator();
+ return GetEnumerator();
}
public IEnumerator<FirebaseObject<T>> GetEnumerator()
{
- return this.dictionary.Select(p => new FirebaseObject<T>(p.Key, p.Value)).GetEnumerator();
+ return dictionary.Select(p => new FirebaseObject<T>(p.Key, p.Value)).GetEnumerator();
}
#endregion
}
-}
+} \ No newline at end of file
diff --git a/FireBase/Streaming/FirebaseEvent.cs b/FireBase/Streaming/FirebaseEvent.cs
index c2338ca..1761a72 100644
--- a/FireBase/Streaming/FirebaseEvent.cs
+++ b/FireBase/Streaming/FirebaseEvent.cs
@@ -1,13 +1,13 @@
namespace Firebase.Database.Streaming
{
/// <summary>
- /// Firebase event which hold <see cref="EventType"/> and the object affected by the event.
+ /// Firebase event which hold <see cref="EventType" /> and the object affected by the event.
/// </summary>
/// <typeparam name="T"> Type of object affected by the event. </typeparam>
public class FirebaseEvent<T> : FirebaseObject<T>
{
/// <summary>
- /// Initializes a new instance of the <see cref="FirebaseEvent{T}"/> class.
+ /// Initializes a new instance of the <see cref="FirebaseEvent{T}" /> class.
/// </summary>
/// <param name="key"> The key of the object. </param>
/// <param name="obj"> The object. </param>
@@ -15,26 +15,23 @@ namespace Firebase.Database.Streaming
public FirebaseEvent(string key, T obj, FirebaseEventType eventType, FirebaseEventSource eventSource)
: base(key, obj)
{
- this.EventType = eventType;
- this.EventSource = eventSource;
+ EventType = eventType;
+ EventSource = eventSource;
}
/// <summary>
- /// Gets the source of the event.
+ /// Gets the source of the event.
/// </summary>
- public FirebaseEventSource EventSource
- {
- get;
- }
+ public FirebaseEventSource EventSource { get; }
/// <summary>
- /// Gets the event type.
+ /// Gets the event type.
/// </summary>
- public FirebaseEventType EventType
+ public FirebaseEventType EventType { get; }
+
+ public static FirebaseEvent<T> Empty(FirebaseEventSource source)
{
- get;
+ return new FirebaseEvent<T>(string.Empty, default(T), FirebaseEventType.InsertOrUpdate, source);
}
-
- public static FirebaseEvent<T> Empty(FirebaseEventSource source) => new FirebaseEvent<T>(string.Empty, default(T), FirebaseEventType.InsertOrUpdate, source);
}
-}
+} \ No newline at end of file
diff --git a/FireBase/Streaming/FirebaseEventSource.cs b/FireBase/Streaming/FirebaseEventSource.cs
index 98df977..b1385ca 100644
--- a/FireBase/Streaming/FirebaseEventSource.cs
+++ b/FireBase/Streaming/FirebaseEventSource.cs
@@ -1,38 +1,38 @@
namespace Firebase.Database.Streaming
{
/// <summary>
- /// Specifies the origin of given <see cref="FirebaseEvent{T}"/>
+ /// Specifies the origin of given <see cref="FirebaseEvent{T}" />
/// </summary>
public enum FirebaseEventSource
{
/// <summary>
- /// Event comes from an offline source.
+ /// Event comes from an offline source.
/// </summary>
Offline,
/// <summary>
- /// Event comes from online source fetched during initial pull (valid only for RealtimeDatabase).
+ /// Event comes from online source fetched during initial pull (valid only for RealtimeDatabase).
/// </summary>
OnlineInitial,
/// <summary>
- /// Event comes from online source received thru active stream.
+ /// Event comes from online source received thru active stream.
/// </summary>
OnlineStream,
/// <summary>
- /// Event comes from online source being fetched manually.
+ /// Event comes from online source being fetched manually.
/// </summary>
OnlinePull,
/// <summary>
- /// Event raised after successful online push (valid only for RealtimeDatabase which isn't streaming).
+ /// Event raised after successful online push (valid only for RealtimeDatabase which isn't streaming).
/// </summary>
OnlinePush,
/// <summary>
- /// Event comes from an online source.
+ /// Event comes from an online source.
/// </summary>
Online = OnlineInitial | OnlinePull | OnlinePush | OnlineStream
}
-}
+} \ No newline at end of file
diff --git a/FireBase/Streaming/FirebaseEventType.cs b/FireBase/Streaming/FirebaseEventType.cs
index 5fb21ef..7606331 100644
--- a/FireBase/Streaming/FirebaseEventType.cs
+++ b/FireBase/Streaming/FirebaseEventType.cs
@@ -1,18 +1,18 @@
namespace Firebase.Database.Streaming
{
/// <summary>
- /// The type of event.
+ /// The type of event.
/// </summary>
public enum FirebaseEventType
{
/// <summary>
- /// Item was inserted or updated.
+ /// Item was inserted or updated.
/// </summary>
InsertOrUpdate,
/// <summary>
- /// Item was deleted.
+ /// Item was deleted.
/// </summary>
Delete
}
-}
+} \ No newline at end of file
diff --git a/FireBase/Streaming/FirebaseServerEventType.cs b/FireBase/Streaming/FirebaseServerEventType.cs
index 1f10bc8..79c816d 100644
--- a/FireBase/Streaming/FirebaseServerEventType.cs
+++ b/FireBase/Streaming/FirebaseServerEventType.cs
@@ -12,4 +12,4 @@ namespace Firebase.Database.Streaming
AuthRevoked
}
-}
+} \ No newline at end of file
diff --git a/FireBase/Streaming/FirebaseSubscription.cs b/FireBase/Streaming/FirebaseSubscription.cs
index 4b5e643..fb0f403 100644
--- a/FireBase/Streaming/FirebaseSubscription.cs
+++ b/FireBase/Streaming/FirebaseSubscription.cs
@@ -1,32 +1,28 @@
+using System;
+using System.Linq;
+using System.Net;
+using System.Net.Http;
+using System.Net.Http.Headers;
+using System.Threading;
+using System.Threading.Tasks;
+using Firebase.Database.Query;
+using Newtonsoft.Json.Linq;
+
namespace Firebase.Database.Streaming
{
- using System;
- using System.Diagnostics;
- using System.Linq;
- using System.Net.Http;
- using System.Net.Http.Headers;
- using System.Threading;
- using System.Threading.Tasks;
-
- using Firebase.Database.Query;
-
- using Newtonsoft.Json.Linq;
- using System.Net;
-
/// <summary>
- /// The firebase subscription.
+ /// The firebase subscription.
/// </summary>
/// <typeparam name="T"> Type of object to be streaming back to the called. </typeparam>
internal class FirebaseSubscription<T> : IDisposable
{
+ private static readonly HttpClient http;
+ private readonly FirebaseCache<T> cache;
private readonly CancellationTokenSource cancel;
+ private readonly FirebaseClient client;
+ private readonly string elementRoot;
private readonly IObserver<FirebaseEvent<T>> observer;
private readonly IFirebaseQuery query;
- private readonly FirebaseCache<T> cache;
- private readonly string elementRoot;
- private readonly FirebaseClient client;
-
- private static HttpClient http;
static FirebaseSubscription()
{
@@ -45,31 +41,32 @@ namespace Firebase.Database.Streaming
}
/// <summary>
- /// Initializes a new instance of the <see cref="FirebaseSubscription{T}"/> class.
+ /// Initializes a new instance of the <see cref="FirebaseSubscription{T}" /> class.
/// </summary>
/// <param name="observer"> The observer. </param>
/// <param name="query"> The query. </param>
/// <param name="cache"> The cache. </param>
- public FirebaseSubscription(IObserver<FirebaseEvent<T>> observer, IFirebaseQuery query, string elementRoot, FirebaseCache<T> cache)
+ public FirebaseSubscription(IObserver<FirebaseEvent<T>> observer, IFirebaseQuery query, string elementRoot,
+ FirebaseCache<T> cache)
{
this.observer = observer;
this.query = query;
this.elementRoot = elementRoot;
- this.cancel = new CancellationTokenSource();
+ cancel = new CancellationTokenSource();
this.cache = cache;
- this.client = query.Client;
+ client = query.Client;
}
- public event EventHandler<ExceptionEventArgs<FirebaseException>> ExceptionThrown;
-
public void Dispose()
{
- this.cancel.Cancel();
+ cancel.Cancel();
}
+ public event EventHandler<ExceptionEventArgs<FirebaseException>> ExceptionThrown;
+
public IDisposable Run()
{
- Task.Run(() => this.ReceiveThread());
+ Task.Run(() => ReceiveThread());
return this;
}
@@ -84,15 +81,17 @@ namespace Firebase.Database.Streaming
try
{
- this.cancel.Token.ThrowIfCancellationRequested();
+ cancel.Token.ThrowIfCancellationRequested();
// initialize network connection
- url = await this.query.BuildUrlAsync().ConfigureAwait(false);
+ url = await query.BuildUrlAsync().ConfigureAwait(false);
var request = new HttpRequestMessage(HttpMethod.Get, url);
var serverEvent = FirebaseServerEventType.KeepAlive;
- var client = this.GetHttpClient();
- var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, this.cancel.Token).ConfigureAwait(false);
+ var client = GetHttpClient();
+ var response = await client
+ .SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancel.Token)
+ .ConfigureAwait(false);
statusCode = response.StatusCode;
response.EnsureSuccessStatusCode();
@@ -102,32 +101,28 @@ namespace Firebase.Database.Streaming
{
while (true)
{
- this.cancel.Token.ThrowIfCancellationRequested();
+ cancel.Token.ThrowIfCancellationRequested();
line = reader.ReadLine()?.Trim();
- if (string.IsNullOrWhiteSpace(line))
- {
- continue;
- }
+ if (string.IsNullOrWhiteSpace(line)) continue;
+
+ var tuple = line.Split(new[] {':'}, 2, StringSplitOptions.RemoveEmptyEntries)
+ .Select(s => s.Trim()).ToArray();
- var tuple = line.Split(new[] { ':' }, 2, StringSplitOptions.RemoveEmptyEntries).Select(s => s.Trim()).ToArray();
-
switch (tuple[0].ToLower())
{
case "event":
- serverEvent = this.ParseServerEvent(serverEvent, tuple[1]);
+ serverEvent = ParseServerEvent(serverEvent, tuple[1]);
break;
case "data":
- this.ProcessServerData(url, serverEvent, tuple[1]);
+ ProcessServerData(url, serverEvent, tuple[1]);
break;
}
if (serverEvent == FirebaseServerEventType.AuthRevoked)
- {
// auth token no longer valid, reconnect
break;
- }
}
}
}
@@ -137,13 +132,15 @@ namespace Firebase.Database.Streaming
}
catch (Exception ex) when (statusCode != HttpStatusCode.OK)
{
- this.observer.OnError(new FirebaseException(url, string.Empty, line, statusCode, ex));
- this.Dispose();
+ observer.OnError(new FirebaseException(url, string.Empty, line, statusCode, ex));
+ Dispose();
break;
}
catch (Exception ex)
{
- this.ExceptionThrown?.Invoke(this, new ExceptionEventArgs<FirebaseException>(new FirebaseException(url, string.Empty, line, statusCode, ex)));
+ ExceptionThrown?.Invoke(this,
+ new ExceptionEventArgs<FirebaseException>(new FirebaseException(url, string.Empty, line,
+ statusCode, ex)));
await Task.Delay(2000).ConfigureAwait(false);
}
@@ -185,30 +182,29 @@ namespace Firebase.Database.Streaming
var data = result["data"].ToString();
// If an elementRoot parameter is provided, but it's not in the cache, it was already deleted. So we can return an empty object.
- if(string.IsNullOrWhiteSpace(this.elementRoot) || !this.cache.Contains(this.elementRoot))
- {
- if(path == "/" && data == string.Empty)
+ if (string.IsNullOrWhiteSpace(elementRoot) || !cache.Contains(elementRoot))
+ if (path == "/" && data == string.Empty)
{
- this.observer.OnNext(FirebaseEvent<T>.Empty(FirebaseEventSource.OnlineStream));
+ observer.OnNext(FirebaseEvent<T>.Empty(FirebaseEventSource.OnlineStream));
return;
}
- }
- var eventType = string.IsNullOrWhiteSpace(data) ? FirebaseEventType.Delete : FirebaseEventType.InsertOrUpdate;
+ var eventType = string.IsNullOrWhiteSpace(data)
+ ? FirebaseEventType.Delete
+ : FirebaseEventType.InsertOrUpdate;
- var items = this.cache.PushData(this.elementRoot + path, data);
+ var items = cache.PushData(elementRoot + path, data);
foreach (var i in items.ToList())
- {
- this.observer.OnNext(new FirebaseEvent<T>(i.Key, i.Object, eventType, FirebaseEventSource.OnlineStream));
- }
+ observer.OnNext(new FirebaseEvent<T>(i.Key, i.Object, eventType,
+ FirebaseEventSource.OnlineStream));
break;
case FirebaseServerEventType.KeepAlive:
break;
case FirebaseServerEventType.Cancel:
- this.observer.OnError(new FirebaseException(url, string.Empty, serverData, HttpStatusCode.Unauthorized));
- this.Dispose();
+ observer.OnError(new FirebaseException(url, string.Empty, serverData, HttpStatusCode.Unauthorized));
+ Dispose();
break;
}
}
@@ -218,4 +214,4 @@ namespace Firebase.Database.Streaming
return http;
}
}
-}
+} \ No newline at end of file
diff --git a/FireBase/Streaming/NonBlockingStreamReader.cs b/FireBase/Streaming/NonBlockingStreamReader.cs
index 2ac83fd..8228e32 100644
--- a/FireBase/Streaming/NonBlockingStreamReader.cs
+++ b/FireBase/Streaming/NonBlockingStreamReader.cs
@@ -1,45 +1,48 @@
-namespace Firebase.Database.Streaming
-{
- using System.IO;
- using System.Text;
+using System.IO;
+using System.Text;
+namespace Firebase.Database.Streaming
+{
/// <summary>
- /// When a regular <see cref="StreamReader"/> is used in a UWP app its <see cref="StreamReader.ReadLine"/> method tends to take a long
- /// time for data larger then 2 KB. This extremly simple implementation of <see cref="TextReader"/> can be used instead to boost performance
- /// in your UWP app. Use <see cref="FirebaseOptions"/> to inject an instance of this class into your <see cref="FirebaseClient"/>.
+ /// When a regular <see cref="StreamReader" /> is used in a UWP app its <see cref="StreamReader.ReadLine" /> method
+ /// tends to take a long
+ /// time for data larger then 2 KB. This extremly simple implementation of <see cref="TextReader" /> can be used
+ /// instead to boost performance
+ /// in your UWP app. Use <see cref="FirebaseOptions" /> to inject an instance of this class into your
+ /// <see cref="FirebaseClient" />.
/// </summary>
public class NonBlockingStreamReader : TextReader
{
private const int DefaultBufferSize = 16000;
-
- private readonly Stream stream;
private readonly byte[] buffer;
private readonly int bufferSize;
+ private readonly Stream stream;
+
private string cachedData;
-
- public NonBlockingStreamReader(Stream stream, int bufferSize = DefaultBufferSize)
+
+ public NonBlockingStreamReader(Stream stream, int bufferSize = DefaultBufferSize)
{
this.stream = stream;
this.bufferSize = bufferSize;
- this.buffer = new byte[bufferSize];
+ buffer = new byte[bufferSize];
- this.cachedData = string.Empty;
+ cachedData = string.Empty;
}
public override string ReadLine()
{
- var currentString = this.TryGetNewLine();
-
+ var currentString = TryGetNewLine();
+
while (currentString == null)
{
- var read = this.stream.Read(this.buffer, 0, this.bufferSize);
+ var read = stream.Read(buffer, 0, bufferSize);
var str = Encoding.UTF8.GetString(buffer, 0, read);
cachedData += str;
- currentString = this.TryGetNewLine();
+ currentString = TryGetNewLine();
}
-
+
return currentString;
}
@@ -50,11 +53,11 @@
if (newLine >= 0)
{
var r = cachedData.Substring(0, newLine + 1);
- this.cachedData = cachedData.Remove(0, r.Length);
+ cachedData = cachedData.Remove(0, r.Length);
return r.Trim();
}
return null;
}
}
-}
+} \ No newline at end of file