Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tidy attacher syntax/logic a bit #436

Merged
merged 2 commits into from
Sep 26, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 51 additions & 71 deletions Rdmp.Dicom/Attachers/Routing/AutoRoutingAttacher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@

readonly Dictionary<string, bool> _columnNamesRoutedSuccesfully = new(StringComparer.CurrentCultureIgnoreCase);

readonly Stopwatch _sw = new();
Dictionary<DataTable, string> _modalityMap;
private Dictionary<DataTable, string> _modalityMap;

protected AutoRoutingAttacher(bool requestsExternalDatabaseCreation) : base(requestsExternalDatabaseCreation) //Derived classes can change mind about RAW creation
{
Expand All @@ -62,11 +61,9 @@
Job = job;

//if we have an explicit payload to run instead (this is how you inject explicit files/archives/directories to be loaded without touching the disk
if (job.Payload != null)
if (job.Payload is IDicomWorklist worklist)
{
var useCase = new AutoRoutingAttacherPipelineUseCase(this, (IDicomWorklist)job.Payload);
var engine = useCase.GetEngine(LoadPipeline, Job);
engine.ExecutePipeline(token);
new AutoRoutingAttacherPipelineUseCase(this, worklist).GetEngine(LoadPipeline, Job).ExecutePipeline(token);
}
else
{
Expand All @@ -78,29 +75,26 @@

//no explicit injected payload, so use the ForLoading directory to generate the list of dicom/zip files to process
foreach (var filesToLoad in job.LoadDirectory.ForLoading.GetFiles(ListPattern))
{
var useCase = new AutoRoutingAttacherPipelineUseCase(this, new FlatFileToLoadDicomFileWorklist(new FlatFileToLoad(filesToLoad)));
var engine = useCase.GetEngine(LoadPipeline, job);
engine.ExecutePipeline(token);
}
new AutoRoutingAttacherPipelineUseCase(this,
new FlatFileToLoadDicomFileWorklist(new FlatFileToLoad(filesToLoad)))
.GetEngine(LoadPipeline, job)
.ExecutePipeline(token);
}

var unmatchedColumns = string.Join($",{Environment.NewLine}", _columnNamesRoutedSuccesfully.Where(kvp => kvp.Value == false).Select(k => k.Key));
var unmatchedColumns = string.Join($",{Environment.NewLine}",
_columnNamesRoutedSuccesfully.Where(static kvp => kvp.Value == false).Select(static k => k.Key));

if (!string.IsNullOrWhiteSpace(unmatchedColumns))
{
//for each column see in an input table that was not succesfully routed somewhere
job.OnNotify(this,new NotifyEventArgs(ProgressEventType.Warning,
$"Ignored input columns:{unmatchedColumns}"));
}


return ExitCodeType.Success;
}

