Skip to content

DTDE Development Plan - Query Engine

← Back to EF Core Integration | Next: Update Engine →


1. Query Engine Overview

The Query Engine is responsible for transforming EF Core LINQ queries into distributed, temporal-aware operations. It intercepts the query pipeline, rewrites expressions to include temporal filters, resolves target shards, executes queries in parallel, and merges results.

1.1 Query Flow

┌─────────────────────────────────────────────────────────────────────────────┐
│                           LINQ Query                                         │
│  db.Products.Where(p => p.Category == "A").ValidAt(date).Take(10)           │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│  1. Expression Rewriter                                                      │
│     • Extract temporal context from ValidAt()                                │
│     • Inject validity predicates (configurable property names)               │
│     • Capture query definition for shard planning                            │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│  2. Shard Query Planner                                                      │
│     • Analyze predicates for shard key values                                │
│     • Resolve target shards using sharding strategy                          │
│     • Generate per-shard query definitions                                   │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│  3. Query Executor                                                           │
│     • Create per-shard DbContext instances                                   │
│     • Execute queries in parallel with bounded concurrency                   │
│     • Collect results with cancellation support                              │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│  4. Result Merger                                                            │
│     • Combine results from all shards                                        │
│     • Apply global ordering                                                  │
│     • Apply global pagination (Skip/Take)                                    │
│     • Return unified result set                                              │
└─────────────────────────────────────────────────────────────────────────────┘

2. Expression Rewriter

2.1 Rewriter Interface

namespace Dtde.EntityFramework.Query;

/// <summary>
/// Rewrites LINQ expression trees to inject temporal predicates
/// and extract query information for shard planning.
/// </summary>
public interface IExpressionRewriter
{
    /// <summary>
    /// Rewrites the expression tree to include temporal filters.
    /// </summary>
    /// <param name="expression">The original expression.</param>
    /// <param name="temporalContext">The temporal context for filtering.</param>
    /// <returns>The rewritten expression and extracted query definition.</returns>
    ExpressionRewriteResult Rewrite(Expression expression, ITemporalContext temporalContext);
}

/// <summary>
/// Result of expression rewriting.
/// </summary>
public sealed class ExpressionRewriteResult
{
    /// <summary>
    /// Gets the rewritten expression with temporal predicates.
    /// </summary>
    public Expression RewrittenExpression { get; init; } = null!;

    /// <summary>
    /// Gets the extracted query definition for shard planning.
    /// </summary>
    public DtdeQueryDefinition QueryDefinition { get; init; } = null!;

    /// <summary>
    /// Gets whether temporal filters were applied.
    /// </summary>
    public bool TemporalFiltersApplied { get; init; }

    /// <summary>
    /// Gets the entities involved in the query.
    /// </summary>
    public IReadOnlyList<Type> InvolvedEntityTypes { get; init; } = Array.Empty<Type>();
}

2.2 Expression Rewriter Implementation

namespace Dtde.EntityFramework.Query;

/// <summary>
/// Rewrites LINQ expressions to inject temporal validity predicates.
/// Uses configurable property names from entity metadata.
/// </summary>
public sealed class DtdeExpressionRewriter : ExpressionVisitor, IExpressionRewriter
{
    private readonly IMetadataRegistry _metadataRegistry;
    private readonly ILogger<DtdeExpressionRewriter> _logger;

    private ITemporalContext? _currentTemporalContext;
    private DateTime? _querySpecificTemporalPoint;
    private bool _includeHistory;
    private readonly List<Type> _involvedEntityTypes = new();
    private readonly Dictionary<string, object?> _extractedPredicates = new();

    public DtdeExpressionRewriter(
        IMetadataRegistry metadataRegistry,
        ILogger<DtdeExpressionRewriter> logger)
    {
        _metadataRegistry = metadataRegistry;
        _logger = logger;
    }

    public ExpressionRewriteResult Rewrite(Expression expression, ITemporalContext temporalContext)
    {
        _currentTemporalContext = temporalContext;
        _querySpecificTemporalPoint = null;
        _includeHistory = temporalContext.IncludeHistory;
        _involvedEntityTypes.Clear();
        _extractedPredicates.Clear();

        // First pass: detect DTDE extension method calls
        var extensionVisitor = new DtdeExtensionMethodVisitor();
        extensionVisitor.Visit(expression);

        _querySpecificTemporalPoint = extensionVisitor.TemporalPoint;
        _includeHistory = extensionVisitor.IncludeHistory || _includeHistory;

        // Second pass: rewrite with temporal predicates
        var rewritten = Visit(expression);

        // Build query definition
        var queryDefinition = new DtdeQueryDefinition
        {
            OriginalExpression = expression,
            EffectiveTemporalPoint = _querySpecificTemporalPoint ?? temporalContext.CurrentPoint,
            IncludeHistory = _includeHistory,
            InvolvedEntityTypes = _involvedEntityTypes.ToList(),
            ExtractedPredicates = new Dictionary<string, object?>(_extractedPredicates),
            ShardHints = extensionVisitor.ShardHints
        };

        return new ExpressionRewriteResult
        {
            RewrittenExpression = rewritten,
            QueryDefinition = queryDefinition,
            TemporalFiltersApplied = !_includeHistory && queryDefinition.EffectiveTemporalPoint.HasValue,
            InvolvedEntityTypes = _involvedEntityTypes.ToList()
        };
    }

