summaryrefslogtreecommitdiff
path: root/dsa/FireBase/Streaming/FirebaseSubscription.cs
diff options
context:
space:
mode:
Diffstat (limited to 'dsa/FireBase/Streaming/FirebaseSubscription.cs')
-rw-r--r--dsa/FireBase/Streaming/FirebaseSubscription.cs66
1 files changed, 22 insertions, 44 deletions
diff --git a/dsa/FireBase/Streaming/FirebaseSubscription.cs b/dsa/FireBase/Streaming/FirebaseSubscription.cs
index fb0f403..6488815 100644
--- a/dsa/FireBase/Streaming/FirebaseSubscription.cs
+++ b/dsa/FireBase/Streaming/FirebaseSubscription.cs
@@ -8,14 +8,12 @@ using System.Threading.Tasks;
using Firebase.Database.Query;
using Newtonsoft.Json.Linq;
-namespace Firebase.Database.Streaming
-{
+namespace Firebase.Database.Streaming {
/// <summary>
/// The firebase subscription.
/// </summary>
/// <typeparam name="T"> Type of object to be streaming back to the called. </typeparam>
- internal class FirebaseSubscription<T> : IDisposable
- {
+ internal class FirebaseSubscription<T> : IDisposable {
private static readonly HttpClient http;
private readonly FirebaseCache<T> cache;
private readonly CancellationTokenSource cancel;
@@ -24,10 +22,8 @@ namespace Firebase.Database.Streaming
private readonly IObserver<FirebaseEvent<T>> observer;
private readonly IFirebaseQuery query;
- static FirebaseSubscription()
- {
- var handler = new HttpClientHandler
- {
+ static FirebaseSubscription() {
+ var handler = new HttpClientHandler {
AllowAutoRedirect = true,
MaxAutomaticRedirections = 10,
CookieContainer = new CookieContainer()
@@ -47,8 +43,7 @@ namespace Firebase.Database.Streaming
/// <param name="query"> The query. </param>
/// <param name="cache"> The cache. </param>
public FirebaseSubscription(IObserver<FirebaseEvent<T>> observer, IFirebaseQuery query, string elementRoot,
- FirebaseCache<T> cache)
- {
+ FirebaseCache<T> cache) {
this.observer = observer;
this.query = query;
this.elementRoot = elementRoot;
@@ -57,30 +52,25 @@ namespace Firebase.Database.Streaming
client = query.Client;
}
- public void Dispose()
- {
+ public void Dispose() {
cancel.Cancel();
}
public event EventHandler<ExceptionEventArgs<FirebaseException>> ExceptionThrown;
- public IDisposable Run()
- {
+ public IDisposable Run() {
Task.Run(() => ReceiveThread());
return this;
}
- private async void ReceiveThread()
- {
- while (true)
- {
+ private async void ReceiveThread() {
+ while (true) {
var url = string.Empty;
var line = string.Empty;
var statusCode = HttpStatusCode.OK;
- try
- {
+ try {
cancel.Token.ThrowIfCancellationRequested();
// initialize network connection
@@ -97,10 +87,8 @@ namespace Firebase.Database.Streaming
response.EnsureSuccessStatusCode();
using (var stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false))
- using (var reader = this.client.Options.SubscriptionStreamReaderFactory(stream))
- {
- while (true)
- {
+ using (var reader = this.client.Options.SubscriptionStreamReaderFactory(stream)) {
+ while (true) {
cancel.Token.ThrowIfCancellationRequested();
line = reader.ReadLine()?.Trim();
@@ -110,8 +98,7 @@ namespace Firebase.Database.Streaming
var tuple = line.Split(new[] {':'}, 2, StringSplitOptions.RemoveEmptyEntries)
.Select(s => s.Trim()).ToArray();
- switch (tuple[0].ToLower())
- {
+ switch (tuple[0].ToLower()) {
case "event":
serverEvent = ParseServerEvent(serverEvent, tuple[1]);
break;
@@ -126,18 +113,15 @@ namespace Firebase.Database.Streaming
}
}
}
- catch (OperationCanceledException)
- {
+ catch (OperationCanceledException) {
break;
}
- catch (Exception ex) when (statusCode != HttpStatusCode.OK)
- {
+ catch (Exception ex) when (statusCode != HttpStatusCode.OK) {
observer.OnError(new FirebaseException(url, string.Empty, line, statusCode, ex));
Dispose();
break;
}
- catch (Exception ex)
- {
+ catch (Exception ex) {
ExceptionThrown?.Invoke(this,
new ExceptionEventArgs<FirebaseException>(new FirebaseException(url, string.Empty, line,
statusCode, ex)));
@@ -147,10 +131,8 @@ namespace Firebase.Database.Streaming
}
}
- private FirebaseServerEventType ParseServerEvent(FirebaseServerEventType serverEvent, string eventName)
- {
- switch (eventName)
- {
+ private FirebaseServerEventType ParseServerEvent(FirebaseServerEventType serverEvent, string eventName) {
+ switch (eventName) {
case "put":
serverEvent = FirebaseServerEventType.Put;
break;
@@ -171,10 +153,8 @@ namespace Firebase.Database.Streaming
return serverEvent;
}
- private void ProcessServerData(string url, FirebaseServerEventType serverEvent, string serverData)
- {
- switch (serverEvent)
- {
+ private void ProcessServerData(string url, FirebaseServerEventType serverEvent, string serverData) {
+ switch (serverEvent) {
case FirebaseServerEventType.Put:
case FirebaseServerEventType.Patch:
var result = JObject.Parse(serverData);
@@ -183,8 +163,7 @@ namespace Firebase.Database.Streaming
// If an elementRoot parameter is provided, but it's not in the cache, it was already deleted. So we can return an empty object.
if (string.IsNullOrWhiteSpace(elementRoot) || !cache.Contains(elementRoot))
- if (path == "/" && data == string.Empty)
- {
+ if (path == "/" && data == string.Empty) {
observer.OnNext(FirebaseEvent<T>.Empty(FirebaseEventSource.OnlineStream));
return;
}
@@ -209,8 +188,7 @@ namespace Firebase.Database.Streaming
}
}
- private HttpClient GetHttpClient()
- {
+ private HttpClient GetHttpClient() {
return http;
}
}