using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Net;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Firebase.Database.Extensions;
using Firebase.Database.Offline.Internals;
using Firebase.Database.Query;
using Firebase.Database.Streaming;
using Newtonsoft.Json;
namespace Firebase.Database.Offline
{
///
/// The real-time Database which synchronizes online and offline data.
///
/// Type of entities.
public class RealtimeDatabase : IDisposable where T : class
{
private readonly ChildQuery childQuery;
private readonly string elementRoot;
private readonly FirebaseCache firebaseCache;
private readonly InitialPullStrategy initialPullStrategy;
private readonly bool pushChanges;
private readonly StreamingOptions streamingOptions;
private readonly Subject> subject;
private FirebaseSubscription firebaseSubscription;
private bool isSyncRunning;
private IObservable> observable;
///
/// Initializes a new instance of the class.
///
/// The child query.
/// The element Root.
/// The offline database factory.
/// Custom string which will get appended to the file name.
/// Specifies whether changes should be streamed from the server.
///
/// Specifies if everything should be pull from the online storage on start. It only
/// makes sense when is set to true.
///
///
/// Specifies whether changed items should actually be pushed to the server. If this is false,
/// then Put / Post / Delete will not affect server data.
///
public RealtimeDatabase(ChildQuery childQuery, string elementRoot,
Func> offlineDatabaseFactory, string filenameModifier,
StreamingOptions streamingOptions, InitialPullStrategy initialPullStrategy, bool pushChanges,
ISetHandler setHandler = null)
{
this.childQuery = childQuery;
this.elementRoot = elementRoot;
this.streamingOptions = streamingOptions;
this.initialPullStrategy = initialPullStrategy;
this.pushChanges = pushChanges;
Database = offlineDatabaseFactory(typeof(T), filenameModifier);
firebaseCache = new FirebaseCache(new OfflineCacheAdapter(Database));
subject = new Subject>();
PutHandler = setHandler ?? new SetHandler();
isSyncRunning = true;
Task.Factory.StartNew(SynchronizeThread, CancellationToken.None, TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
///
/// Gets the backing Database.
///
public IDictionary Database { get; }
public ISetHandler PutHandler { private get; set; }
public void Dispose()
{
subject.OnCompleted();
firebaseSubscription?.Dispose();
}
///
/// Event raised whenever an exception is thrown in the synchronization thread. Exception thrown in there are
/// swallowed, so this event is the only way to get to them.
///
public event EventHandler SyncExceptionThrown;
///
/// Overwrites existing object with given key.
///
/// The key.
/// The object to set.
/// Indicates whether the item should be synced online.
///
/// The priority. Objects with higher priority will be synced first. Higher number indicates higher
/// priority.
///
public void Set(string key, T obj, SyncOptions syncOptions, int priority = 1)
{
SetAndRaise(key, new OfflineEntry(key, obj, priority, syncOptions));
}
public void Set(string key, Expression> propertyExpression, object value,
SyncOptions syncOptions, int priority = 1)
{
var fullKey = GenerateFullKey(key, propertyExpression, syncOptions);
var serializedObject = JsonConvert.SerializeObject(value).Trim('"', '\\');
if (fullKey.Item3)
{
if (typeof(TProperty) != typeof(string) || value == null)
// don't escape non-string primitives and null;
serializedObject = $"{{ \"{fullKey.Item2}\" : {serializedObject} }}";
else
serializedObject = $"{{ \"{fullKey.Item2}\" : \"{serializedObject}\" }}";
}
var setObject = firebaseCache.PushData("/" + fullKey.Item1, serializedObject).First();
if (!Database.ContainsKey(key) || Database[key].SyncOptions != SyncOptions.Patch &&
Database[key].SyncOptions != SyncOptions.Put)
Database[fullKey.Item1] =
new OfflineEntry(fullKey.Item1, value, serializedObject, priority, syncOptions, true);
subject.OnNext(new FirebaseEvent(key, setObject.Object,
setObject == null ? FirebaseEventType.Delete : FirebaseEventType.InsertOrUpdate,
FirebaseEventSource.Offline));
}
///
/// Fetches an object with the given key and adds it to the Database.
///
/// The key.
///
/// The priority. Objects with higher priority will be synced first. Higher number indicates higher
/// priority.
///
public void Pull(string key, int priority = 1)
{
if (!Database.ContainsKey(key))
Database[key] = new OfflineEntry(key, null, priority, SyncOptions.Pull);
else if (Database[key].SyncOptions == SyncOptions.None)
// pull only if push isn't pending
Database[key].SyncOptions = SyncOptions.Pull;
}
///
/// Fetches everything from the remote database.
///
public async Task PullAsync()
{
var existingEntries = await childQuery
.OnceAsync()
.ToObservable()
.RetryAfterDelay>, FirebaseException>(
childQuery.Client.Options.SyncPeriod,
ex => ex.StatusCode ==
HttpStatusCode
.OK) // OK implies the request couldn't complete due to network error.
.Select(e => ResetDatabaseFromInitial(e, false))
.SelectMany(e => e)
.Do(e =>
{
Database[e.Key] = new OfflineEntry(e.Key, e.Object, 1, SyncOptions.None);
subject.OnNext(new FirebaseEvent(e.Key, e.Object, FirebaseEventType.InsertOrUpdate,
FirebaseEventSource.OnlinePull));
})
.ToList();
// Remove items not stored online
foreach (var item in Database.Keys.Except(existingEntries.Select(f => f.Key)).ToList())
{
Database.Remove(item);
subject.OnNext(new FirebaseEvent(item, null, FirebaseEventType.Delete,
FirebaseEventSource.OnlinePull));
}
}
///
/// Retrieves all offline items currently stored in local database.
///
public IEnumerable> Once()
{
return Database
.Where(kvp => !string.IsNullOrEmpty(kvp.Value.Data) && kvp.Value.Data != "null" && !kvp.Value.IsPartial)
.Select(kvp => new FirebaseObject(kvp.Key, kvp.Value.Deserialize()))
.ToList();
}
///
/// Starts observing the real-time Database. Events will be fired both when change is done locally and remotely.
///
/// Stream of .
public IObservable> AsObservable()
{
if (!isSyncRunning)
{
isSyncRunning = true;
Task.Factory.StartNew(SynchronizeThread, CancellationToken.None, TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
if (observable == null)
{
var initialData = Observable.Return(FirebaseEvent.Empty(FirebaseEventSource.Offline));
if (Database.TryGetValue(elementRoot, out var oe))
initialData = Observable.Return(oe)
.Where(offlineEntry =>
!string.IsNullOrEmpty(offlineEntry.Data) && offlineEntry.Data != "null" &&
!offlineEntry.IsPartial)
.Select(offlineEntry => new FirebaseEvent(offlineEntry.Key, offlineEntry.Deserialize(),
FirebaseEventType.InsertOrUpdate, FirebaseEventSource.Offline));
else if (Database.Count > 0)
initialData = Database
.Where(kvp =>
!string.IsNullOrEmpty(kvp.Value.Data) && kvp.Value.Data != "null" && !kvp.Value.IsPartial)
.Select(kvp => new FirebaseEvent(kvp.Key, kvp.Value.Deserialize(),
FirebaseEventType.InsertOrUpdate, FirebaseEventSource.Offline))
.ToList()
.ToObservable();
observable = initialData
.Merge(subject)
.Merge(GetInitialPullObservable()
.RetryAfterDelay>, FirebaseException>(
childQuery.Client.Options.SyncPeriod,
ex => ex.StatusCode ==
HttpStatusCode
.OK) // OK implies the request couldn't complete due to network error.
.Select(e => ResetDatabaseFromInitial(e))
.SelectMany(e => e)
.Do(SetObjectFromInitialPull)
.Select(e => new FirebaseEvent(e.Key, e.Object,
e.Object == null ? FirebaseEventType.Delete : FirebaseEventType.InsertOrUpdate,
FirebaseEventSource.OnlineInitial))
.Concat(Observable.Create>(observer =>
InitializeStreamingSubscription(observer))))
.Do(next => { }, e => observable = null, () => observable = null)
.Replay()
.RefCount();
}
return observable;
}
private IReadOnlyCollection> ResetDatabaseFromInitial(
IReadOnlyCollection> collection, bool onlyWhenInitialEverything = true)
{
if (onlyWhenInitialEverything && initialPullStrategy != InitialPullStrategy.Everything) return collection;
// items which are in local db, but not in the online collection
var extra = Once()
.Select(f => f.Key)
.Except(collection.Select(c => c.Key))
.Select(k => new FirebaseObject(k, null));
return collection.Concat(extra).ToList();
}
private void SetObjectFromInitialPull(FirebaseObject e)
{
// set object with no sync only if it doesn't exist yet
// and the InitialPullStrategy != Everything
// this attempts to deal with scenario when you are offline, have local changes and go online
// in this case having the InitialPullStrategy set to everything would basically purge all local changes
if (!Database.ContainsKey(e.Key) || Database[e.Key].SyncOptions == SyncOptions.None ||
Database[e.Key].SyncOptions == SyncOptions.Pull ||
initialPullStrategy != InitialPullStrategy.Everything)
Database[e.Key] = new OfflineEntry(e.Key, e.Object, 1, SyncOptions.None);
}
private IObservable>> GetInitialPullObservable()
{
FirebaseQuery query;
switch (initialPullStrategy)
{
case InitialPullStrategy.MissingOnly:
query = childQuery.OrderByKey().StartAt(() => GetLatestKey());
break;
case InitialPullStrategy.Everything:
query = childQuery;
break;
case InitialPullStrategy.None:
default:
return Observable.Empty>>();
}
if (string.IsNullOrWhiteSpace(elementRoot))
return Observable.Defer(() => query.OnceAsync().ToObservable());
// there is an element root, which indicates the target location is not a collection but a single element
return Observable.Defer(async () =>
Observable.Return(await query.OnceSingleAsync())
.Select(e => new[] {new FirebaseObject(elementRoot, e)}));
}
private IDisposable InitializeStreamingSubscription(IObserver> observer)
{
var completeDisposable = Disposable.Create(() => isSyncRunning = false);
switch (streamingOptions)
{
case StreamingOptions.LatestOnly:
// stream since the latest key
var queryLatest = childQuery.OrderByKey().StartAt(() => GetLatestKey());
firebaseSubscription =
new FirebaseSubscription(observer, queryLatest, elementRoot, firebaseCache);
firebaseSubscription.ExceptionThrown += StreamingExceptionThrown;
return new CompositeDisposable(firebaseSubscription.Run(), completeDisposable);
case StreamingOptions.Everything:
// stream everything
var queryAll = childQuery;
firebaseSubscription = new FirebaseSubscription(observer, queryAll, elementRoot, firebaseCache);
firebaseSubscription.ExceptionThrown += StreamingExceptionThrown;
return new CompositeDisposable(firebaseSubscription.Run(), completeDisposable);
}
return completeDisposable;
}
private void SetAndRaise(string key, OfflineEntry obj,
FirebaseEventSource eventSource = FirebaseEventSource.Offline)
{
Database[key] = obj;
subject.OnNext(new FirebaseEvent(key, obj?.Deserialize(),
string.IsNullOrEmpty(obj?.Data) || obj?.Data == "null"
? FirebaseEventType.Delete
: FirebaseEventType.InsertOrUpdate, eventSource));
}
private async void SynchronizeThread()
{
while (isSyncRunning)
{
try
{
var validEntries = Database.Where(e => e.Value != null);
await PullEntriesAsync(validEntries.Where(kvp => kvp.Value.SyncOptions == SyncOptions.Pull));
if (pushChanges)
await PushEntriesAsync(validEntries.Where(kvp =>
kvp.Value.SyncOptions == SyncOptions.Put || kvp.Value.SyncOptions == SyncOptions.Patch));
}
catch (Exception ex)
{
SyncExceptionThrown?.Invoke(this, new ExceptionEventArgs(ex));
}
await Task.Delay(childQuery.Client.Options.SyncPeriod);
}
}
private string GetLatestKey()
{
var key = Database.OrderBy(o => o.Key, StringComparer.Ordinal).LastOrDefault().Key ?? string.Empty;
if (!string.IsNullOrWhiteSpace(key))
key = key.Substring(0, key.Length - 1) + (char) (key[key.Length - 1] + 1);
return key;
}
private async Task PushEntriesAsync(IEnumerable> pushEntries)
{
var groups = pushEntries.GroupBy(pair => pair.Value.Priority).OrderByDescending(kvp => kvp.Key).ToList();
foreach (var group in groups)
{
var tasks = group.OrderBy(kvp => kvp.Value.IsPartial).Select(kvp =>
kvp.Value.IsPartial
? ResetSyncAfterPush(PutHandler.SetAsync(childQuery, kvp.Key, kvp.Value), kvp.Key)
: ResetSyncAfterPush(PutHandler.SetAsync(childQuery, kvp.Key, kvp.Value), kvp.Key,
kvp.Value.Deserialize()));
try
{
await Task.WhenAll(tasks).WithAggregateException();
}
catch (Exception ex)
{
SyncExceptionThrown?.Invoke(this, new ExceptionEventArgs(ex));
}
}
}
private async Task PullEntriesAsync(IEnumerable> pullEntries)
{
var taskGroups = pullEntries.GroupBy(pair => pair.Value.Priority).OrderByDescending(kvp => kvp.Key);
foreach (var group in taskGroups)
{
var tasks = group.Select(pair =>
ResetAfterPull(
childQuery.Child(pair.Key == elementRoot ? string.Empty : pair.Key).OnceSingleAsync(),
pair.Key, pair.Value));
try
{
await Task.WhenAll(tasks).WithAggregateException();
}
catch (Exception ex)
{
SyncExceptionThrown?.Invoke(this, new ExceptionEventArgs(ex));
}
}
}
private async Task ResetAfterPull(Task task, string key, OfflineEntry entry)
{
await task;
SetAndRaise(key, new OfflineEntry(key, task.Result, entry.Priority, SyncOptions.None),
FirebaseEventSource.OnlinePull);
}
private async Task ResetSyncAfterPush(Task task, string key, T obj)
{
await ResetSyncAfterPush(task, key);
if (streamingOptions == StreamingOptions.None)
subject.OnNext(new FirebaseEvent(key, obj,
obj == null ? FirebaseEventType.Delete : FirebaseEventType.InsertOrUpdate,
FirebaseEventSource.OnlinePush));
}
private async Task ResetSyncAfterPush(Task task, string key)
{
await task;
ResetSyncOptions(key);
}
private void ResetSyncOptions(string key)
{
var item = Database[key];
if (item.IsPartial)
{
Database.Remove(key);
}
else
{
item.SyncOptions = SyncOptions.None;
Database[key] = item;
}
}
private void StreamingExceptionThrown(object sender, ExceptionEventArgs e)
{
SyncExceptionThrown?.Invoke(this, new ExceptionEventArgs(e.Exception));
}
private Tuple GenerateFullKey(string key,
Expression> propertyGetter, SyncOptions syncOptions)
{
var visitor = new MemberAccessVisitor();
visitor.Visit(propertyGetter);
var propertyType = typeof(TProperty).GetTypeInfo();
var prefix = key == string.Empty ? string.Empty : key + "/";
// primitive types
if (syncOptions == SyncOptions.Patch && (propertyType.IsPrimitive ||
Nullable.GetUnderlyingType(typeof(TProperty)) != null ||
typeof(TProperty) == typeof(string)))
return Tuple.Create(prefix + string.Join("/", visitor.PropertyNames.Skip(1).Reverse()),
visitor.PropertyNames.First(), true);
return Tuple.Create(prefix + string.Join("/", visitor.PropertyNames.Reverse()),
visitor.PropertyNames.First(), false);
}
}
}