概述
消息推送模块(langchat-pusher)是LangChat Pro的多渠道消息通知服务,用于将AI生成的内容、工作流节点输出等消息推送到企业微信、钉钉、邮件等第三方渠道。该模块主要作为工作流的一个节点使用,实现自动化消息通知功能。核心概念
推送渠道
| 渠道 | 说明 | 典型场景 |
|---|---|---|
| DingTalk | 钉钉机器人 | 团队通知、任务提醒 |
| WxCp | 企业微信应用 | 企业内部消息推送 |
| 邮件发送 | 正式报告、详细文档 |
核心组件
MessageProvider
路径:langchat-pusher/src/main/java/cn/langchat/pusher/component/MessageProvider.java
职责: 提供统一的消息发送接口,根据配置路由到不同的推送渠道
核心方法:
Copy
@Component
public class MessageProvider {
@Resource
private AigcPusherService aigcPusherService;
/**
* 发送文本消息
*
* @param req 消息请求
* - providerId: 推送配置ID
* - content: 消息内容
* - userId: 接收人ID(企业微信/钉钉)
* - subject: 邮件主题(邮件)
*/
public void sendText(MessageReq req) {
if (StrUtil.hasBlank(req.getContent())) {
throw new ServiceException("消息内容为空");
}
AigcPusher aigcPusher = aigcPusherService.getById(req.getProviderId());
// 根据provider类型选择推送组件
if (aigcPusher.getProvider().equals(PusherConst.DING_TALK)) {
sendDingTalkMessage(aigcPusher, req);
} else if (aigcPusher.getProvider().equals(PusherConst.EMAIL)) {
sendEmailMessage(aigcPusher, req);
} else if (aigcPusher.getProvider().equals(PusherConst.WX_CP)) {
sendWxCpMessage(aigcPusher, req);
}
}
/**
* 发送钉钉消息
*/
private void sendDingTalkMessage(AigcPusher aigcPusher, MessageReq req) {
if (StrUtil.hasBlank(req.getUserId())) {
req.setUserId("@all"); // 默认@所有人
}
DingTalkComponent dingTalkComponent = new DingTalkComponent();
dingTalkComponent.sendText(
aigcPusher,
req.getContent(),
false,
req.getUserId().split(",")
);
}
/**
* 发送邮件
*/
private void sendEmailMessage(AigcPusher aigcPusher, MessageReq req) {
if (StrUtil.hasBlank(req.getSubject())) {
req.setSubject("LangChat"); // 默认主题
}
EmailComponent emailComponent = new EmailComponent();
emailComponent.sendText(
aigcPusher.getEmail(),
req.getSubject(),
req.getContent()
);
}
/**
* 发送企业微信消息
*/
private void sendWxCpMessage(AigcPusher aigcPusher, MessageReq req) {
if (StrUtil.hasBlank(req.getUserId())) {
req.setUserId("@all"); // 默认@所有人
}
WxCpComponent wxCpComponent = new WxCpComponent();
wxCpComponent.sendText(
aigcPusher,
req.getContent(),
req.getUserId()
);
}
}
推送组件
DingTalkComponent
路径:langchat-pusher/src/main/java/cn/langchat/pusher/component/DingTalkComponent.java
职责: 钉钉机器人消息推送
核心方法:
Copy
@Component
public class DingTalkComponent {
private static final String WEBHOOK_URL = "https://oapi.dingtalk.com/robot/send?access_token=";
/**
* 获取签名后的URL
*/
private String getSignUrl(AigcPusher properties) {
long timestamp = System.currentTimeMillis();
String stringToSign = timestamp + "\n" + properties.getSecret();
byte[] signData = new HMac(HmacAlgorithm.HmacSHA256,
properties.getSecret().getBytes()).digest(stringToSign.getBytes());
String sign = Base64.encode(signData);
return WEBHOOK_URL + properties.getToken() +
"×tamp=" + timestamp + "&sign=" + sign;
}
/**
* 发送文本消息
*
* @param properties 推送配置
* @param content 消息内容
* @param isAtAll 是否@所有人
* @param atMobiles 需要@的手机号列表
*/
public void sendText(AigcPusher properties, String content,
boolean isAtAll, String... atMobiles) {
try {
if (StrUtil.isBlank(content)) {
return;
}
DefaultDingTalkClient client = new DefaultDingTalkClient(getSignUrl(properties));
OapiRobotSendRequest request = new OapiRobotSendRequest();
// 构建文本消息
OapiRobotSendRequest.Text text = new OapiRobotSendRequest.Text();
text.setContent(content);
request.setMsgtype("text");
request.setText(text);
// 设置@信息
OapiRobotSendRequest.At at = new OapiRobotSendRequest.At();
at.setAtMobiles(Arrays.asList(atMobiles));
at.setIsAtAll(isAtAll);
request.setAt(at);
client.execute(request);
} catch (Exception e) {
log.error("钉钉机器人消息发送失败", e);
}
}
/**
* 发送Markdown消息
*/
public void sendMarkdown(AigcPusher properties, String title, String content,
boolean isAtAll, String... atMobiles) {
try {
if (StrUtil.isBlank(content)) {
return;
}
DingTalkClient client = new DefaultDingTalkClient(getSignUrl(properties));
OapiRobotSendRequest request = new OapiRobotSendRequest();
// 构建Markdown消息
OapiRobotSendRequest.Markdown markdown = new OapiRobotSendRequest.Markdown();
markdown.setTitle(title);
markdown.setText(content);
request.setMsgtype("markdown");
request.setMarkdown(markdown);
// 设置@信息
OapiRobotSendRequest.At at = new OapiRobotSendRequest.At();
at.setAtMobiles(Arrays.asList(atMobiles));
at.setIsAtAll(isAtAll);
request.setAt(at);
client.execute(request);
} catch (Exception e) {
log.error("钉钉机器人markdown消息发送失败", e);
}
}
}
EmailComponent
路径:langchat-pusher/src/main/java/cn/langchat/pusher/component/EmailComponent.java
职责: 邮件消息推送
核心方法:
Copy
@Component
public class EmailComponent {
@Resource
private JavaMailSender mailSender;
/**
* 发送文本邮件
*
* @param emailEntity 邮件配置
* @param subject 邮件主题
* @param content 邮件内容
*/
public void sendText(EmailEntity emailEntity, String subject, String content) {
SimpleMailMessage message = new SimpleMailMessage();
// 发送人
message.setFrom(emailEntity.getFrom());
// 收件人
message.setTo(emailEntity.getTo().toArray(new String[0]));
// 抄送
if (CollUtil.isNotEmpty(emailEntity.getCc())) {
message.setCc(emailEntity.getCc().toArray(new String[0]));
}
// 密送
if (CollUtil.isNotEmpty(emailEntity.getBcc())) {
message.setBcc(emailEntity.getBcc().toArray(new String[0]));
}
// 主题
message.setSubject(subject);
// 内容
message.setText(content);
// 发送
mailSender.send(message);
}
/**
* 发送HTML邮件
*/
public void sendHtml(EmailEntity emailEntity, String subject, String htmlContent) {
MimeMessage message = mailSender.createMimeMessage();
try {
MimeMessageHelper helper = new MimeMessageHelper(message, true, "UTF-8");
// 发送人
helper.setFrom(emailEntity.getFrom());
// 收件人
helper.setTo(emailEntity.getTo().toArray(new String[0]));
// 抄送
if (CollUtil.isNotEmpty(emailEntity.getCc())) {
helper.setCc(emailEntity.getCc().toArray(new String[0]));
}
// 密送
if (CollUtil.isNotEmpty(emailEntity.getBcc())) {
helper.setBcc(emailEntity.getBcc().toArray(new String[0]));
}
// 主题
helper.setSubject(subject);
// HTML内容
helper.setText(htmlContent, true);
// 发送
mailSender.send(message);
} catch (Exception e) {
log.error("邮件发送失败", e);
}
}
}
WxCpComponent
路径:langchat-pusher/src/main/java/cn/langchat/pusher/component/WxCpComponent.java
职责: 企业微信消息推送
核心方法:
Copy
@Component
public class WxCpComponent {
@Resource
private WxCpService wxCpService;
/**
* 发送文本消息
*
* @param properties 推送配置
* @param content 消息内容
* @param userId 接收人ID(@all表示所有人)
*/
public void sendText(AigcPusher properties, String content, String userId) {
try {
// 构建文本消息
WxCpMessage message = WxCpMessage.TEXT()
.content(content)
.build();
// 发送消息
WxCpMessageSendResult result = wxCpService.getMessageService()
.send(properties.getAgentId(), userId, message);
if (result.getErrorCode() != 0) {
log.error("企业微信消息发送失败: {}", result.getErrorMsg());
}
} catch (Exception e) {
log.error("企业微信消息发送异常", e);
}
}
/**
* 发送Markdown消息
*/
public void sendMarkdown(AigcPusher properties, String content, String userId) {
try {
// 构建Markdown消息
WxCpMessage message = WxCpMessage.MARKDOWN()
.content(content)
.build();
// 发送消息
WxCpMessageSendResult result = wxCpService.getMessageService()
.send(properties.getAgentId(), userId, message);
if (result.getErrorCode() != 0) {
log.error("企业微信Markdown消息发送失败: {}", result.getErrorMsg());
}
} catch (Exception e) {
log.error("企业微信Markdown消息发送异常", e);
}
}
}
数据模型
AigcPusher
路径:langchat-pusher/src/main/java/cn/langchat/pusher/entity/AigcPusher.java
字段说明:
| 字段 | 类型 | 说明 |
|---|---|---|
| id | String | 主键ID |
| name | String | 推送配置名称 |
| provider | String | 推送渠道:DingTalk、WxCp、Email |
| token | String | Token(钉钉) |
| secret | String | Secret(钉钉) |
| corpId | String | 企业ID(企业微信) |
| agentId | Integer | 应用ID(企业微信) |
| EmailEntity | 邮件配置 | |
| description | String | 描述 |
| creator | String | 创建人 |
| createTime | Long | 创建时间 |
MessageReq
路径:langchat-pusher/src/main/java/cn/langchat/pusher/entity/MessageReq.java
字段说明:
| 字段 | 类型 | 说明 |
|---|---|---|
| providerId | String | 推送配置ID |
| content | String | 消息内容 |
| userId | String | 接收人ID(企业微信/钉钉) |
| subject | String | 邮件主题(邮件) |
EmailEntity
路径:langchat-pusher/src/main/java/cn/langchat/pusher/entity/EmailEntity.java
字段说明:
| 字段 | 类型 | 说明 |
|---|---|---|
| host | String | SMTP服务器地址 |
| port | Integer | SMTP端口 |
| username | String | 用户名 |
| password | String | 密码 |
| from | String | 发件人 |
| to | List<String> | 收件人列表 |
| cc | List<String> | 抄送列表 |
| bcc | List<String> | 密送列表 |
推送流程
完整流程
Copy
┌─────────────────────────────────────────────────────────────┐
│ Workflow节点 │
│ (LLM生成或其他节点输出) │
└────────────────────┬────────────────────────────────────┘
│
│ MessageReq
▼
┌─────────────────────────────────────────────────────────────┐
│ MessageProvider │
│ (统一消息发送接口) │
└────────────────────┬────────────────────────────────────┘
│
│ 查询推送配置
▼
┌─────────────────────────────────────────────────────────────┐
│ AigcPusherService │
│ (获取推送配置) │
└────────────────────┬────────────────────────────────────┘
│
│ AigcPusher
▼
┌──────────────────────────┐
│ 路由到对应渠道 │
└───────────┬────────────┘
│
┌────────────────┼────────────────┬─────────────┐
▼ ▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│DingTalk │ │ WxCp │ │ Email │ │ ... │
│Component │ │Component │ │Component │ │ │
└────┬─────┘ └────┬─────┘ └────┬─────┘ └──────────┘
│ │ │
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 钉钉API │ │企业微信API│ │ SMTP服务器 │
└──────────┘ └──────────┘ └──────────┘
在Workflow中使用
Workflow节点配置
Copy
{
"id": "node-pusher-001",
"type": "pusher",
"name": "发送钉钉通知",
"config": {
"providerId": "pusher-dingtalk-001",
"content": "${content}",
"userId": "13800138000,13800138001",
"isAtAll": false
}
}
Workflow节点执行器
Copy
@Component
public class PusherNodeExecutor implements NodeExecutor {
@Resource
private MessageProvider messageProvider;
@Override
public String getType() {
return "pusher";
}
@Override
public Map<String, Object> execute(FlowNode node, Map<String, Object> input, WorkflowContext context) {
PusherConfig config = NodeConfigParser.parse(node.getConfig(), PusherConfig.class);
// 替换占位符
String content = replacePlaceholder(config.getContent(), input);
// 构建请求
MessageReq req = new MessageReq();
req.setProviderId(config.getProviderId());
req.setContent(content);
req.setUserId(config.getUserId());
req.setSubject(config.getSubject());
// 发送消息
messageProvider.sendText(req);
Map<String, Object> output = new HashMap<>();
output.put("success", true);
output.put("providerId", config.getProviderId());
return output;
}
/**
* 替换占位符
* 例如: ${content} -> input中的content值
*/
private String replacePlaceholder(String template, Map<String, Object> input) {
String result = template;
for (Map.Entry<String, Object> entry : input.entrySet()) {
String placeholder = "${" + entry.getKey() + "}";
if (result.contains(placeholder)) {
result = result.replace(placeholder, String.valueOf(entry.getValue()));
}
}
return result;
}
}
配置说明
钉钉机器人配置
Copy
{
"id": "pusher-dingtalk-001",
"name": "钉钉通知",
"provider": "DingTalk",
"token": "xxxxxxxxxxxxxxxxxxxx",
"secret": "SECxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
"description": "团队通知"
}
企业微信配置
Copy
{
"id": "pusher-wxcp-001",
"name": "企业微信通知",
"provider": "WxCp",
"corpId": "wwxxxxxxxxxxxxxxxxxxxx",
"agentId": 1000001,
"description": "企业内部消息"
}
邮件配置
Copy
{
"id": "pusher-email-001",
"name": "邮件通知",
"provider": "Email",
"email": {
"host": "smtp.exmail.qq.com",
"port": 465,
"username": "[email protected]",
"password": "password",
"from": "LangChat <[email protected]>",
"to": ["[email protected]", "[email protected]"],
"cc": ["[email protected]"],
"bcc": []
},
"description": "正式报告"
}
最佳实践
1. 消息格式
- 标题清晰: 使用简洁明了的标题
- 内容简洁: 避免过长的消息内容
- Markdown: 使用Markdown格式提高可读性
- 占位符: 使用占位符实现动态内容
2. 安全考虑
- 密钥保护: 不要在代码中硬编码token和secret
- 权限控制: 限制机器人权限范围
- 消息过滤: 过滤敏感信息
- 频率限制: 避免频繁推送造成骚扰
3. 错误处理
- 重试机制: 发送失败时自动重试
- 日志记录: 详细记录发送日志
- 异常捕获: 捕获所有异常避免影响主流程
- 告警通知: 发送失败时发送告警
4. 性能优化
- 异步发送: 使用异步方式发送消息
- 批量发送: 支持批量发送提高效率
- 连接池: 使用连接池管理连接
- 缓存配置: 缓存推送配置减少查询
扩展示例
添加新推送渠道
步骤1: 添加常量Copy
public interface PusherConst {
String WECHAT_WORK = "WeChatWork"; // 新增渠道
}
Copy
@Component
public class WeChatWorkComponent {
public void sendText(AigcPusher properties, String content, String userId) {
// 实现微信企业号消息发送逻辑
}
}
Copy
public void sendText(MessageReq req) {
AigcPusher aigcPusher = aigcPusherService.getById(req.getProviderId());
if (aigcPusher.getProvider().equals(PusherConst.DING_TALK)) {
sendDingTalkMessage(aigcPusher, req);
} else if (aigcPusher.getProvider().equals(PusherConst.WECHAT_WORK)) {
sendWeChatWorkMessage(aigcPusher, req); // 新增
}
}

