Skip to main content

概述

Workflow是LangChat Pro的视觉化工作流编排引擎,支持通过拖拽方式构建复杂的AI应用流程。采用DAG(有向无环图)模型,支持40+种节点类型,包括LLM调用、条件判断、HTTP请求、代码执行等。

核心概念

DAG(有向无环图)

Workflow采用DAG模型表示工作流,确保流程的执行顺序和循环依赖的避免。
┌─────────┐
│ Start   │
│  Node   │
└────┬────┘

     ├─────┬──────┬──────┐
     ▼     ▼      ▼      ▼
┌────────┐ ┌─────┐ ┌──────┐ ┌──────┐
│  LLM   │ │ If  │ │ HTTP │ │ Code │
│  Node  │ │Node │ │ Node │ │ Node │
└────┬───┘ └──┬──┘ └──┬───┘ └──┬───┘
     │        │        │         │
     └───┬────┘        └────┬────┘
         ▼                  ▼
     ┌────────────┐     ┌───────────┐
     │   LLM      │     │   Merge    │
     │   Node     │     │   Node     │
     └─────┬──────┘     └─────┬─────┘
           │                  │
           └────────┬─────────┘

              ┌──────────┐
              │  End     │
              │  Node    │
              └──────────┘

核心组件

Flow(工作流定义)

public class Flow {
    private String id;
    private String name;               // 工作流名称
    private String description;        // 描述
    private String version;           // 版本
    private List<FlowNode> nodes;     // 节点列表
    private List<FlowEdge> edges;     // 边列表
    private FlowConfig config;         // 配置
    private String createdBy;          // 创建者
}

public class FlowNode {
    private String id;
    private String type;               // 节点类型
    private String name;               // 节点名称
    private NodeConfig config;         // 节点配置
    private String position;           // 位置(用于UI渲染)
}

public class FlowEdge {
    private String id;
    private String sourceId;           // 源节点ID
    private String targetId;           // 目标节点ID
    private EdgeConfig config;         // 边配置
}

WorkflowExecutor

路径: langchat-workflow/langchat-workflow-core/src/main/java/cn/langchat/workflow/executor/WorkflowExecutor.java 职责: 执行工作流,协调各节点的执行顺序 核心方法:
public interface WorkflowExecutor {
    
    /**
     * 执行工作流
     *
     * @param flow       工作流定义
     * @param input      输入数据
     * @param listener   执行监听器
     * @return 执行结果
     */
    WorkflowResult execute(Flow flow, Map<String, Object> input, WorkflowExecutionListener listener);
    
    /**
     * 暂停工作流
     */
    void pause(String executionId);
    
    /**
     * 恢复工作流
     */
    void resume(String executionId);
    
    /**
     * 停止工作流
     */
    void stop(String executionId);
}

实现流程

@Service
public class WorkflowExecutorImpl implements WorkflowExecutor {
    
    @Resource
    private NodeExecutorFactory nodeExecutorFactory;
    
    @Resource
    private WorkflowStateStore stateStore;
    
    @Override
    public WorkflowResult execute(Flow flow, Map<String, Object> input, WorkflowExecutionListener listener) {
        // 1. 构建DAG
        DAG dag = buildDAG(flow);
        
        // 2. 创建执行上下文
        WorkflowContext context = new WorkflowContext();
        context.setFlow(flow);
        context.setInput(input);
        context.setExecutionId(LcIdUtil.getUUID());
        
        // 3. 保存初始状态
        stateStore.save(context.getExecutionId(), new WorkflowState());
        
        // 4. 按拓扑顺序执行节点
        List<String> executionOrder = dag.topologicalOrder();
        Map<String, Object> result = new HashMap<>(input);
        
        for (String nodeId : executionOrder) {
            FlowNode node = flow.getNodes().stream()
                .filter(n -> n.getId().equals(nodeId))
                .findFirst()
                .orElseThrow();
            
            // 检查是否已停止
            if (isStopped(context.getExecutionId())) {
                break;
            }
            
            // 执行节点
            listener.onNodeStart(nodeId);
            try {
                Map<String, Object> nodeInput = prepareNodeInput(node, result, dag);
                Map<String, Object> nodeOutput = executeNode(node, nodeInput, context);
                
                // 保存节点输出
                result.putAll(nodeOutput);
                listener.onNodeComplete(nodeId, nodeOutput);
                
            } catch (Exception e) {
                listener.onNodeError(nodeId, e);
                throw new WorkflowExecutionException("节点执行失败: " + nodeId, e);
            }
        }
        
        // 5. 返回结果
        return new WorkflowResult(result, context.getExecutionId());
    }
    
