Skip to main content

概述

消息推送模块(langchat-pusher)是LangChat Pro的多渠道消息通知服务,用于将AI生成的内容、工作流节点输出等消息推送到企业微信、钉钉、邮件等第三方渠道。该模块主要作为工作流的一个节点使用,实现自动化消息通知功能。

核心概念

推送渠道

渠道说明典型场景
DingTalk钉钉机器人团队通知、任务提醒
WxCp企业微信应用企业内部消息推送
Email邮件发送正式报告、详细文档

核心组件

MessageProvider

路径: langchat-pusher/src/main/java/cn/langchat/pusher/component/MessageProvider.java 职责: 提供统一的消息发送接口,根据配置路由到不同的推送渠道 核心方法:
@Component
public class MessageProvider {
    
    @Resource
    private AigcPusherService aigcPusherService;
    
    /**
     * 发送文本消息
     *
     * @param req 消息请求
     * - providerId: 推送配置ID
     * - content: 消息内容
     * - userId: 接收人ID(企业微信/钉钉)
     * - subject: 邮件主题(邮件)
     */
    public void sendText(MessageReq req) {
        if (StrUtil.hasBlank(req.getContent())) {
            throw new ServiceException("消息内容为空");
        }
        
        AigcPusher aigcPusher = aigcPusherService.getById(req.getProviderId());
        
        // 根据provider类型选择推送组件
        if (aigcPusher.getProvider().equals(PusherConst.DING_TALK)) {
            sendDingTalkMessage(aigcPusher, req);
        } else if (aigcPusher.getProvider().equals(PusherConst.EMAIL)) {
            sendEmailMessage(aigcPusher, req);
        } else if (aigcPusher.getProvider().equals(PusherConst.WX_CP)) {
            sendWxCpMessage(aigcPusher, req);
        }
    }
    
    /**
     * 发送钉钉消息
     */
    private void sendDingTalkMessage(AigcPusher aigcPusher, MessageReq req) {
        if (StrUtil.hasBlank(req.getUserId())) {
            req.setUserId("@all"); // 默认@所有人
        }
        DingTalkComponent dingTalkComponent = new DingTalkComponent();
        dingTalkComponent.sendText(
            aigcPusher, 
            req.getContent(), 
            false, 
            req.getUserId().split(",")
        );
    }
    
    /**
     * 发送邮件
     */
    private void sendEmailMessage(AigcPusher aigcPusher, MessageReq req) {
        if (StrUtil.hasBlank(req.getSubject())) {
            req.setSubject("LangChat"); // 默认主题
        }
        EmailComponent emailComponent = new EmailComponent();
        emailComponent.sendText(
            aigcPusher.getEmail(), 
            req.getSubject(), 
            req.getContent()
        );
    }
    
    /**
     * 发送企业微信消息
     */
    private void sendWxCpMessage(AigcPusher aigcPusher, MessageReq req) {
        if (StrUtil.hasBlank(req.getUserId())) {
            req.setUserId("@all"); // 默认@所有人
        }
        WxCpComponent wxCpComponent = new WxCpComponent();
        wxCpComponent.sendText(
            aigcPusher, 
            req.getContent(), 
            req.getUserId()
        );
    }
}

推送组件

DingTalkComponent

路径: langchat-pusher/src/main/java/cn/langchat/pusher/component/DingTalkComponent.java 职责: 钉钉机器人消息推送 核心方法:
@Component
public class DingTalkComponent {
    
    private static final String WEBHOOK_URL = "https://oapi.dingtalk.com/robot/send?access_token=";
    
    /**
     * 获取签名后的URL
     */
    private String getSignUrl(AigcPusher properties) {
        long timestamp = System.currentTimeMillis();
        String stringToSign = timestamp + "\n" + properties.getSecret();
        byte[] signData = new HMac(HmacAlgorithm.HmacSHA256, 
            properties.getSecret().getBytes()).digest(stringToSign.getBytes());
        String sign = Base64.encode(signData);
        return WEBHOOK_URL + properties.getToken() + 
               "&timestamp=" + timestamp + "&sign=" + sign;
    }
    
