Updated DBSyncWatcher and Syncer

Updated to remove reading of Datbase File from DBSyncWatcher, and move
it into Syncer.
This commit is contained in:
Mario Steele 2025-08-05 11:39:56 -05:00
parent 22d7bfb410
commit 7dbb71d921
2 changed files with 47 additions and 59 deletions

View file

@ -12,7 +12,7 @@ public class DBSyncWatcher
private FileSystemWatcher _watcher; private FileSystemWatcher _watcher;
public Dictionary<string, Type> WatchFiles { get; set; } = []; public Dictionary<string, Type> WatchFiles { get; set; } = [];
public delegate void DatabaseChange(string db, string value); public delegate void DatabaseChange(string db);
public event DatabaseChange OnDatabaseChange; public event DatabaseChange OnDatabaseChange;
public bool Locked = false; public bool Locked = false;
@ -43,19 +43,7 @@ public class DBSyncWatcher
if (!WatchFiles.Keys.Contains(dbName)) return; if (!WatchFiles.Keys.Contains(dbName)) return;
Log.Information("Database File Changed: {DbName}", dbName); Log.Information("Database File Changed: {DbName}", dbName);
OnDatabaseChange?.Invoke(dbName);
var data = new List<string>();
using var fh = File.OpenText(e.FullPath);
while (!fh.EndOfStream)
data.Add(fh.ReadLine() ?? string.Empty);
foreach (var line in data)
{
if (line == "") continue;
var type = WatchFiles[dbName];
OnDatabaseChange?.Invoke(dbName, line);
}
} }
private void HandleCreated(object sender, FileSystemEventArgs e) private void HandleCreated(object sender, FileSystemEventArgs e)
@ -72,19 +60,7 @@ public class DBSyncWatcher
if (!WatchFiles.Keys.Contains(dbName)) return; if (!WatchFiles.Keys.Contains(dbName)) return;
Log.Information("Database File Created: {DbName}", dbName); Log.Information("Database File Created: {DbName}", dbName);
OnDatabaseChange?.Invoke(dbName);
var data = new List<string>();
using var fh = File.OpenText(e.FullPath);
while (!fh.EndOfStream)
data.Add(fh.ReadLine() ?? string.Empty);
foreach (var line in data)
{
if (line == "") continue;
var type = WatchFiles[dbName];
OnDatabaseChange?.Invoke(dbName, line);
}
} }
private void HandleError(object sender, ErrorEventArgs e) private void HandleError(object sender, ErrorEventArgs e)

View file

@ -19,7 +19,7 @@ public interface ISyncer
{ {
Task ReadDatabase(); Task ReadDatabase();
Task FetchDatabase(); Task FetchDatabase();
void HandleDatabaseChange(string dbName, string entryObject); void HandleDatabaseChange(string dbName);
void Sync(); void Sync();
bool IsDirty(); bool IsDirty();
void Enable(); void Enable();
@ -45,6 +45,8 @@ public class Syncer<T> : ISyncer where T : class, IDataModel, new()
private bool _syncing = false; private bool _syncing = false;
private bool _enabled = true; private bool _enabled = true;
private int _lastLineCount = 0;
public Syncer(DBSyncWatcher watcher, string dbPath, string dbName, string restBaseUrl, string restEndpoint) public Syncer(DBSyncWatcher watcher, string dbPath, string dbName, string restBaseUrl, string restEndpoint)
{ {
_watcher = watcher; _watcher = watcher;
@ -80,7 +82,7 @@ public class Syncer<T> : ISyncer where T : class, IDataModel, new()
return false; return false;
} }
if (res.AppVersion == "0.1.3") if (res.AppVersion == "0.1.4")
{ {
Log.Information("Server Online! {AppVersion}", res.AppVersion); Log.Information("Server Online! {AppVersion}", res.AppVersion);
return true; return true;
@ -154,7 +156,7 @@ public class Syncer<T> : ISyncer where T : class, IDataModel, new()
} }
} }
public async void HandleDatabaseChange(string dbName, string entryObject) public async void HandleDatabaseChange(string dbName)
{ {
if (!_enabled) return; if (!_enabled) return;
if (dbName != _dbName) if (dbName != _dbName)
@ -163,42 +165,52 @@ public class Syncer<T> : ISyncer where T : class, IDataModel, new()
if (_syncing) if (_syncing)
return; return;
T? entry; var lines = await File.ReadAllLinesAsync(_dbPath);
try if (lines.Length < _lastLineCount)
{ // We are compressing the file, so we need to rescan everything.
entry = JsonSerializer.Deserialize<T>(entryObject, GlobalJsonOptions.Options); _lastLineCount = 0;
}
catch (Exception ex) for (var i = _lastLineCount; i < lines.Length; i++)
{ {
var entryObject = lines[i];
T? entry;
try try
{ {
var jobj = JsonSerializer.Deserialize<JsonObject>(entryObject, GlobalJsonOptions.Options); entry = JsonSerializer.Deserialize<T>(entryObject, GlobalJsonOptions.Options);
entry = new T();
entry.MarshalData(jobj["_id"].GetValue<string>(), entryObject);
} }
catch (Exception iex) catch (Exception ex)
{ {
Log.Error("Failed to parse line: {EntryLine}", entryObject); try
Log.Error("Error Message: {Messsage}", iex.Message); {
entry = null; var jobj = JsonSerializer.Deserialize<JsonObject>(entryObject, GlobalJsonOptions.Options);
entry = new T();
entry.MarshalData(jobj["_id"].GetValue<string>(), entryObject);
}
catch (Exception iex)
{
Log.Error("Failed to parse line: {EntryLine}", entryObject);
Log.Error("Error Message: {Messsage}", iex.Message);
entry = null;
}
} }
if (entry == null) continue;
entry.MarshalData(entry.Id(), entryObject);
if (_entries.Any(x => x.EqualId(entry.Id())))
{
var data = _entries.First(x => x.EqualId(entry.Id()));
if (data.Equals(entry)) continue;
Log.Information("Updated File Entry {EntryId} updated for {DbName}", entry.Id(), _dbName);
_entries.RemoveAll(x => x.EqualId(entry.Id()));
}
else
Log.Information("New File Entry {EntryId} for {DbName}", entry.Id(), _dbName);
_entries.Add(entry);
await _client.PostJsonAsync<T>(_restEndpoint, entry);
_lastLineCount = i;
} }
if (entry == null) return;
entry.MarshalData(entry.Id(), entryObject);
if (_entries.Any(x => x.EqualId(entry.Id())))
{
var data = _entries.First(x => x.EqualId(entry.Id()));
if (data.Equals(entry)) return;
Log.Information("Updated File Entry {EntryId} updated for {DbName}", entry.Id(), _dbName);
_entries.RemoveAll(x => x.EqualId(entry.Id()));
}
else
Log.Information("New File Entry {EntryId} for {DbName}", entry.Id(), _dbName);
_entries.Add(entry);
await _client.PostJsonAsync<T>(_restEndpoint, entry);
} }
public void Sync() public void Sync()