Skip to content

任务模型

本文介绍 Fredica 的异步任务系统,包括核心数据模型、状态机、DAG 调度机制、任务控制(取消/暂停/恢复)以及启动恢复流程。


概览

Fredica 的后台处理(下载、转码、字幕提取、AI 分析等)均通过异步任务队列驱动。整个系统由三层组成:

WorkflowRun(工作流运行实例)
  └─ Task × N(具体工作单元,支持 DAG 依赖)
       └─ TaskExecutor(执行逻辑,每种 type 对应一个实现)
  • WorkflowRun:一次处理流程的容器,汇总所有子任务的整体状态和进度。
  • Task:最小工作单元,携带执行参数(payload)和前置依赖(depends_on)。
  • TaskExecutor:Kotlin 接口,每种任务类型对应一个实现,由 WorkerEngine 在运行时调用。

Task 数据模型

定义位于 shared/src/commonMain/kotlin/.../db/Task.kt

核心字段

字段类型说明
idString (UUID)任务唯一标识
typeString任务类型,对应一个 TaskExecutor,如 DOWNLOAD_VIDEOFETCH_SUBTITLE
workflow_run_idString所属工作流运行实例 ID
material_idString关联素材 ID
statusString当前状态(见状态机)
priorityInt调度优先级,数字越大越优先;同优先级按 created_at 升序(FIFO)
depends_onString (JSON 数组)前置任务 ID 列表,全部 completed 才允许认领
payloadString (JSON)执行参数,由各 Executor 自定义格式
resultString? (JSON)执行结果,成功时由 Executor 写入,下游任务可读取
errorString?最后一次失败的错误信息(human-readable)
error_typeString?错误类型标签,如 TIMEOUTIO_ERROR,便于按类型统计
idempotency_keyString?幂等键;相同 key 的任务只插入一条(ON CONFLICT DO NOTHING
retry_countInt已重试次数(首次执行不计入)
max_retriesInt最大重试次数,超过后永久 failed;默认 0(不重试)
progressInt (0–100)执行进度,由 Executor 通过 TaskRepo.updateProgress() 实时写入
is_pausedBoolean是否处于暂停状态;仅在 status=running 时有意义
is_pausableBoolean是否支持暂停;由 Python 端透传,false 时前端禁用暂停按钮

时间戳字段

字段说明
created_at创建时间(Unix 秒)
claimed_at被认领时间
started_at开始执行时间(进入 running 时记录)
completed_at完成时间(进入 completed/failed/cancelled 时记录)
heartbeat_atWorker 最后心跳时间(预留,Phase 3 死亡检测用)

Task 状态机

状态说明

状态含义
pending等待被认领,满足依赖条件后可被 claimNext() 取走
claimed已被 Worker 认领,尚未开始执行(过渡态,通常极短暂)
running正在执行中,可能处于暂停状态(is_paused=true
completed执行成功,result 字段含输出数据
failed执行失败且重试次数耗尽,永久终态
cancelled用户主动取消,永久终态

pendingclaimed 在前端统一展示为"等待中";runningis_paused=true 展示为"已暂停"。


WorkflowRun 数据模型

定义位于 shared/src/commonMain/kotlin/.../db/WorkflowRun.kt

一次 WorkflowRun 代表对某个素材执行一次完整处理流程,是一组 Task 的容器。

字段类型说明
idString (UUID)运行实例唯一标识
material_idString关联素材 ID
templateString工作流模板标识,如 manual_download_bilibili_video
statusString汇总状态(见下方状态机)
total_tasksInt子任务总数(由 recalculate() 维护)
done_tasksInt已完成子任务数(completed 状态的任务数)
created_atLong创建时间(Unix 秒)
completed_atLong?进入 completed 状态的时间

WorkflowRun 状态机

WorkflowRun 的状态不由调用方直接写入,而是由 WorkflowRunRepo.recalculate() 根据子任务实际状态汇总计算,在每次 Task 状态变更后由 WorkerEngine 触发。


DAG 调度

Task.depends_on 是一个 JSON 数组,存放前置任务 ID:

json
["task-uuid-a", "task-uuid-b"]

TaskRepo.claimNext() 只会认领满足以下条件的任务:

  1. status = 'pending'
  2. depends_on 中所有任务均已 completed
  3. priority DESCcreated_at ASC 取第一条(原子操作,防并发重复认领)

这样,一个 WorkflowRun 下的多个 Task 自然形成有向无环图(DAG),无需额外调度器。

典型流水线示例

FETCH_SUBTITLE ──→ WEBEN_CONCEPT_EXTRACT
DOWNLOAD_VIDEO ──→ TRANSCODE_MP4

TaskExecutor 接口

定义位于 shared/src/commonMain/kotlin/.../worker/TaskExecutor.kt

kotlin
interface TaskExecutor {
    val taskType: String          // 与 task.type 对应,如 "DOWNLOAD_VIDEO"

    suspend fun execute(task: Task): ExecuteResult

    fun canSkip(task: Task): Boolean = false   // 前置结果已存在时跳过

    suspend fun onTaskFailed(task: Task, result: ExecuteResult) {}  // 失败/取消回调
}

ExecuteResult

kotlin
data class ExecuteResult(
    val result: String = "{}",    // 成功时写入 task.result 的 JSON
    val error: String? = null,    // 失败时的错误信息
    val errorType: String? = null // 错误类型标签
)

error == null 表示成功,否则失败。errorType 特殊值:

errorType含义
CANCELLED用户主动取消,不触发重试
AWAITING_CREDENTIAL等待用户配置凭据,不自动重试
PAYLOAD_ERRORpayload 解析失败,不重试
其他可重试(受 max_retries 限制)

onTaskFailed 回调

当任务永久失败或被取消时,WorkerEngine 会调用 executor.onTaskFailed(),供 Executor 处理业务副作用(如将关联的 WebenSource.analysisStatus 重置为 "failed")。

默认空实现,不需要副作用的 Executor 无需覆写。


WorkerEngine 调度引擎

定义位于 shared/src/commonMain/kotlin/.../worker/WorkerEngine.kt

启动
  └─ runStartupRecovery()(同步,见下文)
      └─ 主轮询协程(每 1s 尝试 claimNext)
           └─ 认领成功 → launch 独立协程 → Semaphore(maxWorkers) 限并发
                └─ dispatch(task)
                     ├─ canSkip? → completed(跳过)
                     ├─ running → execute()
                     ├─ 成功 → completed → recalculate()
                     ├─ 失败可重试 → pending(retry_count++)
                     └─ 失败不可重试 → failed → onTaskFailed() → recalculate()

队列为空时自动退避到 5s 轮询间隔,减少空轮询开销。


任务控制:取消 / 暂停 / 恢复

取消

  • pending/claimed 任务:直接更新 DB 状态为 cancelledTaskRepo.cancelPendingTasksByWorkflowRun()
  • running 任务:通过 TaskCancelService 向 Executor 发送 CompletableDeferred<Unit> 取消信号;Executor 检测到信号后返回 ExecuteResult(errorType="CANCELLED")
kotlin
// Executor 内部检测取消
if (cancelSignal.isCompleted) return@withContext null

暂停 / 恢复

通过 TaskPauseResumeService 管理每个运行中任务的 Channel 对:

kotlin
data class TaskPauseResumeChannels(
    val pause: Channel<Unit>,
    val resume: Channel<Unit>,
)

Executor 在长任务循环中监听 pauseChannel,收到信号后挂起等待 resumeChannel

Python 主动发起暂停(pause_request / resume_request)

上述暂停/恢复信号均由外部(前端/Kotlin)主动下发给 Python。但某些场景下 Python 需要反向通知 Kotlin,例如:

  • GPU 显存不足,需要等待资源释放后再继续
  • 子进程检测到外部依赖未就绪,主动挂起等待

为此,WebSocket 协议新增两种服务端推送消息(Python → Kotlin):

json
{"type": "pause_request", "reason": "gpu_oom"}
{"type": "resume_request"}

信号流

EventLoop 版TaskEndpointInEventLoopThread):

_run() 调 await self.request_pause(reason)
  → send_json({"type":"pause_request", "reason":...})  → Kotlin onPauseRequest 回调
  → wait_if_paused()  ← 挂起,等 Kotlin 回发 resume 命令(用户手动恢复)

SubProcess 版TaskEndpointInSubProcess):