    /**
     * 发送文本消息
     *
     * @param properties 推送配置
     * @param content    消息内容
     * @param isAtAll    是否@所有人
     * @param atMobiles  需要@的手机号列表
     */
    public void sendText(AigcPusher properties, String content, 
                       boolean isAtAll, String... atMobiles) {
        try {
            if (StrUtil.isBlank(content)) {
                return;
            }
            
            DefaultDingTalkClient client = new DefaultDingTalkClient(getSignUrl(properties));
            OapiRobotSendRequest request = new OapiRobotSendRequest();
            
            // 构建文本消息
            OapiRobotSendRequest.Text text = new OapiRobotSendRequest.Text();
            text.setContent(content);
            request.setMsgtype("text");
            request.setText(text);
            
            // 设置@信息
            OapiRobotSendRequest.At at = new OapiRobotSendRequest.At();
            at.setAtMobiles(Arrays.asList(atMobiles));
            at.setIsAtAll(isAtAll);
            request.setAt(at);
            
            client.execute(request);
        } catch (Exception e) {
            log.error("钉钉机器人消息发送失败", e);
        }
    }
    
    /**
     * 发送Markdown消息
     */
    public void sendMarkdown(AigcPusher properties, String title, String content,
                          boolean isAtAll, String... atMobiles) {
        try {
            if (StrUtil.isBlank(content)) {
                return;
            }
            
            DingTalkClient client = new DefaultDingTalkClient(getSignUrl(properties));
            OapiRobotSendRequest request = new OapiRobotSendRequest();
            
            // 构建Markdown消息
            OapiRobotSendRequest.Markdown markdown = new OapiRobotSendRequest.Markdown();
            markdown.setTitle(title);
            markdown.setText(content);
            request.setMsgtype("markdown");
            request.setMarkdown(markdown);
            
            // 设置@信息
            OapiRobotSendRequest.At at = new OapiRobotSendRequest.At();
            at.setAtMobiles(Arrays.asList(atMobiles));
            at.setIsAtAll(isAtAll);
            request.setAt(at);
            
            client.execute(request);
        } catch (Exception e) {
            log.error("钉钉机器人markdown消息发送失败", e);
        }
    }
}

EmailComponent

路径: langchat-pusher/src/main/java/cn/langchat/pusher/component/EmailComponent.java 职责: 邮件消息推送 核心方法:
@Component
public class EmailComponent {
    
    @Resource
    private JavaMailSender mailSender;
    
    /**
     * 发送文本邮件
     *
     * @param emailEntity 邮件配置
     * @param subject     邮件主题
     * @param content     邮件内容
     */
    public void sendText(EmailEntity emailEntity, String subject, String content) {
        SimpleMailMessage message = new SimpleMailMessage();
        
        // 发送人
        message.setFrom(emailEntity.getFrom());
        
        // 收件人
        message.setTo(emailEntity.getTo().toArray(new String[0]));
        
        // 抄送
        if (CollUtil.isNotEmpty(emailEntity.getCc())) {
            message.setCc(emailEntity.getCc().toArray(new String[0]));
        }
        
        // 密送
        if (CollUtil.isNotEmpty(emailEntity.getBcc())) {
            message.setBcc(emailEntity.getBcc().toArray(new String[0]));
        }
        
        // 主题
        message.setSubject(subject);
        
        // 内容
        message.setText(content);
        
        // 发送
        mailSender.send(message);
    }
    
    /**
     * 发送HTML邮件
     */
    public void sendHtml(EmailEntity emailEntity, String subject, String htmlContent) {
        MimeMessage message = mailSender.createMimeMessage();
        
        try {
            MimeMessageHelper helper = new MimeMessageHelper(message, true, "UTF-8");
            
            // 发送人
            helper.setFrom(emailEntity.getFrom());
            
            // 收件人
            helper.setTo(emailEntity.getTo().toArray(new String[0]));
            
            // 抄送
            if (CollUtil.isNotEmpty(emailEntity.getCc())) {
                helper.setCc(emailEntity.getCc().toArray(new String[0]));
            }
            
            // 密送
            if (CollUtil.isNotEmpty(emailEntity.getBcc())) {
                helper.setBcc(emailEntity.getBcc().toArray(new String[0]));
            }
            
            // 主题
            helper.setSubject(subject);
            
            // HTML内容
            helper.setText(htmlContent, true);
            
            // 发送
            mailSender.send(message);
        } catch (Exception e) {
            log.error("邮件发送失败", e);
        }
    }
}

WxCpComponent

