namespace Firebase.Database.Offline { using System; using System.Collections.Generic; using System.Linq; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Threading; using System.Threading.Tasks; using Firebase.Database.Extensions; using Firebase.Database.Query; using Firebase.Database.Streaming; using System.Reactive.Threading.Tasks; using System.Linq.Expressions; using Internals; using Newtonsoft.Json; using System.Reflection; using System.Reactive.Disposables; /// /// The real-time Database which synchronizes online and offline data. /// /// Type of entities. public partial class RealtimeDatabase : IDisposable where T : class { private readonly ChildQuery childQuery; private readonly string elementRoot; private readonly StreamingOptions streamingOptions; private readonly Subject> subject; private readonly InitialPullStrategy initialPullStrategy; private readonly bool pushChanges; private readonly FirebaseCache firebaseCache; private bool isSyncRunning; private IObservable> observable; private FirebaseSubscription firebaseSubscription; /// /// Initializes a new instance of the class. /// /// The child query. /// The element Root. /// The offline database factory. /// Custom string which will get appended to the file name. /// Specifies whether changes should be streamed from the server. /// Specifies if everything should be pull from the online storage on start. It only makes sense when is set to true. /// Specifies whether changed items should actually be pushed to the server. If this is false, then Put / Post / Delete will not affect server data. public RealtimeDatabase(ChildQuery childQuery, string elementRoot, Func> offlineDatabaseFactory, string filenameModifier, StreamingOptions streamingOptions, InitialPullStrategy initialPullStrategy, bool pushChanges, ISetHandler setHandler = null) { this.childQuery = childQuery; this.elementRoot = elementRoot; this.streamingOptions = streamingOptions; this.initialPullStrategy = initialPullStrategy; this.pushChanges = pushChanges; this.Database = offlineDatabaseFactory(typeof(T), filenameModifier); this.firebaseCache = new FirebaseCache(new OfflineCacheAdapter(this.Database)); this.subject = new Subject>(); this.PutHandler = setHandler ?? new SetHandler(); this.isSyncRunning = true; Task.Factory.StartNew(this.SynchronizeThread, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default); } /// /// Event raised whenever an exception is thrown in the synchronization thread. Exception thrown in there are swallowed, so this event is the only way to get to them. /// public event EventHandler SyncExceptionThrown; /// /// Gets the backing Database. /// public IDictionary Database { get; private set; } public ISetHandler PutHandler { private get; set; } /// /// Overwrites existing object with given key. /// /// The key. /// The object to set. /// Indicates whether the item should be synced online. /// The priority. Objects with higher priority will be synced first. Higher number indicates higher priority. public void Set(string key, T obj, SyncOptions syncOptions, int priority = 1) { this.SetAndRaise(key, new OfflineEntry(key, obj, priority, syncOptions)); } public void Set(string key, Expression> propertyExpression, object value, SyncOptions syncOptions, int priority = 1) { var fullKey = this.GenerateFullKey(key, propertyExpression, syncOptions); var serializedObject = JsonConvert.SerializeObject(value).Trim('"', '\\'); if (fullKey.Item3) { if (typeof(TProperty) != typeof(string) || value == null) { // don't escape non-string primitives and null; serializedObject = $"{{ \"{fullKey.Item2}\" : {serializedObject} }}"; } else { serializedObject = $"{{ \"{fullKey.Item2}\" : \"{serializedObject}\" }}"; } } var setObject = this.firebaseCache.PushData("/" + fullKey.Item1, serializedObject).First(); if (!this.Database.ContainsKey(key) || this.Database[key].SyncOptions != SyncOptions.Patch && this.Database[key].SyncOptions != SyncOptions.Put) { this.Database[fullKey.Item1] = new OfflineEntry(fullKey.Item1, value, serializedObject, priority, syncOptions, true); } this.subject.OnNext(new FirebaseEvent(key, setObject.Object, setObject == null ? FirebaseEventType.Delete : FirebaseEventType.InsertOrUpdate, FirebaseEventSource.Offline)); } /// /// Fetches an object with the given key and adds it to the Database. /// /// The key. /// The priority. Objects with higher priority will be synced first. Higher number indicates higher priority. public void Pull(string key, int priority = 1) { if (!this.Database.ContainsKey(key)) { this.Database[key] = new OfflineEntry(key, null, priority, SyncOptions.Pull); } else if (this.Database[key].SyncOptions == SyncOptions.None) { // pull only if push isn't pending this.Database[key].SyncOptions = SyncOptions.Pull; } } /// /// Fetches everything from the remote database. /// public async Task PullAsync() { var existingEntries = await this.childQuery .OnceAsync() .ToObservable() .RetryAfterDelay>, FirebaseException>( this.childQuery.Client.Options.SyncPeriod, ex => ex.StatusCode == System.Net.HttpStatusCode.OK) // OK implies the request couldn't complete due to network error. .Select(e => this.ResetDatabaseFromInitial(e, false)) .SelectMany(e => e) .Do(e => { this.Database[e.Key] = new OfflineEntry(e.Key, e.Object, 1, SyncOptions.None); this.subject.OnNext(new FirebaseEvent(e.Key, e.Object, FirebaseEventType.InsertOrUpdate, FirebaseEventSource.OnlinePull)); }) .ToList(); // Remove items not stored online foreach (var item in this.Database.Keys.Except(existingEntries.Select(f => f.Key)).ToList()) { this.Database.Remove(item); this.subject.OnNext(new FirebaseEvent(item, null, FirebaseEventType.Delete, FirebaseEventSource.OnlinePull)); } } /// /// Retrieves all offline items currently stored in local database. /// public IEnumerable> Once() { return this.Database .Where(kvp => !string.IsNullOrEmpty(kvp.Value.Data) && kvp.Value.Data != "null" && !kvp.Value.IsPartial) .Select(kvp => new FirebaseObject(kvp.Key, kvp.Value.Deserialize())) .ToList(); } /// /// Starts observing the real-time Database. Events will be fired both when change is done locally and remotely. /// /// Stream of . public IObservable> AsObservable() { if (!this.isSyncRunning) { this.isSyncRunning = true; Task.Factory.StartNew(this.SynchronizeThread, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default); } if (this.observable == null) { var initialData = Observable.Return(FirebaseEvent.Empty(FirebaseEventSource.Offline)); if(this.Database.TryGetValue(this.elementRoot, out OfflineEntry oe)) { initialData = Observable.Return(oe) .Where(offlineEntry => !string.IsNullOrEmpty(offlineEntry.Data) && offlineEntry.Data != "null" && !offlineEntry.IsPartial) .Select(offlineEntry => new FirebaseEvent(offlineEntry.Key, offlineEntry.Deserialize(), FirebaseEventType.InsertOrUpdate, FirebaseEventSource.Offline)); } else if(this.Database.Count > 0) { initialData = this.Database .Where(kvp => !string.IsNullOrEmpty(kvp.Value.Data) && kvp.Value.Data != "null" && !kvp.Value.IsPartial) .Select(kvp => new FirebaseEvent(kvp.Key, kvp.Value.Deserialize(), FirebaseEventType.InsertOrUpdate, FirebaseEventSource.Offline)) .ToList() .ToObservable(); } this.observable = initialData .Merge(this.subject) .Merge(this.GetInitialPullObservable() .RetryAfterDelay>, FirebaseException>( this.childQuery.Client.Options.SyncPeriod, ex => ex.StatusCode == System.Net.HttpStatusCode.OK) // OK implies the request couldn't complete due to network error. .Select(e => this.ResetDatabaseFromInitial(e)) .SelectMany(e => e) .Do(this.SetObjectFromInitialPull) .Select(e => new FirebaseEvent(e.Key, e.Object, e.Object == null ? FirebaseEventType.Delete : FirebaseEventType.InsertOrUpdate, FirebaseEventSource.OnlineInitial)) .Concat(Observable.Create>(observer => this.InitializeStreamingSubscription(observer)))) .Do(next => { }, e => this.observable = null, () => this.observable = null) .Replay() .RefCount(); } return this.observable; } public void Dispose() { this.subject.OnCompleted(); this.firebaseSubscription?.Dispose(); } private IReadOnlyCollection> ResetDatabaseFromInitial(IReadOnlyCollection> collection, bool onlyWhenInitialEverything = true) { if (onlyWhenInitialEverything && this.initialPullStrategy != InitialPullStrategy.Everything) { return collection; } // items which are in local db, but not in the online collection var extra = this.Once() .Select(f => f.Key) .Except(collection.Select(c => c.Key)) .Select(k => new FirebaseObject(k, null)); return collection.Concat(extra).ToList(); } private void SetObjectFromInitialPull(FirebaseObject e) { // set object with no sync only if it doesn't exist yet // and the InitialPullStrategy != Everything // this attempts to deal with scenario when you are offline, have local changes and go online // in this case having the InitialPullStrategy set to everything would basically purge all local changes if (!this.Database.ContainsKey(e.Key) || this.Database[e.Key].SyncOptions == SyncOptions.None || this.Database[e.Key].SyncOptions == SyncOptions.Pull || this.initialPullStrategy != InitialPullStrategy.Everything) { this.Database[e.Key] = new OfflineEntry(e.Key, e.Object, 1, SyncOptions.None); } } private IObservable>> GetInitialPullObservable() { FirebaseQuery query; switch (this.initialPullStrategy) { case InitialPullStrategy.MissingOnly: query = this.childQuery.OrderByKey().StartAt(() => this.GetLatestKey()); break; case InitialPullStrategy.Everything: query = this.childQuery; break; case InitialPullStrategy.None: default: return Observable.Empty>>(); } if (string.IsNullOrWhiteSpace(this.elementRoot)) { return Observable.Defer(() => query.OnceAsync().ToObservable()); } // there is an element root, which indicates the target location is not a collection but a single element return Observable.Defer(async () => Observable.Return(await query.OnceSingleAsync()).Select(e => new[] { new FirebaseObject(this.elementRoot, e) })); } private IDisposable InitializeStreamingSubscription(IObserver> observer) { var completeDisposable = Disposable.Create(() => this.isSyncRunning = false); switch (this.streamingOptions) { case StreamingOptions.LatestOnly: // stream since the latest key var queryLatest = this.childQuery.OrderByKey().StartAt(() => this.GetLatestKey()); this.firebaseSubscription = new FirebaseSubscription(observer, queryLatest, this.elementRoot, this.firebaseCache); this.firebaseSubscription.ExceptionThrown += this.StreamingExceptionThrown; return new CompositeDisposable(this.firebaseSubscription.Run(), completeDisposable); case StreamingOptions.Everything: // stream everything var queryAll = this.childQuery; this.firebaseSubscription = new FirebaseSubscription(observer, queryAll, this.elementRoot, this.firebaseCache); this.firebaseSubscription.ExceptionThrown += this.StreamingExceptionThrown; return new CompositeDisposable(this.firebaseSubscription.Run(), completeDisposable); default: break; } return completeDisposable; } private void SetAndRaise(string key, OfflineEntry obj, FirebaseEventSource eventSource = FirebaseEventSource.Offline) { this.Database[key] = obj; this.subject.OnNext(new FirebaseEvent(key, obj?.Deserialize(), string.IsNullOrEmpty(obj?.Data) || obj?.Data == "null" ? FirebaseEventType.Delete : FirebaseEventType.InsertOrUpdate, eventSource)); } private async void SynchronizeThread() { while (this.isSyncRunning) { try { var validEntries = this.Database.Where(e => e.Value != null); await this.PullEntriesAsync(validEntries.Where(kvp => kvp.Value.SyncOptions == SyncOptions.Pull)); if (this.pushChanges) { await this.PushEntriesAsync(validEntries.Where(kvp => kvp.Value.SyncOptions == SyncOptions.Put || kvp.Value.SyncOptions == SyncOptions.Patch)); } } catch (Exception ex) { this.SyncExceptionThrown?.Invoke(this, new ExceptionEventArgs(ex)); } await Task.Delay(this.childQuery.Client.Options.SyncPeriod); } } private string GetLatestKey() { var key = this.Database.OrderBy(o => o.Key, StringComparer.Ordinal).LastOrDefault().Key ?? string.Empty; if (!string.IsNullOrWhiteSpace(key)) { key = key.Substring(0, key.Length - 1) + (char)(key[key.Length - 1] + 1); } return key; } private async Task PushEntriesAsync(IEnumerable> pushEntries) { var groups = pushEntries.GroupBy(pair => pair.Value.Priority).OrderByDescending(kvp => kvp.Key).ToList(); foreach (var group in groups) { var tasks = group.OrderBy(kvp => kvp.Value.IsPartial).Select(kvp => kvp.Value.IsPartial ? this.ResetSyncAfterPush(this.PutHandler.SetAsync(this.childQuery, kvp.Key, kvp.Value), kvp.Key) : this.ResetSyncAfterPush(this.PutHandler.SetAsync(this.childQuery, kvp.Key, kvp.Value), kvp.Key, kvp.Value.Deserialize())); try { await Task.WhenAll(tasks).WithAggregateException(); } catch (Exception ex) { this.SyncExceptionThrown?.Invoke(this, new ExceptionEventArgs(ex)); } } } private async Task PullEntriesAsync(IEnumerable> pullEntries) { var taskGroups = pullEntries.GroupBy(pair => pair.Value.Priority).OrderByDescending(kvp => kvp.Key); foreach (var group in taskGroups) { var tasks = group.Select(pair => this.ResetAfterPull(this.childQuery.Child(pair.Key == this.elementRoot ? string.Empty : pair.Key).OnceSingleAsync(), pair.Key, pair.Value)); try { await Task.WhenAll(tasks).WithAggregateException(); } catch (Exception ex) { this.SyncExceptionThrown?.Invoke(this, new ExceptionEventArgs(ex)); } } } private async Task ResetAfterPull(Task task, string key, OfflineEntry entry) { await task; this.SetAndRaise(key, new OfflineEntry(key, task.Result, entry.Priority, SyncOptions.None), FirebaseEventSource.OnlinePull); } private async Task ResetSyncAfterPush(Task task, string key, T obj) { await this.ResetSyncAfterPush(task, key); if (this.streamingOptions == StreamingOptions.None) { this.subject.OnNext(new FirebaseEvent(key, obj, obj == null ? FirebaseEventType.Delete : FirebaseEventType.InsertOrUpdate, FirebaseEventSource.OnlinePush)); } } private async Task ResetSyncAfterPush(Task task, string key) { await task; this.ResetSyncOptions(key); } private void ResetSyncOptions(string key) { var item = this.Database[key]; if (item.IsPartial) { this.Database.Remove(key); } else { item.SyncOptions = SyncOptions.None; this.Database[key] = item; } } private void StreamingExceptionThrown(object sender, ExceptionEventArgs e) { this.SyncExceptionThrown?.Invoke(this, new ExceptionEventArgs(e.Exception)); } private Tuple GenerateFullKey(string key, Expression> propertyGetter, SyncOptions syncOptions) { var visitor = new MemberAccessVisitor(); visitor.Visit(propertyGetter); var propertyType = typeof(TProperty).GetTypeInfo(); var prefix = key == string.Empty ? string.Empty : key + "/"; // primitive types if (syncOptions == SyncOptions.Patch && (propertyType.IsPrimitive || Nullable.GetUnderlyingType(typeof(TProperty)) != null || typeof(TProperty) == typeof(string))) { return Tuple.Create(prefix + string.Join("/", visitor.PropertyNames.Skip(1).Reverse()), visitor.PropertyNames.First(), true); } return Tuple.Create(prefix + string.Join("/", visitor.PropertyNames.Reverse()), visitor.PropertyNames.First(), false); } } }