    protected override Expression VisitMethodCall(MethodCallExpression node)
    {
        // Handle DTDE extension methods
        if (IsDtdeExtensionMethod(node.Method))
        {
            return HandleDtdeExtensionMethod(node);
        }

        // Handle queryable source (DbSet<T>)
        if (IsQueryableSource(node))
        {
            return InjectTemporalFilter(node);
        }

        // Extract predicates for shard planning
        if (IsWhereMethod(node))
        {
            ExtractPredicates(node);
        }

        return base.VisitMethodCall(node);
    }

    protected override Expression VisitConstant(ConstantExpression node)
    {
        // Track entity types from DbSet access
        if (node.Type.IsGenericType 
            && node.Type.GetGenericTypeDefinition() == typeof(DbSet<>))
        {
            var entityType = node.Type.GetGenericArguments()[0];
            _involvedEntityTypes.Add(entityType);

            var metadata = _metadataRegistry.GetEntityMetadata(entityType);

            if (metadata?.IsTemporal == true)
            {
                _logger.LogDebug(
                    "Found temporal entity {EntityType} in query",
                    entityType.Name);
            }
        }

        return base.VisitConstant(node);
    }

    private Expression HandleDtdeExtensionMethod(MethodCallExpression node)
    {
        var methodName = node.Method.Name;

        return methodName switch
        {
            "ValidAt" => HandleValidAt(node),
            "WithVersions" => HandleWithVersions(node),
            "ValidBetween" => HandleValidBetween(node),
            "ShardHint" => HandleShardHint(node),
            _ => base.VisitMethodCall(node)
        };
    }

    private Expression HandleValidAt(MethodCallExpression node)
    {
        // Extract the date argument
        if (node.Arguments.Count >= 2 
            && node.Arguments[1] is ConstantExpression dateExpr)
        {
            _querySpecificTemporalPoint = (DateTime)dateExpr.Value!;
            _includeHistory = false;

            _logger.LogDebug(
                "ValidAt({Date}) detected in query",
                _querySpecificTemporalPoint);
        }

        // Continue visiting the source without the ValidAt call
        // The temporal filter will be injected at the source level
        return Visit(node.Arguments[0]);
    }

    private Expression HandleWithVersions(MethodCallExpression node)
    {
        _includeHistory = true;
        _querySpecificTemporalPoint = null;

        _logger.LogDebug("WithVersions() detected - including all versions");

        return Visit(node.Arguments[0]);
    }

    private Expression HandleValidBetween(MethodCallExpression node)
    {
        // Implementation for range queries
        throw new NotImplementedException();
    }

    private Expression HandleShardHint(MethodCallExpression node)
    {
        // Shard hints are captured but don't affect expression
        return Visit(node.Arguments[0]);
    }

    private Expression InjectTemporalFilter(MethodCallExpression node)
    {
        var entityType = node.Type.GetGenericArguments()[0];
        var metadata = _metadataRegistry.GetEntityMetadata(entityType);

        if (metadata?.IsTemporal != true || _includeHistory)
        {
            return base.VisitMethodCall(node);
        }

        var temporalPoint = _querySpecificTemporalPoint 
            ?? _currentTemporalContext?.CurrentPoint;

        if (temporalPoint is null)
        {
            return base.VisitMethodCall(node);
        }

        // Build the temporal predicate using configured property names
        var predicate = BuildTemporalPredicate(entityType, metadata.Validity!, temporalPoint.Value);

        // Wrap the source with a Where clause
        var whereMethod = typeof(Queryable)
            .GetMethods()
            .First(m => m.Name == "Where" && m.GetParameters().Length == 2)
            .MakeGenericMethod(entityType);

        var source = base.VisitMethodCall(node);

        return Expression.Call(null, whereMethod, source, predicate);
    }