子进程调 subprocess_request_pause(status_queue, resume_event, reason)
  → resume_event.clear()(子进程自己先清,确保后续 wait() 会阻塞)
  → status_queue.put({"type":"pause_request", "reason":...})

父进程 _on_subprocess_message 识别特殊消息
  → send_json({"type":"pause_request"})  → Kotlin onPauseRequest 回调
  → 同步 _is_pause 标志位

子进程 resume_event.wait()  ← 挂起
  ← 用户手动恢复 → Kotlin 发 resume 命令 → call_resume() set resume_event → 子进程解除

子进程版本的关键时序:clearputwait,避免父进程还未处理消息时子进程的 wait() 因 event 仍为 set 而直接返回。

Kotlin 侧(websocketTask 新增参数)

kotlin
suspend fun websocketTask(
    // ...原有参数...
    onPauseRequest: (suspend (reason: String) -> Unit)? = null,
    onResumeRequest: (suspend () -> Unit)? = null,
): String?

Executor 使用示例

kotlin
val result = PythonUtil.Py314Embed.PyUtilServer.websocketTask(
    pth = "/asr/transcribe",
    paramJson = "...",
    onProgress = { pct -> TaskService.repo.updateProgress(task.id, pct) },
    onPauseRequest = { reason ->
        TaskService.repo.updatePaused(task.id, true)
        logger.info("Task ${task.id} self-paused by Python: $reason")
    },
    onResumeRequest = {
        TaskService.repo.updatePaused(task.id, false)
    },
    cancelSignal = cancelSignal,
    pauseChannel = channels.pause,
    resumeChannel = channels.resume,
)