private void CreateTableUploaders()
{
_uploaders = new Dictionary<string, Tuple<SqlBulkInsertDestination, ITableLoadInfo>>(StringComparer.CurrentCultureIgnoreCase);
_uploaders = new Dictionary<string, Tuple<SqlBulkInsertDestination, ITableLoadInfo>>(StringComparer.OrdinalIgnoreCase);
foreach (var t in Job.RegularTablesToLoad)
{
var tblName = t.GetRuntimeName(LoadBubble.Raw,Job.Configuration.DatabaseNamer);
Expand All @@ -117,42 +111,32 @@
private void RefreshUploadDictionary()
{
//Each column name can exist in multiple TableInfos (e.g. foreign keys with the same name) so we can route a column to multiple destination tables
_columnNameToTargetTablesDictionary = new Dictionary<string, HashSet<DataTable>>(StringComparer.CurrentCultureIgnoreCase);
_columnNameToTargetTablesDictionary = new Dictionary<string, HashSet<DataTable>>(StringComparer.OrdinalIgnoreCase);

foreach (var tableInfo in Job.RegularTablesToLoad)
{
var dt = new DataTable(tableInfo.GetRuntimeName(LoadBubble.Raw, Job.Configuration.DatabaseNamer));
dt.BeginLoadData();

foreach (var columnInfo in tableInfo.GetColumnsAtStage(LoadStage.AdjustRaw))
foreach (var colName in tableInfo.GetColumnsAtStage(LoadStage.AdjustRaw)
.Select(static columnInfo => columnInfo.GetRuntimeName(LoadStage.AdjustRaw)))
{
var colName = columnInfo.GetRuntimeName(LoadStage.AdjustRaw);

//add the column to the DataTable that will be uploaded
dt.Columns.Add(colName);

//add it to the routing dictionary
if (!_columnNameToTargetTablesDictionary.ContainsKey(colName))
_columnNameToTargetTablesDictionary.Add(colName, new HashSet<DataTable>());

if (!_columnNameToTargetTablesDictionary[colName].Contains(dt))
_columnNameToTargetTablesDictionary[colName].Add(dt);
if (!_columnNameToTargetTablesDictionary.TryGetValue(colName, out var targets))
_columnNameToTargetTablesDictionary.Add(colName, targets = new HashSet<DataTable>());
targets.Add(dt);
}
}
}

public override void Check(ICheckNotifier notifier)
{
if(LoadPipeline != null)
{
new PipelineChecker(LoadPipeline).Check(notifier);

//don't check this since we are our own Fixed source for the engine so we just end up in a loop! but do instantiate it incase there are construction/context errors
if (LoadPipeline != null) new PipelineChecker(LoadPipeline).Check(notifier);

PipelineChecker c = new(LoadPipeline);
c.Check(notifier);
}

if (ModalityMatchingRegex != null && !ModalityMatchingRegex.ToString().Contains('('))
if (ModalityMatchingRegex?.ToString().Contains('(') == false)
notifier.OnCheckPerformed(
new CheckEventArgs(
$"Expected ModalityMatchingRegex '{ModalityMatchingRegex}' to contain a group matching for extracting modality e.g. '^(.*)_.*$'", CheckResult.Fail));
Expand All @@ -172,7 +156,7 @@
{
MySqlBulkCopy.BulkInsertBatchTimeoutInSeconds = int.MaxValue; //forever

_sw.Start();
var sw = Stopwatch.StartNew();

RefreshUploadDictionary();

Expand All @@ -182,26 +166,22 @@

AddRows(toProcess);

Exception ex = null;
try
{
BulkInsert(cancellationToken);

}
catch(Exception exception)
catch (Exception exception)
{
ex = exception;
DisposeUploaders(exception);
throw new Exception("Error occurred during upload",exception);
}

Check notice

Code scanning / CodeQL

Generic catch clause Note

Generic catch clause.

DisposeUploaders(ex);
DisposeUploaders(null);

if (ex != null)
throw new Exception("Error occurred during upload",ex);

_sw.Stop();
sw.Stop();

listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
$"ProcessPipelineData (Upload) cumulative total time is {_sw.ElapsedMilliseconds}ms"));
$"ProcessPipelineData (Upload) cumulative total time is {sw.ElapsedMilliseconds}ms"));

return null;
}
Expand All @@ -214,7 +194,7 @@

_modalityMap = new Dictionary<DataTable, string>();

