init:taskbot 1.1.0
This commit is contained in:
308
src/logic/syncManager.js
Normal file
308
src/logic/syncManager.js
Normal file
@@ -0,0 +1,308 @@
|
||||
const dbMap = require('../db/issueMap');
|
||||
const jiraService = require('../services/jira');
|
||||
const giteaService = require('../services/gitea');
|
||||
const { buildJiraFields } = require('./converter');
|
||||
const { getRepoConfig } = require('../config/mappings');
|
||||
const logger = require('../utils/logger');
|
||||
const config = require('../config/env');
|
||||
const { checkCircuitBreaker } = require('../utils/circuitBreaker');
|
||||
|
||||
// 处理Gitea Issue事件的主逻辑
|
||||
const processingIds = new Set();
|
||||
const LOCK_TIMEOUT = 10000;
|
||||
const RETRY_DELAY = 1500;
|
||||
const MAX_RETRIES = 3;
|
||||
const sleep = (ms) => new Promise(resolve => setTimeout(resolve, ms));
|
||||
|
||||
//生成用于锁定和数据库查询的唯一标识
|
||||
function getIssueKey(repoKey, giteaId) {
|
||||
return `${repoKey}#${giteaId}`;
|
||||
}
|
||||
|
||||
//Gitea机器人检测
|
||||
function isGiteaBot(sender) {
|
||||
if (!sender) return false;
|
||||
const { botId, botName } = config.gitea;
|
||||
//ID和昵称二者只要有一个符合就判定为机器人
|
||||
const idMatch = botId && sender.id === botId;
|
||||
const nameMatch = botName && sender.username === botName;
|
||||
return !!(idMatch || nameMatch);
|
||||
}
|
||||
|
||||
async function handleIssueEvent(payload, retryCount = 0) {
|
||||
const { action, issue, repository, comment, sender } = payload;
|
||||
|
||||
//如果操作者是机器人,直接忽略
|
||||
if (isGiteaBot(sender)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 验证payload完整性
|
||||
if (!issue || !issue.number || !repository) {
|
||||
logger.error('Invalid payload: missing required fields');
|
||||
return;
|
||||
}
|
||||
|
||||
//熔断机制检查
|
||||
if (!checkCircuitBreaker()) {
|
||||
return;
|
||||
}
|
||||
|
||||
//构建仓库标识 (owner/repo格式)
|
||||
const repoKey = `${repository.owner.username}/${repository.name}`;
|
||||
|
||||
//获取该仓库的配置
|
||||
const repoConfig = getRepoConfig(repoKey);
|
||||
if (!repoConfig) {
|
||||
const errorMsg = `Repository "${repoKey}" is not configured in mappings.js`;
|
||||
logger.error(errorMsg);
|
||||
throw new Error(errorMsg);
|
||||
}
|
||||
|
||||
//检测是否为/resync
|
||||
const isResyncCommand = (action === 'created' && comment && comment.body.trim() === '/resync');
|
||||
const giteaId = issue.number;
|
||||
const issueKey = getIssueKey(repoKey, giteaId);
|
||||
|
||||
if (issue.title && issue.title.includes("#不同步")) {
|
||||
if (!isResyncCommand) {
|
||||
logger.info(`[GITEA->JIRA] Skipped ${issue.key}: title contains #不同步`);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
//只处理特定的动作类型
|
||||
const allowedActions = ['opened', 'label_updated', 'milestoned', 'demilestoned', 'edited', 'closed', 'reopened', 'assigned', 'unassigned'];
|
||||
if (!allowedActions.includes(action) && !isResyncCommand) return;
|
||||
//检查是否已被其他进程锁定
|
||||
if (processingIds.has(issueKey)) {
|
||||
if (retryCount >= MAX_RETRIES) {
|
||||
logger.warn(`Lock timeout for ${repoKey}#${giteaId} after ${MAX_RETRIES} retries`, { action });
|
||||
return;
|
||||
}
|
||||
logger.info(`Issue ${repoKey}#${giteaId} is locked. Retry ${retryCount + 1}/${MAX_RETRIES} in ${RETRY_DELAY}ms...`);
|
||||
await sleep(RETRY_DELAY);
|
||||
return handleIssueEvent(payload, retryCount + 1);
|
||||
}
|
||||
|
||||
//加锁并设置超时自动释放
|
||||
processingIds.add(issueKey);
|
||||
const lockTimer = setTimeout(() => {
|
||||
if (processingIds.has(issueKey)) {
|
||||
logger.warn(`Force releasing lock for ${repoKey}#${giteaId} after ${LOCK_TIMEOUT}ms timeout`);
|
||||
processingIds.delete(issueKey);
|
||||
}
|
||||
}, LOCK_TIMEOUT);
|
||||
|
||||
try {
|
||||
|
||||
const mapping = dbMap.getJiraKey(repoKey, giteaId);
|
||||
const { transitions } = repoConfig;
|
||||
|
||||
//只有已存在的工单才能进行更新/关闭/重开操作
|
||||
if (mapping) {
|
||||
|
||||
//处理关闭事件
|
||||
if (action === 'closed') {
|
||||
if (transitions && transitions.close) {
|
||||
logger.sync(`[${repoKey}] [GITEA->JIRA] Closed ${mapping.jira_key}`);
|
||||
await jiraService.transitionIssue(mapping.jira_key, transitions.close);
|
||||
} else {
|
||||
//未配置状态流转,跳过关闭操作
|
||||
logger.info(`[${repoKey}] [GITEA->JIRA] Skipped close action for ${mapping.jira_key}: transitions.close not configured`);
|
||||
}
|
||||
}
|
||||
//处理重开事件
|
||||
else if (action === 'reopened') {
|
||||
if (transitions && transitions.reopen) {
|
||||
logger.sync(`[${repoKey}] [GITEA->JIRA] Reopened ${mapping.jira_key}`);
|
||||
await jiraService.transitionIssue(mapping.jira_key, transitions.reopen);
|
||||
} else {
|
||||
//未配置状态流转,跳过重开操作
|
||||
logger.info(`[${repoKey}] [GITEA->JIRA] Skipped reopen action for ${mapping.jira_key}: transitions.reopen not configured`);
|
||||
}
|
||||
}
|
||||
//处理指派,同步经办人并流转状态
|
||||
else if (action === 'assigned') {
|
||||
//Gitea支持多个指派人,Jira只支持一个经办人
|
||||
//获取完整的issue信息来获取所有指派人,并同步第一个到Jira
|
||||
try {
|
||||
const fullIssue = await giteaService.getIssue(
|
||||
repository.owner.username,
|
||||
repository.name,
|
||||
giteaId
|
||||
);
|
||||
|
||||
let assigneeToSync = null;
|
||||
if (fullIssue.assignees && fullIssue.assignees.length > 0) {
|
||||
assigneeToSync = fullIssue.assignees[0];
|
||||
} else if (fullIssue.assignee) {
|
||||
assigneeToSync = fullIssue.assignee;
|
||||
}
|
||||
|
||||
if (assigneeToSync) {
|
||||
const jiraUser = await jiraService.findUser(assigneeToSync.username);
|
||||
if (jiraUser) {
|
||||
logger.sync(`[${repoKey}] [GITEA->JIRA] Assigned ${mapping.jira_key} to ${jiraUser.name}`);
|
||||
await jiraService.updateIssue(mapping.jira_key, { assignee: { name: jiraUser.name } });
|
||||
}
|
||||
}
|
||||
|
||||
if (transitions && transitions.in_progress) {
|
||||
await jiraService.transitionIssue(mapping.jira_key, transitions.in_progress);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(`[${repoKey}] Failed to handle assigned event for ${mapping.jira_key}:`, error.message);
|
||||
}
|
||||
}
|
||||
else if (action === 'unassigned') {
|
||||
//Gitea 支持多个指派人,Jira只支持一个经办人
|
||||
//当移除指派人时,需要检查是否还有其他指派人
|
||||
//如果还有,保持第一个指派人同步到Jira;如果没有了,才清空Jira的经办人
|
||||
try {
|
||||
const fullIssue = await giteaService.getIssue(
|
||||
repository.owner.username,
|
||||
repository.name,
|
||||
giteaId
|
||||
);
|
||||
|
||||
if (fullIssue.assignees && fullIssue.assignees.length > 0) {
|
||||
const firstAssignee = fullIssue.assignees[0];
|
||||
const jiraUser = await jiraService.findUser(firstAssignee.username);
|
||||
if (jiraUser) {
|
||||
logger.sync(`[${repoKey}] [GITEA->JIRA] ${mapping.jira_key} still has assignees, keeping first: ${jiraUser.name}`);
|
||||
await jiraService.updateIssue(mapping.jira_key, { assignee: { name: jiraUser.name } });
|
||||
}
|
||||
} else {
|
||||
//没有指派人了,清空Jira的经办人
|
||||
logger.sync(`[${repoKey}] [GITEA->JIRA] Unassigned ${mapping.jira_key} (no assignees left)`);
|
||||
await jiraService.updateIssue(mapping.jira_key, { assignee: null });
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(`[${repoKey}] Failed to handle unassigned event for ${mapping.jira_key}:`, error.message);
|
||||
}
|
||||
}
|
||||
else if (isResyncCommand || (!['closed', 'reopened', 'assigned', 'unassigned'].includes(action))) {
|
||||
const jiraFields = buildJiraFields(
|
||||
issue.title,
|
||||
issue.body,
|
||||
issue.labels,
|
||||
issue.milestone,
|
||||
repoConfig
|
||||
);
|
||||
|
||||
//已有映射的工单,即使类型未配置也允许同步(只是不更新类型字段)
|
||||
//这样当类型从未配置变为已配置时,也能正常同步
|
||||
if (jiraFields) {
|
||||
// 处理指派人同步(resync 时)
|
||||
// Gitea 支持多个指派人,Jira 只支持一个经办人,取第一个
|
||||
if (isResyncCommand) {
|
||||
if (issue.assignees && issue.assignees.length > 0) {
|
||||
const firstAssignee = issue.assignees[0];
|
||||
const jiraUser = await jiraService.findUser(firstAssignee.username);
|
||||
if (jiraUser) jiraFields.assignee = { name: jiraUser.name };
|
||||
} else if (issue.assignee) {
|
||||
const jiraUser = await jiraService.findUser(issue.assignee.username);
|
||||
if (jiraUser) jiraFields.assignee = { name: jiraUser.name };
|
||||
}
|
||||
}
|
||||
|
||||
logger.sync(`[${repoKey}] [GITEA->JIRA] Updated ${mapping.jira_key}`);
|
||||
delete jiraFields.project;
|
||||
await jiraService.updateIssue(mapping.jira_key, jiraFields);
|
||||
} else {
|
||||
//类型未配置但已有映射,记录信息但不中断
|
||||
logger.info(`[${repoKey}] [GITEA->JIRA] #${giteaId} type not configured, skipping update`);
|
||||
}
|
||||
|
||||
if (isResyncCommand) {
|
||||
await giteaService.addComment(
|
||||
repository.owner.username,
|
||||
repository.name,
|
||||
giteaId,
|
||||
`手动同步:工单已存在,已更新 [${mapping.jira_key}](${config.jira.baseUrl}/browse/${mapping.jira_key})`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
//场景B:不存在->创建Jira
|
||||
//只有opened事件和resync命令可以创建新工单
|
||||
if (!isResyncCommand && action !== 'opened') {
|
||||
logger.info(`[${repoKey}] [GITEA->JIRA] Skipped #${giteaId}: no mapping exists and action is '${action}' (only 'opened' or resync can create)`);
|
||||
return;
|
||||
}
|
||||
|
||||
const jiraFields = buildJiraFields(
|
||||
issue.title,
|
||||
issue.body,
|
||||
issue.labels,
|
||||
issue.milestone,
|
||||
repoConfig
|
||||
);
|
||||
|
||||
//类型未配置,跳过同步
|
||||
if (!jiraFields) {
|
||||
logger.info(`[${repoKey}] [GITEA->JIRA] Skipped #${giteaId}: type not configured in mappings`);
|
||||
return;
|
||||
}
|
||||
if (issue.assignees && issue.assignees.length > 0) {
|
||||
const firstAssignee = issue.assignees[0];
|
||||
const jiraUser = await jiraService.findUser(firstAssignee.username);
|
||||
if (jiraUser) {
|
||||
jiraFields.assignee = { name: jiraUser.name };
|
||||
}
|
||||
} else if (issue.assignee) {
|
||||
const jiraUser = await jiraService.findUser(issue.assignee.username);
|
||||
if (jiraUser) {
|
||||
jiraFields.assignee = { name: jiraUser.name };
|
||||
}
|
||||
}
|
||||
|
||||
const newIssue = await jiraService.createIssue(jiraFields);
|
||||
|
||||
dbMap.saveMapping(repoKey, giteaId, newIssue.key, newIssue.id);
|
||||
logger.sync(`[${repoKey}] [GITEA->JIRA] Created ${newIssue.key} from #${giteaId}`);
|
||||
|
||||
const hasAssignee = (issue.assignees && issue.assignees.length > 0) || issue.assignee;
|
||||
if (hasAssignee && transitions.in_progress) {
|
||||
await jiraService.transitionIssue(newIssue.key, transitions.in_progress)
|
||||
.catch(e => logger.error('Initial transition failed', e.message));
|
||||
}
|
||||
|
||||
const giteaWebUrl = config.gitea.baseUrl.replace(/\/api\/v1\/?$/, '');
|
||||
const giteaIssueUrl = `${giteaWebUrl}/${repository.owner.username}/${repository.name}/issues/${giteaId}`;
|
||||
|
||||
const successMsg = isResyncCommand
|
||||
? `手动同步:已补建Jira工单 [${newIssue.key}](${config.jira.baseUrl}/browse/${newIssue.key})`
|
||||
: `Jira来源:[${newIssue.key}](${config.jira.baseUrl}/browse/${newIssue.key})\n由工单机器人创建`;
|
||||
|
||||
Promise.all([
|
||||
jiraService.addComment(newIssue.key, `Gitea来源: ${giteaIssueUrl}\n由工单机器人创建`),
|
||||
giteaService.addComment(
|
||||
repository.owner.username,
|
||||
repository.name,
|
||||
giteaId,
|
||||
successMsg
|
||||
)
|
||||
]).catch(err => logger.error('Comment write-back failed', err.message));
|
||||
}
|
||||
|
||||
} catch (error) {
|
||||
logger.error(`[${repoKey}] [GITEA->JIRA] Failed to sync #${giteaId}: ${error.message}`);
|
||||
|
||||
if (isResyncCommand) {
|
||||
await giteaService.addComment(
|
||||
repository.owner.username,
|
||||
repository.name,
|
||||
giteaId,
|
||||
`同步失败: ${error.message}`
|
||||
);
|
||||
}
|
||||
} finally {
|
||||
clearTimeout(lockTimer);
|
||||
processingIds.delete(issueKey);
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = { handleIssueEvent };
|
||||
Reference in New Issue
Block a user