diff options
Diffstat (limited to 'FireBase/Streaming')
-rw-r--r-- | FireBase/Streaming/FirebaseCache.cs | 93 | ||||
-rw-r--r-- | FireBase/Streaming/FirebaseEvent.cs | 27 | ||||
-rw-r--r-- | FireBase/Streaming/FirebaseEventSource.cs | 16 | ||||
-rw-r--r-- | FireBase/Streaming/FirebaseEventType.cs | 8 | ||||
-rw-r--r-- | FireBase/Streaming/FirebaseServerEventType.cs | 2 | ||||
-rw-r--r-- | FireBase/Streaming/FirebaseSubscription.cs | 112 | ||||
-rw-r--r-- | FireBase/Streaming/NonBlockingStreamReader.cs | 43 |
7 files changed, 143 insertions, 158 deletions
diff --git a/FireBase/Streaming/FirebaseCache.cs b/FireBase/Streaming/FirebaseCache.cs index ba7990b..66241e0 100644 --- a/FireBase/Streaming/FirebaseCache.cs +++ b/FireBase/Streaming/FirebaseCache.cs @@ -1,51 +1,50 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using Firebase.Database.Http; +using Newtonsoft.Json; + namespace Firebase.Database.Streaming { - using System; - using System.Collections; - using System.Collections.Generic; - using System.Linq; - using System.Reflection; - - using Firebase.Database.Http; - - using Newtonsoft.Json; - /// <summary> - /// The firebase cache. + /// The firebase cache. /// </summary> /// <typeparam name="T"> Type of top-level entities in the cache. </typeparam> public class FirebaseCache<T> : IEnumerable<FirebaseObject<T>> { private readonly IDictionary<string, T> dictionary; private readonly bool isDictionaryType; - private readonly JsonSerializerSettings serializerSettings = new JsonSerializerSettings() + + private readonly JsonSerializerSettings serializerSettings = new JsonSerializerSettings { ObjectCreationHandling = ObjectCreationHandling.Replace }; /// <summary> - /// Initializes a new instance of the <see cref="FirebaseCache{T}"/> class. + /// Initializes a new instance of the <see cref="FirebaseCache{T}" /> class. /// </summary> - public FirebaseCache() + public FirebaseCache() : this(new Dictionary<string, T>()) { } /// <summary> - /// Initializes a new instance of the <see cref="FirebaseCache{T}"/> class and populates it with existing data. + /// Initializes a new instance of the <see cref="FirebaseCache{T}" /> class and populates it with existing data. /// </summary> /// <param name="existingItems"> The existing items. </param> public FirebaseCache(IDictionary<string, T> existingItems) { - this.dictionary = existingItems; - this.isDictionaryType = typeof(IDictionary).GetTypeInfo().IsAssignableFrom(typeof(T).GetTypeInfo()); + dictionary = existingItems; + isDictionaryType = typeof(IDictionary).GetTypeInfo().IsAssignableFrom(typeof(T).GetTypeInfo()); } /// <summary> - /// The push data. + /// The push data. /// </summary> - /// <param name="path"> The path of incoming data, separated by slash. </param> - /// <param name="data"> The data in json format as returned by firebase. </param> + /// <param name="path"> The path of incoming data, separated by slash. </param> + /// <param name="data"> The data in json format as returned by firebase. </param> /// <returns> Collection of top-level entities which were affected by the push. </returns> public IEnumerable<FirebaseObject<T>> PushData(string path, string data, bool removeEmptyEntries = true) { @@ -53,18 +52,18 @@ namespace Firebase.Database.Streaming Action<object> primitiveObjSetter = null; Action objDeleter = null; - var pathElements = path.Split(new[] { "/" }, removeEmptyEntries ? StringSplitOptions.RemoveEmptyEntries : StringSplitOptions.None); + var pathElements = path.Split(new[] {"/"}, + removeEmptyEntries ? StringSplitOptions.RemoveEmptyEntries : StringSplitOptions.None); // first find where we should insert the data to foreach (var element in pathElements) - { if (obj is IDictionary) { // if it's a dictionary, then it's just a matter of inserting into it / accessing existing object by key var dictionary = obj as IDictionary; var valueType = obj.GetType().GenericTypeArguments[1]; - primitiveObjSetter = (d) => dictionary[element] = d; + primitiveObjSetter = d => dictionary[element] = d; objDeleter = () => dictionary.Remove(element); if (dictionary.Contains(element)) @@ -73,7 +72,7 @@ namespace Firebase.Database.Streaming } else { - dictionary[element] = this.CreateInstance(valueType); + dictionary[element] = CreateInstance(valueType); obj = dictionary[element]; } } @@ -84,24 +83,24 @@ namespace Firebase.Database.Streaming var property = objParent .GetType() .GetRuntimeProperties() - .First(p => p.Name.Equals(element, StringComparison.OrdinalIgnoreCase) || element == p.GetCustomAttribute<JsonPropertyAttribute>()?.PropertyName); + .First(p => p.Name.Equals(element, StringComparison.OrdinalIgnoreCase) || + element == p.GetCustomAttribute<JsonPropertyAttribute>()?.PropertyName); objDeleter = () => property.SetValue(objParent, null); - primitiveObjSetter = (d) => property.SetValue(objParent, d); + primitiveObjSetter = d => property.SetValue(objParent, d); obj = property.GetValue(obj); if (obj == null) { - obj = this.CreateInstance(property.PropertyType); + obj = CreateInstance(property.PropertyType); property.SetValue(objParent, obj); } } - } // if data is null (=empty string) delete it if (string.IsNullOrWhiteSpace(data) || data == "null") { var key = pathElements[0]; - var target = this.dictionary[key]; + var target = dictionary[key]; objDeleter(); @@ -110,7 +109,7 @@ namespace Firebase.Database.Streaming } // now insert the data - if (obj is IDictionary && !this.isDictionaryType) + if (obj is IDictionary && !isDictionaryType) { // insert data into dictionary and return it as a collection of FirebaseObject var dictionary = obj as IDictionary; @@ -122,10 +121,7 @@ namespace Firebase.Database.Streaming dictionary[item.Key] = item.Object; // top level dictionary changed - if (!pathElements.Any()) - { - yield return new FirebaseObject<T>(item.Key, (T)item.Object); - } + if (!pathElements.Any()) yield return new FirebaseObject<T>(item.Key, (T) item.Object); } // nested dictionary changed @@ -141,52 +137,45 @@ namespace Firebase.Database.Streaming var valueType = obj.GetType(); // firebase sends strings without double quotes - var targetObject = valueType == typeof(string) ? data.ToString() : JsonConvert.DeserializeObject(data, valueType); + var targetObject = valueType == typeof(string) + ? data + : JsonConvert.DeserializeObject(data, valueType); if ((valueType.GetTypeInfo().IsPrimitive || valueType == typeof(string)) && primitiveObjSetter != null) - { // handle primitive (value) types separately primitiveObjSetter(targetObject); - } else - { - JsonConvert.PopulateObject(data, obj, this.serializerSettings); - } + JsonConvert.PopulateObject(data, obj, serializerSettings); - this.dictionary[pathElements[0]] = this.dictionary[pathElements[0]]; - yield return new FirebaseObject<T>(pathElements[0], this.dictionary[pathElements[0]]); + dictionary[pathElements[0]] = dictionary[pathElements[0]]; + yield return new FirebaseObject<T>(pathElements[0], dictionary[pathElements[0]]); } } public bool Contains(string key) { - return this.dictionary.Keys.Contains(key); + return dictionary.Keys.Contains(key); } private object CreateInstance(Type type) { if (type == typeof(string)) - { return string.Empty; - } - else - { - return Activator.CreateInstance(type); - } + return Activator.CreateInstance(type); } #region IEnumerable IEnumerator IEnumerable.GetEnumerator() { - return this.GetEnumerator(); + return GetEnumerator(); } public IEnumerator<FirebaseObject<T>> GetEnumerator() { - return this.dictionary.Select(p => new FirebaseObject<T>(p.Key, p.Value)).GetEnumerator(); + return dictionary.Select(p => new FirebaseObject<T>(p.Key, p.Value)).GetEnumerator(); } #endregion } -} +}
\ No newline at end of file diff --git a/FireBase/Streaming/FirebaseEvent.cs b/FireBase/Streaming/FirebaseEvent.cs index c2338ca..1761a72 100644 --- a/FireBase/Streaming/FirebaseEvent.cs +++ b/FireBase/Streaming/FirebaseEvent.cs @@ -1,13 +1,13 @@ namespace Firebase.Database.Streaming { /// <summary> - /// Firebase event which hold <see cref="EventType"/> and the object affected by the event. + /// Firebase event which hold <see cref="EventType" /> and the object affected by the event. /// </summary> /// <typeparam name="T"> Type of object affected by the event. </typeparam> public class FirebaseEvent<T> : FirebaseObject<T> { /// <summary> - /// Initializes a new instance of the <see cref="FirebaseEvent{T}"/> class. + /// Initializes a new instance of the <see cref="FirebaseEvent{T}" /> class. /// </summary> /// <param name="key"> The key of the object. </param> /// <param name="obj"> The object. </param> @@ -15,26 +15,23 @@ namespace Firebase.Database.Streaming public FirebaseEvent(string key, T obj, FirebaseEventType eventType, FirebaseEventSource eventSource) : base(key, obj) { - this.EventType = eventType; - this.EventSource = eventSource; + EventType = eventType; + EventSource = eventSource; } /// <summary> - /// Gets the source of the event. + /// Gets the source of the event. /// </summary> - public FirebaseEventSource EventSource - { - get; - } + public FirebaseEventSource EventSource { get; } /// <summary> - /// Gets the event type. + /// Gets the event type. /// </summary> - public FirebaseEventType EventType + public FirebaseEventType EventType { get; } + + public static FirebaseEvent<T> Empty(FirebaseEventSource source) { - get; + return new FirebaseEvent<T>(string.Empty, default(T), FirebaseEventType.InsertOrUpdate, source); } - - public static FirebaseEvent<T> Empty(FirebaseEventSource source) => new FirebaseEvent<T>(string.Empty, default(T), FirebaseEventType.InsertOrUpdate, source); } -} +}
\ No newline at end of file diff --git a/FireBase/Streaming/FirebaseEventSource.cs b/FireBase/Streaming/FirebaseEventSource.cs index 98df977..b1385ca 100644 --- a/FireBase/Streaming/FirebaseEventSource.cs +++ b/FireBase/Streaming/FirebaseEventSource.cs @@ -1,38 +1,38 @@ namespace Firebase.Database.Streaming { /// <summary> - /// Specifies the origin of given <see cref="FirebaseEvent{T}"/> + /// Specifies the origin of given <see cref="FirebaseEvent{T}" /> /// </summary> public enum FirebaseEventSource { /// <summary> - /// Event comes from an offline source. + /// Event comes from an offline source. /// </summary> Offline, /// <summary> - /// Event comes from online source fetched during initial pull (valid only for RealtimeDatabase). + /// Event comes from online source fetched during initial pull (valid only for RealtimeDatabase). /// </summary> OnlineInitial, /// <summary> - /// Event comes from online source received thru active stream. + /// Event comes from online source received thru active stream. /// </summary> OnlineStream, /// <summary> - /// Event comes from online source being fetched manually. + /// Event comes from online source being fetched manually. /// </summary> OnlinePull, /// <summary> - /// Event raised after successful online push (valid only for RealtimeDatabase which isn't streaming). + /// Event raised after successful online push (valid only for RealtimeDatabase which isn't streaming). /// </summary> OnlinePush, /// <summary> - /// Event comes from an online source. + /// Event comes from an online source. /// </summary> Online = OnlineInitial | OnlinePull | OnlinePush | OnlineStream } -} +}
\ No newline at end of file diff --git a/FireBase/Streaming/FirebaseEventType.cs b/FireBase/Streaming/FirebaseEventType.cs index 5fb21ef..7606331 100644 --- a/FireBase/Streaming/FirebaseEventType.cs +++ b/FireBase/Streaming/FirebaseEventType.cs @@ -1,18 +1,18 @@ namespace Firebase.Database.Streaming { /// <summary> - /// The type of event. + /// The type of event. /// </summary> public enum FirebaseEventType { /// <summary> - /// Item was inserted or updated. + /// Item was inserted or updated. /// </summary> InsertOrUpdate, /// <summary> - /// Item was deleted. + /// Item was deleted. /// </summary> Delete } -} +}
\ No newline at end of file diff --git a/FireBase/Streaming/FirebaseServerEventType.cs b/FireBase/Streaming/FirebaseServerEventType.cs index 1f10bc8..79c816d 100644 --- a/FireBase/Streaming/FirebaseServerEventType.cs +++ b/FireBase/Streaming/FirebaseServerEventType.cs @@ -12,4 +12,4 @@ namespace Firebase.Database.Streaming AuthRevoked } -} +}
\ No newline at end of file diff --git a/FireBase/Streaming/FirebaseSubscription.cs b/FireBase/Streaming/FirebaseSubscription.cs index 4b5e643..fb0f403 100644 --- a/FireBase/Streaming/FirebaseSubscription.cs +++ b/FireBase/Streaming/FirebaseSubscription.cs @@ -1,32 +1,28 @@ +using System; +using System.Linq; +using System.Net; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Threading; +using System.Threading.Tasks; +using Firebase.Database.Query; +using Newtonsoft.Json.Linq; + namespace Firebase.Database.Streaming { - using System; - using System.Diagnostics; - using System.Linq; - using System.Net.Http; - using System.Net.Http.Headers; - using System.Threading; - using System.Threading.Tasks; - - using Firebase.Database.Query; - - using Newtonsoft.Json.Linq; - using System.Net; - /// <summary> - /// The firebase subscription. + /// The firebase subscription. /// </summary> /// <typeparam name="T"> Type of object to be streaming back to the called. </typeparam> internal class FirebaseSubscription<T> : IDisposable { + private static readonly HttpClient http; + private readonly FirebaseCache<T> cache; private readonly CancellationTokenSource cancel; + private readonly FirebaseClient client; + private readonly string elementRoot; private readonly IObserver<FirebaseEvent<T>> observer; private readonly IFirebaseQuery query; - private readonly FirebaseCache<T> cache; - private readonly string elementRoot; - private readonly FirebaseClient client; - - private static HttpClient http; static FirebaseSubscription() { @@ -45,31 +41,32 @@ namespace Firebase.Database.Streaming } /// <summary> - /// Initializes a new instance of the <see cref="FirebaseSubscription{T}"/> class. + /// Initializes a new instance of the <see cref="FirebaseSubscription{T}" /> class. /// </summary> /// <param name="observer"> The observer. </param> /// <param name="query"> The query. </param> /// <param name="cache"> The cache. </param> - public FirebaseSubscription(IObserver<FirebaseEvent<T>> observer, IFirebaseQuery query, string elementRoot, FirebaseCache<T> cache) + public FirebaseSubscription(IObserver<FirebaseEvent<T>> observer, IFirebaseQuery query, string elementRoot, + FirebaseCache<T> cache) { this.observer = observer; this.query = query; this.elementRoot = elementRoot; - this.cancel = new CancellationTokenSource(); + cancel = new CancellationTokenSource(); this.cache = cache; - this.client = query.Client; + client = query.Client; } - public event EventHandler<ExceptionEventArgs<FirebaseException>> ExceptionThrown; - public void Dispose() { - this.cancel.Cancel(); + cancel.Cancel(); } + public event EventHandler<ExceptionEventArgs<FirebaseException>> ExceptionThrown; + public IDisposable Run() { - Task.Run(() => this.ReceiveThread()); + Task.Run(() => ReceiveThread()); return this; } @@ -84,15 +81,17 @@ namespace Firebase.Database.Streaming try { - this.cancel.Token.ThrowIfCancellationRequested(); + cancel.Token.ThrowIfCancellationRequested(); // initialize network connection - url = await this.query.BuildUrlAsync().ConfigureAwait(false); + url = await query.BuildUrlAsync().ConfigureAwait(false); var request = new HttpRequestMessage(HttpMethod.Get, url); var serverEvent = FirebaseServerEventType.KeepAlive; - var client = this.GetHttpClient(); - var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, this.cancel.Token).ConfigureAwait(false); + var client = GetHttpClient(); + var response = await client + .SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancel.Token) + .ConfigureAwait(false); statusCode = response.StatusCode; response.EnsureSuccessStatusCode(); @@ -102,32 +101,28 @@ namespace Firebase.Database.Streaming { while (true) { - this.cancel.Token.ThrowIfCancellationRequested(); + cancel.Token.ThrowIfCancellationRequested(); line = reader.ReadLine()?.Trim(); - if (string.IsNullOrWhiteSpace(line)) - { - continue; - } + if (string.IsNullOrWhiteSpace(line)) continue; + + var tuple = line.Split(new[] {':'}, 2, StringSplitOptions.RemoveEmptyEntries) + .Select(s => s.Trim()).ToArray(); - var tuple = line.Split(new[] { ':' }, 2, StringSplitOptions.RemoveEmptyEntries).Select(s => s.Trim()).ToArray(); - switch (tuple[0].ToLower()) { case "event": - serverEvent = this.ParseServerEvent(serverEvent, tuple[1]); + serverEvent = ParseServerEvent(serverEvent, tuple[1]); break; case "data": - this.ProcessServerData(url, serverEvent, tuple[1]); + ProcessServerData(url, serverEvent, tuple[1]); break; } if (serverEvent == FirebaseServerEventType.AuthRevoked) - { // auth token no longer valid, reconnect break; - } } } } @@ -137,13 +132,15 @@ namespace Firebase.Database.Streaming } catch (Exception ex) when (statusCode != HttpStatusCode.OK) { - this.observer.OnError(new FirebaseException(url, string.Empty, line, statusCode, ex)); - this.Dispose(); + observer.OnError(new FirebaseException(url, string.Empty, line, statusCode, ex)); + Dispose(); break; } catch (Exception ex) { - this.ExceptionThrown?.Invoke(this, new ExceptionEventArgs<FirebaseException>(new FirebaseException(url, string.Empty, line, statusCode, ex))); + ExceptionThrown?.Invoke(this, + new ExceptionEventArgs<FirebaseException>(new FirebaseException(url, string.Empty, line, + statusCode, ex))); await Task.Delay(2000).ConfigureAwait(false); } @@ -185,30 +182,29 @@ namespace Firebase.Database.Streaming var data = result["data"].ToString(); // If an elementRoot parameter is provided, but it's not in the cache, it was already deleted. So we can return an empty object. - if(string.IsNullOrWhiteSpace(this.elementRoot) || !this.cache.Contains(this.elementRoot)) - { - if(path == "/" && data == string.Empty) + if (string.IsNullOrWhiteSpace(elementRoot) || !cache.Contains(elementRoot)) + if (path == "/" && data == string.Empty) { - this.observer.OnNext(FirebaseEvent<T>.Empty(FirebaseEventSource.OnlineStream)); + observer.OnNext(FirebaseEvent<T>.Empty(FirebaseEventSource.OnlineStream)); return; } - } - var eventType = string.IsNullOrWhiteSpace(data) ? FirebaseEventType.Delete : FirebaseEventType.InsertOrUpdate; + var eventType = string.IsNullOrWhiteSpace(data) + ? FirebaseEventType.Delete + : FirebaseEventType.InsertOrUpdate; - var items = this.cache.PushData(this.elementRoot + path, data); + var items = cache.PushData(elementRoot + path, data); foreach (var i in items.ToList()) - { - this.observer.OnNext(new FirebaseEvent<T>(i.Key, i.Object, eventType, FirebaseEventSource.OnlineStream)); - } + observer.OnNext(new FirebaseEvent<T>(i.Key, i.Object, eventType, + FirebaseEventSource.OnlineStream)); break; case FirebaseServerEventType.KeepAlive: break; case FirebaseServerEventType.Cancel: - this.observer.OnError(new FirebaseException(url, string.Empty, serverData, HttpStatusCode.Unauthorized)); - this.Dispose(); + observer.OnError(new FirebaseException(url, string.Empty, serverData, HttpStatusCode.Unauthorized)); + Dispose(); break; } } @@ -218,4 +214,4 @@ namespace Firebase.Database.Streaming return http; } } -} +}
\ No newline at end of file diff --git a/FireBase/Streaming/NonBlockingStreamReader.cs b/FireBase/Streaming/NonBlockingStreamReader.cs index 2ac83fd..8228e32 100644 --- a/FireBase/Streaming/NonBlockingStreamReader.cs +++ b/FireBase/Streaming/NonBlockingStreamReader.cs @@ -1,45 +1,48 @@ -namespace Firebase.Database.Streaming -{ - using System.IO; - using System.Text; +using System.IO; +using System.Text; +namespace Firebase.Database.Streaming +{ /// <summary> - /// When a regular <see cref="StreamReader"/> is used in a UWP app its <see cref="StreamReader.ReadLine"/> method tends to take a long - /// time for data larger then 2 KB. This extremly simple implementation of <see cref="TextReader"/> can be used instead to boost performance - /// in your UWP app. Use <see cref="FirebaseOptions"/> to inject an instance of this class into your <see cref="FirebaseClient"/>. + /// When a regular <see cref="StreamReader" /> is used in a UWP app its <see cref="StreamReader.ReadLine" /> method + /// tends to take a long + /// time for data larger then 2 KB. This extremly simple implementation of <see cref="TextReader" /> can be used + /// instead to boost performance + /// in your UWP app. Use <see cref="FirebaseOptions" /> to inject an instance of this class into your + /// <see cref="FirebaseClient" />. /// </summary> public class NonBlockingStreamReader : TextReader { private const int DefaultBufferSize = 16000; - - private readonly Stream stream; private readonly byte[] buffer; private readonly int bufferSize; + private readonly Stream stream; + private string cachedData; - - public NonBlockingStreamReader(Stream stream, int bufferSize = DefaultBufferSize) + + public NonBlockingStreamReader(Stream stream, int bufferSize = DefaultBufferSize) { this.stream = stream; this.bufferSize = bufferSize; - this.buffer = new byte[bufferSize]; + buffer = new byte[bufferSize]; - this.cachedData = string.Empty; + cachedData = string.Empty; } public override string ReadLine() { - var currentString = this.TryGetNewLine(); - + var currentString = TryGetNewLine(); + while (currentString == null) { - var read = this.stream.Read(this.buffer, 0, this.bufferSize); + var read = stream.Read(buffer, 0, bufferSize); var str = Encoding.UTF8.GetString(buffer, 0, read); cachedData += str; - currentString = this.TryGetNewLine(); + currentString = TryGetNewLine(); } - + return currentString; } @@ -50,11 +53,11 @@ if (newLine >= 0) { var r = cachedData.Substring(0, newLine + 1); - this.cachedData = cachedData.Remove(0, r.Length); + cachedData = cachedData.Remove(0, r.Length); return r.Trim(); } return null; } } -} +}
\ No newline at end of file |