    private LambdaExpression BuildTemporalPredicate(
        Type entityType,
        ValidityConfiguration validity,
        DateTime temporalPoint)
    {
        var parameter = Expression.Parameter(entityType, "e");

        // e.{ValidFromProperty} <= temporalPoint
        var validFromProperty = Expression.Property(
            parameter, 
            validity.ValidFromProperty.PropertyName);
        var dateConstant = Expression.Constant(temporalPoint);
        var validFromCondition = Expression.LessThanOrEqual(validFromProperty, dateConstant);

        Expression predicate = validFromCondition;

        if (validity.ValidToProperty is not null)
        {
            // e.{ValidToProperty} > temporalPoint
            var validToProperty = Expression.Property(
                parameter, 
                validity.ValidToProperty.PropertyName);
            var validToCondition = Expression.GreaterThan(validToProperty, dateConstant);

            predicate = Expression.AndAlso(validFromCondition, validToCondition);
        }

        _logger.LogDebug(
            "Built temporal predicate for {EntityType}: {ValidFrom} <= {Date} AND {ValidTo} > {Date}",
            entityType.Name,
            validity.ValidFromProperty.PropertyName,
            temporalPoint,
            validity.ValidToProperty?.PropertyName ?? "(open-ended)");

        return Expression.Lambda(predicate, parameter);
    }

    private void ExtractPredicates(MethodCallExpression node)
    {
        // Extract equality predicates for shard planning
        if (node.Arguments.Count >= 2 
            && node.Arguments[1] is UnaryExpression { Operand: LambdaExpression lambda })
        {
            var predicateExtractor = new PredicateExtractor(_metadataRegistry);
            var predicates = predicateExtractor.Extract(lambda.Body);

            foreach (var (key, value) in predicates)
            {
                _extractedPredicates[key] = value;
            }
        }
    }

    private static bool IsDtdeExtensionMethod(MethodInfo method)
    {
        return method.DeclaringType == typeof(QueryableExtensions);
    }

    private static bool IsQueryableSource(MethodCallExpression node)
    {
        // Check if this is the root queryable source
        return node.Method.DeclaringType == typeof(Queryable) 
            && node.Arguments.Count > 0
            && node.Arguments[0].Type.IsGenericType
            && typeof(IQueryable<>).IsAssignableFrom(
                node.Arguments[0].Type.GetGenericTypeDefinition());
    }

    private static bool IsWhereMethod(MethodCallExpression node)
    {
        return node.Method.Name == "Where" 
            && node.Method.DeclaringType == typeof(Queryable);
    }
}

2.3 Query Definition

namespace Dtde.EntityFramework.Query;

/// <summary>
/// Captures all information needed to plan and execute a distributed query.
/// </summary>
public sealed class DtdeQueryDefinition
{
    /// <summary>
    /// Gets the original LINQ expression before rewriting.
    /// </summary>
    public Expression OriginalExpression { get; init; } = null!;

    /// <summary>
    /// Gets the effective temporal point for filtering.
    /// </summary>
    public DateTime? EffectiveTemporalPoint { get; init; }

    /// <summary>
    /// Gets whether historical versions should be included.
    /// </summary>
    public bool IncludeHistory { get; init; }

    /// <summary>
    /// Gets the entity types involved in the query.
    /// </summary>
    public IReadOnlyList<Type> InvolvedEntityTypes { get; init; } = Array.Empty<Type>();

    /// <summary>
    /// Gets predicates extracted from Where clauses for shard planning.
    /// Key is "EntityType.PropertyName", value is the constant value.
    /// </summary>
    public IReadOnlyDictionary<string, object?> ExtractedPredicates { get; init; } 
        = new Dictionary<string, object?>();

    /// <summary>
    /// Gets explicit shard hints from the query.
    /// </summary>
    public IReadOnlyList<string>? ShardHints { get; init; }

    /// <summary>
    /// Gets ordering specifications.
    /// </summary>
    public IReadOnlyList<OrderingSpec> Orderings { get; init; } = Array.Empty<OrderingSpec>();

    /// <summary>
    /// Gets the Skip value for pagination.
    /// </summary>
    public int? Skip { get; init; }

    /// <summary>
    /// Gets the Take value for pagination.
    /// </summary>
    public int? Take { get; init; }
}

/// <summary>
/// Specification for query ordering.
/// </summary>
public sealed class OrderingSpec
{
    public string PropertyName { get; init; } = null!;
    public bool Descending { get; init; }
}

3. Shard Query Planner

3.1 Planner Interface

namespace Dtde.EntityFramework.Query;