    /**
     * 构建DAG
     */
    private DAG buildDAG(Flow flow) {
        DAG dag = new DAG();
        
        // 添加节点
        for (FlowNode node : flow.getNodes()) {
            dag.addNode(node.getId(), node);
        }
        
        // 添加边
        for (FlowEdge edge : flow.getEdges()) {
            dag.addEdge(edge.getSourceId(), edge.getTargetId());
        }
        
        // 验证DAG
        if (!dag.validate()) {
            throw new IllegalArgumentException("工作流包含循环依赖");
        }
        
        return dag;
    }
    
    /**
     * 执行单个节点
     */
    private Map<String, Object> executeNode(
        FlowNode node,
        Map<String, Object> input,
        WorkflowContext context
    ) {
        NodeExecutor executor = nodeExecutorFactory.getExecutor(node.getType());
        return executor.execute(node, input, context);
    }
}

节点类型

1. 基础节点

StartNode(开始节点)

@Component
public class StartNodeExecutor implements NodeExecutor {
    
    @Override
    public String getType() {
        return "start";
    }
    
    @Override
    public Map<String, Object> execute(FlowNode node, Map<String, Object> input, WorkflowContext context) {
        // 开始节点,直接返回输入
        return input;
    }
}

EndNode(结束节点)

@Component
public class EndNodeExecutor implements NodeExecutor {
    
    @Override
    public String getType() {
        return "end";
    }
    
    @Override
    public Map<String, Object> execute(FlowNode node, Map<String, Object> input, WorkflowContext context) {
        // 结束节点,标记流程结束
        context.setCompleted(true);
        return input;
    }
}

2. LLM节点

@Component
public class LLMNodeExecutor implements NodeExecutor {
    
    @Resource
    private LcChatService lcChatService;
    
    @Override
    public String getType() {
        return "llm";
    }
    
    @Override
    public Map<String, Object> execute(FlowNode node, Map<String, Object> input, WorkflowContext context) {
        LLMConfig config = NodeConfigParser.parse(node.getConfig(), LLMConfig.class);
        
        // 构建请求
        LcChatReq req = new LcChatReq();
        req.setModelId(config.getModelId());
        req.setMessage(formatMessage(config.getPrompt(), input));
        req.setSystemMessage(config.getSystemMessage());
        req.setVariables(config.getVariables());
        
        // 执行调用
        TokenStream stream = lcChatService.streamingChat(req);
        
        // 收集结果
        StringBuilder result = new StringBuilder();
        stream.onComplete(result::append);
        
        Map<String, Object> output = new HashMap<>();
        output.put("output", result.toString());
        output.put("model", config.getModelId());
        
        return output;
    }
}

3. 条件节点

@Component
public class IfNodeExecutor implements NodeExecutor {
    
    @Override
    public String getType() {
        return "if";
    }
    
    @Override
    public Map<String, Object> execute(FlowNode node, Map<String, Object> input, WorkflowContext context) {
        IfConfig config = NodeConfigParser.parse(node.getConfig(), IfConfig.class);
        
        // 评估条件
        boolean condition = evaluateCondition(config.getCondition(), input);
        
        // 根据条件选择分支
        String branch = condition ? config.getTrueBranch() : config.getFalseBranch();
        
        // 设置下一个节点
        context.setNextNodeId(branch);
        
        return Map.of("condition", condition, "branch", branch);
    }
    
    /**
     * 评估条件
     */
    private boolean evaluateCondition(String condition, Map<String, Object> input) {
        // 支持简单表达式:${input.xxx} == value
        // 可以使用SpEL或其他表达式引擎
        return SpelUtils.evaluate(condition, input, Boolean.class);
    }
}

4. HTTP节点

@Component
public class HttpNodeExecutor implements NodeExecutor {
    
    @Resource
    private RestTemplate restTemplate;
    
