using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using NPP.SmartSchedue.Api.Core.Repositories; using NPP.SmartSchedue.Api.Contracts.Domain.Integration; using ZhonTai.Admin.Core.Db.Transaction; using FreeSql; namespace NPP.SmartSchedue.Api.Repositories.Integration { /// /// 任务变更事件仓储实现类 /// /// 业务思考: /// 1. 事件数据具有时序性特点,重点优化时间范围查询性能 /// 2. 支持事件状态流转和批量处理,提高系统吞吐量 /// 3. 实现智能事件优先级排序,确保重要事件及时处理 /// 4. 提供丰富的统计分析功能,支持变更监控和趋势分析 /// public class TaskChangeEventRepository : AppRepositoryBase, ITaskChangeEventRepository { public TaskChangeEventRepository(UnitOfWorkManagerCloud unitOfWorkManager) : base(unitOfWorkManager) { } #region 基础查询方法 /// /// 根据任务ID获取变更事件 /// 利用任务ID索引优化查询性能 /// public async Task> GetByTaskIdAsync(long taskId) { return await Select .Where(e => e.TaskId == taskId) .OrderByDescending(e => e.EventTime) .ToListAsync(); } /// /// 根据整合记录ID获取相关变更事件 /// 用于查看特定整合记录的所有变更历史 /// public async Task> GetByIntegrationRecordIdAsync(long integrationRecordId) { return await Select .Where(e => e.IntegrationRecordId == integrationRecordId) .OrderByDescending(e => e.EventTime) .ToListAsync(); } /// /// 获取待处理的变更事件 /// 按优先级和时间排序,支持事件处理队列 /// public async Task> GetPendingEventsAsync(int limit = 100) { return await Select .Where(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Pending) .Where(e => !e.NextRetryTime.HasValue || e.NextRetryTime.Value <= DateTime.Now) // 排除还在等待重试的事件 .OrderByDescending(e => e.EventPriority) // 优先级降序 .OrderBy(e => e.EventTime) // 时间升序(优先处理早期事件) .Take(limit) .ToListAsync(); } /// /// 获取超时未处理的事件 /// 支持事件监控和告警机制 /// public async Task> GetOverdueEventsAsync() { var now = DateTime.Now; return await Select .Where(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Pending || e.ProcessingStatus == TaskChangeEventProcessingStatus.Processing) .ToListAsync() .ContinueWith(task => { var events = task.Result; return events.Where(e => e.IsOverdue).ToList(); }); } #endregion #region 复合查询方法 /// /// 多条件综合查询变更事件 /// 支持灵活的业务查询需求 /// public async Task<(List Events, long Total)> GetByMultipleConditionsAsync( long? integrationRecordId = null, long? rootIntegrationRecordId = null, List? taskIds = null, List? eventTypes = null, List? processingStatuses = null, DateTime? startTime = null, DateTime? endTime = null, List? triggeredByUserIds = null, int? minPriority = null, int? minImpactScore = null, bool includeTestEvents = false, int pageIndex = 1, int pageSize = 20, string sortBy = "EventTime", string sortDirection = "Desc") { var query = Select; // 动态添加查询条件 if (integrationRecordId.HasValue) query = query.Where(e => e.IntegrationRecordId == integrationRecordId.Value); if (rootIntegrationRecordId.HasValue) query = query.Where(e => e.RootIntegrationRecordId == rootIntegrationRecordId.Value); if (taskIds?.Any() == true) query = query.Where(e => taskIds.Contains(e.TaskId)); if (eventTypes?.Any() == true) query = query.Where(e => eventTypes.Contains(e.EventType)); if (processingStatuses?.Any() == true) query = query.Where(e => processingStatuses.Contains(e.ProcessingStatus)); if (startTime.HasValue) query = query.Where(e => e.EventTime >= startTime.Value); if (endTime.HasValue) query = query.Where(e => e.EventTime <= endTime.Value); if (triggeredByUserIds?.Any() == true) query = query.Where(e => e.TriggeredByUserId.HasValue && triggeredByUserIds.Contains(e.TriggeredByUserId.Value)); if (minPriority.HasValue) query = query.Where(e => e.EventPriority >= minPriority.Value); if (minImpactScore.HasValue) query = query.Where(e => e.ImpactAssessmentScore >= minImpactScore.Value); if (!includeTestEvents) query = query.Where(e => !e.IsTestEvent); // 排序 var sortField = sortBy.ToLower() switch { "priority" => nameof(TaskChangeEventEntity.EventPriority), "impactscore" => nameof(TaskChangeEventEntity.ImpactAssessmentScore), "status" => nameof(TaskChangeEventEntity.ProcessingStatus), "taskid" => nameof(TaskChangeEventEntity.TaskId), _ => nameof(TaskChangeEventEntity.EventTime) }; ISelect orderedQuery; if (sortDirection.ToUpper() == "ASC") { orderedQuery = sortField switch { nameof(TaskChangeEventEntity.EventTime) => query.OrderBy(e => e.EventTime), nameof(TaskChangeEventEntity.TaskId) => query.OrderBy(e => e.TaskId), nameof(TaskChangeEventEntity.EventPriority) => query.OrderBy(e => e.EventPriority), nameof(TaskChangeEventEntity.ImpactAssessmentScore) => query.OrderBy(e => e.ImpactAssessmentScore), _ => query.OrderBy(e => e.EventTime) }; } else { orderedQuery = sortField switch { nameof(TaskChangeEventEntity.EventTime) => query.OrderByDescending(e => e.EventTime), nameof(TaskChangeEventEntity.TaskId) => query.OrderByDescending(e => e.TaskId), nameof(TaskChangeEventEntity.EventPriority) => query.OrderByDescending(e => e.EventPriority), nameof(TaskChangeEventEntity.ImpactAssessmentScore) => query.OrderByDescending(e => e.ImpactAssessmentScore), _ => query.OrderByDescending(e => e.EventTime) }; } var total = await orderedQuery.CountAsync(); var events = await orderedQuery .Page(pageIndex, pageSize) .ToListAsync(); return (events, total); } /// /// 获取指定时间段内的高影响事件 /// 支持重要事件监控和分析 /// public async Task> GetHighImpactEventsAsync( DateTime startTime, DateTime endTime, int minImpactScore = 80, int limit = 50) { return await Select .Where(e => e.EventTime >= startTime && e.EventTime <= endTime) .Where(e => e.ImpactAssessmentScore >= minImpactScore) .Where(e => !e.IsTestEvent) .OrderByDescending(e => e.ImpactAssessmentScore) .OrderByDescending(e => e.EventPriority) .Take(limit) .ToListAsync(); } /// /// 获取批量处理的事件组 /// 支持事件分组批量处理机制 /// public async Task> GetBatchProcessingEventsAsync(long eventGroupId) { return await Select .Where(e => e.EventGroupId == eventGroupId) .OrderBy(e => e.EventGroupSequence) .ToListAsync(); } #endregion #region 状态管理方法 /// /// 批量更新事件处理状态 /// 高效的批量状态更新操作 /// public async Task BatchUpdateProcessingStatusAsync( List eventIds, string newStatus, long? processedByUserId = null, string? processedByUserName = null, string? resultSummary = null) { if (!eventIds.Any()) return 0; var updateQuery = UpdateDiy .Set(e => e.ProcessingStatus, newStatus) .Set(e => e.ProcessingCompletedTime, DateTime.Now) .Where(e => eventIds.Contains(e.Id)); if (processedByUserId.HasValue) updateQuery = updateQuery.Set(e => e.ProcessedByUserId, processedByUserId.Value); if (!string.IsNullOrEmpty(processedByUserName)) updateQuery = updateQuery.Set(e => e.ProcessedByUserName, processedByUserName); if (!string.IsNullOrEmpty(resultSummary)) updateQuery = updateQuery.Set(e => e.ProcessingResultSummary, resultSummary); return await updateQuery.ExecuteAffrowsAsync(); } /// /// 标记事件开始处理 /// 设置处理开始时间和处理人员信息 /// public async Task MarkEventProcessingAsync( long eventId, long processedByUserId, string processedByUserName) { var affectedRows = await UpdateDiy .Set(e => e.ProcessingStatus, TaskChangeEventProcessingStatus.Processing) .Set(e => e.ProcessingStartTime, DateTime.Now) .Set(e => e.ProcessedByUserId, processedByUserId) .Set(e => e.ProcessedByUserName, processedByUserName) .Where(e => e.Id == eventId) .Where(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Pending) // 确保只有待处理的事件才能开始处理 .ExecuteAffrowsAsync(); return affectedRows > 0; } /// /// 完成事件处理 /// 设置处理完成状态和结果信息 /// public async Task CompleteEventProcessingAsync( long eventId, string resultSummary, bool triggeredReallocation = false, long? createdVersionId = null, string? detailsJson = null) { var updateQuery = UpdateDiy .Set(e => e.ProcessingStatus, TaskChangeEventProcessingStatus.Completed) .Set(e => e.ProcessingCompletedTime, DateTime.Now) .Set(e => e.ProcessingResultSummary, resultSummary) .Set(e => e.TriggeredReallocation, triggeredReallocation) .Where(e => e.Id == eventId) .Where(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Processing); if (createdVersionId.HasValue) updateQuery = updateQuery.Set(e => e.CreatedNewVersionId, createdVersionId.Value); if (!string.IsNullOrEmpty(detailsJson)) updateQuery = updateQuery.Set(e => e.ProcessingDetailsJson, detailsJson); var affectedRows = await updateQuery.ExecuteAffrowsAsync(); return affectedRows > 0; } /// /// 标记事件处理失败 /// 设置失败状态和错误信息,支持重试机制 /// public async Task FailEventProcessingAsync( long eventId, string errorMessage, bool shouldRetry = true) { var eventEntity = await GetAsync(eventId); if (eventEntity == null) return false; // 使用实体方法处理失败逻辑(包含重试机制) eventEntity.FailProcessing(errorMessage); await UpdateAsync(eventEntity); return true; } #endregion #region 统计分析方法 /// /// 获取事件处理统计数据 /// 支持监控面板和性能分析 /// public async Task GetProcessingStatisticsAsync( DateTime startTime, DateTime endTime) { var events = await Select .Where(e => e.EventTime >= startTime && e.EventTime <= endTime) .Where(e => !e.IsTestEvent) .ToListAsync(); if (!events.Any()) { return new EventProcessingStatistics { StartTime = startTime, EndTime = endTime }; } var completedEvents = events.Where(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Completed).ToList(); var failedEvents = events.Where(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Failed).ToList(); var pendingEvents = events.Where(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Pending).ToList(); var overdueEvents = events.Where(e => e.IsOverdue).ToList(); return new EventProcessingStatistics { StartTime = startTime, EndTime = endTime, TotalEvents = events.Count, CompletedEvents = completedEvents.Count, FailedEvents = failedEvents.Count, PendingEvents = pendingEvents.Count, OverdueEvents = overdueEvents.Count, AverageProcessingTime = completedEvents.Any() ? (long)completedEvents.Average(e => e.ProcessingElapsedMilliseconds) : 0, AverageImpactScore = events.Any() ? Math.Round((decimal)events.Average(e => e.ImpactAssessmentScore), 1) : 0, SuccessRate = events.Count > 0 ? Math.Round((decimal)completedEvents.Count / events.Count * 100, 2) : 0, ReallocationTriggerCount = events.Count(e => e.TriggeredReallocation), UniqueTaskCount = events.Select(e => e.TaskId).Distinct().Count(), UniqueOperatorCount = events.Where(e => e.TriggeredByUserId.HasValue) .Select(e => e.TriggeredByUserId.Value).Distinct().Count() }; } /// /// 获取事件类型分布统计 /// 分析不同事件类型的发生频率和处理效果 /// public async Task> GetEventTypeStatisticsAsync( DateTime startTime, DateTime endTime) { var events = await Select .Where(e => e.EventTime >= startTime && e.EventTime <= endTime) .Where(e => !e.IsTestEvent) .ToListAsync(); var typeGroups = events .GroupBy(e => e.EventType) .Select(g => new EventTypeStatistics { EventType = g.Key, TotalCount = g.Count(), CompletedCount = g.Count(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Completed), FailedCount = g.Count(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Failed), PendingCount = g.Count(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Pending), AverageImpactScore = Math.Round((decimal)g.Average(e => e.ImpactAssessmentScore), 1), AverageProcessingTime = g.Where(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Completed).Any() ? (long)g.Where(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Completed) .Average(e => e.ProcessingElapsedMilliseconds) : 0, ReallocationTriggerRate = g.Count() > 0 ? Math.Round((decimal)g.Count(e => e.TriggeredReallocation) / g.Count() * 100, 2) : 0 }) .OrderByDescending(s => s.TotalCount) .ToList(); return typeGroups; } /// /// 获取事件处理趋势数据 /// 生成趋势图表数据,支持性能监控 /// public async Task> GetProcessingTrendAsync(int days = 30) { var startDate = DateTime.Today.AddDays(-days + 1); var endDate = DateTime.Today.AddDays(1); var events = await Select .Where(e => e.EventTime >= startDate && e.EventTime < endDate) .Where(e => !e.IsTestEvent) .ToListAsync(); var trends = events .GroupBy(e => e.EventTime.Date) .Select(g => new EventProcessingTrend { Date = g.Key, TotalEvents = g.Count(), CompletedEvents = g.Count(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Completed), FailedEvents = g.Count(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Failed), AverageImpactScore = Math.Round((decimal)g.Average(e => e.ImpactAssessmentScore), 1), ReallocationTriggeredEvents = g.Count(e => e.TriggeredReallocation) }) .OrderBy(t => t.Date) .ToList(); // 填充没有数据的日期 var allDates = Enumerable.Range(0, days) .Select(i => startDate.AddDays(i)) .ToList(); var completeTrends = allDates .Select(date => trends.FirstOrDefault(t => t.Date == date) ?? new EventProcessingTrend { Date = date, TotalEvents = 0, CompletedEvents = 0, FailedEvents = 0, AverageImpactScore = 0, ReallocationTriggeredEvents = 0 }) .ToList(); return completeTrends; } #endregion #region 业务专用方法 /// /// 创建任务变更事件 /// 便捷的事件创建方法,自动设置基础信息 /// public async Task CreateTaskChangeEventAsync( long taskId, string taskCode, string eventType, long? integrationRecordId = null, long? rootIntegrationRecordId = null, string changedFields = "", Dictionary? beforeValues = null, Dictionary? afterValues = null, long? triggeredByUserId = null, string triggeredByUserName = "", string eventSource = "Manual", int eventPriority = 5) { var eventEntity = new TaskChangeEventEntity { TaskId = taskId, TaskCode = taskCode, IntegrationRecordId = integrationRecordId, RootIntegrationRecordId = rootIntegrationRecordId, EventType = eventType, ChangedFields = changedFields, EventTime = DateTime.Now, TriggeredByUserId = triggeredByUserId, TriggeredByUserName = triggeredByUserName, EventSource = eventSource, EventPriority = eventPriority, ProcessingStatus = TaskChangeEventProcessingStatus.Pending, RequiresNotification = true, NotificationStatus = "NotSent" }; // 设置变更前后值 if (beforeValues?.Any() == true) eventEntity.SetBeforeValues(beforeValues); if (afterValues?.Any() == true) eventEntity.SetAfterValues(afterValues); // 计算影响评估评分 eventEntity.ImpactAssessmentScore = eventEntity.CalculateImpactScore(); return await InsertAsync(eventEntity); } /// /// 获取可重试的失败事件 /// 支持自动重试机制 /// public async Task> GetRetryableFailedEventsAsync(int limit = 50) { return await Select .Where(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Pending) // 失败后重置为Pending状态 .Where(e => e.RetryCount < e.MaxRetryCount) .Where(e => !e.NextRetryTime.HasValue || e.NextRetryTime.Value <= DateTime.Now) .OrderByDescending(e => e.EventPriority) .OrderBy(e => e.NextRetryTime) .Take(limit) .ToListAsync(); } /// /// 清理过期的已完成事件 /// 定期数据清理,保持系统性能 /// public async Task ArchiveCompletedEventsAsync(DateTime beforeDate) { // 这里可以实现将完成的事件迁移到历史表的逻辑 // 暂时使用软删除处理 var archivedCount = await UpdateDiy .Set(e => e.IsDeleted, true) .Set(e => e.ModifiedTime, DateTime.Now) .Where(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Completed) .Where(e => e.ProcessingCompletedTime.HasValue && e.ProcessingCompletedTime.Value < beforeDate) .Where(e => !e.IsDeleted) .ExecuteAffrowsAsync(); return archivedCount; } #endregion } /// /// 任务变更事件仓储接口 /// public interface ITaskChangeEventRepository : ZhonTai.Admin.Core.Repositories.IRepositoryBase { Task> GetByTaskIdAsync(long taskId); Task> GetByIntegrationRecordIdAsync(long integrationRecordId); Task> GetPendingEventsAsync(int limit = 100); Task> GetOverdueEventsAsync(); Task<(List Events, long Total)> GetByMultipleConditionsAsync( long? integrationRecordId = null, long? rootIntegrationRecordId = null, List? taskIds = null, List? eventTypes = null, List? processingStatuses = null, DateTime? startTime = null, DateTime? endTime = null, List? triggeredByUserIds = null, int? minPriority = null, int? minImpactScore = null, bool includeTestEvents = false, int pageIndex = 1, int pageSize = 20, string sortBy = "EventTime", string sortDirection = "Desc"); Task> GetHighImpactEventsAsync(DateTime startTime, DateTime endTime, int minImpactScore = 80, int limit = 50); Task> GetBatchProcessingEventsAsync(long eventGroupId); Task BatchUpdateProcessingStatusAsync(List eventIds, string newStatus, long? processedByUserId = null, string? processedByUserName = null, string? resultSummary = null); Task MarkEventProcessingAsync(long eventId, long processedByUserId, string processedByUserName); Task CompleteEventProcessingAsync(long eventId, string resultSummary, bool triggeredReallocation = false, long? createdVersionId = null, string? detailsJson = null); Task FailEventProcessingAsync(long eventId, string errorMessage, bool shouldRetry = true); Task CreateTaskChangeEventAsync(long taskId, string taskCode, string eventType, long? integrationRecordId = null, long? rootIntegrationRecordId = null, string changedFields = "", Dictionary? beforeValues = null, Dictionary? afterValues = null, long? triggeredByUserId = null, string triggeredByUserName = "", string eventSource = "Manual", int eventPriority = 5); Task> GetRetryableFailedEventsAsync(int limit = 50); Task ArchiveCompletedEventsAsync(DateTime beforeDate); } #region 统计数据DTO类 /// /// 事件处理统计数据 /// public class EventProcessingStatistics { public DateTime StartTime { get; set; } public DateTime EndTime { get; set; } public int TotalEvents { get; set; } public int CompletedEvents { get; set; } public int FailedEvents { get; set; } public int PendingEvents { get; set; } public int OverdueEvents { get; set; } public long AverageProcessingTime { get; set; } public decimal AverageImpactScore { get; set; } public decimal SuccessRate { get; set; } public int ReallocationTriggerCount { get; set; } public int UniqueTaskCount { get; set; } public int UniqueOperatorCount { get; set; } } /// /// 事件类型统计数据 /// public class EventTypeStatistics { public string EventType { get; set; } = string.Empty; public int TotalCount { get; set; } public int CompletedCount { get; set; } public int FailedCount { get; set; } public int PendingCount { get; set; } public decimal AverageImpactScore { get; set; } public long AverageProcessingTime { get; set; } public decimal ReallocationTriggerRate { get; set; } } /// /// 事件处理趋势数据 /// public class EventProcessingTrend { public DateTime Date { get; set; } public int TotalEvents { get; set; } public int CompletedEvents { get; set; } public int FailedEvents { get; set; } public decimal AverageImpactScore { get; set; } public int ReallocationTriggeredEvents { get; set; } } #endregion }