/// <summary>
/// Plans query execution across shards.
/// </summary>
public interface IShardQueryPlanner
{
    /// <summary>
    /// Creates an execution plan for the query.
    /// </summary>
    /// <param name="queryDefinition">The query definition.</param>
    /// <returns>The execution plan with per-shard queries.</returns>
    ShardQueryPlan CreatePlan(DtdeQueryDefinition queryDefinition);
}

/// <summary>
/// Execution plan for a distributed query.
/// </summary>
public sealed class ShardQueryPlan
{
    /// <summary>
    /// Gets the per-shard query specifications.
    /// </summary>
    public IReadOnlyList<ShardQuery> ShardQueries { get; init; } = Array.Empty<ShardQuery>();

    /// <summary>
    /// Gets whether global ordering is required after merge.
    /// </summary>
    public bool RequiresGlobalOrdering { get; init; }

    /// <summary>
    /// Gets whether global pagination is required after merge.
    /// </summary>
    public bool RequiresGlobalPagination { get; init; }

    /// <summary>
    /// Gets the global ordering specifications.
    /// </summary>
    public IReadOnlyList<OrderingSpec> GlobalOrderings { get; init; } = Array.Empty<OrderingSpec>();

    /// <summary>
    /// Gets the global Skip value.
    /// </summary>
    public int? GlobalSkip { get; init; }

    /// <summary>
    /// Gets the global Take value.
    /// </summary>
    public int? GlobalTake { get; init; }

    /// <summary>
    /// Gets the total number of shards to query.
    /// </summary>
    public int TotalShards => ShardQueries.Count;
}

/// <summary>
/// Query specification for a single shard.
/// </summary>
public sealed class ShardQuery
{
    /// <summary>
    /// Gets the target shard metadata.
    /// </summary>
    public ShardMetadata Shard { get; init; } = null!;

    /// <summary>
    /// Gets the expression to execute against this shard.
    /// </summary>
    public Expression Expression { get; init; } = null!;

    /// <summary>
    /// Gets the entity type being queried.
    /// </summary>
    public Type EntityType { get; init; } = null!;

    /// <summary>
    /// Gets the per-shard Take limit (for optimization).
    /// </summary>
    public int? PerShardTake { get; init; }
}

3.2 Planner Implementation

namespace Dtde.EntityFramework.Query;

/// <summary>
/// Plans distributed query execution across shards.
/// </summary>
public sealed class ShardQueryPlanner : IShardQueryPlanner
{
    private readonly IMetadataRegistry _metadataRegistry;
    private readonly ILogger<ShardQueryPlanner> _logger;

    public ShardQueryPlanner(
        IMetadataRegistry metadataRegistry,
        ILogger<ShardQueryPlanner> logger)
    {
        _metadataRegistry = metadataRegistry;
        _logger = logger;
    }

    public ShardQueryPlan CreatePlan(DtdeQueryDefinition queryDefinition)
    {
        var stopwatch = Stopwatch.StartNew();

        // Resolve target shards for each involved entity type
        var allTargetShards = new HashSet<ShardMetadata>();

        foreach (var entityType in queryDefinition.InvolvedEntityTypes)
        {
            var entityMetadata = _metadataRegistry.GetEntityMetadata(entityType);

            if (entityMetadata?.IsSharded != true)
            {
                // Non-sharded entity - use default shard or all shards
                var defaultShards = _metadataRegistry.ShardRegistry.GetAllShards();

                foreach (var shard in defaultShards)
                {
                    allTargetShards.Add(shard);
                }

                continue;
            }

            // Use sharding strategy to resolve target shards
            var strategy = entityMetadata.Sharding!.Strategy;
            var predicates = ExtractEntityPredicates(queryDefinition, entityType);

            var resolvedShards = strategy.ResolveShards(
                entityMetadata,
                _metadataRegistry.ShardRegistry,
                predicates,
                queryDefinition.EffectiveTemporalPoint);

            foreach (var shard in resolvedShards)
            {
                allTargetShards.Add(shard);
            }
        }

        // Apply shard hints if present
        if (queryDefinition.ShardHints is { Count: > 0 })
        {
            allTargetShards = allTargetShards
                .Where(s => queryDefinition.ShardHints.Contains(s.ShardId))
                .ToHashSet();

            _logger.LogDebug(
                "Applied shard hints, reduced to {Count} shards",
                allTargetShards.Count);
        }

        // Build per-shard queries
        var shardQueries = allTargetShards
            .OrderBy(s => s.Priority)
            .ThenBy(s => s.Tier)
            .Select(shard => BuildShardQuery(shard, queryDefinition))
            .ToList();

        // Determine if global operations are needed
        var requiresGlobalOrdering = queryDefinition.Orderings.Count > 0 && shardQueries.Count > 1;
        var requiresGlobalPagination = (queryDefinition.Skip.HasValue || queryDefinition.Take.HasValue) 
            && shardQueries.Count > 1;

        stopwatch.Stop();

        _logger.LogInformation(
            "Query plan created: {ShardCount} shards, GlobalOrder={Order}, GlobalPage={Page}, Duration={Duration}ms",
            shardQueries.Count,
            requiresGlobalOrdering,
            requiresGlobalPagination,
            stopwatch.ElapsedMilliseconds);

        return new ShardQueryPlan
        {
            ShardQueries = shardQueries,
            RequiresGlobalOrdering = requiresGlobalOrdering,
            RequiresGlobalPagination = requiresGlobalPagination,
            GlobalOrderings = queryDefinition.Orderings,
            GlobalSkip = requiresGlobalPagination ? queryDefinition.Skip : null,
            GlobalTake = requiresGlobalPagination ? queryDefinition.Take : null
        };
    }