    @Override
    public String getType() {
        return "http";
    }
    
    @Override
    public Map<String, Object> execute(FlowNode node, Map<String, Object> input, WorkflowContext context) {
        HttpConfig config = NodeConfigParser.parse(node.getConfig(), HttpConfig.class);
        
        // 构建请求
        HttpMethod method = HttpMethod.valueOf(config.getMethod().toUpperCase());
        String url = formatUrl(config.getUrl(), input);
        Object body = config.getBody() != null ? formatBody(config.getBody(), input) : null;
        
        // 发送请求
        HttpHeaders headers = new HttpHeaders();
        if (config.getHeaders() != null) {
            config.getHeaders().forEach(headers::add);
        }
        
        HttpEntity<Object> entity = new HttpEntity<>(body, headers);
        ResponseEntity<String> response = restTemplate.exchange(url, method, entity, String.class);
        
        // 返回结果
        Map<String, Object> output = new HashMap<>();
        output.put("status", response.getStatusCodeValue());
        output.put("body", response.getBody());
        output.put("headers", response.getHeaders().toSingleValueMap());
        
        return output;
    }
}

5. 代码节点

@Component
public class CodeNodeExecutor implements NodeExecutor {
    
    @Resource
    private GroovyScriptEngine groovyScriptEngine;
    
    @Override
    public String getType() {
        return "code";
    }
    
    @Override
    public Map<String, Object> execute(FlowNode node, Map<String, Object> input, WorkflowContext context) {
        CodeConfig config = NodeConfigParser.parse(node.getConfig(), CodeConfig.class);
        
        // 绑定变量
        Binding binding = new Binding();
        input.forEach(binding::setVariable);
        
        // 执行脚本
        Object result = groovyScriptEngine.eval(config.getCode(), binding);
        
        Map<String, Object> output = new HashMap<>();
        if (result instanceof Map) {
            output.putAll((Map<? extends String, ?>) result);
        } else {
            output.put("output", result);
        }
        
        return output;
    }
}

状态管理

WorkflowState

public class WorkflowState {
    private String executionId;
    private String flowId;
    private FlowStatus status;           // 执行状态
    private String currentNodeId;        // 当前节点
    private Map<String, Object> data;   // 执行数据
    private Date startTime;
    private Date endTime;
    private String errorMessage;
}

public enum FlowStatus {
    RUNNING,     // 运行中
    PAUSED,      // 已暂停
    COMPLETED,   // 已完成
    FAILED,      // 失败
    STOPPED      // 已停止
}

WorkflowStateStore

职责: 存储和管理工作流执行状态 实现:
@Service
public class RedisWorkflowStateStore implements WorkflowStateStore {
    
    @Resource
    private RedisTemplate<String, String> redisTemplate;
    
    private static final String KEY_PREFIX = "workflow:state:";
    
    @Override
    public void save(String executionId, WorkflowState state) {
        String key = KEY_PREFIX + executionId;
        String value = JsonUtil.toJson(state);
        redisTemplate.opsForValue().set(key, value, Duration.ofHours(24));
    }
    
    @Override
    public WorkflowState get(String executionId) {
        String key = KEY_PREFIX + executionId;
        String value = redisTemplate.opsForValue().get(key);
        return value != null ? JsonUtil.toBean(value, WorkflowState.class) : null;
    }
    
    @Override
    public void updateStatus(String executionId, FlowStatus status) {
        WorkflowState state = get(executionId);
        if (state != null) {
            state.setStatus(status);
            save(executionId, state);
        }
    }
    
    @Override
    public void delete(String executionId) {
        String key = KEY_PREFIX + executionId;
        redisTemplate.delete(key);
    }
}

WorkflowBuilder

职责: 构建和验证工作流 核心方法:
public interface WorkflowBuilder {
    
    /**
     * 创建空工作流
     */
    Flow create();
    
    /**
     * 添加节点
     */
    Flow addNode(Flow flow, FlowNode node);
    
    /**
     * 添加边
     */
    Flow addEdge(Flow flow, FlowEdge edge);
    
    /**
     * 验证工作流
     */
    boolean validate(Flow flow);
    
    /**
     * 导出为JSON
     */
    String toJson(Flow flow);
    