路径: langchat-pusher/src/main/java/cn/langchat/pusher/component/WxCpComponent.java 职责: 企业微信消息推送 核心方法:
@Component
public class WxCpComponent {
    
    @Resource
    private WxCpService wxCpService;
    
    /**
     * 发送文本消息
     *
     * @param properties 推送配置
     * @param content    消息内容
     * @param userId     接收人ID(@all表示所有人)
     */
    public void sendText(AigcPusher properties, String content, String userId) {
        try {
            // 构建文本消息
            WxCpMessage message = WxCpMessage.TEXT()
                .content(content)
                .build();
            
            // 发送消息
            WxCpMessageSendResult result = wxCpService.getMessageService()
                .send(properties.getAgentId(), userId, message);
            
            if (result.getErrorCode() != 0) {
                log.error("企业微信消息发送失败: {}", result.getErrorMsg());
            }
        } catch (Exception e) {
            log.error("企业微信消息发送异常", e);
        }
    }
    
    /**
     * 发送Markdown消息
     */
    public void sendMarkdown(AigcPusher properties, String content, String userId) {
        try {
            // 构建Markdown消息
            WxCpMessage message = WxCpMessage.MARKDOWN()
                .content(content)
                .build();
            
            // 发送消息
            WxCpMessageSendResult result = wxCpService.getMessageService()
                .send(properties.getAgentId(), userId, message);
            
            if (result.getErrorCode() != 0) {
                log.error("企业微信Markdown消息发送失败: {}", result.getErrorMsg());
            }
        } catch (Exception e) {
            log.error("企业微信Markdown消息发送异常", e);
        }
    }
}

数据模型

AigcPusher

路径: langchat-pusher/src/main/java/cn/langchat/pusher/entity/AigcPusher.java 字段说明:
字段类型说明
idString主键ID
nameString推送配置名称
providerString推送渠道:DingTalk、WxCp、Email
tokenStringToken(钉钉)
secretStringSecret(钉钉)
corpIdString企业ID(企业微信)
agentIdInteger应用ID(企业微信)
emailEmailEntity邮件配置
descriptionString描述
creatorString创建人
createTimeLong创建时间

MessageReq

路径: langchat-pusher/src/main/java/cn/langchat/pusher/entity/MessageReq.java 字段说明:
字段类型说明
providerIdString推送配置ID
contentString消息内容
userIdString接收人ID(企业微信/钉钉)
subjectString邮件主题(邮件)

EmailEntity

路径: langchat-pusher/src/main/java/cn/langchat/pusher/entity/EmailEntity.java 字段说明:
字段类型说明
hostStringSMTP服务器地址
portIntegerSMTP端口
usernameString用户名
passwordString密码
fromString发件人
toList<String>收件人列表
ccList<String>抄送列表
bccList<String>密送列表

推送流程

完整流程

┌─────────────────────────────────────────────────────────────┐
│                  Workflow节点                           │
│            (LLM生成或其他节点输出)                        │
└────────────────────┬────────────────────────────────────┘

                     │ MessageReq

┌─────────────────────────────────────────────────────────────┐
│             MessageProvider                            │
│  (统一消息发送接口)                                   │
└────────────────────┬────────────────────────────────────┘

                     │ 查询推送配置

┌─────────────────────────────────────────────────────────────┐
│          AigcPusherService                          │
│  (获取推送配置)                                       │
└────────────────────┬────────────────────────────────────┘

                     │ AigcPusher

         ┌──────────────────────────┐
         │    路由到对应渠道      │
         └───────────┬────────────┘

    ┌────────────────┼────────────────┬─────────────┐
    ▼                ▼                ▼             ▼
┌──────────┐   ┌──────────┐   ┌──────────┐   ┌──────────┐
│DingTalk  │   │  WxCp    │   │  Email   │   │  ...     │
│Component │   │Component │   │Component │   │          │
└────┬─────┘   └────┬─────┘   └────┬─────┘   └──────────┘
     │              │              │
     │              │              │
     ▼              ▼              ▼
┌──────────┐   ┌──────────┐   ┌──────────┐
│ 钉钉API  │   │企业微信API│   │ SMTP服务器 │
└──────────┘   └──────────┘   └──────────┘

在Workflow中使用

Workflow节点配置

{
  "id": "node-pusher-001",
  "type": "pusher",
  "name": "发送钉钉通知",
  "config": {
    "providerId": "pusher-dingtalk-001",
    "content": "${content}",
    "userId": "13800138000,13800138001",
    "isAtAll": false
  }
}

