diff options
Diffstat (limited to 'dsa/FireBase/Streaming')
-rw-r--r-- | dsa/FireBase/Streaming/FirebaseCache.cs | 181 | ||||
-rw-r--r-- | dsa/FireBase/Streaming/FirebaseEvent.cs | 37 | ||||
-rw-r--r-- | dsa/FireBase/Streaming/FirebaseEventSource.cs | 38 | ||||
-rw-r--r-- | dsa/FireBase/Streaming/FirebaseEventType.cs | 18 | ||||
-rw-r--r-- | dsa/FireBase/Streaming/FirebaseServerEventType.cs | 15 | ||||
-rw-r--r-- | dsa/FireBase/Streaming/FirebaseSubscription.cs | 217 | ||||
-rw-r--r-- | dsa/FireBase/Streaming/NonBlockingStreamReader.cs | 63 |
7 files changed, 569 insertions, 0 deletions
diff --git a/dsa/FireBase/Streaming/FirebaseCache.cs b/dsa/FireBase/Streaming/FirebaseCache.cs new file mode 100644 index 0000000..66241e0 --- /dev/null +++ b/dsa/FireBase/Streaming/FirebaseCache.cs @@ -0,0 +1,181 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using Firebase.Database.Http; +using Newtonsoft.Json; + +namespace Firebase.Database.Streaming +{ + /// <summary> + /// The firebase cache. + /// </summary> + /// <typeparam name="T"> Type of top-level entities in the cache. </typeparam> + public class FirebaseCache<T> : IEnumerable<FirebaseObject<T>> + { + private readonly IDictionary<string, T> dictionary; + private readonly bool isDictionaryType; + + private readonly JsonSerializerSettings serializerSettings = new JsonSerializerSettings + { + ObjectCreationHandling = ObjectCreationHandling.Replace + }; + + /// <summary> + /// Initializes a new instance of the <see cref="FirebaseCache{T}" /> class. + /// </summary> + public FirebaseCache() + : this(new Dictionary<string, T>()) + { + } + + /// <summary> + /// Initializes a new instance of the <see cref="FirebaseCache{T}" /> class and populates it with existing data. + /// </summary> + /// <param name="existingItems"> The existing items. </param> + public FirebaseCache(IDictionary<string, T> existingItems) + { + dictionary = existingItems; + isDictionaryType = typeof(IDictionary).GetTypeInfo().IsAssignableFrom(typeof(T).GetTypeInfo()); + } + + /// <summary> + /// The push data. + /// </summary> + /// <param name="path"> The path of incoming data, separated by slash. </param> + /// <param name="data"> The data in json format as returned by firebase. </param> + /// <returns> Collection of top-level entities which were affected by the push. </returns> + public IEnumerable<FirebaseObject<T>> PushData(string path, string data, bool removeEmptyEntries = true) + { + object obj = this.dictionary; + Action<object> primitiveObjSetter = null; + Action objDeleter = null; + + var pathElements = path.Split(new[] {"/"}, + removeEmptyEntries ? StringSplitOptions.RemoveEmptyEntries : StringSplitOptions.None); + + // first find where we should insert the data to + foreach (var element in pathElements) + if (obj is IDictionary) + { + // if it's a dictionary, then it's just a matter of inserting into it / accessing existing object by key + var dictionary = obj as IDictionary; + var valueType = obj.GetType().GenericTypeArguments[1]; + + primitiveObjSetter = d => dictionary[element] = d; + objDeleter = () => dictionary.Remove(element); + + if (dictionary.Contains(element)) + { + obj = dictionary[element]; + } + else + { + dictionary[element] = CreateInstance(valueType); + obj = dictionary[element]; + } + } + else + { + // if it's not a dictionary, try to find the property of current object with the matching name + var objParent = obj; + var property = objParent + .GetType() + .GetRuntimeProperties() + .First(p => p.Name.Equals(element, StringComparison.OrdinalIgnoreCase) || + element == p.GetCustomAttribute<JsonPropertyAttribute>()?.PropertyName); + + objDeleter = () => property.SetValue(objParent, null); + primitiveObjSetter = d => property.SetValue(objParent, d); + obj = property.GetValue(obj); + if (obj == null) + { + obj = CreateInstance(property.PropertyType); + property.SetValue(objParent, obj); + } + } + + // if data is null (=empty string) delete it + if (string.IsNullOrWhiteSpace(data) || data == "null") + { + var key = pathElements[0]; + var target = dictionary[key]; + + objDeleter(); + + yield return new FirebaseObject<T>(key, target); + yield break; + } + + // now insert the data + if (obj is IDictionary && !isDictionaryType) + { + // insert data into dictionary and return it as a collection of FirebaseObject + var dictionary = obj as IDictionary; + var valueType = obj.GetType().GenericTypeArguments[1]; + var objectCollection = data.GetObjectCollection(valueType); + + foreach (var item in objectCollection) + { + dictionary[item.Key] = item.Object; + + // top level dictionary changed + if (!pathElements.Any()) yield return new FirebaseObject<T>(item.Key, (T) item.Object); + } + + // nested dictionary changed + if (pathElements.Any()) + { + this.dictionary[pathElements[0]] = this.dictionary[pathElements[0]]; + yield return new FirebaseObject<T>(pathElements[0], this.dictionary[pathElements[0]]); + } + } + else + { + // set the data on a property of the given object + var valueType = obj.GetType(); + + // firebase sends strings without double quotes + var targetObject = valueType == typeof(string) + ? data + : JsonConvert.DeserializeObject(data, valueType); + + if ((valueType.GetTypeInfo().IsPrimitive || valueType == typeof(string)) && primitiveObjSetter != null) + // handle primitive (value) types separately + primitiveObjSetter(targetObject); + else + JsonConvert.PopulateObject(data, obj, serializerSettings); + + dictionary[pathElements[0]] = dictionary[pathElements[0]]; + yield return new FirebaseObject<T>(pathElements[0], dictionary[pathElements[0]]); + } + } + + public bool Contains(string key) + { + return dictionary.Keys.Contains(key); + } + + private object CreateInstance(Type type) + { + if (type == typeof(string)) + return string.Empty; + return Activator.CreateInstance(type); + } + + #region IEnumerable + + IEnumerator IEnumerable.GetEnumerator() + { + return GetEnumerator(); + } + + public IEnumerator<FirebaseObject<T>> GetEnumerator() + { + return dictionary.Select(p => new FirebaseObject<T>(p.Key, p.Value)).GetEnumerator(); + } + + #endregion + } +}
\ No newline at end of file diff --git a/dsa/FireBase/Streaming/FirebaseEvent.cs b/dsa/FireBase/Streaming/FirebaseEvent.cs new file mode 100644 index 0000000..1761a72 --- /dev/null +++ b/dsa/FireBase/Streaming/FirebaseEvent.cs @@ -0,0 +1,37 @@ +namespace Firebase.Database.Streaming +{ + /// <summary> + /// Firebase event which hold <see cref="EventType" /> and the object affected by the event. + /// </summary> + /// <typeparam name="T"> Type of object affected by the event. </typeparam> + public class FirebaseEvent<T> : FirebaseObject<T> + { + /// <summary> + /// Initializes a new instance of the <see cref="FirebaseEvent{T}" /> class. + /// </summary> + /// <param name="key"> The key of the object. </param> + /// <param name="obj"> The object. </param> + /// <param name="eventType"> The event type. </param> + public FirebaseEvent(string key, T obj, FirebaseEventType eventType, FirebaseEventSource eventSource) + : base(key, obj) + { + EventType = eventType; + EventSource = eventSource; + } + + /// <summary> + /// Gets the source of the event. + /// </summary> + public FirebaseEventSource EventSource { get; } + + /// <summary> + /// Gets the event type. + /// </summary> + public FirebaseEventType EventType { get; } + + public static FirebaseEvent<T> Empty(FirebaseEventSource source) + { + return new FirebaseEvent<T>(string.Empty, default(T), FirebaseEventType.InsertOrUpdate, source); + } + } +}
\ No newline at end of file diff --git a/dsa/FireBase/Streaming/FirebaseEventSource.cs b/dsa/FireBase/Streaming/FirebaseEventSource.cs new file mode 100644 index 0000000..b1385ca --- /dev/null +++ b/dsa/FireBase/Streaming/FirebaseEventSource.cs @@ -0,0 +1,38 @@ +namespace Firebase.Database.Streaming +{ + /// <summary> + /// Specifies the origin of given <see cref="FirebaseEvent{T}" /> + /// </summary> + public enum FirebaseEventSource + { + /// <summary> + /// Event comes from an offline source. + /// </summary> + Offline, + + /// <summary> + /// Event comes from online source fetched during initial pull (valid only for RealtimeDatabase). + /// </summary> + OnlineInitial, + + /// <summary> + /// Event comes from online source received thru active stream. + /// </summary> + OnlineStream, + + /// <summary> + /// Event comes from online source being fetched manually. + /// </summary> + OnlinePull, + + /// <summary> + /// Event raised after successful online push (valid only for RealtimeDatabase which isn't streaming). + /// </summary> + OnlinePush, + + /// <summary> + /// Event comes from an online source. + /// </summary> + Online = OnlineInitial | OnlinePull | OnlinePush | OnlineStream + } +}
\ No newline at end of file diff --git a/dsa/FireBase/Streaming/FirebaseEventType.cs b/dsa/FireBase/Streaming/FirebaseEventType.cs new file mode 100644 index 0000000..7606331 --- /dev/null +++ b/dsa/FireBase/Streaming/FirebaseEventType.cs @@ -0,0 +1,18 @@ +namespace Firebase.Database.Streaming +{ + /// <summary> + /// The type of event. + /// </summary> + public enum FirebaseEventType + { + /// <summary> + /// Item was inserted or updated. + /// </summary> + InsertOrUpdate, + + /// <summary> + /// Item was deleted. + /// </summary> + Delete + } +}
\ No newline at end of file diff --git a/dsa/FireBase/Streaming/FirebaseServerEventType.cs b/dsa/FireBase/Streaming/FirebaseServerEventType.cs new file mode 100644 index 0000000..79c816d --- /dev/null +++ b/dsa/FireBase/Streaming/FirebaseServerEventType.cs @@ -0,0 +1,15 @@ +namespace Firebase.Database.Streaming +{ + internal enum FirebaseServerEventType + { + Put, + + Patch, + + KeepAlive, + + Cancel, + + AuthRevoked + } +}
\ No newline at end of file diff --git a/dsa/FireBase/Streaming/FirebaseSubscription.cs b/dsa/FireBase/Streaming/FirebaseSubscription.cs new file mode 100644 index 0000000..fb0f403 --- /dev/null +++ b/dsa/FireBase/Streaming/FirebaseSubscription.cs @@ -0,0 +1,217 @@ +using System; +using System.Linq; +using System.Net; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Threading; +using System.Threading.Tasks; +using Firebase.Database.Query; +using Newtonsoft.Json.Linq; + +namespace Firebase.Database.Streaming +{ + /// <summary> + /// The firebase subscription. + /// </summary> + /// <typeparam name="T"> Type of object to be streaming back to the called. </typeparam> + internal class FirebaseSubscription<T> : IDisposable + { + private static readonly HttpClient http; + private readonly FirebaseCache<T> cache; + private readonly CancellationTokenSource cancel; + private readonly FirebaseClient client; + private readonly string elementRoot; + private readonly IObserver<FirebaseEvent<T>> observer; + private readonly IFirebaseQuery query; + + static FirebaseSubscription() + { + var handler = new HttpClientHandler + { + AllowAutoRedirect = true, + MaxAutomaticRedirections = 10, + CookieContainer = new CookieContainer() + }; + + var httpClient = new HttpClient(handler, true); + + httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("text/event-stream")); + + http = httpClient; + } + + /// <summary> + /// Initializes a new instance of the <see cref="FirebaseSubscription{T}" /> class. + /// </summary> + /// <param name="observer"> The observer. </param> + /// <param name="query"> The query. </param> + /// <param name="cache"> The cache. </param> + public FirebaseSubscription(IObserver<FirebaseEvent<T>> observer, IFirebaseQuery query, string elementRoot, + FirebaseCache<T> cache) + { + this.observer = observer; + this.query = query; + this.elementRoot = elementRoot; + cancel = new CancellationTokenSource(); + this.cache = cache; + client = query.Client; + } + + public void Dispose() + { + cancel.Cancel(); + } + + public event EventHandler<ExceptionEventArgs<FirebaseException>> ExceptionThrown; + + public IDisposable Run() + { + Task.Run(() => ReceiveThread()); + + return this; + } + + private async void ReceiveThread() + { + while (true) + { + var url = string.Empty; + var line = string.Empty; + var statusCode = HttpStatusCode.OK; + + try + { + cancel.Token.ThrowIfCancellationRequested(); + + // initialize network connection + url = await query.BuildUrlAsync().ConfigureAwait(false); + var request = new HttpRequestMessage(HttpMethod.Get, url); + var serverEvent = FirebaseServerEventType.KeepAlive; + + var client = GetHttpClient(); + var response = await client + .SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancel.Token) + .ConfigureAwait(false); + + statusCode = response.StatusCode; + response.EnsureSuccessStatusCode(); + + using (var stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false)) + using (var reader = this.client.Options.SubscriptionStreamReaderFactory(stream)) + { + while (true) + { + cancel.Token.ThrowIfCancellationRequested(); + + line = reader.ReadLine()?.Trim(); + + if (string.IsNullOrWhiteSpace(line)) continue; + + var tuple = line.Split(new[] {':'}, 2, StringSplitOptions.RemoveEmptyEntries) + .Select(s => s.Trim()).ToArray(); + + switch (tuple[0].ToLower()) + { + case "event": + serverEvent = ParseServerEvent(serverEvent, tuple[1]); + break; + case "data": + ProcessServerData(url, serverEvent, tuple[1]); + break; + } + + if (serverEvent == FirebaseServerEventType.AuthRevoked) + // auth token no longer valid, reconnect + break; + } + } + } + catch (OperationCanceledException) + { + break; + } + catch (Exception ex) when (statusCode != HttpStatusCode.OK) + { + observer.OnError(new FirebaseException(url, string.Empty, line, statusCode, ex)); + Dispose(); + break; + } + catch (Exception ex) + { + ExceptionThrown?.Invoke(this, + new ExceptionEventArgs<FirebaseException>(new FirebaseException(url, string.Empty, line, + statusCode, ex))); + + await Task.Delay(2000).ConfigureAwait(false); + } + } + } + + private FirebaseServerEventType ParseServerEvent(FirebaseServerEventType serverEvent, string eventName) + { + switch (eventName) + { + case "put": + serverEvent = FirebaseServerEventType.Put; + break; + case "patch": + serverEvent = FirebaseServerEventType.Patch; + break; + case "keep-alive": + serverEvent = FirebaseServerEventType.KeepAlive; + break; + case "cancel": + serverEvent = FirebaseServerEventType.Cancel; + break; + case "auth_revoked": + serverEvent = FirebaseServerEventType.AuthRevoked; + break; + } + + return serverEvent; + } + + private void ProcessServerData(string url, FirebaseServerEventType serverEvent, string serverData) + { + switch (serverEvent) + { + case FirebaseServerEventType.Put: + case FirebaseServerEventType.Patch: + var result = JObject.Parse(serverData); + var path = result["path"].ToString(); + var data = result["data"].ToString(); + + // If an elementRoot parameter is provided, but it's not in the cache, it was already deleted. So we can return an empty object. + if (string.IsNullOrWhiteSpace(elementRoot) || !cache.Contains(elementRoot)) + if (path == "/" && data == string.Empty) + { + observer.OnNext(FirebaseEvent<T>.Empty(FirebaseEventSource.OnlineStream)); + return; + } + + var eventType = string.IsNullOrWhiteSpace(data) + ? FirebaseEventType.Delete + : FirebaseEventType.InsertOrUpdate; + + var items = cache.PushData(elementRoot + path, data); + + foreach (var i in items.ToList()) + observer.OnNext(new FirebaseEvent<T>(i.Key, i.Object, eventType, + FirebaseEventSource.OnlineStream)); + + break; + case FirebaseServerEventType.KeepAlive: + break; + case FirebaseServerEventType.Cancel: + observer.OnError(new FirebaseException(url, string.Empty, serverData, HttpStatusCode.Unauthorized)); + Dispose(); + break; + } + } + + private HttpClient GetHttpClient() + { + return http; + } + } +}
\ No newline at end of file diff --git a/dsa/FireBase/Streaming/NonBlockingStreamReader.cs b/dsa/FireBase/Streaming/NonBlockingStreamReader.cs new file mode 100644 index 0000000..8228e32 --- /dev/null +++ b/dsa/FireBase/Streaming/NonBlockingStreamReader.cs @@ -0,0 +1,63 @@ +using System.IO; +using System.Text; + +namespace Firebase.Database.Streaming +{ + /// <summary> + /// When a regular <see cref="StreamReader" /> is used in a UWP app its <see cref="StreamReader.ReadLine" /> method + /// tends to take a long + /// time for data larger then 2 KB. This extremly simple implementation of <see cref="TextReader" /> can be used + /// instead to boost performance + /// in your UWP app. Use <see cref="FirebaseOptions" /> to inject an instance of this class into your + /// <see cref="FirebaseClient" />. + /// </summary> + public class NonBlockingStreamReader : TextReader + { + private const int DefaultBufferSize = 16000; + private readonly byte[] buffer; + private readonly int bufferSize; + + private readonly Stream stream; + + private string cachedData; + + public NonBlockingStreamReader(Stream stream, int bufferSize = DefaultBufferSize) + { + this.stream = stream; + this.bufferSize = bufferSize; + buffer = new byte[bufferSize]; + + cachedData = string.Empty; + } + + public override string ReadLine() + { + var currentString = TryGetNewLine(); + + while (currentString == null) + { + var read = stream.Read(buffer, 0, bufferSize); + var str = Encoding.UTF8.GetString(buffer, 0, read); + + cachedData += str; + currentString = TryGetNewLine(); + } + + return currentString; + } + + private string TryGetNewLine() + { + var newLine = cachedData.IndexOf('\n'); + + if (newLine >= 0) + { + var r = cachedData.Substring(0, newLine + 1); + cachedData = cachedData.Remove(0, r.Length); + return r.Trim(); + } + + return null; + } + } +}
\ No newline at end of file |