foreach (var dt in _columnNameToTargetTablesDictionary.Values.SelectMany(v=>v).Distinct())
foreach (var dt in _columnNameToTargetTablesDictionary.Values.SelectMany(static v=>v).Distinct())
{
var m = ModalityMatchingRegex.Match(dt.TableName);

Expand All @@ -229,9 +209,11 @@

private void BulkInsert(GracefulCancellationToken token)
{
foreach (var dt in _columnNameToTargetTablesDictionary.Values.SelectMany(hs => hs).Distinct())
if (dt.Rows.Count > 0)
_uploaders[dt.TableName].Item1.ProcessPipelineData(dt, Job, token);
foreach (var dt in _columnNameToTargetTablesDictionary.Values.SelectMany(static hs => hs).Distinct().Where(static dt => dt.Rows.Count > 0))
{
dt.EndLoadData();
_uploaders[dt.TableName].Item1.ProcessPipelineData(dt, Job, token);
}
}

private void AddRows(DataTable toProcess)
Expand All @@ -242,28 +224,27 @@
//for every row in the input table
foreach (DataRow inputRow in toProcess.Rows)
{
Dictionary<DataTable,DataRow> newDestinationRows = new();
Dictionary<DataTable,DataRow> newDestinationRows = new();

//get the modality of the current record (if we care)
string modality = null;

if (_modalityMap != null)
modality = inputRow["Modality"].ToString();
var modality = _modalityMap == null ? null : inputRow["Modality"].ToString();

var addedToAtLeastOneTable = false;

//for every input cell
foreach (DataColumn column in toProcess.Columns)
{
//if there is a destination for that DataTable
if (!_columnNameToTargetTablesDictionary.ContainsKey(column.ColumnName)) continue;
if (!_columnNameToTargetTablesDictionary.TryGetValue(column.ColumnName, out var destinations)) continue;
//there is a matching destination column in one or more destination tables in RAW
foreach (var destinationTable in _columnNameToTargetTablesDictionary[column.ColumnName])
foreach (var destinationTable in destinations)
{
//if we are mapping modalities to tables and this table isn't an ALL table
if (_modalityMap != null && !_modalityMap[destinationTable].Equals("ALL",StringComparison.CurrentCultureIgnoreCase))
if(!string.Equals(_modalityMap[destinationTable], modality,StringComparison.CurrentCultureIgnoreCase))
continue; //skip it
var tableModality = _modalityMap?[destinationTable];
if (tableModality?.Equals("ALL", StringComparison.OrdinalIgnoreCase) == false &&
tableModality?.Equals(modality, StringComparison.OrdinalIgnoreCase) == false)
continue; //skip it

AddCellValue(inputRow, column, destinationTable, newDestinationRows);
addedToAtLeastOneTable = true;
Expand All @@ -273,36 +254,35 @@
}

//we didn't add the row to any tables yet
if( _modalityMap != null && !addedToAtLeastOneTable)
{
if (_modalityMap != null && !addedToAtLeastOneTable)
//Try again but put it in OTHER
foreach (DataColumn column in toProcess.Columns)
{
if (!_columnNameToTargetTablesDictionary.TryGetValue(column.ColumnName,out var tables)) continue;
if (!_columnNameToTargetTablesDictionary.TryGetValue(column.ColumnName, out var tables)) continue;
//there is a matching destination column in one or more destination tables in RAW
foreach (var destinationTable in tables.Where(destinationTable => _modalityMap[destinationTable].Equals("OTHER",StringComparison.CurrentCultureIgnoreCase)))
foreach (var destinationTable in tables.Where(destinationTable => _modalityMap[destinationTable].Equals("OTHER", StringComparison.OrdinalIgnoreCase)))
{
AddCellValue(inputRow, column, destinationTable, newDestinationRows);
addedToAtLeastOneTable = true;
}

_columnNamesRoutedSuccesfully[column.ColumnName] = true;
}
}

if(!addedToAtLeastOneTable && _modalityMap != null)
if (!addedToAtLeastOneTable && _modalityMap != null)
throw new Exception(
$"Failed to route row with modality:{modality} Mapping was {string.Join(Environment.NewLine, _modalityMap.Select(kvp => $"{kvp.Key.TableName}={kvp.Value}"))}");
$"Failed to route row with modality:{modality} Mapping was {string.Join(Environment.NewLine, _modalityMap.Select(static kvp => $"{kvp.Key.TableName}={kvp.Value}"))}");
}
}

private void AddCellValue(DataRow inputRow, DataColumn column, DataTable destinationTable, Dictionary<DataTable, DataRow> newDestinationRows)
private static void AddCellValue(DataRow inputRow, DataColumn column, DataTable destinationTable, Dictionary<DataTable, DataRow> newDestinationRows)
{
//if destination table doesn't have a new row yet add one
if (!newDestinationRows.ContainsKey(destinationTable))
newDestinationRows.Add(destinationTable, destinationTable.Rows.Add());
if (!newDestinationRows.TryGetValue(destinationTable, out var destRow))
newDestinationRows.Add(destinationTable, destRow = destinationTable.Rows.Add());

//copy the value into the new row
newDestinationRows[destinationTable][column.ColumnName] = inputRow[column.ColumnName];
destRow[column.ColumnName] = inputRow[column.ColumnName];
}

public void Dispose(IDataLoadEventListener listener, Exception pipelineFailureExceptionIfAny)
Expand All @@ -318,7 +298,7 @@
item2.CloseAndArchive();
}

foreach (var dt in _columnNameToTargetTablesDictionary.SelectMany(v => v.Value).Distinct())
foreach (var dt in _columnNameToTargetTablesDictionary.SelectMany(static v => v.Value).Distinct())
dt.Dispose();

_columnNameToTargetTablesDictionary = null;
Expand Down