    private ShardQuery BuildShardQuery(
        ShardMetadata shard,
        DtdeQueryDefinition queryDefinition)
    {
        // Calculate per-shard take for optimization
        // If we need Skip(10).Take(5) globally, each shard should return up to 15 rows
        int? perShardTake = null;

        if (queryDefinition.Skip.HasValue || queryDefinition.Take.HasValue)
        {
            perShardTake = (queryDefinition.Skip ?? 0) + (queryDefinition.Take ?? 0);
        }

        return new ShardQuery
        {
            Shard = shard,
            Expression = BuildShardExpression(queryDefinition, perShardTake),
            EntityType = queryDefinition.InvolvedEntityTypes.FirstOrDefault() ?? typeof(object),
            PerShardTake = perShardTake
        };
    }

    private Expression BuildShardExpression(
        DtdeQueryDefinition queryDefinition,
        int? perShardTake)
    {
        var expression = queryDefinition.OriginalExpression;

        // For multi-shard queries with pagination:
        // - Keep ordering in per-shard query (for partial sorting)
        // - Replace Skip/Take with just Take(skip + take)
        if (perShardTake.HasValue)
        {
            expression = ReplacePagination(expression, skip: null, take: perShardTake);
        }

        return expression;
    }

    private Expression ReplacePagination(Expression expression, int? skip, int? take)
    {
        // Implementation replaces Skip/Take calls in the expression tree
        throw new NotImplementedException();
    }

    private IReadOnlyDictionary<string, object?> ExtractEntityPredicates(
        DtdeQueryDefinition queryDefinition,
        Type entityType)
    {
        var prefix = $"{entityType.Name}.";

        return queryDefinition.ExtractedPredicates
            .Where(kvp => kvp.Key.StartsWith(prefix))
            .ToDictionary(
                kvp => kvp.Key.Substring(prefix.Length),
                kvp => kvp.Value);
    }
}

4. Query Executor

4.1 Executor Interface

namespace Dtde.EntityFramework.Query;

/// <summary>
/// Executes distributed queries across shards.
/// </summary>
public interface IDtdeQueryExecutor
{
    /// <summary>
    /// Executes the query plan and returns merged results.
    /// </summary>
    /// <typeparam name="TEntity">The entity type.</typeparam>
    /// <param name="plan">The query execution plan.</param>
    /// <param name="cancellationToken">Cancellation token.</param>
    /// <returns>The merged query results.</returns>
    Task<IReadOnlyList<TEntity>> ExecuteAsync<TEntity>(
        ShardQueryPlan plan,
        CancellationToken cancellationToken = default)
        where TEntity : class;

    /// <summary>
    /// Executes the query plan and returns a count.
    /// </summary>
    /// <param name="plan">The query execution plan.</param>
    /// <param name="cancellationToken">Cancellation token.</param>
    /// <returns>The total count across all shards.</returns>
    Task<int> ExecuteCountAsync(
        ShardQueryPlan plan,
        CancellationToken cancellationToken = default);
}

4.2 Executor Implementation

namespace Dtde.EntityFramework.Query;

/// <summary>
/// Executes distributed queries with bounded parallelism.
/// </summary>
public sealed class DtdeQueryExecutor : IDtdeQueryExecutor
{
    private readonly DtdeOptions _options;
    private readonly IResultMerger _resultMerger;
    private readonly IDbContextFactory<ShardDbContext> _contextFactory;
    private readonly IDtdeDiagnostics _diagnostics;
    private readonly ILogger<DtdeQueryExecutor> _logger;