两种子场景

场景Python 行为Kotlin 回调职责
主动暂停,等用户手动恢复pause_request + wait_if_paused()onPauseRequest 更新 is_paused=true;用户点恢复后正常走 TaskPauseResumeService.resume()
自行暂停又自行恢复(等资源)pause_request + wait,资源就绪后发 resume_requestonPauseRequest/onResumeRequest 仅同步 is_paused 状态,无需用户介入

信号注册生命周期

WebSocketTaskExecutor 基类统一管理信号的注册与注销,Executor 子类无需手动处理:

kotlin
// 基类 execute() 内部
val cancelSignal = TaskCancelService.register(task.id)
val channels = TaskPauseResumeService.register(task.id)
try {
    return executeWithSignals(task, cancelSignal, channels)
} finally {
    TaskCancelService.unregister(task.id)
    TaskPauseResumeService.unregister(task.id)
}

启动恢复

APP 强杀后重启时,WorkerEngine.start() 在启动轮询前同步执行四道恢复保护:

步骤操作原因
1. 快照记录所有非终态任务为重启日志提供原始状态
2. 重置僵尸任务running → failedclaimed → pending执行结果不确定,running 标记失败;claimed 可安全重新入队
3. 写重启日志写入 restart_task_log可追溯哪些任务被中断
4. 孤立任务对账无对应 WorkflowRun 的非终态任务 → failed防止僵尸任务永远占用队列
5. WorkflowRun 对账reconcileNonTerminal() 批量修正汇总状态补偿上次会话中 recalculate() 失败遗留的落后状态

恢复步骤同步完成后 start() 才返回,确保调用方在此后插入的新任务不会被误取消。


幂等键

Task.idempotency_key 防止并发重复提交相同任务。

语义:只对活跃任务去重

幂等键仅对非终态任务(pending / claimed / running)生效。已进入终态(completed / failed / cancelled)的任务不会阻塞相同 key 的新任务创建。

插入新任务时:
  idempotency_key 有值
    → 查询是否存在相同 key 且 status NOT IN (completed, failed, cancelled) 的任务
    → 存在 → 跳过插入,logger.info 提示
    → 不存在 → 正常插入
  idempotency_key 为 null → 不做去重,直接插入

典型用法:以 "${materialId}:DOWNLOAD_VIDEO" 作为幂等键,确保同一素材的下载任务不会并发重复入队。任务完成后,相同 key 可以重新触发(例如用户手动重新处理)。

已完成资源的跳过逻辑由 TaskExecutor.canSkip() 负责,不依赖幂等键。


职责边界

职责
TaskDb只操作 task 表,不含 recalculate()
WorkflowRunDb管理 workflow_run 表,含 recalculate()
WorkerEngine任务完成后调用 WorkflowRunService.repo.recalculate()不是 TaskService
TaskCancelService运行中任务取消信号注册表(commonMain
TaskPauseResumeService暂停/恢复 Channel 注册表(commonMain
WebSocketTaskExecutor信号注册/注销生命周期基类,Executor 子类继承后只需实现 executeWithSignals()

Fredica — AI 视频工坊