概述
Workflow是LangChat Pro的视觉化工作流编排引擎,支持通过拖拽方式构建复杂的AI应用流程。采用DAG(有向无环图)模型,支持40+种节点类型,包括LLM调用、条件判断、HTTP请求、代码执行等。核心概念
DAG(有向无环图)
Workflow采用DAG模型表示工作流,确保流程的执行顺序和循环依赖的避免。Copy
┌─────────┐
│ Start │
│ Node │
└────┬────┘
│
├─────┬──────┬──────┐
▼ ▼ ▼ ▼
┌────────┐ ┌─────┐ ┌──────┐ ┌──────┐
│ LLM │ │ If │ │ HTTP │ │ Code │
│ Node │ │Node │ │ Node │ │ Node │
└────┬───┘ └──┬──┘ └──┬───┘ └──┬───┘
│ │ │ │
└───┬────┘ └────┬────┘
▼ ▼
┌────────────┐ ┌───────────┐
│ LLM │ │ Merge │
│ Node │ │ Node │
└─────┬──────┘ └─────┬─────┘
│ │
└────────┬─────────┘
▼
┌──────────┐
│ End │
│ Node │
└──────────┘
核心组件
Flow(工作流定义)
Copy
public class Flow {
private String id;
private String name; // 工作流名称
private String description; // 描述
private String version; // 版本
private List<FlowNode> nodes; // 节点列表
private List<FlowEdge> edges; // 边列表
private FlowConfig config; // 配置
private String createdBy; // 创建者
}
public class FlowNode {
private String id;
private String type; // 节点类型
private String name; // 节点名称
private NodeConfig config; // 节点配置
private String position; // 位置(用于UI渲染)
}
public class FlowEdge {
private String id;
private String sourceId; // 源节点ID
private String targetId; // 目标节点ID
private EdgeConfig config; // 边配置
}
WorkflowExecutor
路径:langchat-workflow/langchat-workflow-core/src/main/java/cn/langchat/workflow/executor/WorkflowExecutor.java
职责: 执行工作流,协调各节点的执行顺序
核心方法:
Copy
public interface WorkflowExecutor {
/**
* 执行工作流
*
* @param flow 工作流定义
* @param input 输入数据
* @param listener 执行监听器
* @return 执行结果
*/
WorkflowResult execute(Flow flow, Map<String, Object> input, WorkflowExecutionListener listener);
/**
* 暂停工作流
*/
void pause(String executionId);
/**
* 恢复工作流
*/
void resume(String executionId);
/**
* 停止工作流
*/
void stop(String executionId);
}
实现流程
Copy
@Service
public class WorkflowExecutorImpl implements WorkflowExecutor {
@Resource
private NodeExecutorFactory nodeExecutorFactory;
@Resource
private WorkflowStateStore stateStore;
@Override
public WorkflowResult execute(Flow flow, Map<String, Object> input, WorkflowExecutionListener listener) {
// 1. 构建DAG
DAG dag = buildDAG(flow);
// 2. 创建执行上下文
WorkflowContext context = new WorkflowContext();
context.setFlow(flow);
context.setInput(input);
context.setExecutionId(LcIdUtil.getUUID());
// 3. 保存初始状态
stateStore.save(context.getExecutionId(), new WorkflowState());
// 4. 按拓扑顺序执行节点
List<String> executionOrder = dag.topologicalOrder();
Map<String, Object> result = new HashMap<>(input);
for (String nodeId : executionOrder) {
FlowNode node = flow.getNodes().stream()
.filter(n -> n.getId().equals(nodeId))
.findFirst()
.orElseThrow();
// 检查是否已停止
if (isStopped(context.getExecutionId())) {
break;
}
// 执行节点
listener.onNodeStart(nodeId);
try {
Map<String, Object> nodeInput = prepareNodeInput(node, result, dag);
Map<String, Object> nodeOutput = executeNode(node, nodeInput, context);
// 保存节点输出
result.putAll(nodeOutput);
listener.onNodeComplete(nodeId, nodeOutput);
} catch (Exception e) {
listener.onNodeError(nodeId, e);
throw new WorkflowExecutionException("节点执行失败: " + nodeId, e);
}
}
// 5. 返回结果
return new WorkflowResult(result, context.getExecutionId());
}
/**
* 构建DAG
*/
private DAG buildDAG(Flow flow) {
DAG dag = new DAG();
// 添加节点
for (FlowNode node : flow.getNodes()) {
dag.addNode(node.getId(), node);
}
// 添加边
for (FlowEdge edge : flow.getEdges()) {
dag.addEdge(edge.getSourceId(), edge.getTargetId());
}
// 验证DAG
if (!dag.validate()) {
throw new IllegalArgumentException("工作流包含循环依赖");
}
return dag;
}
/**
* 执行单个节点
*/
private Map<String, Object> executeNode(
FlowNode node,
Map<String, Object> input,
WorkflowContext context
) {
NodeExecutor executor = nodeExecutorFactory.getExecutor(node.getType());
return executor.execute(node, input, context);
}
}
节点类型
1. 基础节点
StartNode(开始节点)
Copy
@Component
public class StartNodeExecutor implements NodeExecutor {
@Override
public String getType() {
return "start";
}
@Override
public Map<String, Object> execute(FlowNode node, Map<String, Object> input, WorkflowContext context) {
// 开始节点,直接返回输入
return input;
}
}
EndNode(结束节点)
Copy
@Component
public class EndNodeExecutor implements NodeExecutor {
@Override
public String getType() {
return "end";
}
@Override
public Map<String, Object> execute(FlowNode node, Map<String, Object> input, WorkflowContext context) {
// 结束节点,标记流程结束
context.setCompleted(true);
return input;
}
}
2. LLM节点
Copy
@Component
public class LLMNodeExecutor implements NodeExecutor {
@Resource
private LcChatService lcChatService;
@Override
public String getType() {
return "llm";
}
@Override
public Map<String, Object> execute(FlowNode node, Map<String, Object> input, WorkflowContext context) {
LLMConfig config = NodeConfigParser.parse(node.getConfig(), LLMConfig.class);
// 构建请求
LcChatReq req = new LcChatReq();
req.setModelId(config.getModelId());
req.setMessage(formatMessage(config.getPrompt(), input));
req.setSystemMessage(config.getSystemMessage());
req.setVariables(config.getVariables());
// 执行调用
TokenStream stream = lcChatService.streamingChat(req);
// 收集结果
StringBuilder result = new StringBuilder();
stream.onComplete(result::append);
Map<String, Object> output = new HashMap<>();
output.put("output", result.toString());
output.put("model", config.getModelId());
return output;
}
}
3. 条件节点
Copy
@Component
public class IfNodeExecutor implements NodeExecutor {
@Override
public String getType() {
return "if";
}
@Override
public Map<String, Object> execute(FlowNode node, Map<String, Object> input, WorkflowContext context) {
IfConfig config = NodeConfigParser.parse(node.getConfig(), IfConfig.class);
// 评估条件
boolean condition = evaluateCondition(config.getCondition(), input);
// 根据条件选择分支
String branch = condition ? config.getTrueBranch() : config.getFalseBranch();
// 设置下一个节点
context.setNextNodeId(branch);
return Map.of("condition", condition, "branch", branch);
}
/**
* 评估条件
*/
private boolean evaluateCondition(String condition, Map<String, Object> input) {
// 支持简单表达式:${input.xxx} == value
// 可以使用SpEL或其他表达式引擎
return SpelUtils.evaluate(condition, input, Boolean.class);
}
}
4. HTTP节点
Copy
@Component
public class HttpNodeExecutor implements NodeExecutor {
@Resource
private RestTemplate restTemplate;
@Override
public String getType() {
return "http";
}
@Override
public Map<String, Object> execute(FlowNode node, Map<String, Object> input, WorkflowContext context) {
HttpConfig config = NodeConfigParser.parse(node.getConfig(), HttpConfig.class);
// 构建请求
HttpMethod method = HttpMethod.valueOf(config.getMethod().toUpperCase());
String url = formatUrl(config.getUrl(), input);
Object body = config.getBody() != null ? formatBody(config.getBody(), input) : null;
// 发送请求
HttpHeaders headers = new HttpHeaders();
if (config.getHeaders() != null) {
config.getHeaders().forEach(headers::add);
}
HttpEntity<Object> entity = new HttpEntity<>(body, headers);
ResponseEntity<String> response = restTemplate.exchange(url, method, entity, String.class);
// 返回结果
Map<String, Object> output = new HashMap<>();
output.put("status", response.getStatusCodeValue());
output.put("body", response.getBody());
output.put("headers", response.getHeaders().toSingleValueMap());
return output;
}
}
5. 代码节点
Copy
@Component
public class CodeNodeExecutor implements NodeExecutor {
@Resource
private GroovyScriptEngine groovyScriptEngine;
@Override
public String getType() {
return "code";
}
@Override
public Map<String, Object> execute(FlowNode node, Map<String, Object> input, WorkflowContext context) {
CodeConfig config = NodeConfigParser.parse(node.getConfig(), CodeConfig.class);
// 绑定变量
Binding binding = new Binding();
input.forEach(binding::setVariable);
// 执行脚本
Object result = groovyScriptEngine.eval(config.getCode(), binding);
Map<String, Object> output = new HashMap<>();
if (result instanceof Map) {
output.putAll((Map<? extends String, ?>) result);
} else {
output.put("output", result);
}
return output;
}
}
状态管理
WorkflowState
Copy
public class WorkflowState {
private String executionId;
private String flowId;
private FlowStatus status; // 执行状态
private String currentNodeId; // 当前节点
private Map<String, Object> data; // 执行数据
private Date startTime;
private Date endTime;
private String errorMessage;
}
public enum FlowStatus {
RUNNING, // 运行中
PAUSED, // 已暂停
COMPLETED, // 已完成
FAILED, // 失败
STOPPED // 已停止
}
WorkflowStateStore
职责: 存储和管理工作流执行状态 实现:Copy
@Service
public class RedisWorkflowStateStore implements WorkflowStateStore {
@Resource
private RedisTemplate<String, String> redisTemplate;
private static final String KEY_PREFIX = "workflow:state:";
@Override
public void save(String executionId, WorkflowState state) {
String key = KEY_PREFIX + executionId;
String value = JsonUtil.toJson(state);
redisTemplate.opsForValue().set(key, value, Duration.ofHours(24));
}
@Override
public WorkflowState get(String executionId) {
String key = KEY_PREFIX + executionId;
String value = redisTemplate.opsForValue().get(key);
return value != null ? JsonUtil.toBean(value, WorkflowState.class) : null;
}
@Override
public void updateStatus(String executionId, FlowStatus status) {
WorkflowState state = get(executionId);
if (state != null) {
state.setStatus(status);
save(executionId, state);
}
}
@Override
public void delete(String executionId) {
String key = KEY_PREFIX + executionId;
redisTemplate.delete(key);
}
}
WorkflowBuilder
职责: 构建和验证工作流 核心方法:Copy
public interface WorkflowBuilder {
/**
* 创建空工作流
*/
Flow create();
/**
* 添加节点
*/
Flow addNode(Flow flow, FlowNode node);
/**
* 添加边
*/
Flow addEdge(Flow flow, FlowEdge edge);
/**
* 验证工作流
*/
boolean validate(Flow flow);
/**
* 导出为JSON
*/
String toJson(Flow flow);
/**
* 从JSON导入
*/
Flow fromJson(String json);
}
配置说明
FlowConfig
Copy
public class FlowConfig {
private Integer timeout; // 超时时间(秒)
private Integer maxRetries; // 最大重试次数
private Boolean enableLog; // 是否启用日志
private Boolean enableMetrics; // 是否启用指标收集
}
节点配置示例
Copy
{
"id": "node-001",
"type": "llm",
"name": "LLM调用",
"config": {
"modelId": "model-gpt4",
"prompt": "根据以下内容生成摘要:\n${input.content}",
"systemMessage": "你是一个专业的AI助手",
"temperature": 0.7,
"maxTokens": 2000
}
}
流式执行
StreamWorkflowExecutor
职责: 支持流式工作流执行,实时返回节点执行结果 实现:Copy
@Service
public class StreamWorkflowExecutor implements WorkflowExecutor {
@Override
public WorkflowResult execute(Flow flow, Map<String, Object> input, WorkflowExecutionListener listener) {
// 使用Reactor实现流式执行
return Flux.fromIterable(flow.getNodes())
.flatMap(node -> executeNodeAsync(node, input, listener))
.collectList()
.map(results -> aggregateResults(results))
.block(); // 或返回Mono实现完全异步
}
private Mono<Map<String, Object>> executeNodeAsync(
FlowNode node,
Map<String, Object> input,
WorkflowExecutionListener listener
) {
return Mono.fromCallable(() -> executeNode(node, input, null))
.doOnSubscribe(s -> listener.onNodeStart(node.getId()))
.doOnSuccess(output -> listener.onNodeComplete(node.getId(), output))
.doOnError(e -> listener.onNodeError(node.getId(), e))
.subscribeOn(Schedulers.boundedElastic());
}
}
扩展示例
自定义节点
步骤1: 实现NodeExecutorCopy
@Component
public class CustomNodeExecutor implements NodeExecutor {
@Resource
private SomeService someService;
@Override
public String getType() {
return "custom";
}
@Override
public Map<String, Object> execute(FlowNode node, Map<String, Object> input, WorkflowContext context) {
CustomConfig config = NodeConfigParser.parse(node.getConfig(), CustomConfig.class);
// 自定义逻辑
String result = someService.execute(config, input);
Map<String, Object> output = new HashMap<>();
output.put("output", result);
return output;
}
}
最佳实践
1. 节点设计
- 单一职责: 每个节点只做一件事
- 明确输入输出: 定义清晰的输入输出结构
- 错误处理: 提供详细的错误信息
- 可测试: 节点应该易于单元测试
2. 工作流设计
- 避免复杂: 工作流不宜过于复杂
- 清晰命名: 节点和边要有清晰的名称
- 合理分支: 条件分支要逻辑清晰
- 性能考虑: 考虑并行执行可能性
3. 状态管理
- 持久化: 状态应该持久化到Redis
- TTL: 设置合理的过期时间
- 监控: 提供执行状态查询接口
- 清理: 及时清理过期状态
4. 错误处理
- 重试机制: 设置合理的重试次数
- 超时控制: 避免长时间阻塞
- 降级策略: 提供降级方案
- 日志记录: 详细记录错误信息
性能优化
1. 并行执行
Copy
// 支持并行执行无依赖的节点
List<FlowNode> parallelNodes = findIndependentNodes(dag);
List<Map<String, Object>> results = parallelNodes.parallelStream()
.map(node -> executeNode(node, input, context))
.collect(Collectors.toList());
2. 缓存优化
Copy
@Cacheable(value = "workflow", key = "#node.id + ':' + #input.hashCode()")
public Map<String, Object> executeNode(FlowNode node, Map<String, Object> input, WorkflowContext context) {
// 节点执行逻辑
}
3. 异步执行
Copy
@Async("workflowExecutor")
public CompletableFuture<WorkflowResult> executeAsync(Flow flow, Map<String, Object> input) {
return CompletableFuture.completedFuture(execute(flow, input, null));
}

