using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using NPP.SmartSchedue.Api.Core.Repositories;
using NPP.SmartSchedue.Api.Contracts.Domain.Integration;
using ZhonTai.Admin.Core.Db.Transaction;
using FreeSql;
namespace NPP.SmartSchedue.Api.Repositories.Integration
{
///
/// 任务变更事件仓储实现类
///
/// 业务思考:
/// 1. 事件数据具有时序性特点,重点优化时间范围查询性能
/// 2. 支持事件状态流转和批量处理,提高系统吞吐量
/// 3. 实现智能事件优先级排序,确保重要事件及时处理
/// 4. 提供丰富的统计分析功能,支持变更监控和趋势分析
///
public class TaskChangeEventRepository : AppRepositoryBase, ITaskChangeEventRepository
{
public TaskChangeEventRepository(UnitOfWorkManagerCloud unitOfWorkManager) : base(unitOfWorkManager)
{
}
#region 基础查询方法
///
/// 根据任务ID获取变更事件
/// 利用任务ID索引优化查询性能
///
public async Task> GetByTaskIdAsync(long taskId)
{
return await Select
.Where(e => e.TaskId == taskId)
.OrderByDescending(e => e.EventTime)
.ToListAsync();
}
///
/// 根据整合记录ID获取相关变更事件
/// 用于查看特定整合记录的所有变更历史
///
public async Task> GetByIntegrationRecordIdAsync(long integrationRecordId)
{
return await Select
.Where(e => e.IntegrationRecordId == integrationRecordId)
.OrderByDescending(e => e.EventTime)
.ToListAsync();
}
///
/// 获取待处理的变更事件
/// 按优先级和时间排序,支持事件处理队列
///
public async Task> GetPendingEventsAsync(int limit = 100)
{
return await Select
.Where(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Pending)
.Where(e => !e.NextRetryTime.HasValue || e.NextRetryTime.Value <= DateTime.Now) // 排除还在等待重试的事件
.OrderByDescending(e => e.EventPriority) // 优先级降序
.OrderBy(e => e.EventTime) // 时间升序(优先处理早期事件)
.Take(limit)
.ToListAsync();
}
///
/// 获取超时未处理的事件
/// 支持事件监控和告警机制
///
public async Task> GetOverdueEventsAsync()
{
var now = DateTime.Now;
return await Select
.Where(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Pending ||
e.ProcessingStatus == TaskChangeEventProcessingStatus.Processing)
.ToListAsync()
.ContinueWith(task =>
{
var events = task.Result;
return events.Where(e => e.IsOverdue).ToList();
});
}
#endregion
#region 复合查询方法
///
/// 多条件综合查询变更事件
/// 支持灵活的业务查询需求
///
public async Task<(List Events, long Total)> GetByMultipleConditionsAsync(
long? integrationRecordId = null,
long? rootIntegrationRecordId = null,
List? taskIds = null,
List? eventTypes = null,
List? processingStatuses = null,
DateTime? startTime = null,
DateTime? endTime = null,
List? triggeredByUserIds = null,
int? minPriority = null,
int? minImpactScore = null,
bool includeTestEvents = false,
int pageIndex = 1,
int pageSize = 20,
string sortBy = "EventTime",
string sortDirection = "Desc")
{
var query = Select;
// 动态添加查询条件
if (integrationRecordId.HasValue)
query = query.Where(e => e.IntegrationRecordId == integrationRecordId.Value);
if (rootIntegrationRecordId.HasValue)
query = query.Where(e => e.RootIntegrationRecordId == rootIntegrationRecordId.Value);
if (taskIds?.Any() == true)
query = query.Where(e => taskIds.Contains(e.TaskId));
if (eventTypes?.Any() == true)
query = query.Where(e => eventTypes.Contains(e.EventType));
if (processingStatuses?.Any() == true)
query = query.Where(e => processingStatuses.Contains(e.ProcessingStatus));
if (startTime.HasValue)
query = query.Where(e => e.EventTime >= startTime.Value);
if (endTime.HasValue)
query = query.Where(e => e.EventTime <= endTime.Value);
if (triggeredByUserIds?.Any() == true)
query = query.Where(e => e.TriggeredByUserId.HasValue && triggeredByUserIds.Contains(e.TriggeredByUserId.Value));
if (minPriority.HasValue)
query = query.Where(e => e.EventPriority >= minPriority.Value);
if (minImpactScore.HasValue)
query = query.Where(e => e.ImpactAssessmentScore >= minImpactScore.Value);
if (!includeTestEvents)
query = query.Where(e => !e.IsTestEvent);
// 排序
var sortField = sortBy.ToLower() switch
{
"priority" => nameof(TaskChangeEventEntity.EventPriority),
"impactscore" => nameof(TaskChangeEventEntity.ImpactAssessmentScore),
"status" => nameof(TaskChangeEventEntity.ProcessingStatus),
"taskid" => nameof(TaskChangeEventEntity.TaskId),
_ => nameof(TaskChangeEventEntity.EventTime)
};
ISelect orderedQuery;
if (sortDirection.ToUpper() == "ASC")
{
orderedQuery = sortField switch
{
nameof(TaskChangeEventEntity.EventTime) => query.OrderBy(e => e.EventTime),
nameof(TaskChangeEventEntity.TaskId) => query.OrderBy(e => e.TaskId),
nameof(TaskChangeEventEntity.EventPriority) => query.OrderBy(e => e.EventPriority),
nameof(TaskChangeEventEntity.ImpactAssessmentScore) => query.OrderBy(e => e.ImpactAssessmentScore),
_ => query.OrderBy(e => e.EventTime)
};
}
else
{
orderedQuery = sortField switch
{
nameof(TaskChangeEventEntity.EventTime) => query.OrderByDescending(e => e.EventTime),
nameof(TaskChangeEventEntity.TaskId) => query.OrderByDescending(e => e.TaskId),
nameof(TaskChangeEventEntity.EventPriority) => query.OrderByDescending(e => e.EventPriority),
nameof(TaskChangeEventEntity.ImpactAssessmentScore) => query.OrderByDescending(e => e.ImpactAssessmentScore),
_ => query.OrderByDescending(e => e.EventTime)
};
}
var total = await orderedQuery.CountAsync();
var events = await orderedQuery
.Page(pageIndex, pageSize)
.ToListAsync();
return (events, total);
}
///
/// 获取指定时间段内的高影响事件
/// 支持重要事件监控和分析
///
public async Task> GetHighImpactEventsAsync(
DateTime startTime,
DateTime endTime,
int minImpactScore = 80,
int limit = 50)
{
return await Select
.Where(e => e.EventTime >= startTime && e.EventTime <= endTime)
.Where(e => e.ImpactAssessmentScore >= minImpactScore)
.Where(e => !e.IsTestEvent)
.OrderByDescending(e => e.ImpactAssessmentScore)
.OrderByDescending(e => e.EventPriority)
.Take(limit)
.ToListAsync();
}
///
/// 获取批量处理的事件组
/// 支持事件分组批量处理机制
///
public async Task> GetBatchProcessingEventsAsync(long eventGroupId)
{
return await Select
.Where(e => e.EventGroupId == eventGroupId)
.OrderBy(e => e.EventGroupSequence)
.ToListAsync();
}
#endregion
#region 状态管理方法
///
/// 批量更新事件处理状态
/// 高效的批量状态更新操作
///
public async Task BatchUpdateProcessingStatusAsync(
List eventIds,
string newStatus,
long? processedByUserId = null,
string? processedByUserName = null,
string? resultSummary = null)
{
if (!eventIds.Any())
return 0;
var updateQuery = UpdateDiy
.Set(e => e.ProcessingStatus, newStatus)
.Set(e => e.ProcessingCompletedTime, DateTime.Now)
.Where(e => eventIds.Contains(e.Id));
if (processedByUserId.HasValue)
updateQuery = updateQuery.Set(e => e.ProcessedByUserId, processedByUserId.Value);
if (!string.IsNullOrEmpty(processedByUserName))
updateQuery = updateQuery.Set(e => e.ProcessedByUserName, processedByUserName);
if (!string.IsNullOrEmpty(resultSummary))
updateQuery = updateQuery.Set(e => e.ProcessingResultSummary, resultSummary);
return await updateQuery.ExecuteAffrowsAsync();
}
///
/// 标记事件开始处理
/// 设置处理开始时间和处理人员信息
///
public async Task MarkEventProcessingAsync(
long eventId,
long processedByUserId,
string processedByUserName)
{
var affectedRows = await UpdateDiy
.Set(e => e.ProcessingStatus, TaskChangeEventProcessingStatus.Processing)
.Set(e => e.ProcessingStartTime, DateTime.Now)
.Set(e => e.ProcessedByUserId, processedByUserId)
.Set(e => e.ProcessedByUserName, processedByUserName)
.Where(e => e.Id == eventId)
.Where(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Pending) // 确保只有待处理的事件才能开始处理
.ExecuteAffrowsAsync();
return affectedRows > 0;
}
///
/// 完成事件处理
/// 设置处理完成状态和结果信息
///
public async Task CompleteEventProcessingAsync(
long eventId,
string resultSummary,
bool triggeredReallocation = false,
long? createdVersionId = null,
string? detailsJson = null)
{
var updateQuery = UpdateDiy
.Set(e => e.ProcessingStatus, TaskChangeEventProcessingStatus.Completed)
.Set(e => e.ProcessingCompletedTime, DateTime.Now)
.Set(e => e.ProcessingResultSummary, resultSummary)
.Set(e => e.TriggeredReallocation, triggeredReallocation)
.Where(e => e.Id == eventId)
.Where(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Processing);
if (createdVersionId.HasValue)
updateQuery = updateQuery.Set(e => e.CreatedNewVersionId, createdVersionId.Value);
if (!string.IsNullOrEmpty(detailsJson))
updateQuery = updateQuery.Set(e => e.ProcessingDetailsJson, detailsJson);
var affectedRows = await updateQuery.ExecuteAffrowsAsync();
return affectedRows > 0;
}
///
/// 标记事件处理失败
/// 设置失败状态和错误信息,支持重试机制
///
public async Task FailEventProcessingAsync(
long eventId,
string errorMessage,
bool shouldRetry = true)
{
var eventEntity = await GetAsync(eventId);
if (eventEntity == null)
return false;
// 使用实体方法处理失败逻辑(包含重试机制)
eventEntity.FailProcessing(errorMessage);
await UpdateAsync(eventEntity);
return true;
}
#endregion
#region 统计分析方法
///
/// 获取事件处理统计数据
/// 支持监控面板和性能分析
///
public async Task GetProcessingStatisticsAsync(
DateTime startTime,
DateTime endTime)
{
var events = await Select
.Where(e => e.EventTime >= startTime && e.EventTime <= endTime)
.Where(e => !e.IsTestEvent)
.ToListAsync();
if (!events.Any())
{
return new EventProcessingStatistics
{
StartTime = startTime,
EndTime = endTime
};
}
var completedEvents = events.Where(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Completed).ToList();
var failedEvents = events.Where(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Failed).ToList();
var pendingEvents = events.Where(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Pending).ToList();
var overdueEvents = events.Where(e => e.IsOverdue).ToList();
return new EventProcessingStatistics
{
StartTime = startTime,
EndTime = endTime,
TotalEvents = events.Count,
CompletedEvents = completedEvents.Count,
FailedEvents = failedEvents.Count,
PendingEvents = pendingEvents.Count,
OverdueEvents = overdueEvents.Count,
AverageProcessingTime = completedEvents.Any()
? (long)completedEvents.Average(e => e.ProcessingElapsedMilliseconds)
: 0,
AverageImpactScore = events.Any()
? Math.Round((decimal)events.Average(e => e.ImpactAssessmentScore), 1)
: 0,
SuccessRate = events.Count > 0
? Math.Round((decimal)completedEvents.Count / events.Count * 100, 2)
: 0,
ReallocationTriggerCount = events.Count(e => e.TriggeredReallocation),
UniqueTaskCount = events.Select(e => e.TaskId).Distinct().Count(),
UniqueOperatorCount = events.Where(e => e.TriggeredByUserId.HasValue)
.Select(e => e.TriggeredByUserId.Value).Distinct().Count()
};
}
///
/// 获取事件类型分布统计
/// 分析不同事件类型的发生频率和处理效果
///
public async Task> GetEventTypeStatisticsAsync(
DateTime startTime,
DateTime endTime)
{
var events = await Select
.Where(e => e.EventTime >= startTime && e.EventTime <= endTime)
.Where(e => !e.IsTestEvent)
.ToListAsync();
var typeGroups = events
.GroupBy(e => e.EventType)
.Select(g => new EventTypeStatistics
{
EventType = g.Key,
TotalCount = g.Count(),
CompletedCount = g.Count(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Completed),
FailedCount = g.Count(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Failed),
PendingCount = g.Count(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Pending),
AverageImpactScore = Math.Round((decimal)g.Average(e => e.ImpactAssessmentScore), 1),
AverageProcessingTime = g.Where(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Completed).Any()
? (long)g.Where(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Completed)
.Average(e => e.ProcessingElapsedMilliseconds)
: 0,
ReallocationTriggerRate = g.Count() > 0
? Math.Round((decimal)g.Count(e => e.TriggeredReallocation) / g.Count() * 100, 2)
: 0
})
.OrderByDescending(s => s.TotalCount)
.ToList();
return typeGroups;
}
///
/// 获取事件处理趋势数据
/// 生成趋势图表数据,支持性能监控
///
public async Task> GetProcessingTrendAsync(int days = 30)
{
var startDate = DateTime.Today.AddDays(-days + 1);
var endDate = DateTime.Today.AddDays(1);
var events = await Select
.Where(e => e.EventTime >= startDate && e.EventTime < endDate)
.Where(e => !e.IsTestEvent)
.ToListAsync();
var trends = events
.GroupBy(e => e.EventTime.Date)
.Select(g => new EventProcessingTrend
{
Date = g.Key,
TotalEvents = g.Count(),
CompletedEvents = g.Count(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Completed),
FailedEvents = g.Count(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Failed),
AverageImpactScore = Math.Round((decimal)g.Average(e => e.ImpactAssessmentScore), 1),
ReallocationTriggeredEvents = g.Count(e => e.TriggeredReallocation)
})
.OrderBy(t => t.Date)
.ToList();
// 填充没有数据的日期
var allDates = Enumerable.Range(0, days)
.Select(i => startDate.AddDays(i))
.ToList();
var completeTrends = allDates
.Select(date => trends.FirstOrDefault(t => t.Date == date) ?? new EventProcessingTrend
{
Date = date,
TotalEvents = 0,
CompletedEvents = 0,
FailedEvents = 0,
AverageImpactScore = 0,
ReallocationTriggeredEvents = 0
})
.ToList();
return completeTrends;
}
#endregion
#region 业务专用方法
///
/// 创建任务变更事件
/// 便捷的事件创建方法,自动设置基础信息
///
public async Task CreateTaskChangeEventAsync(
long taskId,
string taskCode,
string eventType,
long? integrationRecordId = null,
long? rootIntegrationRecordId = null,
string changedFields = "",
Dictionary? beforeValues = null,
Dictionary? afterValues = null,
long? triggeredByUserId = null,
string triggeredByUserName = "",
string eventSource = "Manual",
int eventPriority = 5)
{
var eventEntity = new TaskChangeEventEntity
{
TaskId = taskId,
TaskCode = taskCode,
IntegrationRecordId = integrationRecordId,
RootIntegrationRecordId = rootIntegrationRecordId,
EventType = eventType,
ChangedFields = changedFields,
EventTime = DateTime.Now,
TriggeredByUserId = triggeredByUserId,
TriggeredByUserName = triggeredByUserName,
EventSource = eventSource,
EventPriority = eventPriority,
ProcessingStatus = TaskChangeEventProcessingStatus.Pending,
RequiresNotification = true,
NotificationStatus = "NotSent"
};
// 设置变更前后值
if (beforeValues?.Any() == true)
eventEntity.SetBeforeValues(beforeValues);
if (afterValues?.Any() == true)
eventEntity.SetAfterValues(afterValues);
// 计算影响评估评分
eventEntity.ImpactAssessmentScore = eventEntity.CalculateImpactScore();
return await InsertAsync(eventEntity);
}
///
/// 获取可重试的失败事件
/// 支持自动重试机制
///
public async Task> GetRetryableFailedEventsAsync(int limit = 50)
{
return await Select
.Where(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Pending) // 失败后重置为Pending状态
.Where(e => e.RetryCount < e.MaxRetryCount)
.Where(e => !e.NextRetryTime.HasValue || e.NextRetryTime.Value <= DateTime.Now)
.OrderByDescending(e => e.EventPriority)
.OrderBy(e => e.NextRetryTime)
.Take(limit)
.ToListAsync();
}
///
/// 清理过期的已完成事件
/// 定期数据清理,保持系统性能
///
public async Task ArchiveCompletedEventsAsync(DateTime beforeDate)
{
// 这里可以实现将完成的事件迁移到历史表的逻辑
// 暂时使用软删除处理
var archivedCount = await UpdateDiy
.Set(e => e.IsDeleted, true)
.Set(e => e.ModifiedTime, DateTime.Now)
.Where(e => e.ProcessingStatus == TaskChangeEventProcessingStatus.Completed)
.Where(e => e.ProcessingCompletedTime.HasValue && e.ProcessingCompletedTime.Value < beforeDate)
.Where(e => !e.IsDeleted)
.ExecuteAffrowsAsync();
return archivedCount;
}
#endregion
}
///
/// 任务变更事件仓储接口
///
public interface ITaskChangeEventRepository : ZhonTai.Admin.Core.Repositories.IRepositoryBase
{
Task> GetByTaskIdAsync(long taskId);
Task> GetByIntegrationRecordIdAsync(long integrationRecordId);
Task> GetPendingEventsAsync(int limit = 100);
Task> GetOverdueEventsAsync();
Task<(List Events, long Total)> GetByMultipleConditionsAsync(
long? integrationRecordId = null,
long? rootIntegrationRecordId = null,
List? taskIds = null,
List? eventTypes = null,
List? processingStatuses = null,
DateTime? startTime = null,
DateTime? endTime = null,
List? triggeredByUserIds = null,
int? minPriority = null,
int? minImpactScore = null,
bool includeTestEvents = false,
int pageIndex = 1,
int pageSize = 20,
string sortBy = "EventTime",
string sortDirection = "Desc");
Task> GetHighImpactEventsAsync(DateTime startTime, DateTime endTime, int minImpactScore = 80, int limit = 50);
Task> GetBatchProcessingEventsAsync(long eventGroupId);
Task BatchUpdateProcessingStatusAsync(List eventIds, string newStatus, long? processedByUserId = null, string? processedByUserName = null, string? resultSummary = null);
Task MarkEventProcessingAsync(long eventId, long processedByUserId, string processedByUserName);
Task CompleteEventProcessingAsync(long eventId, string resultSummary, bool triggeredReallocation = false, long? createdVersionId = null, string? detailsJson = null);
Task FailEventProcessingAsync(long eventId, string errorMessage, bool shouldRetry = true);
Task CreateTaskChangeEventAsync(long taskId, string taskCode, string eventType, long? integrationRecordId = null, long? rootIntegrationRecordId = null, string changedFields = "", Dictionary? beforeValues = null, Dictionary? afterValues = null, long? triggeredByUserId = null, string triggeredByUserName = "", string eventSource = "Manual", int eventPriority = 5);
Task> GetRetryableFailedEventsAsync(int limit = 50);
Task ArchiveCompletedEventsAsync(DateTime beforeDate);
}
#region 统计数据DTO类
///
/// 事件处理统计数据
///
public class EventProcessingStatistics
{
public DateTime StartTime { get; set; }
public DateTime EndTime { get; set; }
public int TotalEvents { get; set; }
public int CompletedEvents { get; set; }
public int FailedEvents { get; set; }
public int PendingEvents { get; set; }
public int OverdueEvents { get; set; }
public long AverageProcessingTime { get; set; }
public decimal AverageImpactScore { get; set; }
public decimal SuccessRate { get; set; }
public int ReallocationTriggerCount { get; set; }
public int UniqueTaskCount { get; set; }
public int UniqueOperatorCount { get; set; }
}
///
/// 事件类型统计数据
///
public class EventTypeStatistics
{
public string EventType { get; set; } = string.Empty;
public int TotalCount { get; set; }
public int CompletedCount { get; set; }
public int FailedCount { get; set; }
public int PendingCount { get; set; }
public decimal AverageImpactScore { get; set; }
public long AverageProcessingTime { get; set; }
public decimal ReallocationTriggerRate { get; set; }
}
///
/// 事件处理趋势数据
///
public class EventProcessingTrend
{
public DateTime Date { get; set; }
public int TotalEvents { get; set; }
public int CompletedEvents { get; set; }
public int FailedEvents { get; set; }
public decimal AverageImpactScore { get; set; }
public int ReallocationTriggeredEvents { get; set; }
}
#endregion
}