DTDE Development Plan - Update Engine¶
← Back to Query Engine | Next: Configuration & API →
1. Update Engine Overview¶
The Update Engine transforms standard EF Core change tracking operations into temporal versioning operations. When the application calls SaveChanges(), DTDE intercepts temporal entities and applies version-bump semantics across the appropriate shards.
1.1 Update Flow¶
┌─────────────────────────────────────────────────────────────────────────────┐
│ Application Code │
│ var entity = await db.Contracts.FindAsync(id); │
│ entity.Amount = newAmount; │
│ await db.SaveChangesAsync(); │
└─────────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ 1. ChangeTracker Analysis │
│ • Identify temporal entities with changes │
│ • Group by operation type (Add/Modify/Delete) │
│ • Extract current and original values │
└─────────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ 2. Version Manager │
│ • For Added: Validate no overlapping version exists │
│ • For Modified: Create version bump (close old, create new) │
│ • For Deleted: Close validity period (soft delete) │
└─────────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ 3. Shard Write Router │
│ • Determine target shard for new version │
│ • Determine source shard for old version │
│ • Handle cross-shard version bumps │
└─────────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ 4. Distributed Write Executor │
│ • Execute invalidation on source shard │
│ • Execute insertion on target shard │
│ • Handle consistency (best-effort or transactional) │
└─────────────────────────────────────────────────────────────────────────────┘
2. Update Processor Interface¶
namespace Dtde.EntityFramework.Update;
/// <summary>
/// Processes EF Core change tracking entries for temporal entities.
/// </summary>
public interface IDtdeUpdateProcessor
{
/// <summary>
/// Processes temporal entity updates from the ChangeTracker.
/// </summary>
/// <param name="context">The source DbContext.</param>
/// <param name="entries">The change tracking entries to process.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The number of affected entities.</returns>
Task<int> ProcessUpdatesAsync(
DbContext context,
IReadOnlyList<EntityEntry> entries,
CancellationToken cancellationToken = default);
}
3. Update Processor Implementation¶
namespace Dtde.EntityFramework.Update;
/// <summary>
/// Processes temporal entity updates with version-bump semantics.
/// </summary>
public sealed class DtdeUpdateProcessor : IDtdeUpdateProcessor
{
private readonly IMetadataRegistry _metadataRegistry;
private readonly IVersionManager _versionManager;
private readonly IShardWriteRouter _shardWriteRouter;
private readonly IDistributedWriteExecutor _writeExecutor;
private readonly IDtdeDiagnostics _diagnostics;
private readonly ILogger<DtdeUpdateProcessor> _logger;
public DtdeUpdateProcessor(
IMetadataRegistry metadataRegistry,
IVersionManager versionManager,
IShardWriteRouter shardWriteRouter,
IDistributedWriteExecutor writeExecutor,
IDtdeDiagnostics diagnostics,
ILogger<DtdeUpdateProcessor> logger)
{
_metadataRegistry = metadataRegistry;
_versionManager = versionManager;
_shardWriteRouter = shardWriteRouter;
_writeExecutor = writeExecutor;
_diagnostics = diagnostics;
_logger = logger;
}
public async Task<int> ProcessUpdatesAsync(
DbContext context,
IReadOnlyList<EntityEntry> entries,
CancellationToken cancellationToken = default)
{
var correlationId = Guid.NewGuid().ToString("N")[..8];
var stopwatch = Stopwatch.StartNew();
var affectedCount = 0;
_logger.LogInformation(
"[{CorrelationId}] Processing {Count} temporal entity updates",
correlationId,
entries.Count);
// Group entries by entity type and operation
var operations = new List<VersionOperation>();
foreach (var entry in entries)
{
var entityType = entry.Entity.GetType();
var metadata = _metadataRegistry.GetEntityMetadata(entityType);
if (metadata?.IsTemporal != true)
{
_logger.LogWarning(
"[{CorrelationId}] Entity {EntityType} is not temporal, skipping",
correlationId,
entityType.Name);
continue;
}
var operation = entry.State switch
{
EntityState.Added => CreateAddOperation(entry, metadata, correlationId),
EntityState.Modified => CreateModifyOperation(entry, metadata, correlationId),
EntityState.Deleted => CreateDeleteOperation(entry, metadata, correlationId),
_ => null
};
if (operation is not null)
{
operations.Add(operation);
}
}
// Process operations through version manager
var processedOperations = await _versionManager.ProcessOperationsAsync(
operations,
cancellationToken);
// Route to shards
var writeCommands = await _shardWriteRouter.RouteWritesAsync(
processedOperations,
cancellationToken);
// Execute writes
affectedCount = await _writeExecutor.ExecuteAsync(
writeCommands,
correlationId,
cancellationToken);
// Detach processed entries (they've been handled by DTDE)
foreach (var entry in entries)
{
entry.State = EntityState.Detached;
}
stopwatch.Stop();
_logger.LogInformation(
"[{CorrelationId}] Completed processing {Count} updates in {Duration}ms",
correlationId,
affectedCount,
stopwatch.ElapsedMilliseconds);
return affectedCount;
}
private VersionOperation CreateAddOperation(
EntityEntry entry,
EntityMetadata metadata,
string correlationId)
{
var validity = metadata.Validity!;
var entity = entry.Entity;
var validFrom = (DateTime)validity.ValidFromProperty.GetValue(entity)!;
var validTo = validity.ValidToProperty is not null
? (DateTime?)validity.ValidToProperty.GetValue(entity)
: null;
_logger.LogDebug(
"[{CorrelationId}] Creating ADD operation for {EntityType}, ValidFrom={ValidFrom}",
correlationId,
metadata.ClrType.Name,
validFrom);
return new VersionOperation
{
OperationType = VersionOperationType.Create,
EntityMetadata = metadata,
Entity = entity,
PrimaryKeyValue = metadata.PrimaryKey.GetValue(entity),
NewValidFrom = validFrom,
NewValidTo = validTo ?? validity.OpenEndedValue
};
}
private VersionOperation CreateModifyOperation(
EntityEntry entry,
EntityMetadata metadata,
string correlationId)
{
var validity = metadata.Validity!;
var entity = entry.Entity;
// Get original values (the version being superseded)
var originalValidFrom = (DateTime)entry.OriginalValues[validity.ValidFromProperty.PropertyName]!;
var originalValidTo = validity.ValidToProperty is not null
? (DateTime?)entry.OriginalValues[validity.ValidToProperty.PropertyName]
: null;
// New version starts now (or use a configured timestamp provider)
var versionBumpDate = DateTime.UtcNow;
_logger.LogDebug(
"[{CorrelationId}] Creating MODIFY operation for {EntityType}, bumping at {BumpDate}",
correlationId,
metadata.ClrType.Name,
versionBumpDate);
return new VersionOperation
{
OperationType = VersionOperationType.VersionBump,
EntityMetadata = metadata,
Entity = entity,
PrimaryKeyValue = metadata.PrimaryKey.GetValue(entity),
OriginalValidFrom = originalValidFrom,
OriginalValidTo = originalValidTo ?? validity.OpenEndedValue,
NewValidFrom = versionBumpDate,
NewValidTo = originalValidTo ?? validity.OpenEndedValue,
VersionBumpDate = versionBumpDate
};
}
private VersionOperation CreateDeleteOperation(
EntityEntry entry,
EntityMetadata metadata,
string correlationId)
{
var validity = metadata.Validity!;
var entity = entry.Entity;
var originalValidFrom = (DateTime)entry.OriginalValues[validity.ValidFromProperty.PropertyName]!;
var closeDate = DateTime.UtcNow;
_logger.LogDebug(
"[{CorrelationId}] Creating DELETE (close) operation for {EntityType}, closing at {CloseDate}",
correlationId,
metadata.ClrType.Name,
closeDate);
return new VersionOperation
{
OperationType = VersionOperationType.Close,
EntityMetadata = metadata,
Entity = entity,
PrimaryKeyValue = metadata.PrimaryKey.GetValue(entity),
OriginalValidFrom = originalValidFrom,
CloseDate = closeDate
};
}
}
4. Version Operation Model¶
namespace Dtde.EntityFramework.Update;
/// <summary>
/// Represents a temporal versioning operation.
/// </summary>
public sealed class VersionOperation
{
/// <summary>
/// Gets the operation type.
/// </summary>
public VersionOperationType OperationType { get; init; }
/// <summary>
/// Gets the entity metadata.
/// </summary>
public EntityMetadata EntityMetadata { get; init; } = null!;
/// <summary>
/// Gets the entity instance.
/// </summary>
public object Entity { get; init; } = null!;
/// <summary>
/// Gets the primary key value.
/// </summary>
public object? PrimaryKeyValue { get; init; }
/// <summary>
/// Gets the original validity start (for Modify/Delete).
/// </summary>
public DateTime? OriginalValidFrom { get; init; }
/// <summary>
/// Gets the original validity end (for Modify/Delete).
/// </summary>
public DateTime? OriginalValidTo { get; init; }
/// <summary>
/// Gets the new validity start (for Create/Modify).
/// </summary>
public DateTime? NewValidFrom { get; init; }
/// <summary>
/// Gets the new validity end (for Create/Modify).
/// </summary>
public DateTime? NewValidTo { get; init; }
/// <summary>
/// Gets the version bump date (for Modify).
/// </summary>
public DateTime? VersionBumpDate { get; init; }
/// <summary>
/// Gets the close date (for Delete/soft delete).
/// </summary>
public DateTime? CloseDate { get; init; }
}
/// <summary>
/// Types of version operations.
/// </summary>
public enum VersionOperationType
{
/// <summary>
/// Create a new version (first version of entity).
/// </summary>
Create,
/// <summary>
/// Version bump (close current, create new).
/// </summary>
VersionBump,
/// <summary>
/// Close an existing version (soft delete).
/// </summary>
Close
}
5. Version Manager¶
5.1 Interface¶
namespace Dtde.EntityFramework.Update;
/// <summary>
/// Manages temporal version operations.
/// </summary>
public interface IVersionManager
{
/// <summary>
/// Processes version operations and generates write commands.
/// </summary>
/// <param name="operations">The version operations to process.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Processed operations with resolved version details.</returns>
Task<IReadOnlyList<ProcessedVersionOperation>> ProcessOperationsAsync(
IReadOnlyList<VersionOperation> operations,
CancellationToken cancellationToken = default);
}
5.2 Implementation¶
namespace Dtde.EntityFramework.Update;
/// <summary>
/// Processes temporal versioning logic.
/// </summary>
public sealed class VersionManager : IVersionManager
{
private readonly IMetadataRegistry _metadataRegistry;
private readonly IShardRegistry _shardRegistry;
private readonly ILogger<VersionManager> _logger;
public VersionManager(
IMetadataRegistry metadataRegistry,
IShardRegistry shardRegistry,
ILogger<VersionManager> logger)
{
_metadataRegistry = metadataRegistry;
_shardRegistry = shardRegistry;
_logger = logger;
}
public async Task<IReadOnlyList<ProcessedVersionOperation>> ProcessOperationsAsync(
IReadOnlyList<VersionOperation> operations,
CancellationToken cancellationToken = default)
{
var processed = new List<ProcessedVersionOperation>();
foreach (var operation in operations)
{
var result = operation.OperationType switch
{
VersionOperationType.Create => ProcessCreate(operation),
VersionOperationType.VersionBump => await ProcessVersionBumpAsync(operation, cancellationToken),
VersionOperationType.Close => ProcessClose(operation),
_ => throw new InvalidOperationException($"Unknown operation type: {operation.OperationType}")
};
processed.Add(result);
}
return processed;
}
private ProcessedVersionOperation ProcessCreate(VersionOperation operation)
{
var metadata = operation.EntityMetadata;
var validity = metadata.Validity!;
// Set validity properties on the entity
validity.ValidFromProperty.SetValue(operation.Entity, operation.NewValidFrom);
if (validity.ValidToProperty is not null)
{
validity.ValidToProperty.SetValue(operation.Entity, operation.NewValidTo);
}
return new ProcessedVersionOperation
{
SourceOperation = operation,
WriteCommands = new[]
{
new WriteCommand
{
CommandType = WriteCommandType.Insert,
EntityMetadata = metadata,
Entity = operation.Entity,
PrimaryKeyValue = operation.PrimaryKeyValue
}
}
};
}
private async Task<ProcessedVersionOperation> ProcessVersionBumpAsync(
VersionOperation operation,
CancellationToken cancellationToken)
{
var metadata = operation.EntityMetadata;
var validity = metadata.Validity!;
var bumpDate = operation.VersionBumpDate!.Value;
// Create commands for both invalidation and new version
var commands = new List<WriteCommand>();
// Command 1: Invalidate old version
// UPDATE SET {ValidToProperty} = @bumpDate WHERE PK = @pk AND {ValidFromProperty} = @originalValidFrom
commands.Add(new WriteCommand
{
CommandType = WriteCommandType.Invalidate,
EntityMetadata = metadata,
PrimaryKeyValue = operation.PrimaryKeyValue,
InvalidationDate = bumpDate,
OriginalValidFrom = operation.OriginalValidFrom
});
// Command 2: Insert new version
// Clone entity with new validity period
var newEntity = CloneEntity(operation.Entity, metadata);
validity.ValidFromProperty.SetValue(newEntity, bumpDate);
if (validity.ValidToProperty is not null)
{
validity.ValidToProperty.SetValue(newEntity, operation.NewValidTo);
}
commands.Add(new WriteCommand
{
CommandType = WriteCommandType.Insert,
EntityMetadata = metadata,
Entity = newEntity,
PrimaryKeyValue = operation.PrimaryKeyValue
});
return new ProcessedVersionOperation
{
SourceOperation = operation,
WriteCommands = commands
};
}
private ProcessedVersionOperation ProcessClose(VersionOperation operation)
{
var metadata = operation.EntityMetadata;
var closeDate = operation.CloseDate!.Value;
return new ProcessedVersionOperation
{
SourceOperation = operation,
WriteCommands = new[]
{
new WriteCommand
{
CommandType = WriteCommandType.Invalidate,
EntityMetadata = metadata,
PrimaryKeyValue = operation.PrimaryKeyValue,
InvalidationDate = closeDate,
OriginalValidFrom = operation.OriginalValidFrom
}
}
};
}
private object CloneEntity(object source, EntityMetadata metadata)
{
var clone = Activator.CreateInstance(metadata.ClrType)!;
// Copy all property values
foreach (var property in metadata.ClrType.GetProperties()
.Where(p => p.CanRead && p.CanWrite))
{
var value = property.GetValue(source);
property.SetValue(clone, value);
}
return clone;
}
}
5.3 Processed Operation and Write Command¶
namespace Dtde.EntityFramework.Update;
/// <summary>
/// Result of version operation processing.
/// </summary>
public sealed class ProcessedVersionOperation
{
/// <summary>
/// Gets the source operation.
/// </summary>
public VersionOperation SourceOperation { get; init; } = null!;
/// <summary>
/// Gets the write commands to execute.
/// </summary>
public IReadOnlyList<WriteCommand> WriteCommands { get; init; } = Array.Empty<WriteCommand>();
}
/// <summary>
/// A database write command for a shard.
/// </summary>
public sealed class WriteCommand
{
/// <summary>
/// Gets the command type.
/// </summary>
public WriteCommandType CommandType { get; init; }
/// <summary>
/// Gets the entity metadata.
/// </summary>
public EntityMetadata EntityMetadata { get; init; } = null!;
/// <summary>
/// Gets the entity instance (for Insert).
/// </summary>
public object? Entity { get; init; }
/// <summary>
/// Gets the primary key value.
/// </summary>
public object? PrimaryKeyValue { get; init; }
/// <summary>
/// Gets the invalidation date (for Invalidate).
/// </summary>
public DateTime? InvalidationDate { get; init; }
/// <summary>
/// Gets the original validity start (for locating version to invalidate).
/// </summary>
public DateTime? OriginalValidFrom { get; init; }
/// <summary>
/// Gets the target shard (resolved by router).
/// </summary>
public ShardMetadata? TargetShard { get; set; }
}
/// <summary>
/// Types of write commands.
/// </summary>
public enum WriteCommandType
{
/// <summary>
/// Insert a new entity version.
/// </summary>
Insert,
/// <summary>
/// Invalidate (close) an existing version.
/// </summary>
Invalidate
}
6. Shard Write Router¶
6.1 Interface¶
namespace Dtde.EntityFramework.Update;
/// <summary>
/// Routes write commands to appropriate shards.
/// </summary>
public interface IShardWriteRouter
{
/// <summary>
/// Routes write commands to target shards.
/// </summary>
/// <param name="operations">The processed operations.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Write commands with resolved target shards.</returns>
Task<IReadOnlyList<WriteCommand>> RouteWritesAsync(
IReadOnlyList<ProcessedVersionOperation> operations,
CancellationToken cancellationToken = default);
}
6.2 Implementation¶
namespace Dtde.EntityFramework.Update;
/// <summary>
/// Routes write commands based on sharding strategy.
/// </summary>
public sealed class ShardWriteRouter : IShardWriteRouter
{
private readonly IMetadataRegistry _metadataRegistry;
private readonly ILogger<ShardWriteRouter> _logger;
public ShardWriteRouter(
IMetadataRegistry metadataRegistry,
ILogger<ShardWriteRouter> logger)
{
_metadataRegistry = metadataRegistry;
_logger = logger;
}
public Task<IReadOnlyList<WriteCommand>> RouteWritesAsync(
IReadOnlyList<ProcessedVersionOperation> operations,
CancellationToken cancellationToken = default)
{
var routedCommands = new List<WriteCommand>();
foreach (var operation in operations)
{
foreach (var command in operation.WriteCommands)
{
var targetShard = ResolveTargetShard(command, operation.SourceOperation);
command.TargetShard = targetShard;
routedCommands.Add(command);
_logger.LogDebug(
"Routed {CommandType} command for {EntityType} to shard {ShardId}",
command.CommandType,
command.EntityMetadata.ClrType.Name,
targetShard.ShardId);
}
}
return Task.FromResult<IReadOnlyList<WriteCommand>>(routedCommands);
}
private ShardMetadata ResolveTargetShard(WriteCommand command, VersionOperation sourceOperation)
{
var metadata = command.EntityMetadata;
if (!metadata.IsSharded)
{
// Non-sharded entity - use default shard
return _metadataRegistry.ShardRegistry.GetAllShards()
.First(s => !s.IsReadOnly);
}
var strategy = metadata.Sharding!.Strategy;
return command.CommandType switch
{
WriteCommandType.Insert => ResolveInsertShard(command, strategy, metadata),
WriteCommandType.Invalidate => ResolveInvalidateShard(command, sourceOperation, strategy, metadata),
_ => throw new InvalidOperationException($"Unknown command type: {command.CommandType}")
};
}
private ShardMetadata ResolveInsertShard(
WriteCommand command,
IShardingStrategy strategy,
EntityMetadata metadata)
{
return strategy.ResolveWriteShard(
metadata,
_metadataRegistry.ShardRegistry,
command.Entity!);
}
private ShardMetadata ResolveInvalidateShard(
WriteCommand command,
VersionOperation sourceOperation,
IShardingStrategy strategy,
EntityMetadata metadata)
{
// For invalidation, we need to find the shard containing the original version
// Use the original validity start date for date-based sharding
if (strategy.StrategyType == ShardingStrategyType.DateRange
&& sourceOperation.OriginalValidFrom.HasValue)
{
// Find shard containing the original validity start
var predicates = new Dictionary<string, object?>
{
[metadata.Validity!.ValidFromProperty.PropertyName] = sourceOperation.OriginalValidFrom
};
var shards = strategy.ResolveShards(
metadata,
_metadataRegistry.ShardRegistry,
predicates,
sourceOperation.OriginalValidFrom);
if (shards.Count == 1)
{
return shards[0];
}
}
// Fallback: query all shards to find the version
// This is less efficient but handles edge cases
return _metadataRegistry.ShardRegistry.GetAllShards()
.First(s => !s.IsReadOnly);
}
}
7. Distributed Write Executor¶
7.1 Interface¶
namespace Dtde.EntityFramework.Update;
/// <summary>
/// Executes write commands across distributed shards.
/// </summary>
public interface IDistributedWriteExecutor
{
/// <summary>
/// Executes write commands.
/// </summary>
/// <param name="commands">The commands to execute.</param>
/// <param name="correlationId">Correlation ID for tracing.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The number of affected entities.</returns>
Task<int> ExecuteAsync(
IReadOnlyList<WriteCommand> commands,
string correlationId,
CancellationToken cancellationToken = default);
}
7.2 Implementation¶
namespace Dtde.EntityFramework.Update;
/// <summary>
/// Executes distributed writes with best-effort consistency.
/// </summary>
public sealed class DistributedWriteExecutor : IDistributedWriteExecutor
{
private readonly DtdeOptions _options;
private readonly IDtdeDiagnostics _diagnostics;
private readonly ILogger<DistributedWriteExecutor> _logger;
public DistributedWriteExecutor(
DtdeOptions options,
IDtdeDiagnostics diagnostics,
ILogger<DistributedWriteExecutor> logger)
{
_options = options;
_diagnostics = diagnostics;
_logger = logger;
}
public async Task<int> ExecuteAsync(
IReadOnlyList<WriteCommand> commands,
string correlationId,
CancellationToken cancellationToken = default)
{
// Group commands by shard
var commandsByShard = commands
.GroupBy(c => c.TargetShard!.ShardId)
.ToDictionary(g => g.Key, g => g.ToList());
_logger.LogInformation(
"[{CorrelationId}] Executing {CommandCount} commands across {ShardCount} shards",
correlationId,
commands.Count,
commandsByShard.Count);
var affectedCount = 0;
var failedShards = new List<(string ShardId, Exception Exception)>();
// Execute commands shard by shard (sequential for consistency)
// Or parallel with compensation in advanced scenarios
foreach (var (shardId, shardCommands) in commandsByShard)
{
try
{
var shard = shardCommands[0].TargetShard!;
var affected = await ExecuteShardCommandsAsync(
shard,
shardCommands,
correlationId,
cancellationToken);
affectedCount += affected;
// Emit events for diagnostics
EmitDiagnosticEvents(shardCommands, shardId, correlationId);
}
catch (Exception ex)
{
_logger.LogError(
ex,
"[{CorrelationId}] Failed to execute commands on shard {ShardId}",
correlationId,
shardId);
failedShards.Add((shardId, ex));
}
}
// Handle failures
if (failedShards.Count > 0)
{
if (_options.FailOnPartialWrite)
{
// TODO: Implement compensation/rollback
throw new ShardOperationException(
$"Write failed on {failedShards.Count} shards",
failedShards[0].ShardId,
failedShards[0].Exception);
}
_logger.LogWarning(
"[{CorrelationId}] Partial write: {SuccessCount} succeeded, {FailCount} failed",
correlationId,
commandsByShard.Count - failedShards.Count,
failedShards.Count);
}
return affectedCount;
}
private async Task<int> ExecuteShardCommandsAsync(
ShardMetadata shard,
IReadOnlyList<WriteCommand> commands,
string correlationId,
CancellationToken cancellationToken)
{
await using var connection = new SqlConnection(shard.ConnectionString);
await connection.OpenAsync(cancellationToken);
await using var transaction = await connection.BeginTransactionAsync(cancellationToken);
try
{
var affected = 0;
foreach (var command in commands)
{
affected += command.CommandType switch
{
WriteCommandType.Insert => await ExecuteInsertAsync(connection, transaction, command, cancellationToken),
WriteCommandType.Invalidate => await ExecuteInvalidateAsync(connection, transaction, command, cancellationToken),
_ => 0
};
}
await transaction.CommitAsync(cancellationToken);
_logger.LogDebug(
"[{CorrelationId}] Committed {Count} commands to shard {ShardId}",
correlationId,
commands.Count,
shard.ShardId);
return affected;
}
catch
{
await transaction.RollbackAsync(cancellationToken);
throw;
}
}
private async Task<int> ExecuteInsertAsync(
SqlConnection connection,
SqlTransaction transaction,
WriteCommand command,
CancellationToken cancellationToken)
{
var metadata = command.EntityMetadata;
var entity = command.Entity!;
// Build INSERT statement dynamically based on entity metadata
var columns = new List<string>();
var parameters = new DynamicParameters();
foreach (var property in metadata.ClrType.GetProperties()
.Where(p => p.CanRead && p.GetCustomAttribute<NotMappedAttribute>() is null))
{
var columnName = property.Name; // Could use column mapping from metadata
var value = property.GetValue(entity);
columns.Add(columnName);
parameters.Add($"@{columnName}", value);
}
var sql = $"""
INSERT INTO [{metadata.SchemaName}].[{metadata.TableName}]
({string.Join(", ", columns.Select(c => $"[{c}]"))})
VALUES ({string.Join(", ", columns.Select(c => $"@{c}"))})
""";
return await connection.ExecuteAsync(
sql,
parameters,
transaction);
}
private async Task<int> ExecuteInvalidateAsync(
SqlConnection connection,
SqlTransaction transaction,
WriteCommand command,
CancellationToken cancellationToken)
{
var metadata = command.EntityMetadata;
var validity = metadata.Validity!;
// Build UPDATE statement to close validity period
var sql = $"""
UPDATE [{metadata.SchemaName}].[{metadata.TableName}]
SET [{validity.ValidToProperty!.PropertyName}] = @InvalidationDate
WHERE [{metadata.PrimaryKey.PropertyName}] = @PrimaryKey
AND [{validity.ValidFromProperty.PropertyName}] = @OriginalValidFrom
AND [{validity.ValidToProperty.PropertyName}] > @InvalidationDate
""";
var parameters = new DynamicParameters();
parameters.Add("@InvalidationDate", command.InvalidationDate);
parameters.Add("@PrimaryKey", command.PrimaryKeyValue);
parameters.Add("@OriginalValidFrom", command.OriginalValidFrom);
return await connection.ExecuteAsync(
sql,
parameters,
transaction);
}
private void EmitDiagnosticEvents(
IReadOnlyList<WriteCommand> commands,
string shardId,
string correlationId)
{
foreach (var command in commands)
{
if (command.CommandType == WriteCommandType.Insert)
{
var validity = command.EntityMetadata.Validity!;
var validFrom = (DateTime)validity.ValidFromProperty.GetValue(command.Entity!)!;
var validTo = validity.ValidToProperty is not null
? (DateTime?)validity.ValidToProperty.GetValue(command.Entity!)
: null;
_diagnostics.EmitVersionCreated(new VersionCreatedEvent(
DateTime.UtcNow,
correlationId,
command.EntityMetadata.ClrType,
command.PrimaryKeyValue!,
validFrom,
validTo,
shardId));
}
else if (command.CommandType == WriteCommandType.Invalidate)
{
_diagnostics.EmitVersionInvalidated(new VersionInvalidatedEvent(
DateTime.UtcNow,
correlationId,
command.EntityMetadata.ClrType,
command.PrimaryKeyValue!,
DateTime.MaxValue, // Original was open-ended
command.InvalidationDate!.Value,
shardId));
}
}
}
}
8. Consistency Strategies¶
8.1 Best-Effort Consistency (Default)¶
/// <summary>
/// Best-effort consistency strategy.
/// Each shard is updated independently. Partial failures are logged.
/// </summary>
public sealed class BestEffortConsistencyStrategy : IConsistencyStrategy
{
public async Task<WriteResult> ExecuteAsync(
IReadOnlyList<ShardWriteGroup> groups,
CancellationToken cancellationToken)
{
var results = new List<ShardWriteResult>();
foreach (var group in groups)
{
try
{
await ExecuteShardWritesAsync(group, cancellationToken);
results.Add(new ShardWriteResult(group.ShardId, success: true));
}
catch (Exception ex)
{
results.Add(new ShardWriteResult(group.ShardId, success: false, exception: ex));
}
}
return new WriteResult(results);
}
}
8.2 Outbox Pattern (Advanced)¶
/// <summary>
/// Outbox-based consistency strategy.
/// Records operations in an outbox table for reliable delivery.
/// </summary>
public sealed class OutboxConsistencyStrategy : IConsistencyStrategy
{
public async Task<WriteResult> ExecuteAsync(
IReadOnlyList<ShardWriteGroup> groups,
CancellationToken cancellationToken)
{
// 1. Write all operations to outbox in local transaction
await WriteToOutboxAsync(groups, cancellationToken);
// 2. Process outbox (can be async/background)
var results = await ProcessOutboxAsync(groups, cancellationToken);
return new WriteResult(results);
}
}
9. Conflict Detection¶
namespace Dtde.EntityFramework.Update;
/// <summary>
/// Detects and handles version conflicts.
/// </summary>
public interface IConflictDetector
{
/// <summary>
/// Checks if the operation would cause a conflict.
/// </summary>
/// <param name="operation">The operation to check.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Conflict information if detected.</returns>
Task<VersionConflict?> DetectConflictAsync(
VersionOperation operation,
CancellationToken cancellationToken = default);
}
/// <summary>
/// Information about a version conflict.
/// </summary>
public sealed class VersionConflict
{
/// <summary>
/// Gets the conflict type.
/// </summary>
public ConflictType Type { get; init; }
/// <summary>
/// Gets the existing version that conflicts.
/// </summary>
public object? ExistingEntity { get; init; }
/// <summary>
/// Gets a description of the conflict.
/// </summary>
public string Description { get; init; } = string.Empty;
}
/// <summary>
/// Types of version conflicts.
/// </summary>
public enum ConflictType
{
/// <summary>
/// Validity periods overlap.
/// </summary>
OverlappingValidity,
/// <summary>
/// Version was modified by another operation.
/// </summary>
ConcurrentModification,
/// <summary>
/// Version was already closed.
/// </summary>
AlreadyClosed
}
10. Test Specifications¶
Following the MethodName_Condition_ExpectedResult pattern:
10.1 Update Processor Tests¶
// ProcessUpdatesAsync_AddedEntity_CreatesInsertCommand
// ProcessUpdatesAsync_ModifiedEntity_CreatesVersionBumpCommands
// ProcessUpdatesAsync_DeletedEntity_CreatesCloseCommand
// ProcessUpdatesAsync_NonTemporalEntity_SkipsProcessing
// ProcessUpdatesAsync_MultipleEntities_ProcessesAll
10.2 Version Manager Tests¶
// ProcessOperationsAsync_Create_SetsValidityProperties
// ProcessOperationsAsync_VersionBump_GeneratesTwoCommands
// ProcessOperationsAsync_Close_GeneratesInvalidateCommand
// ProcessOperationsAsync_VersionBump_ClonesEntity
10.3 Shard Write Router Tests¶
// RouteWritesAsync_InsertCommand_ResolvesFromStrategy
// RouteWritesAsync_InvalidateCommand_UsesOriginalValidFrom
// RouteWritesAsync_NonShardedEntity_UsesDefaultShard
// RouteWritesAsync_ReadOnlyShard_ExcludesFromInsert
10.4 Distributed Write Executor Tests¶
// ExecuteAsync_SingleShard_ExecutesInTransaction
// ExecuteAsync_MultipleShards_ExecutesSequentially
// ExecuteAsync_ShardFailure_LogsAndContinues
// ExecuteAsync_AllFailed_ThrowsException
// ExecuteAsync_Invalidate_UpdatesCorrectRow
Next Steps¶
Continue to 06 - Configuration & API for developer-facing API and configuration details.