using System; using System.Linq; using System.Net; using System.Net.Http; using System.Net.Http.Headers; using System.Threading; using System.Threading.Tasks; using Firebase.Database.Query; using Newtonsoft.Json.Linq; namespace Firebase.Database.Streaming { /// /// The firebase subscription. /// /// Type of object to be streaming back to the called. internal class FirebaseSubscription : IDisposable { private static readonly HttpClient http; private readonly FirebaseCache cache; private readonly CancellationTokenSource cancel; private readonly FirebaseClient client; private readonly string elementRoot; private readonly IObserver> observer; private readonly IFirebaseQuery query; static FirebaseSubscription() { var handler = new HttpClientHandler { AllowAutoRedirect = true, MaxAutomaticRedirections = 10, CookieContainer = new CookieContainer() }; var httpClient = new HttpClient(handler, true); httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("text/event-stream")); http = httpClient; } /// /// Initializes a new instance of the class. /// /// The observer. /// The query. /// The cache. public FirebaseSubscription(IObserver> observer, IFirebaseQuery query, string elementRoot, FirebaseCache cache) { this.observer = observer; this.query = query; this.elementRoot = elementRoot; cancel = new CancellationTokenSource(); this.cache = cache; client = query.Client; } public void Dispose() { cancel.Cancel(); } public event EventHandler> ExceptionThrown; public IDisposable Run() { Task.Run(() => ReceiveThread()); return this; } private async void ReceiveThread() { while (true) { var url = string.Empty; var line = string.Empty; var statusCode = HttpStatusCode.OK; try { cancel.Token.ThrowIfCancellationRequested(); // initialize network connection url = await query.BuildUrlAsync().ConfigureAwait(false); var request = new HttpRequestMessage(HttpMethod.Get, url); var serverEvent = FirebaseServerEventType.KeepAlive; var client = GetHttpClient(); var response = await client .SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancel.Token) .ConfigureAwait(false); statusCode = response.StatusCode; response.EnsureSuccessStatusCode(); using (var stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false)) using (var reader = this.client.Options.SubscriptionStreamReaderFactory(stream)) { while (true) { cancel.Token.ThrowIfCancellationRequested(); line = reader.ReadLine()?.Trim(); if (string.IsNullOrWhiteSpace(line)) continue; var tuple = line.Split(new[] {':'}, 2, StringSplitOptions.RemoveEmptyEntries) .Select(s => s.Trim()).ToArray(); switch (tuple[0].ToLower()) { case "event": serverEvent = ParseServerEvent(serverEvent, tuple[1]); break; case "data": ProcessServerData(url, serverEvent, tuple[1]); break; } if (serverEvent == FirebaseServerEventType.AuthRevoked) // auth token no longer valid, reconnect break; } } } catch (OperationCanceledException) { break; } catch (Exception ex) when (statusCode != HttpStatusCode.OK) { observer.OnError(new FirebaseException(url, string.Empty, line, statusCode, ex)); Dispose(); break; } catch (Exception ex) { ExceptionThrown?.Invoke(this, new ExceptionEventArgs(new FirebaseException(url, string.Empty, line, statusCode, ex))); await Task.Delay(2000).ConfigureAwait(false); } } } private FirebaseServerEventType ParseServerEvent(FirebaseServerEventType serverEvent, string eventName) { switch (eventName) { case "put": serverEvent = FirebaseServerEventType.Put; break; case "patch": serverEvent = FirebaseServerEventType.Patch; break; case "keep-alive": serverEvent = FirebaseServerEventType.KeepAlive; break; case "cancel": serverEvent = FirebaseServerEventType.Cancel; break; case "auth_revoked": serverEvent = FirebaseServerEventType.AuthRevoked; break; } return serverEvent; } private void ProcessServerData(string url, FirebaseServerEventType serverEvent, string serverData) { switch (serverEvent) { case FirebaseServerEventType.Put: case FirebaseServerEventType.Patch: var result = JObject.Parse(serverData); var path = result["path"].ToString(); var data = result["data"].ToString(); // If an elementRoot parameter is provided, but it's not in the cache, it was already deleted. So we can return an empty object. if (string.IsNullOrWhiteSpace(elementRoot) || !cache.Contains(elementRoot)) if (path == "/" && data == string.Empty) { observer.OnNext(FirebaseEvent.Empty(FirebaseEventSource.OnlineStream)); return; } var eventType = string.IsNullOrWhiteSpace(data) ? FirebaseEventType.Delete : FirebaseEventType.InsertOrUpdate; var items = cache.PushData(elementRoot + path, data); foreach (var i in items.ToList()) observer.OnNext(new FirebaseEvent(i.Key, i.Object, eventType, FirebaseEventSource.OnlineStream)); break; case FirebaseServerEventType.KeepAlive: break; case FirebaseServerEventType.Cancel: observer.OnError(new FirebaseException(url, string.Empty, serverData, HttpStatusCode.Unauthorized)); Dispose(); break; } } private HttpClient GetHttpClient() { return http; } } }