using System;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
using Firebase.Database.Query;
using Newtonsoft.Json.Linq;
namespace Firebase.Database.Streaming
{
///
/// The firebase subscription.
///
/// Type of object to be streaming back to the called.
internal class FirebaseSubscription : IDisposable
{
private static readonly HttpClient http;
private readonly FirebaseCache cache;
private readonly CancellationTokenSource cancel;
private readonly FirebaseClient client;
private readonly string elementRoot;
private readonly IObserver> observer;
private readonly IFirebaseQuery query;
static FirebaseSubscription()
{
var handler = new HttpClientHandler
{
AllowAutoRedirect = true,
MaxAutomaticRedirections = 10,
CookieContainer = new CookieContainer()
};
var httpClient = new HttpClient(handler, true);
httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("text/event-stream"));
http = httpClient;
}
///
/// Initializes a new instance of the class.
///
/// The observer.
/// The query.
/// The cache.
public FirebaseSubscription(IObserver> observer, IFirebaseQuery query, string elementRoot,
FirebaseCache cache)
{
this.observer = observer;
this.query = query;
this.elementRoot = elementRoot;
cancel = new CancellationTokenSource();
this.cache = cache;
client = query.Client;
}
public void Dispose()
{
cancel.Cancel();
}
public event EventHandler> ExceptionThrown;
public IDisposable Run()
{
Task.Run(() => ReceiveThread());
return this;
}
private async void ReceiveThread()
{
while (true)
{
var url = string.Empty;
var line = string.Empty;
var statusCode = HttpStatusCode.OK;
try
{
cancel.Token.ThrowIfCancellationRequested();
// initialize network connection
url = await query.BuildUrlAsync().ConfigureAwait(false);
var request = new HttpRequestMessage(HttpMethod.Get, url);
var serverEvent = FirebaseServerEventType.KeepAlive;
var client = GetHttpClient();
var response = await client
.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancel.Token)
.ConfigureAwait(false);
statusCode = response.StatusCode;
response.EnsureSuccessStatusCode();
using (var stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false))
using (var reader = this.client.Options.SubscriptionStreamReaderFactory(stream))
{
while (true)
{
cancel.Token.ThrowIfCancellationRequested();
line = reader.ReadLine()?.Trim();
if (string.IsNullOrWhiteSpace(line)) continue;
var tuple = line.Split(new[] {':'}, 2, StringSplitOptions.RemoveEmptyEntries)
.Select(s => s.Trim()).ToArray();
switch (tuple[0].ToLower())
{
case "event":
serverEvent = ParseServerEvent(serverEvent, tuple[1]);
break;
case "data":
ProcessServerData(url, serverEvent, tuple[1]);
break;
}
if (serverEvent == FirebaseServerEventType.AuthRevoked)
// auth token no longer valid, reconnect
break;
}
}
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex) when (statusCode != HttpStatusCode.OK)
{
observer.OnError(new FirebaseException(url, string.Empty, line, statusCode, ex));
Dispose();
break;
}
catch (Exception ex)
{
ExceptionThrown?.Invoke(this,
new ExceptionEventArgs(new FirebaseException(url, string.Empty, line,
statusCode, ex)));
await Task.Delay(2000).ConfigureAwait(false);
}
}
}
private FirebaseServerEventType ParseServerEvent(FirebaseServerEventType serverEvent, string eventName)
{
switch (eventName)
{
case "put":
serverEvent = FirebaseServerEventType.Put;
break;
case "patch":
serverEvent = FirebaseServerEventType.Patch;
break;
case "keep-alive":
serverEvent = FirebaseServerEventType.KeepAlive;
break;
case "cancel":
serverEvent = FirebaseServerEventType.Cancel;
break;
case "auth_revoked":
serverEvent = FirebaseServerEventType.AuthRevoked;
break;
}
return serverEvent;
}
private void ProcessServerData(string url, FirebaseServerEventType serverEvent, string serverData)
{
switch (serverEvent)
{
case FirebaseServerEventType.Put:
case FirebaseServerEventType.Patch:
var result = JObject.Parse(serverData);
var path = result["path"].ToString();
var data = result["data"].ToString();
// If an elementRoot parameter is provided, but it's not in the cache, it was already deleted. So we can return an empty object.
if (string.IsNullOrWhiteSpace(elementRoot) || !cache.Contains(elementRoot))
if (path == "/" && data == string.Empty)
{
observer.OnNext(FirebaseEvent.Empty(FirebaseEventSource.OnlineStream));
return;
}
var eventType = string.IsNullOrWhiteSpace(data)
? FirebaseEventType.Delete
: FirebaseEventType.InsertOrUpdate;
var items = cache.PushData(elementRoot + path, data);
foreach (var i in items.ToList())
observer.OnNext(new FirebaseEvent(i.Key, i.Object, eventType,
FirebaseEventSource.OnlineStream));
break;
case FirebaseServerEventType.KeepAlive:
break;
case FirebaseServerEventType.Cancel:
observer.OnError(new FirebaseException(url, string.Empty, serverData, HttpStatusCode.Unauthorized));
Dispose();
break;
}
}
private HttpClient GetHttpClient()
{
return http;
}
}
}