Workflow节点执行器

@Component
public class PusherNodeExecutor implements NodeExecutor {
    
    @Resource
    private MessageProvider messageProvider;
    
    @Override
    public String getType() {
        return "pusher";
    }
    
    @Override
    public Map<String, Object> execute(FlowNode node, Map<String, Object> input, WorkflowContext context) {
        PusherConfig config = NodeConfigParser.parse(node.getConfig(), PusherConfig.class);
        
        // 替换占位符
        String content = replacePlaceholder(config.getContent(), input);
        
        // 构建请求
        MessageReq req = new MessageReq();
        req.setProviderId(config.getProviderId());
        req.setContent(content);
        req.setUserId(config.getUserId());
        req.setSubject(config.getSubject());
        
        // 发送消息
        messageProvider.sendText(req);
        
        Map<String, Object> output = new HashMap<>();
        output.put("success", true);
        output.put("providerId", config.getProviderId());
        
        return output;
    }
    
    /**
     * 替换占位符
     * 例如: ${content} -> input中的content值
     */
    private String replacePlaceholder(String template, Map<String, Object> input) {
        String result = template;
        for (Map.Entry<String, Object> entry : input.entrySet()) {
            String placeholder = "${" + entry.getKey() + "}";
            if (result.contains(placeholder)) {
                result = result.replace(placeholder, String.valueOf(entry.getValue()));
            }
        }
        return result;
    }
}

配置说明

钉钉机器人配置

{
  "id": "pusher-dingtalk-001",
  "name": "钉钉通知",
  "provider": "DingTalk",
  "token": "xxxxxxxxxxxxxxxxxxxx",
  "secret": "SECxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
  "description": "团队通知"
}

企业微信配置

{
  "id": "pusher-wxcp-001",
  "name": "企业微信通知",
  "provider": "WxCp",
  "corpId": "wwxxxxxxxxxxxxxxxxxxxx",
  "agentId": 1000001,
  "description": "企业内部消息"
}

邮件配置

{
  "id": "pusher-email-001",
  "name": "邮件通知",
  "provider": "Email",
  "email": {
    "host": "smtp.exmail.qq.com",
    "port": 465,
    "username": "[email protected]",
    "password": "password",
    "from": "LangChat <[email protected]>",
    "to": ["[email protected]", "[email protected]"],
    "cc": ["[email protected]"],
    "bcc": []
  },
  "description": "正式报告"
}

最佳实践

1. 消息格式

  • 标题清晰: 使用简洁明了的标题
  • 内容简洁: 避免过长的消息内容
  • Markdown: 使用Markdown格式提高可读性
  • 占位符: 使用占位符实现动态内容

2. 安全考虑

  • 密钥保护: 不要在代码中硬编码token和secret
  • 权限控制: 限制机器人权限范围
  • 消息过滤: 过滤敏感信息
  • 频率限制: 避免频繁推送造成骚扰

3. 错误处理

  • 重试机制: 发送失败时自动重试
  • 日志记录: 详细记录发送日志
  • 异常捕获: 捕获所有异常避免影响主流程
  • 告警通知: 发送失败时发送告警

4. 性能优化

  • 异步发送: 使用异步方式发送消息
  • 批量发送: 支持批量发送提高效率
  • 连接池: 使用连接池管理连接
  • 缓存配置: 缓存推送配置减少查询

扩展示例

添加新推送渠道

步骤1: 添加常量
public interface PusherConst {
    String WECHAT_WORK = "WeChatWork"; // 新增渠道
}
步骤2: 创建推送组件
@Component
public class WeChatWorkComponent {
    
    public void sendText(AigcPusher properties, String content, String userId) {
        // 实现微信企业号消息发送逻辑
    }
}
步骤3: 在MessageProvider中添加路由
public void sendText(MessageReq req) {
    AigcPusher aigcPusher = aigcPusherService.getById(req.getProviderId());
    
    if (aigcPusher.getProvider().equals(PusherConst.DING_TALK)) {
        sendDingTalkMessage(aigcPusher, req);
    } else if (aigcPusher.getProvider().equals(PusherConst.WECHAT_WORK)) {
        sendWeChatWorkMessage(aigcPusher, req); // 新增
    }
}

参考文档