Skip to main content

创建新工作流节点指南(后端)

概述

本指南将帮助您快速创建新的工作流节点。基于SpringBoot3 + LangGraph4j + Hutool技术栈,新的节点系统采用了标准化的结构设计,使得创建节点变得非常简单和规范。

节点创建步骤

1. 创建节点文件夹

langchat-workflow/langchat-workflow-core/src/main/java/cn/langchat/workflow/core/node/ 目录下创建新的文件夹,文件夹名称使用小写格式:
mkdir my_custom_node

2. 创建必需的三个Java文件

每个节点都必须包含以下三个文件:

2.1 节点主类 (XxxNode.java)

这是节点的核心实现类,必须实现 WorkflowNode<T> 接口:
package cn.langchat.workflow.core.node.my_custom_node;

import cn.langchat.workflow.core.node.common.BaseContext;
import cn.langchat.workflow.core.node.common.CommonParamFields;
import cn.langchat.workflow.core.node.common.WorkflowNode;
import cn.langchat.workflow.core.message.WorkflowMessagePusher;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * @author tycoding
 * @since 2025/8/28
 */
@Slf4j
@Component
public class MyCustomNode implements WorkflowNode<MyCustomContext> {

    @Override
    public String getNodeName() {
        return this.getClass().getSimpleName();
    }

    @Override
    public Class<MyCustomContext> getContextType() {
        return MyCustomContext.class;
    }

    @Override
    public Map<String, Object> process(MyCustomContext context, Map<String, Object> state) {
        // 表单校验
        BaseContext.ValidationResult validationResult = BaseContext.validateParams(
                context.getParamSchemas(),
                context.getParams(),
                CommonParamFields.INPUT
        );
        if (!validationResult.isValid()) {
            throw new RuntimeException("表单校验失败: " + validationResult.getFirstError());
        }

        // 获取输入内容
        String input = context.getInput(state);

        // 实现节点业务逻辑
        String result = processCustomLogic(input);

        // 返回处理结果(使用 WorkflowMessagePusher 统一输出)
        return WorkflowMessagePusher.addNodeOutput(state, context, result);
    }

    private String processCustomLogic(String input) {
        // 在这里实现具体的业务逻辑
        return "处理结果: " + input;
    }
}

2.2 节点上下文类 (XxxContext.java)

这是节点的上下文类,继承自 BaseContext
package cn.langchat.workflow.core.node.my_custom_node;

import cn.langchat.workflow.core.node.common.BaseContext;
import lombok.Data;
import lombok.EqualsAndHashCode;

import java.util.Map;

/**
 * @author tycoding
 * @since 2025/8/28
 */
@EqualsAndHashCode(callSuper = true)
@Data
public class MyCustomContext extends BaseContext {

    /**
     * 获取自定义参数值
     */
    public String getCustomParam(Map<String, Object> state) {
        return getParamValue(state, MyCustomParamFields.CUSTOM_PARAM, String.class);
    }

    /**
     * 获取数值参数
     */
    public Integer getNumberParam(Map<String, Object> state) {
        Integer value = getParamValue(state, MyCustomParamFields.NUMBER_PARAM, Integer.class);
        return value != null ? value : 10; // 默认值
    }
}

2.3 节点参数字段枚举 (XxxParamFields.java)

这是节点参数的字段定义,使用枚举形式:
package cn.langchat.workflow.core.node.my_custom_node;

import lombok.Getter;

/**
 * 自定义节点参数字段枚举
 * 定义MyCustomNode特有的表单字段名
 *
 * @author tycoding
 * @since 2025/8/27
 */
@Getter
public enum MyCustomParamFields {

    /**
     * 自定义参数
     */
    CUSTOM_PARAM("customParam", "自定义参数"),

    /**
     * 数值参数
     */
    NUMBER_PARAM("numberParam", "数值参数"),
    ;

    /**
     * 字段名
     */
    private final String fieldName;

    /**
     * 字段描述
     */
    private final String description;

    MyCustomParamFields(String fieldName, String description) {
        this.fieldName = fieldName;
        this.description = description;
    }

    @Override
    public String toString() {
        return fieldName;
    }
}

3. 核心接口和基类说明

3.1 WorkflowNode<T> 接口

所有节点都必须实现这个接口:
public interface WorkflowNode<T extends BaseContext> {
    String getNodeName();
    Class<T> getContextType();
    Map<String, Object> process(T context, Map<String, Object> state);
}

3.2 BaseContext 基类

提供通用的上下文功能:
  • 参数验证
  • 参数值获取
  • 变量替换
  • 基础数据管理
  • getAlias() - 获取节点别名,用于变量输出

3.3 CommonParamFields 通用字段

定义所有节点共享的参数字段:
public enum CommonParamFields {
    INPUT("input", "输入内容");
}

4. 节点输出管理

节点输出通过 WorkflowMessagePusher 统一处理,这是所有消息推送和变量输出的单一入口:

