summaryrefslogtreecommitdiff
path: root/FireBase/Streaming
diff options
context:
space:
mode:
Diffstat (limited to 'FireBase/Streaming')
-rw-r--r--FireBase/Streaming/FirebaseCache.cs58
-rw-r--r--FireBase/Streaming/FirebaseEvent.cs19
-rw-r--r--FireBase/Streaming/FirebaseEventSource.cs2
-rw-r--r--FireBase/Streaming/FirebaseEventType.cs2
-rw-r--r--FireBase/Streaming/FirebaseServerEventType.cs2
-rw-r--r--FireBase/Streaming/FirebaseSubscription.cs76
-rw-r--r--FireBase/Streaming/NonBlockingStreamReader.cs22
7 files changed, 83 insertions, 98 deletions
diff --git a/FireBase/Streaming/FirebaseCache.cs b/FireBase/Streaming/FirebaseCache.cs
index ba7990b..77fc622 100644
--- a/FireBase/Streaming/FirebaseCache.cs
+++ b/FireBase/Streaming/FirebaseCache.cs
@@ -5,9 +5,7 @@ namespace Firebase.Database.Streaming
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
-
- using Firebase.Database.Http;
-
+ using Http;
using Newtonsoft.Json;
/// <summary>
@@ -18,6 +16,7 @@ namespace Firebase.Database.Streaming
{
private readonly IDictionary<string, T> dictionary;
private readonly bool isDictionaryType;
+
private readonly JsonSerializerSettings serializerSettings = new JsonSerializerSettings()
{
ObjectCreationHandling = ObjectCreationHandling.Replace
@@ -26,7 +25,7 @@ namespace Firebase.Database.Streaming
/// <summary>
/// Initializes a new instance of the <see cref="FirebaseCache{T}"/> class.
/// </summary>
- public FirebaseCache()
+ public FirebaseCache()
: this(new Dictionary<string, T>())
{
}
@@ -37,8 +36,8 @@ namespace Firebase.Database.Streaming
/// <param name="existingItems"> The existing items. </param>
public FirebaseCache(IDictionary<string, T> existingItems)
{
- this.dictionary = existingItems;
- this.isDictionaryType = typeof(IDictionary).GetTypeInfo().IsAssignableFrom(typeof(T).GetTypeInfo());
+ dictionary = existingItems;
+ isDictionaryType = typeof(IDictionary).GetTypeInfo().IsAssignableFrom(typeof(T).GetTypeInfo());
}
/// <summary>
@@ -53,11 +52,11 @@ namespace Firebase.Database.Streaming
Action<object> primitiveObjSetter = null;
Action objDeleter = null;
- var pathElements = path.Split(new[] { "/" }, removeEmptyEntries ? StringSplitOptions.RemoveEmptyEntries : StringSplitOptions.None);
+ var pathElements = path.Split(new[] {"/"},
+ removeEmptyEntries ? StringSplitOptions.RemoveEmptyEntries : StringSplitOptions.None);
// first find where we should insert the data to
foreach (var element in pathElements)
- {
if (obj is IDictionary)
{
// if it's a dictionary, then it's just a matter of inserting into it / accessing existing object by key
@@ -73,7 +72,7 @@ namespace Firebase.Database.Streaming
}
else
{
- dictionary[element] = this.CreateInstance(valueType);
+ dictionary[element] = CreateInstance(valueType);
obj = dictionary[element];
}
}
@@ -84,24 +83,24 @@ namespace Firebase.Database.Streaming
var property = objParent
.GetType()
.GetRuntimeProperties()
- .First(p => p.Name.Equals(element, StringComparison.OrdinalIgnoreCase) || element == p.GetCustomAttribute<JsonPropertyAttribute>()?.PropertyName);
+ .First(p => p.Name.Equals(element, StringComparison.OrdinalIgnoreCase) ||
+ element == p.GetCustomAttribute<JsonPropertyAttribute>()?.PropertyName);
objDeleter = () => property.SetValue(objParent, null);
primitiveObjSetter = (d) => property.SetValue(objParent, d);
obj = property.GetValue(obj);
if (obj == null)
{
- obj = this.CreateInstance(property.PropertyType);
+ obj = CreateInstance(property.PropertyType);
property.SetValue(objParent, obj);
}
}
- }
// if data is null (=empty string) delete it
if (string.IsNullOrWhiteSpace(data) || data == "null")
{
var key = pathElements[0];
- var target = this.dictionary[key];
+ var target = dictionary[key];
objDeleter();
@@ -110,7 +109,7 @@ namespace Firebase.Database.Streaming
}
// now insert the data
- if (obj is IDictionary && !this.isDictionaryType)
+ if (obj is IDictionary && !isDictionaryType)
{
// insert data into dictionary and return it as a collection of FirebaseObject
var dictionary = obj as IDictionary;
@@ -122,10 +121,7 @@ namespace Firebase.Database.Streaming
dictionary[item.Key] = item.Object;
// top level dictionary changed
- if (!pathElements.Any())
- {
- yield return new FirebaseObject<T>(item.Key, (T)item.Object);
- }
+ if (!pathElements.Any()) yield return new FirebaseObject<T>(item.Key, (T) item.Object);
}
// nested dictionary changed
@@ -141,52 +137,46 @@ namespace Firebase.Database.Streaming
var valueType = obj.GetType();
// firebase sends strings without double quotes
- var targetObject = valueType == typeof(string) ? data.ToString() : JsonConvert.DeserializeObject(data, valueType);
+ var targetObject = valueType == typeof(string)
+ ? data.ToString()
+ : JsonConvert.DeserializeObject(data, valueType);
if ((valueType.GetTypeInfo().IsPrimitive || valueType == typeof(string)) && primitiveObjSetter != null)
- {
// handle primitive (value) types separately
primitiveObjSetter(targetObject);
- }
else
- {
- JsonConvert.PopulateObject(data, obj, this.serializerSettings);
- }
+ JsonConvert.PopulateObject(data, obj, serializerSettings);
- this.dictionary[pathElements[0]] = this.dictionary[pathElements[0]];
- yield return new FirebaseObject<T>(pathElements[0], this.dictionary[pathElements[0]]);
+ dictionary[pathElements[0]] = dictionary[pathElements[0]];
+ yield return new FirebaseObject<T>(pathElements[0], dictionary[pathElements[0]]);
}
}
public bool Contains(string key)
{
- return this.dictionary.Keys.Contains(key);
+ return dictionary.Keys.Contains(key);
}
private object CreateInstance(Type type)
{
if (type == typeof(string))
- {
return string.Empty;
- }
else
- {
return Activator.CreateInstance(type);
- }
}
#region IEnumerable
IEnumerator IEnumerable.GetEnumerator()
{
- return this.GetEnumerator();
+ return GetEnumerator();
}
public IEnumerator<FirebaseObject<T>> GetEnumerator()
{
- return this.dictionary.Select(p => new FirebaseObject<T>(p.Key, p.Value)).GetEnumerator();
+ return dictionary.Select(p => new FirebaseObject<T>(p.Key, p.Value)).GetEnumerator();
}
#endregion
}
-}
+} \ No newline at end of file
diff --git a/FireBase/Streaming/FirebaseEvent.cs b/FireBase/Streaming/FirebaseEvent.cs
index c2338ca..e4fd238 100644
--- a/FireBase/Streaming/FirebaseEvent.cs
+++ b/FireBase/Streaming/FirebaseEvent.cs
@@ -15,26 +15,23 @@ namespace Firebase.Database.Streaming
public FirebaseEvent(string key, T obj, FirebaseEventType eventType, FirebaseEventSource eventSource)
: base(key, obj)
{
- this.EventType = eventType;
- this.EventSource = eventSource;
+ EventType = eventType;
+ EventSource = eventSource;
}
/// <summary>
/// Gets the source of the event.
/// </summary>
- public FirebaseEventSource EventSource
- {
- get;
- }
+ public FirebaseEventSource EventSource { get; }
/// <summary>
/// Gets the event type.
/// </summary>
- public FirebaseEventType EventType
+ public FirebaseEventType EventType { get; }
+
+ public static FirebaseEvent<T> Empty(FirebaseEventSource source)
{
- get;
+ return new FirebaseEvent<T>(string.Empty, default(T), FirebaseEventType.InsertOrUpdate, source);
}
-
- public static FirebaseEvent<T> Empty(FirebaseEventSource source) => new FirebaseEvent<T>(string.Empty, default(T), FirebaseEventType.InsertOrUpdate, source);
}
-}
+} \ No newline at end of file
diff --git a/FireBase/Streaming/FirebaseEventSource.cs b/FireBase/Streaming/FirebaseEventSource.cs
index 98df977..0a397ad 100644
--- a/FireBase/Streaming/FirebaseEventSource.cs
+++ b/FireBase/Streaming/FirebaseEventSource.cs
@@ -35,4 +35,4 @@
/// </summary>
Online = OnlineInitial | OnlinePull | OnlinePush | OnlineStream
}
-}
+} \ No newline at end of file
diff --git a/FireBase/Streaming/FirebaseEventType.cs b/FireBase/Streaming/FirebaseEventType.cs
index 5fb21ef..d8c65b3 100644
--- a/FireBase/Streaming/FirebaseEventType.cs
+++ b/FireBase/Streaming/FirebaseEventType.cs
@@ -15,4 +15,4 @@ namespace Firebase.Database.Streaming
/// </summary>
Delete
}
-}
+} \ No newline at end of file
diff --git a/FireBase/Streaming/FirebaseServerEventType.cs b/FireBase/Streaming/FirebaseServerEventType.cs
index 1f10bc8..79c816d 100644
--- a/FireBase/Streaming/FirebaseServerEventType.cs
+++ b/FireBase/Streaming/FirebaseServerEventType.cs
@@ -12,4 +12,4 @@ namespace Firebase.Database.Streaming
AuthRevoked
}
-}
+} \ No newline at end of file
diff --git a/FireBase/Streaming/FirebaseSubscription.cs b/FireBase/Streaming/FirebaseSubscription.cs
index 4b5e643..acdc76c 100644
--- a/FireBase/Streaming/FirebaseSubscription.cs
+++ b/FireBase/Streaming/FirebaseSubscription.cs
@@ -7,9 +7,7 @@ namespace Firebase.Database.Streaming
using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
-
- using Firebase.Database.Query;
-
+ using Query;
using Newtonsoft.Json.Linq;
using System.Net;
@@ -50,26 +48,27 @@ namespace Firebase.Database.Streaming
/// <param name="observer"> The observer. </param>
/// <param name="query"> The query. </param>
/// <param name="cache"> The cache. </param>
- public FirebaseSubscription(IObserver<FirebaseEvent<T>> observer, IFirebaseQuery query, string elementRoot, FirebaseCache<T> cache)
+ public FirebaseSubscription(IObserver<FirebaseEvent<T>> observer, IFirebaseQuery query, string elementRoot,
+ FirebaseCache<T> cache)
{
this.observer = observer;
this.query = query;
this.elementRoot = elementRoot;
- this.cancel = new CancellationTokenSource();
+ cancel = new CancellationTokenSource();
this.cache = cache;
- this.client = query.Client;
+ client = query.Client;
}
public event EventHandler<ExceptionEventArgs<FirebaseException>> ExceptionThrown;
public void Dispose()
{
- this.cancel.Cancel();
+ cancel.Cancel();
}
public IDisposable Run()
{
- Task.Run(() => this.ReceiveThread());
+ Task.Run(() => ReceiveThread());
return this;
}
@@ -84,15 +83,17 @@ namespace Firebase.Database.Streaming
try
{
- this.cancel.Token.ThrowIfCancellationRequested();
+ cancel.Token.ThrowIfCancellationRequested();
// initialize network connection
- url = await this.query.BuildUrlAsync().ConfigureAwait(false);
+ url = await query.BuildUrlAsync().ConfigureAwait(false);
var request = new HttpRequestMessage(HttpMethod.Get, url);
var serverEvent = FirebaseServerEventType.KeepAlive;
- var client = this.GetHttpClient();
- var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, this.cancel.Token).ConfigureAwait(false);
+ var client = GetHttpClient();
+ var response = await client
+ .SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancel.Token)
+ .ConfigureAwait(false);
statusCode = response.StatusCode;
response.EnsureSuccessStatusCode();
@@ -102,32 +103,28 @@ namespace Firebase.Database.Streaming
{
while (true)
{
- this.cancel.Token.ThrowIfCancellationRequested();
+ cancel.Token.ThrowIfCancellationRequested();
line = reader.ReadLine()?.Trim();
- if (string.IsNullOrWhiteSpace(line))
- {
- continue;
- }
+ if (string.IsNullOrWhiteSpace(line)) continue;
+
+ var tuple = line.Split(new[] {':'}, 2, StringSplitOptions.RemoveEmptyEntries)
+ .Select(s => s.Trim()).ToArray();
- var tuple = line.Split(new[] { ':' }, 2, StringSplitOptions.RemoveEmptyEntries).Select(s => s.Trim()).ToArray();
-
switch (tuple[0].ToLower())
{
case "event":
- serverEvent = this.ParseServerEvent(serverEvent, tuple[1]);
+ serverEvent = ParseServerEvent(serverEvent, tuple[1]);
break;
case "data":
- this.ProcessServerData(url, serverEvent, tuple[1]);
+ ProcessServerData(url, serverEvent, tuple[1]);
break;
}
if (serverEvent == FirebaseServerEventType.AuthRevoked)
- {
// auth token no longer valid, reconnect
break;
- }
}
}
}
@@ -137,13 +134,15 @@ namespace Firebase.Database.Streaming
}
catch (Exception ex) when (statusCode != HttpStatusCode.OK)
{
- this.observer.OnError(new FirebaseException(url, string.Empty, line, statusCode, ex));
- this.Dispose();
+ observer.OnError(new FirebaseException(url, string.Empty, line, statusCode, ex));
+ Dispose();
break;
}
catch (Exception ex)
{
- this.ExceptionThrown?.Invoke(this, new ExceptionEventArgs<FirebaseException>(new FirebaseException(url, string.Empty, line, statusCode, ex)));
+ ExceptionThrown?.Invoke(this,
+ new ExceptionEventArgs<FirebaseException>(new FirebaseException(url, string.Empty, line,
+ statusCode, ex)));
await Task.Delay(2000).ConfigureAwait(false);
}
@@ -185,30 +184,29 @@ namespace Firebase.Database.Streaming
var data = result["data"].ToString();
// If an elementRoot parameter is provided, but it's not in the cache, it was already deleted. So we can return an empty object.
- if(string.IsNullOrWhiteSpace(this.elementRoot) || !this.cache.Contains(this.elementRoot))
- {
- if(path == "/" && data == string.Empty)
+ if (string.IsNullOrWhiteSpace(elementRoot) || !cache.Contains(elementRoot))
+ if (path == "/" && data == string.Empty)
{
- this.observer.OnNext(FirebaseEvent<T>.Empty(FirebaseEventSource.OnlineStream));
+ observer.OnNext(FirebaseEvent<T>.Empty(FirebaseEventSource.OnlineStream));
return;
}
- }
- var eventType = string.IsNullOrWhiteSpace(data) ? FirebaseEventType.Delete : FirebaseEventType.InsertOrUpdate;
+ var eventType = string.IsNullOrWhiteSpace(data)
+ ? FirebaseEventType.Delete
+ : FirebaseEventType.InsertOrUpdate;
- var items = this.cache.PushData(this.elementRoot + path, data);
+ var items = cache.PushData(elementRoot + path, data);
foreach (var i in items.ToList())
- {
- this.observer.OnNext(new FirebaseEvent<T>(i.Key, i.Object, eventType, FirebaseEventSource.OnlineStream));
- }
+ observer.OnNext(new FirebaseEvent<T>(i.Key, i.Object, eventType,
+ FirebaseEventSource.OnlineStream));
break;
case FirebaseServerEventType.KeepAlive:
break;
case FirebaseServerEventType.Cancel:
- this.observer.OnError(new FirebaseException(url, string.Empty, serverData, HttpStatusCode.Unauthorized));
- this.Dispose();
+ observer.OnError(new FirebaseException(url, string.Empty, serverData, HttpStatusCode.Unauthorized));
+ Dispose();
break;
}
}
@@ -218,4 +216,4 @@ namespace Firebase.Database.Streaming
return http;
}
}
-}
+} \ No newline at end of file
diff --git a/FireBase/Streaming/NonBlockingStreamReader.cs b/FireBase/Streaming/NonBlockingStreamReader.cs
index 2ac83fd..ab01510 100644
--- a/FireBase/Streaming/NonBlockingStreamReader.cs
+++ b/FireBase/Streaming/NonBlockingStreamReader.cs
@@ -17,29 +17,29 @@
private readonly int bufferSize;
private string cachedData;
-
- public NonBlockingStreamReader(Stream stream, int bufferSize = DefaultBufferSize)
+
+ public NonBlockingStreamReader(Stream stream, int bufferSize = DefaultBufferSize)
{
this.stream = stream;
this.bufferSize = bufferSize;
- this.buffer = new byte[bufferSize];
+ buffer = new byte[bufferSize];
- this.cachedData = string.Empty;
+ cachedData = string.Empty;
}
public override string ReadLine()
{
- var currentString = this.TryGetNewLine();
-
+ var currentString = TryGetNewLine();
+
while (currentString == null)
{
- var read = this.stream.Read(this.buffer, 0, this.bufferSize);
+ var read = stream.Read(buffer, 0, bufferSize);
var str = Encoding.UTF8.GetString(buffer, 0, read);
cachedData += str;
- currentString = this.TryGetNewLine();
+ currentString = TryGetNewLine();
}
-
+
return currentString;
}
@@ -50,11 +50,11 @@
if (newLine >= 0)
{
var r = cachedData.Substring(0, newLine + 1);
- this.cachedData = cachedData.Remove(0, r.Length);
+ cachedData = cachedData.Remove(0, r.Length);
return r.Trim();
}
return null;
}
}
-}
+} \ No newline at end of file