    public DtdeQueryExecutor(
        DtdeOptions options,
        IResultMerger resultMerger,
        IDbContextFactory<ShardDbContext> contextFactory,
        IDtdeDiagnostics diagnostics,
        ILogger<DtdeQueryExecutor> logger)
    {
        _options = options;
        _resultMerger = resultMerger;
        _contextFactory = contextFactory;
        _diagnostics = diagnostics;
        _logger = logger;
    }

    public async Task<IReadOnlyList<TEntity>> ExecuteAsync<TEntity>(
        ShardQueryPlan plan,
        CancellationToken cancellationToken = default)
        where TEntity : class
    {
        var correlationId = Guid.NewGuid().ToString("N")[..8];
        var stopwatch = Stopwatch.StartNew();

        _logger.LogInformation(
            "[{CorrelationId}] Executing query across {ShardCount} shards",
            correlationId,
            plan.TotalShards);

        // Execute queries with bounded parallelism
        var semaphore = new SemaphoreSlim(_options.MaxParallelShards);
        var shardResults = new ConcurrentDictionary<string, ShardQueryResult<TEntity>>();

        var tasks = plan.ShardQueries.Select(async shardQuery =>
        {
            await semaphore.WaitAsync(cancellationToken);

            try
            {
                var result = await ExecuteShardQueryAsync<TEntity>(
                    shardQuery,
                    correlationId,
                    cancellationToken);

                shardResults[shardQuery.Shard.ShardId] = result;
            }
            finally
            {
                semaphore.Release();
            }
        });

        await Task.WhenAll(tasks);

        // Collect all results
        var allResults = shardResults.Values.ToList();
        var totalRows = allResults.Sum(r => r.Results.Count);

        _logger.LogInformation(
            "[{CorrelationId}] Collected {TotalRows} rows from {ShardCount} shards",
            correlationId,
            totalRows,
            allResults.Count);

        // Merge results
        var merged = await _resultMerger.MergeAsync(
            allResults.Select(r => r.Results).ToList(),
            plan,
            cancellationToken);

        stopwatch.Stop();

        // Emit diagnostics
        _diagnostics.EmitQueryExecuted(new QueryExecutedEvent(
            DateTime.UtcNow,
            correlationId,
            typeof(TEntity),
            plan.TotalShards,
            merged.Count,
            stopwatch.Elapsed,
            allResults.ToDictionary(r => r.ShardId, r => r.Duration)));

        _logger.LogInformation(
            "[{CorrelationId}] Query completed: {ResultCount} results, {Duration}ms",
            correlationId,
            merged.Count,
            stopwatch.ElapsedMilliseconds);

        return merged;
    }

    public async Task<int> ExecuteCountAsync(
        ShardQueryPlan plan,
        CancellationToken cancellationToken = default)
    {
        var semaphore = new SemaphoreSlim(_options.MaxParallelShards);
        var counts = new ConcurrentBag<int>();

        var tasks = plan.ShardQueries.Select(async shardQuery =>
        {
            await semaphore.WaitAsync(cancellationToken);

            try
            {
                var count = await ExecuteShardCountAsync(shardQuery, cancellationToken);
                counts.Add(count);
            }
            finally
            {
                semaphore.Release();
            }
        });

        await Task.WhenAll(tasks);

        return counts.Sum();
    }

    private async Task<ShardQueryResult<TEntity>> ExecuteShardQueryAsync<TEntity>(
        ShardQuery shardQuery,
        string correlationId,
        CancellationToken cancellationToken)
        where TEntity : class
    {
        var shardStopwatch = Stopwatch.StartNew();

        _logger.LogDebug(
            "[{CorrelationId}] Executing query on shard {ShardId}",
            correlationId,
            shardQuery.Shard.ShardId);

        try
        {
            await using var context = CreateShardContext(shardQuery.Shard);

            // Build queryable from expression
            var queryable = BuildQueryable<TEntity>(context, shardQuery.Expression);

            // Apply per-shard take if specified
            if (shardQuery.PerShardTake.HasValue)
            {
                queryable = queryable.Take(shardQuery.PerShardTake.Value);
            }

            var results = await queryable.ToListAsync(cancellationToken);

            shardStopwatch.Stop();

            _logger.LogDebug(
                "[{CorrelationId}] Shard {ShardId} returned {Count} rows in {Duration}ms",
                correlationId,
                shardQuery.Shard.ShardId,
                results.Count,
                shardStopwatch.ElapsedMilliseconds);

            return new ShardQueryResult<TEntity>
            {
                ShardId = shardQuery.Shard.ShardId,
                Results = results,
                Duration = shardStopwatch.Elapsed
            };
        }
        catch (Exception ex)
        {
            _logger.LogError(
                ex,
                "[{CorrelationId}] Query failed on shard {ShardId}",
                correlationId,
                shardQuery.Shard.ShardId);

            throw new ShardOperationException(
                $"Query failed on shard {shardQuery.Shard.ShardId}: {ex.Message}",
                shardQuery.Shard.ShardId,
                ex);
        }
    }