4.1 单变量输出(推荐)

大多数节点只输出一个 text 字段,使用 WorkflowMessagePusher.addNodeOutput()
@Override
public Map<String, Object> process(MyCustomContext context, Map<String, Object> state) {
    // ... 业务逻辑处理 ...
    String result = "处理结果";

    // 输出单个 text 变量(推荐方式)
    return WorkflowMessagePusher.addNodeOutput(state, context, result);
}
输出存储格式:
state["alias.text"] = "处理结果"     // 路径访问: {{#alias.text#}}
state["alias"] = {text: "处理结果"}  // 对象访问: {{#alias#}}

4.2 多变量输出

对于需要输出多个变量的节点,使用 WorkflowMessagePusher.addNodeOutputs()
@Override
public Map<String, Object> process(MyContext context, Map<String, Object> state) {
    // ... 业务逻辑处理 ...

    // 生成输出数据(字段由业务代码决定)
    Map<String, Object> output = new HashMap<>();
    output.put("name", "张三");
    output.put("age", 25);
    output.put("email", "zhangsan@example.com");

    // 同步到工作流状态
    return WorkflowMessagePusher.addNodeOutputs(state, context.getAlias(), output);
}
输出存储格式:
state["alias.name"] = "张三"
state["alias.age"] = 25
state["alias.email"] = "zhangsan@example.com"
state["alias"] = {name: "张三", age: 25, email: "zhangsan@example.com"}

4.3 推送消息流给前端

对于需要流式推送消息的节点(如LLM、代码执行等),使用 WorkflowMessagePusher.pushMessageFlux()pushSingleEvent()
@Override
public Map<String, Object> process(LlmContext context, Map<String, Object> state) {
    // ... 构建LLM请求 ...

    // 推送消息流给前端(用于流式输出)
    Flux<ServerSentEvent<StreamEvent>> messageFlux = buildStreamingResponse();
    return WorkflowMessagePusher.pushMessageFlux(state, messageFlux);
}

// 或者推送单个事件
Flux<ServerSentEvent<StreamEvent>> singleEventFlux =
    WorkflowMessagePusher.buildSingleEventFlux(streamEvent);
return WorkflowMessagePusher.pushMessageFlux(state, singleEventFlux);

4.4 动态输出(用户定义字段)

对于参数提取器等节点,用户在前端定义的字段会存储在 params 中。业务代码直接使用这些字段:
@Override
public Map<String, Object> process(ParameterExtractorContext context, Map<String, Object> state) {
    // 获取用户定义的提取字段
    List<?> extractionFields = context.getExtractionFields(state);

    // 调用模型进行提取
    Map<String, Object> extractedData = callLlmForExtraction(extractionFields);

    // 直接同步输出
    return WorkflowMessagePusher.addNodeOutputs(state, context.getAlias(), extractedData);
}

8. 开发规范

8.1 输出处理原则

  • 使用 WorkflowMessagePusher.addNodeOutput() - 用于单个 text 字段输出(推荐)
  • 使用 WorkflowMessagePusher.addNodeOutputs() - 用于多个字段输出
  • 使用 WorkflowMessagePusher.pushMessageFlux() - 用于推送消息流给前端
  • 直接生成输出数据 - 不依赖于框架级别的 OutputConfig,让业务代码决定输出结构
  • 对于动态输出 - 在业务代码中直接解析前端配置的字段
不应该做的:
  • ❌ 直接调用 WorkflowState.addBusinessMessageFlux() - 改用 WorkflowMessagePusher
  • ❌ 直接调用 OutputResolver - 改用 WorkflowMessagePusher
  • ❌ 手动构建 Flux 对象 - WorkflowMessagePusher 已经封装了这些细节

8.2 参数获取和验证

// 获取参数
String input = context.getInput(state);
String customParam = context.getCustomParam(state);

// 进行参数验证
BaseContext.ValidationResult validationResult = BaseContext.validateParams(
    context.getParamSchemas(),
    context.getParams(),
    CommonParamFields.INPUT,
    MyCustomParamFields.CUSTOM_PARAM
);
if (!validationResult.isValid()) {
    throw new RuntimeException("表单校验失败: " + validationResult.getFirstError());
}

8.3 输出同步

// 单变量输出(推荐)
return WorkflowMessagePusher.addNodeOutput(state, context, result);

// 多变量输出
Map<String, Object> output = new HashMap<>();
output.put("field1", value1);
output.put("field2", value2);
return WorkflowMessagePusher.addNodeOutputs(state, context.getAlias(), output);

