summaryrefslogtreecommitdiff
path: root/dsa/FireBase/Offline/RealtimeDatabase.cs
diff options
context:
space:
mode:
Diffstat (limited to 'dsa/FireBase/Offline/RealtimeDatabase.cs')
-rw-r--r--dsa/FireBase/Offline/RealtimeDatabase.cs129
1 files changed, 43 insertions, 86 deletions
diff --git a/dsa/FireBase/Offline/RealtimeDatabase.cs b/dsa/FireBase/Offline/RealtimeDatabase.cs
index 973db46..e66a1f1 100644
--- a/dsa/FireBase/Offline/RealtimeDatabase.cs
+++ b/dsa/FireBase/Offline/RealtimeDatabase.cs
@@ -16,14 +16,12 @@ using Firebase.Database.Query;
using Firebase.Database.Streaming;
using Newtonsoft.Json;
-namespace Firebase.Database.Offline
-{
+namespace Firebase.Database.Offline {
/// <summary>
/// The real-time Database which synchronizes online and offline data.
/// </summary>
/// <typeparam name="T"> Type of entities. </typeparam>
- public class RealtimeDatabase<T> : IDisposable where T : class
- {
+ public class RealtimeDatabase<T> : IDisposable where T : class {
private readonly ChildQuery childQuery;
private readonly string elementRoot;
private readonly FirebaseCache<T> firebaseCache;
@@ -55,8 +53,7 @@ namespace Firebase.Database.Offline
public RealtimeDatabase(ChildQuery childQuery, string elementRoot,
Func<Type, string, IDictionary<string, OfflineEntry>> offlineDatabaseFactory, string filenameModifier,
StreamingOptions streamingOptions, InitialPullStrategy initialPullStrategy, bool pushChanges,
- ISetHandler<T> setHandler = null)
- {
+ ISetHandler<T> setHandler = null) {
this.childQuery = childQuery;
this.elementRoot = elementRoot;
this.streamingOptions = streamingOptions;
@@ -80,8 +77,7 @@ namespace Firebase.Database.Offline
public ISetHandler<T> PutHandler { private get; set; }
- public void Dispose()
- {
+ public void Dispose() {
subject.OnCompleted();
firebaseSubscription?.Dispose();
}
@@ -102,19 +98,16 @@ namespace Firebase.Database.Offline
/// The priority. Objects with higher priority will be synced first. Higher number indicates higher
/// priority.
/// </param>
- public void Set(string key, T obj, SyncOptions syncOptions, int priority = 1)
- {
+ public void Set(string key, T obj, SyncOptions syncOptions, int priority = 1) {
SetAndRaise(key, new OfflineEntry(key, obj, priority, syncOptions));
}
public void Set<TProperty>(string key, Expression<Func<T, TProperty>> propertyExpression, object value,
- SyncOptions syncOptions, int priority = 1)
- {
+ SyncOptions syncOptions, int priority = 1) {
var fullKey = GenerateFullKey(key, propertyExpression, syncOptions);
var serializedObject = JsonConvert.SerializeObject(value).Trim('"', '\\');
- if (fullKey.Item3)
- {
+ if (fullKey.Item3) {
if (typeof(TProperty) != typeof(string) || value == null)
// don't escape non-string primitives and null;
serializedObject = $"{{ \"{fullKey.Item2}\" : {serializedObject} }}";
@@ -142,8 +135,7 @@ namespace Firebase.Database.Offline
/// The priority. Objects with higher priority will be synced first. Higher number indicates higher
/// priority.
/// </param>
- public void Pull(string key, int priority = 1)
- {
+ public void Pull(string key, int priority = 1) {
if (!Database.ContainsKey(key))
Database[key] = new OfflineEntry(key, null, priority, SyncOptions.Pull);
else if (Database[key].SyncOptions == SyncOptions.None)
@@ -154,8 +146,7 @@ namespace Firebase.Database.Offline
/// <summary>
/// Fetches everything from the remote database.
/// </summary>
- public async Task PullAsync()
- {
+ public async Task PullAsync() {
var existingEntries = await childQuery
.OnceAsync<T>()
.ToObservable()
@@ -166,8 +157,7 @@ namespace Firebase.Database.Offline
.OK) // OK implies the request couldn't complete due to network error.
.Select(e => ResetDatabaseFromInitial(e, false))
.SelectMany(e => e)
- .Do(e =>
- {
+ .Do(e => {
Database[e.Key] = new OfflineEntry(e.Key, e.Object, 1, SyncOptions.None);
subject.OnNext(new FirebaseEvent<T>(e.Key, e.Object, FirebaseEventType.InsertOrUpdate,
FirebaseEventSource.OnlinePull));
@@ -175,8 +165,7 @@ namespace Firebase.Database.Offline
.ToList();
// Remove items not stored online
- foreach (var item in Database.Keys.Except(existingEntries.Select(f => f.Key)).ToList())
- {
+ foreach (var item in Database.Keys.Except(existingEntries.Select(f => f.Key)).ToList()) {
Database.Remove(item);
subject.OnNext(new FirebaseEvent<T>(item, null, FirebaseEventType.Delete,
FirebaseEventSource.OnlinePull));
@@ -186,8 +175,7 @@ namespace Firebase.Database.Offline
/// <summary>
/// Retrieves all offline items currently stored in local database.
/// </summary>
- public IEnumerable<FirebaseObject<T>> Once()
- {
+ public IEnumerable<FirebaseObject<T>> Once() {
return Database
.Where(kvp => !string.IsNullOrEmpty(kvp.Value.Data) && kvp.Value.Data != "null" && !kvp.Value.IsPartial)
.Select(kvp => new FirebaseObject<T>(kvp.Key, kvp.Value.Deserialize<T>()))
@@ -198,17 +186,14 @@ namespace Firebase.Database.Offline
/// Starts observing the real-time Database. Events will be fired both when change is done locally and remotely.
/// </summary>
/// <returns> Stream of <see cref="FirebaseEvent{T}" />. </returns>
- public IObservable<FirebaseEvent<T>> AsObservable()
- {
- if (!isSyncRunning)
- {
+ public IObservable<FirebaseEvent<T>> AsObservable() {
+ if (!isSyncRunning) {
isSyncRunning = true;
Task.Factory.StartNew(SynchronizeThread, CancellationToken.None, TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
- if (observable == null)
- {
+ if (observable == null) {
var initialData = Observable.Return(FirebaseEvent<T>.Empty(FirebaseEventSource.Offline));
if (Database.TryGetValue(elementRoot, out var oe))
initialData = Observable.Return(oe)
@@ -251,8 +236,7 @@ namespace Firebase.Database.Offline
}
private IReadOnlyCollection<FirebaseObject<T>> ResetDatabaseFromInitial(
- IReadOnlyCollection<FirebaseObject<T>> collection, bool onlyWhenInitialEverything = true)
- {
+ IReadOnlyCollection<FirebaseObject<T>> collection, bool onlyWhenInitialEverything = true) {
if (onlyWhenInitialEverything && initialPullStrategy != InitialPullStrategy.Everything) return collection;
// items which are in local db, but not in the online collection
@@ -264,8 +248,7 @@ namespace Firebase.Database.Offline
return collection.Concat(extra).ToList();
}
- private void SetObjectFromInitialPull(FirebaseObject<T> e)
- {
+ private void SetObjectFromInitialPull(FirebaseObject<T> e) {
// set object with no sync only if it doesn't exist yet
// and the InitialPullStrategy != Everything
// this attempts to deal with scenario when you are offline, have local changes and go online
@@ -276,11 +259,9 @@ namespace Firebase.Database.Offline
Database[e.Key] = new OfflineEntry(e.Key, e.Object, 1, SyncOptions.None);
}
- private IObservable<IReadOnlyCollection<FirebaseObject<T>>> GetInitialPullObservable()
- {
+ private IObservable<IReadOnlyCollection<FirebaseObject<T>>> GetInitialPullObservable() {
FirebaseQuery query;
- switch (initialPullStrategy)
- {
+ switch (initialPullStrategy) {
case InitialPullStrategy.MissingOnly:
query = childQuery.OrderByKey().StartAt(() => GetLatestKey());
break;
@@ -301,12 +282,10 @@ namespace Firebase.Database.Offline
.Select(e => new[] {new FirebaseObject<T>(elementRoot, e)}));
}
- private IDisposable InitializeStreamingSubscription(IObserver<FirebaseEvent<T>> observer)
- {
+ private IDisposable InitializeStreamingSubscription(IObserver<FirebaseEvent<T>> observer) {
var completeDisposable = Disposable.Create(() => isSyncRunning = false);
- switch (streamingOptions)
- {
+ switch (streamingOptions) {
case StreamingOptions.LatestOnly:
// stream since the latest key
var queryLatest = childQuery.OrderByKey().StartAt(() => GetLatestKey());
@@ -328,8 +307,7 @@ namespace Firebase.Database.Offline
}
private void SetAndRaise(string key, OfflineEntry obj,
- FirebaseEventSource eventSource = FirebaseEventSource.Offline)
- {
+ FirebaseEventSource eventSource = FirebaseEventSource.Offline) {
Database[key] = obj;
subject.OnNext(new FirebaseEvent<T>(key, obj?.Deserialize<T>(),
string.IsNullOrEmpty(obj?.Data) || obj?.Data == "null"
@@ -337,12 +315,9 @@ namespace Firebase.Database.Offline
: FirebaseEventType.InsertOrUpdate, eventSource));
}
- private async void SynchronizeThread()
- {
- while (isSyncRunning)
- {
- try
- {
+ private async void SynchronizeThread() {
+ while (isSyncRunning) {
+ try {
var validEntries = Database.Where(e => e.Value != null);
await PullEntriesAsync(validEntries.Where(kvp => kvp.Value.SyncOptions == SyncOptions.Pull));
@@ -350,8 +325,7 @@ namespace Firebase.Database.Offline
await PushEntriesAsync(validEntries.Where(kvp =>
kvp.Value.SyncOptions == SyncOptions.Put || kvp.Value.SyncOptions == SyncOptions.Patch));
}
- catch (Exception ex)
- {
+ catch (Exception ex) {
SyncExceptionThrown?.Invoke(this, new ExceptionEventArgs(ex));
}
@@ -359,8 +333,7 @@ namespace Firebase.Database.Offline
}
}
- private string GetLatestKey()
- {
+ private string GetLatestKey() {
var key = Database.OrderBy(o => o.Key, StringComparer.Ordinal).LastOrDefault().Key ?? string.Empty;
if (!string.IsNullOrWhiteSpace(key))
@@ -369,60 +342,50 @@ namespace Firebase.Database.Offline
return key;
}
- private async Task PushEntriesAsync(IEnumerable<KeyValuePair<string, OfflineEntry>> pushEntries)
- {
+ private async Task PushEntriesAsync(IEnumerable<KeyValuePair<string, OfflineEntry>> pushEntries) {
var groups = pushEntries.GroupBy(pair => pair.Value.Priority).OrderByDescending(kvp => kvp.Key).ToList();
- foreach (var group in groups)
- {
+ foreach (var group in groups) {
var tasks = group.OrderBy(kvp => kvp.Value.IsPartial).Select(kvp =>
kvp.Value.IsPartial
? ResetSyncAfterPush(PutHandler.SetAsync(childQuery, kvp.Key, kvp.Value), kvp.Key)
: ResetSyncAfterPush(PutHandler.SetAsync(childQuery, kvp.Key, kvp.Value), kvp.Key,
kvp.Value.Deserialize<T>()));
- try
- {
+ try {
await Task.WhenAll(tasks).WithAggregateException();
}
- catch (Exception ex)
- {
+ catch (Exception ex) {
SyncExceptionThrown?.Invoke(this, new ExceptionEventArgs(ex));
}
}
}
- private async Task PullEntriesAsync(IEnumerable<KeyValuePair<string, OfflineEntry>> pullEntries)
- {
+ private async Task PullEntriesAsync(IEnumerable<KeyValuePair<string, OfflineEntry>> pullEntries) {
var taskGroups = pullEntries.GroupBy(pair => pair.Value.Priority).OrderByDescending(kvp => kvp.Key);
- foreach (var group in taskGroups)
- {
+ foreach (var group in taskGroups) {
var tasks = group.Select(pair =>
ResetAfterPull(
childQuery.Child(pair.Key == elementRoot ? string.Empty : pair.Key).OnceSingleAsync<T>(),
pair.Key, pair.Value));
- try
- {
+ try {
await Task.WhenAll(tasks).WithAggregateException();
}
- catch (Exception ex)
- {
+ catch (Exception ex) {
SyncExceptionThrown?.Invoke(this, new ExceptionEventArgs(ex));
}
}
}
- private async Task ResetAfterPull(Task<T> task, string key, OfflineEntry entry)
- {
+ private async Task ResetAfterPull(Task<T> task, string key, OfflineEntry entry) {
await task;
SetAndRaise(key, new OfflineEntry(key, task.Result, entry.Priority, SyncOptions.None),
FirebaseEventSource.OnlinePull);
}
- private async Task ResetSyncAfterPush(Task task, string key, T obj)
- {
+ private async Task ResetSyncAfterPush(Task task, string key, T obj) {
await ResetSyncAfterPush(task, key);
if (streamingOptions == StreamingOptions.None)
@@ -431,35 +394,29 @@ namespace Firebase.Database.Offline
FirebaseEventSource.OnlinePush));
}
- private async Task ResetSyncAfterPush(Task task, string key)
- {
+ private async Task ResetSyncAfterPush(Task task, string key) {
await task;
ResetSyncOptions(key);
}
- private void ResetSyncOptions(string key)
- {
+ private void ResetSyncOptions(string key) {
var item = Database[key];
- if (item.IsPartial)
- {
+ if (item.IsPartial) {
Database.Remove(key);
}
- else
- {
+ else {
item.SyncOptions = SyncOptions.None;
Database[key] = item;
}
}
- private void StreamingExceptionThrown(object sender, ExceptionEventArgs<FirebaseException> e)
- {
+ private void StreamingExceptionThrown(object sender, ExceptionEventArgs<FirebaseException> e) {
SyncExceptionThrown?.Invoke(this, new ExceptionEventArgs(e.Exception));
}
private Tuple<string, string, bool> GenerateFullKey<TProperty>(string key,
- Expression<Func<T, TProperty>> propertyGetter, SyncOptions syncOptions)
- {
+ Expression<Func<T, TProperty>> propertyGetter, SyncOptions syncOptions) {
var visitor = new MemberAccessVisitor();
visitor.Visit(propertyGetter);
var propertyType = typeof(TProperty).GetTypeInfo();