    private async Task<int> ExecuteShardCountAsync(
        ShardQuery shardQuery,
        CancellationToken cancellationToken)
    {
        await using var context = CreateShardContext(shardQuery.Shard);

        var queryable = BuildQueryable<object>(context, shardQuery.Expression);

        return await queryable.CountAsync(cancellationToken);
    }

    private ShardDbContext CreateShardContext(ShardMetadata shard)
    {
        var optionsBuilder = new DbContextOptionsBuilder<ShardDbContext>();
        optionsBuilder.UseSqlServer(shard.ConnectionString);

        return new ShardDbContext(optionsBuilder.Options);
    }

    private IQueryable<TEntity> BuildQueryable<TEntity>(
        DbContext context,
        Expression expression)
        where TEntity : class
    {
        // Replace the original provider with this context's provider
        var set = context.Set<TEntity>();

        var visitor = new QueryProviderReplacer(set.Provider);
        var newExpression = visitor.Visit(expression);

        return set.Provider.CreateQuery<TEntity>(newExpression);
    }
}

/// <summary>
/// Result from a single shard query.
/// </summary>
internal sealed class ShardQueryResult<TEntity>
{
    public string ShardId { get; init; } = null!;
    public IReadOnlyList<TEntity> Results { get; init; } = Array.Empty<TEntity>();
    public TimeSpan Duration { get; init; }
}

5. Result Merger

5.1 Merger Interface

namespace Dtde.EntityFramework.Query;

/// <summary>
/// Merges query results from multiple shards.
/// </summary>
public interface IResultMerger
{
    /// <summary>
    /// Merges results from multiple shards with global ordering and pagination.
    /// </summary>
    /// <typeparam name="TEntity">The entity type.</typeparam>
    /// <param name="shardResults">Results from each shard.</param>
    /// <param name="plan">The query execution plan.</param>
    /// <param name="cancellationToken">Cancellation token.</param>
    /// <returns>The merged and paginated results.</returns>
    Task<IReadOnlyList<TEntity>> MergeAsync<TEntity>(
        IReadOnlyList<IReadOnlyList<TEntity>> shardResults,
        ShardQueryPlan plan,
        CancellationToken cancellationToken = default);
}

5.2 Merger Implementation

namespace Dtde.EntityFramework.Query;

/// <summary>
/// Merges query results with sorting and pagination.
/// </summary>
public sealed class ResultMerger : IResultMerger
{
    private readonly ILogger<ResultMerger> _logger;

    public ResultMerger(ILogger<ResultMerger> logger)
    {
        _logger = logger;
    }

    public Task<IReadOnlyList<TEntity>> MergeAsync<TEntity>(
        IReadOnlyList<IReadOnlyList<TEntity>> shardResults,
        ShardQueryPlan plan,
        CancellationToken cancellationToken = default)
    {
        // Concatenate all results
        IEnumerable<TEntity> merged = shardResults.SelectMany(r => r);

        // Apply global ordering if required
        if (plan.RequiresGlobalOrdering && plan.GlobalOrderings.Count > 0)
        {
            merged = ApplyOrdering(merged, plan.GlobalOrderings);

            _logger.LogDebug(
                "Applied global ordering: {Orderings}",
                string.Join(", ", plan.GlobalOrderings.Select(o => 
                    $"{o.PropertyName} {(o.Descending ? "DESC" : "ASC")}")));
        }

        // Apply global pagination if required
        if (plan.RequiresGlobalPagination)
        {
            if (plan.GlobalSkip.HasValue)
            {
                merged = merged.Skip(plan.GlobalSkip.Value);

                _logger.LogDebug("Applied global Skip({Skip})", plan.GlobalSkip.Value);
            }

            if (plan.GlobalTake.HasValue)
            {
                merged = merged.Take(plan.GlobalTake.Value);

                _logger.LogDebug("Applied global Take({Take})", plan.GlobalTake.Value);
            }
        }

        IReadOnlyList<TEntity> result = merged.ToList();

        return Task.FromResult(result);
    }