// 推送消息流
Flux<ServerSentEvent<StreamEvent>> messageFlux = buildMessageFlux();
return WorkflowMessagePusher.pushMessageFlux(state, messageFlux);
前端通过以下格式引用节点输出变量:
{{#alias.field#}}
示例:
  • {{#llm_a1b2.text#}} - 引用 LLM 节点的 text 输出
  • {{#extract_7c9d.name#}} - 引用参数提取器节点的 name 字段
  • {{#sys.message#}} - 引用系统消息

9. 高级功能

9.1 自定义参数获取

在Context类中添加自定义方法,简化参数获取:
public String getCustomParam(Map<String, Object> state) {
    return getParamValue(state, MyCustomParamFields.CUSTOM_PARAM, String.class);
}

9.2 类型转换

系统自动处理前端组件类型到Java类型的转换:
  • Input/TextArea → String
  • InputNumber → Integer
  • Switch/Checkbox → Boolean
  • Select → String

9.3 变量替换

支持在工作流状态中进行变量替换:
// 自动处理 {{#variable#}} 格式的变量
String result = VariableReplacer.replace(input, state);

9.4 解析前端配置的动态字段

对于参数提取器等节点,前端发送的动态字段配置存储在 params 中,业务代码直接解析:
// 获取前端配置的字段列表
List<Map<String, Object>> fields = context.getExtractionFields(state);

// 在业务逻辑中解析这些字段
for (Map<String, Object> field : fields) {
    String fieldName = (String) field.get("name");
    String fieldType = (String) field.get("type");
    String description = (String) field.get("description");
    // 使用这些字段进行处理
}

10. 完整示例

文件结构

my_custom_node/
├── MyCustomNode.java      # 节点主类
├── MyCustomContext.java   # 节点上下文
└── MyCustomParamFields.java # 参数字段枚举

节点主类 (MyCustomNode.java)

@Slf4j
@Component
public class MyCustomNode implements WorkflowNode<MyCustomContext> {

    @Override
    public String getNodeName() {
        return this.getClass().getSimpleName();
    }

    @Override
    public Class<MyCustomContext> getContextType() {
        return MyCustomContext.class;
    }

    @Override
    public Map<String, Object> process(MyCustomContext context, Map<String, Object> state) {
        // 参数验证
        BaseContext.ValidationResult validationResult = BaseContext.validateParams(
                context.getParamSchemas(),
                context.getParams(),
                CommonParamFields.INPUT
        );
        if (!validationResult.isValid()) {
            throw new RuntimeException("表单校验失败: " + validationResult.getFirstError());
        }

        // 获取参数
        String input = context.getInput(state);
        String customParam = context.getCustomParam(state);

        // 业务逻辑处理
        String result = processBusinessLogic(input, customParam);

        // 返回结果
        return WorkflowMessagePusher.addNodeOutput(state, context, result);
    }

    private String processBusinessLogic(String input, String customParam) {
        log.info("处理输入: {}, 自定义参数: {}", input, customParam);
        return String.format("处理结果: %s + %s", input, customParam);
    }
}

11. 测试和验证

确保节点正确注册并能够被工作流引擎调用:
  • 检查是否添加了 @Component 注解
  • 确认包路径正确
  • 验证 Spring 容器是否正确扫描

12. 注意事项

  1. 命名规范: 类名使用PascalCase,文件夹使用小写下划线
  2. 继承关系: Context类必须继承BaseContext
  3. 接口实现: Node类必须实现WorkflowNode接口
  4. 参数验证: 始终进行参数验证
  5. 异常处理: 合理处理异常情况
  6. 日志记录: 使用日志记录关键操作
  7. 类型安全: 使用泛型确保类型安全
  8. 输出统一: 使用 WorkflowMessagePusher 处理节点输出(单一职责入口)
  9. 不要过度设计: 直接在业务代码中处理动态输出,不依赖复杂的框架
  10. 不要直接调用 WorkflowState: 改用 WorkflowMessagePusher,让它处理所有消息和变量操作

13. 故障排除

节点不生效

  • 检查是否正确添加了 @Component 注解
  • 确认包路径是否正确
  • 验证Spring容器是否正确扫描

参数获取失败

  • 检查ParamFields枚举定义
  • 确认参数名称匹配
  • 验证参数类型转换

输出变量无法引用

  • 检查是否使用了 WorkflowMessagePusher.addNodeOutput()WorkflowMessagePusher.addNodeOutputs()
  • 确认 context.getAlias() 返回正确的节点别名
  • 验证变量引用格式:{{#alias.field#}}

消息推送失败

  • 检查是否使用了 WorkflowMessagePusher(不要直接调用 WorkflowState.addBusinessMessageFlux()
  • 确认 Flux 对象正确构建
  • 验证 StreamEvent 中包含了正确的数据

更多资源

  • 查看现有节点实现作为参考:
    • llm/LlmNode.java - LLM调用示例
    • reply_node/ReplyNode.java - 简单输出示例
    • code/CodeNode.java - 复杂输出示例
    • form_generator_node/FormGeneratorNode.java - 消息推送示例
  • 阅读WorkflowMessagePusher文档了解完整API
  • 参考Spring Boot文档了解依赖注入
  • 查看Hutool工具类文档