    /**
     * 从JSON导入
     */
    Flow fromJson(String json);
}

配置说明

FlowConfig

public class FlowConfig {
    private Integer timeout;           // 超时时间(秒)
    private Integer maxRetries;        // 最大重试次数
    private Boolean enableLog;         // 是否启用日志
    private Boolean enableMetrics;     // 是否启用指标收集
}

节点配置示例

{
  "id": "node-001",
  "type": "llm",
  "name": "LLM调用",
  "config": {
    "modelId": "model-gpt4",
    "prompt": "根据以下内容生成摘要:\n${input.content}",
    "systemMessage": "你是一个专业的AI助手",
    "temperature": 0.7,
    "maxTokens": 2000
  }
}

流式执行

StreamWorkflowExecutor

职责: 支持流式工作流执行,实时返回节点执行结果 实现:
@Service
public class StreamWorkflowExecutor implements WorkflowExecutor {
    
    @Override
    public WorkflowResult execute(Flow flow, Map<String, Object> input, WorkflowExecutionListener listener) {
        // 使用Reactor实现流式执行
        
        return Flux.fromIterable(flow.getNodes())
            .flatMap(node -> executeNodeAsync(node, input, listener))
            .collectList()
            .map(results -> aggregateResults(results))
            .block(); // 或返回Mono实现完全异步
    }
    
    private Mono<Map<String, Object>> executeNodeAsync(
        FlowNode node,
        Map<String, Object> input,
        WorkflowExecutionListener listener
    ) {
        return Mono.fromCallable(() -> executeNode(node, input, null))
            .doOnSubscribe(s -> listener.onNodeStart(node.getId()))
            .doOnSuccess(output -> listener.onNodeComplete(node.getId(), output))
            .doOnError(e -> listener.onNodeError(node.getId(), e))
            .subscribeOn(Schedulers.boundedElastic());
    }
}

扩展示例

自定义节点

步骤1: 实现NodeExecutor
@Component
public class CustomNodeExecutor implements NodeExecutor {
    
    @Resource
    private SomeService someService;
    
    @Override
    public String getType() {
        return "custom";
    }
    
    @Override
    public Map<String, Object> execute(FlowNode node, Map<String, Object> input, WorkflowContext context) {
        CustomConfig config = NodeConfigParser.parse(node.getConfig(), CustomConfig.class);
        
        // 自定义逻辑
        String result = someService.execute(config, input);
        
        Map<String, Object> output = new HashMap<>();
        output.put("output", result);
        return output;
    }
}
步骤2: 使用@Component注解 Spring会自动扫描并注册到NodeExecutorFactory

最佳实践

1. 节点设计

  • 单一职责: 每个节点只做一件事
  • 明确输入输出: 定义清晰的输入输出结构
  • 错误处理: 提供详细的错误信息
  • 可测试: 节点应该易于单元测试

2. 工作流设计

  • 避免复杂: 工作流不宜过于复杂
  • 清晰命名: 节点和边要有清晰的名称
  • 合理分支: 条件分支要逻辑清晰
  • 性能考虑: 考虑并行执行可能性

3. 状态管理

  • 持久化: 状态应该持久化到Redis
  • TTL: 设置合理的过期时间
  • 监控: 提供执行状态查询接口
  • 清理: 及时清理过期状态

4. 错误处理

  • 重试机制: 设置合理的重试次数
  • 超时控制: 避免长时间阻塞
  • 降级策略: 提供降级方案
  • 日志记录: 详细记录错误信息

性能优化

1. 并行执行

// 支持并行执行无依赖的节点
List<FlowNode> parallelNodes = findIndependentNodes(dag);
List<Map<String, Object>> results = parallelNodes.parallelStream()
    .map(node -> executeNode(node, input, context))
    .collect(Collectors.toList());

2. 缓存优化

@Cacheable(value = "workflow", key = "#node.id + ':' + #input.hashCode()")
public Map<String, Object> executeNode(FlowNode node, Map<String, Object> input, WorkflowContext context) {
    // 节点执行逻辑
}

3. 异步执行

@Async("workflowExecutor")
public CompletableFuture<WorkflowResult> executeAsync(Flow flow, Map<String, Object> input) {
    return CompletableFuture.completedFuture(execute(flow, input, null));
}

参考文档