    private IEnumerable<TEntity> ApplyOrdering<TEntity>(
        IEnumerable<TEntity> source,
        IReadOnlyList<OrderingSpec> orderings)
    {
        if (orderings.Count == 0)
        {
            return source;
        }

        IOrderedEnumerable<TEntity>? ordered = null;

        foreach (var ordering in orderings)
        {
            var property = typeof(TEntity).GetProperty(ordering.PropertyName)
                ?? throw new InvalidOperationException(
                    $"Property {ordering.PropertyName} not found on {typeof(TEntity).Name}");

            Func<TEntity, object?> selector = e => property.GetValue(e);

            if (ordered is null)
            {
                ordered = ordering.Descending
                    ? source.OrderByDescending(selector)
                    : source.OrderBy(selector);
            }
            else
            {
                ordered = ordering.Descending
                    ? ordered.ThenByDescending(selector)
                    : ordered.ThenBy(selector);
            }
        }

        return ordered ?? source;
    }
}

6. Diagnostics

6.1 Diagnostics Interface

namespace Dtde.EntityFramework.Diagnostics;

/// <summary>
/// Diagnostics service for DTDE operations.
/// </summary>
public interface IDtdeDiagnostics
{
    /// <summary>
    /// Emits a query executed event.
    /// </summary>
    /// <param name="event">The event data.</param>
    void EmitQueryExecuted(QueryExecutedEvent @event);

    /// <summary>
    /// Emits a shard resolved event.
    /// </summary>
    /// <param name="event">The event data.</param>
    void EmitShardResolved(ShardResolvedEvent @event);

    /// <summary>
    /// Subscribes to diagnostic events.
    /// </summary>
    /// <param name="observer">The event observer.</param>
    /// <returns>A disposable subscription.</returns>
    IDisposable Subscribe(IDtdeEventObserver observer);
}

/// <summary>
/// Observer for DTDE diagnostic events.
/// </summary>
public interface IDtdeEventObserver
{
    void OnQueryExecuted(QueryExecutedEvent @event);
    void OnShardResolved(ShardResolvedEvent @event);
    void OnVersionCreated(VersionCreatedEvent @event);
    void OnVersionInvalidated(VersionInvalidatedEvent @event);
}

7. Performance Considerations

7.1 Query Optimization Strategies

Strategy When to Apply Expected Improvement
Single Shard Optimization Equality predicate on shard key Avoid fan-out entirely
Per-Shard Take Limit Pagination queries Reduce data transfer
Shard Priority Hot/warm/cold tiers Query hot shards first
Connection Pooling All queries Reduce connection overhead
Expression Caching Repeated query patterns Reduce expression parsing

7.2 Bounded Parallelism

// Configuration for bounded parallelism
public sealed class DtdeOptions
{
    /// <summary>
    /// Maximum concurrent shard queries (default: 10).
    /// </summary>
    public int MaxParallelShards { get; init; } = 10;

    /// <summary>
    /// Connection timeout per shard (default: 30s).
    /// </summary>
    public TimeSpan ShardConnectionTimeout { get; init; } = TimeSpan.FromSeconds(30);

    /// <summary>
    /// Query timeout per shard (default: 60s).
    /// </summary>
    public TimeSpan ShardQueryTimeout { get; init; } = TimeSpan.FromSeconds(60);
}

8. Test Specifications

Following the MethodName_Condition_ExpectedResult pattern:

8.1 Expression Rewriter Tests

// Rewrite_WithValidAt_InjectsTemporalPredicate
// Rewrite_WithWithVersions_SkipsTemporalFilter
// Rewrite_WithContextTemporalPoint_UsesContextValue
// Rewrite_WithQueryTemporalPoint_OverridesContext
// Rewrite_NonTemporalEntity_NoPredicateInjected
// Rewrite_CustomPropertyNames_UsesConfiguredNames

8.2 Shard Query Planner Tests

// CreatePlan_WithTemporalPoint_ResolvesCorrectShards
// CreatePlan_WithShardKeyPredicate_SingleShardSelected
// CreatePlan_WithNoPredicates_AllShardsSelected
// CreatePlan_WithPagination_SetsPerShardTake
// CreatePlan_WithShardHints_FiltersToHintedShards

8.3 Query Executor Tests

// ExecuteAsync_SingleShard_ReturnsResults
// ExecuteAsync_MultipleShards_MergesResults
// ExecuteAsync_WithCancellation_CancelsGracefully
// ExecuteAsync_ShardFailure_ThrowsShardOperationException
// ExecuteCountAsync_AggregatesFromAllShards

8.4 Result Merger Tests

// MergeAsync_WithOrdering_SortsGlobally
// MergeAsync_WithPagination_AppliesSkipTake
// MergeAsync_NoOrdering_ReturnsAllResults
// MergeAsync_MultipleOrderings_AppliesThenBy

Next Steps

Continue to 05 - Update Engine for temporal versioning and write pipeline details.