feat: 添加单源爬取功能并优化数据库同步

新增单源爬取功能,支持在界面上单独更新每个数据源
添加数据库同步脚本,支持主从数据库结构同步和数据同步
优化华能集团爬虫的页面导航和稳定性
新增系统托盘功能,支持最小化到托盘
This commit is contained in:
dmy
2026-01-14 16:25:01 +08:00
parent bcd7af4e69
commit 3f6d10061d
8 changed files with 691 additions and 41 deletions

View File

@@ -1,15 +1,19 @@
import { Controller, Post, Get } from '@nestjs/common';
import { Controller, Post, Get, Param, Body } from '@nestjs/common';
import { BidCrawlerService } from './services/bid-crawler.service';
@Controller('api/crawler')
export class CrawlerController {
private isCrawling = false;
private crawlingSources = new Set<string>();
constructor(private readonly crawlerService: BidCrawlerService) {}
@Get('status')
getStatus() {
return { isCrawling: this.isCrawling };
return {
isCrawling: this.isCrawling,
crawlingSources: Array.from(this.crawlingSources)
};
}
@Post('run')
@@ -20,12 +24,12 @@ export class CrawlerController {
this.isCrawling = true;
// We don't await this because we want it to run in the background
// We don't await this because we want it to run in the background
// and return immediately, or we can await if we want to user to wait.
// Given the requirement "Immediate Crawl", usually implies triggering it.
// However, for a better UI experience, we might want to wait or just trigger.
// Let's await it so that user knows when it's done (or failed),
// assuming it doesn't take too long for the mock.
// Let's await it so that user knows when it's done (or failed),
// assuming it doesn't take too long for the mock.
// Real crawling might take long, so background is better.
// For this prototype, I'll await it to show completion.
try {
@@ -35,4 +39,20 @@ export class CrawlerController {
this.isCrawling = false;
}
}
@Post('crawl/:sourceName')
async crawlSingleSource(@Param('sourceName') sourceName: string) {
if (this.crawlingSources.has(sourceName)) {
return { message: `Source ${sourceName} is already being crawled` };
}
this.crawlingSources.add(sourceName);
try {
const result = await this.crawlerService.crawlSingleSource(sourceName);
return result;
} finally {
this.crawlingSources.delete(sourceName);
}
}
}

View File

@@ -216,6 +216,97 @@ export class BidCrawlerService {
}
}
async crawlSingleSource(sourceName: string) {
this.logger.log(`Starting single source crawl for: ${sourceName}`);
// 从环境变量读取代理配置
const proxyHost = this.configService.get<string>('PROXY_HOST');
const proxyPort = this.configService.get<string>('PROXY_PORT');
const proxyUsername = this.configService.get<string>('PROXY_USERNAME');
const proxyPassword = this.configService.get<string>('PROXY_PASSWORD');
// 构建代理参数
const args = [
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-blink-features=AutomationControlled',
'--disable-infobars',
'--window-position=0,0',
'--ignore-certifcate-errors',
'--ignore-certifcate-errors-spki-list',
];
if (proxyHost && proxyPort) {
const proxyUrl = proxyUsername && proxyPassword
? `http://${proxyUsername}:${proxyPassword}@${proxyHost}:${proxyPort}`
: `http://${proxyHost}:${proxyPort}`;
args.push(`--proxy-server=${proxyUrl}`);
this.logger.log(`Using proxy: ${proxyHost}:${proxyPort}`);
}
const browser = await puppeteer.launch({
headless: false,
args,
});
const crawlers = [ChdtpCrawler, ChngCrawler, SzecpCrawler, CdtCrawler, EpsCrawler, CnncecpCrawler, CgnpcCrawler, CeicCrawler, EspicCrawler, PowerbeijingCrawler, SdiccCrawler, CnoocCrawler];
const targetCrawler = crawlers.find(c => c.name === sourceName);
if (!targetCrawler) {
await browser.close();
throw new Error(`Crawler not found for source: ${sourceName}`);
}
try {
this.logger.log(`Crawling: ${targetCrawler.name}`);
const results = await targetCrawler.crawl(browser);
this.logger.log(`Extracted ${results.length} items from ${targetCrawler.name}`);
// 获取最新的发布日期
const latestPublishDate = results.length > 0
? results.reduce((latest, item) => {
const itemDate = new Date(item.publishDate);
return itemDate > latest ? itemDate : latest;
}, new Date(0))
: null;
for (const item of results) {
await this.bidsService.createOrUpdate({
title: item.title,
url: item.url,
publishDate: item.publishDate,
source: targetCrawler.name,
});
}
// 保存爬虫统计信息到数据库
await this.saveCrawlInfo(targetCrawler.name, results.length, latestPublishDate);
return {
success: true,
source: targetCrawler.name,
count: results.length,
latestPublishDate,
};
} catch (err) {
this.logger.error(`Error crawling ${targetCrawler.name}: ${err.message}`);
// 保存错误信息到数据库
await this.saveCrawlInfo(targetCrawler.name, 0, null, err.message);
return {
success: false,
source: targetCrawler.name,
count: 0,
error: err.message,
};
} finally {
await browser.close();
}
}
private async saveCrawlInfo(
source: string,
count: number,

View File

@@ -4,53 +4,75 @@ import { ChdtpResult } from './chdtp_target';
// 模拟人类鼠标移动
async function simulateHumanMouseMovement(page: puppeteer.Page) {
const viewport = page.viewport();
if (!viewport) return;
try {
const viewport = page.viewport();
if (!viewport) return;
const movements = 5 + Math.floor(Math.random() * 5); // 5-10次随机移动
const movements = 5 + Math.floor(Math.random() * 5); // 5-10次随机移动
for (let i = 0; i < movements; i++) {
const x = Math.floor(Math.random() * viewport.width);
const y = Math.floor(Math.random() * viewport.height);
await page.mouse.move(x, y, {
steps: 10 + Math.floor(Math.random() * 20) // 10-30步使移动更平滑
});
// 随机停顿 100-500ms
await new Promise(r => setTimeout(r, 100 + Math.random() * 400));
for (let i = 0; i < movements; i++) {
// 检查页面是否仍然有效
if (page.isClosed()) {
console.log('Page was closed during mouse movement simulation');
return;
}
const x = Math.floor(Math.random() * viewport.width);
const y = Math.floor(Math.random() * viewport.height);
await page.mouse.move(x, y, {
steps: 10 + Math.floor(Math.random() * 20) // 10-30步使移动更平滑
});
// 随机停顿 100-500ms
await new Promise(r => setTimeout(r, 100 + Math.random() * 400));
}
} catch (error) {
console.log('Mouse movement simulation interrupted:', error.message);
}
}
// 模拟人类滚动
async function simulateHumanScrolling(page: puppeteer.Page) {
const scrollCount = 3 + Math.floor(Math.random() * 5); // 3-7次滚动
try {
const scrollCount = 3 + Math.floor(Math.random() * 5); // 3-7次滚动
for (let i = 0; i < scrollCount; i++) {
const scrollDistance = 100 + Math.floor(Math.random() * 400); // 100-500px
await page.evaluate((distance) => {
window.scrollBy({
top: distance,
behavior: 'smooth'
for (let i = 0; i < scrollCount; i++) {
// 检查页面是否仍然有效
if (page.isClosed()) {
console.log('Page was closed during scrolling simulation');
return;
}
const scrollDistance = 100 + Math.floor(Math.random() * 400); // 100-500px
await page.evaluate((distance) => {
window.scrollBy({
top: distance,
behavior: 'smooth'
});
}, scrollDistance);
// 随机停顿 500-1500ms
await new Promise(r => setTimeout(r, 500 + Math.random() * 1000));
}
// 滚动回顶部
if (!page.isClosed()) {
await page.evaluate(() => {
window.scrollTo({ top: 0, behavior: 'smooth' });
});
}, scrollDistance);
// 随机停顿 500-1500ms
await new Promise(r => setTimeout(r, 500 + Math.random() * 1000));
await new Promise(r => setTimeout(r, 1000));
}
} catch (error) {
console.log('Scrolling simulation interrupted:', error.message);
}
// 滚动回顶部
await page.evaluate(() => {
window.scrollTo({ top: 0, behavior: 'smooth' });
});
await new Promise(r => setTimeout(r, 1000));
}
export const ChngCrawler = {
name: '华能集团电子商务平台',
url: 'https://ec.chng.com.cn/ecmall/index.html#/purchase/home?top=0',
baseUrl: 'https://ec.chng.com.cn/ecmall/index.html',
url: 'https://ec.chng.com.cn/channel/home/#/purchase?top=0',
baseUrl: 'https://ec.chng.com.cn/channel/home/#',
async crawl(browser: puppeteer.Browser): Promise<ChdtpResult[]> {
const logger = new Logger('ChngCrawler');
@@ -106,14 +128,16 @@ export const ChngCrawler = {
await page.authenticate({ username, password });
}
}
// 模拟人类行为
// 模拟人类行为
logger.log('Simulating human mouse movements...');
await simulateHumanMouseMovement(page);
logger.log('Simulating human scrolling...');
await simulateHumanScrolling(page);
await page.waitForNavigation({ waitUntil: 'domcontentloaded' }).catch(() => {});
// 等待页面稳定,不强制等待导航
await new Promise(r => setTimeout(r, 3000));
// 模拟人类行为
logger.log('Simulating human mouse movements...');
await simulateHumanMouseMovement(page);
@@ -215,8 +239,24 @@ export const ChngCrawler = {
const nextButton = await page.$('svg[data-icon="right"]');
if (!nextButton) break;
// 点击下一页前保存当前页面状态
const currentUrl = page.url();
await nextButton.click();
await new Promise(r => setTimeout(r, 5000));
// 等待页面导航完成
try {
await page.waitForFunction(
(oldUrl) => window.location.href !== oldUrl,
{ timeout: 10000 },
currentUrl
);
} catch (e) {
logger.warn('Navigation timeout, continuing anyway');
}
// 等待页面内容加载
await new Promise(r => setTimeout(r, 15000));
currentPage++;
}

252
src/scripts/sync.ts Normal file
View File

@@ -0,0 +1,252 @@
import 'dotenv/config';
import { DataSource, DataSourceOptions } from 'typeorm';
import mysql from 'mysql2/promise';
import { BidItem } from '../bids/entities/bid-item.entity';
import { Keyword } from '../keywords/keyword.entity';
import { AiRecommendation } from '../ai/entities/ai-recommendation.entity';
import { CrawlInfoAdd } from '../crawler/entities/crawl-info-add.entity';
// 主数据库配置
const masterDbConfig: DataSourceOptions = {
type: process.env.DATABASE_TYPE as any || 'mariadb',
host: process.env.DATABASE_HOST || 'localhost',
port: parseInt(process.env.DATABASE_PORT || '3306'),
username: process.env.DATABASE_USERNAME || 'root',
password: process.env.DATABASE_PASSWORD || 'root',
database: process.env.DATABASE_NAME || 'bidding',
entities: [BidItem, Keyword, AiRecommendation, CrawlInfoAdd],
synchronize: false,
};
// Slave 数据库配置
const slaveDbConfig: DataSourceOptions = {
type: process.env.SLAVE_DATABASE_TYPE as any || 'mariadb',
host: process.env.SLAVE_DATABASE_HOST || 'localhost',
port: parseInt(process.env.SLAVE_DATABASE_PORT || '3306'),
username: process.env.SLAVE_DATABASE_USERNAME || 'root',
password: process.env.SLAVE_DATABASE_PASSWORD || 'root',
database: process.env.SLAVE_DATABASE_NAME || 'bidding_slave',
entities: [BidItem, Keyword, AiRecommendation, CrawlInfoAdd],
synchronize: false,
};
// 日志工具
const logger = {
log: (message: string, ...args: any[]) => {
console.log(`[INFO] ${new Date().toISOString()} - ${message}`, ...args);
},
error: (message: string, ...args: any[]) => {
console.error(`[ERROR] ${new Date().toISOString()} - ${message}`, ...args);
},
warn: (message: string, ...args: any[]) => {
console.warn(`[WARN] ${new Date().toISOString()} - ${message}`, ...args);
},
};
// 同步单个表的数据
async function syncTable<T>(
masterDataSource: DataSource,
slaveDataSource: DataSource,
entityClass: any,
tableName: string,
): Promise<number> {
const masterRepo = masterDataSource.getRepository(entityClass);
const slaveRepo = slaveDataSource.getRepository(entityClass);
logger.log(`开始同步表: ${tableName}`);
// 从主数据库获取所有数据
const masterData = await masterRepo.find();
logger.log(`主数据库 ${tableName} 表中有 ${masterData.length} 条记录`);
// 从 slave 数据库获取所有数据
const slaveData = await slaveRepo.find();
logger.log(`Slave 数据库 ${tableName} 表中有 ${slaveData.length} 条记录`);
// 创建主数据库记录的 ID 集合
const masterIds = new Set(masterData.map((item: any) => item.id));
// 删除 slave 数据库中不存在于主数据库的记录
const toDelete = slaveData.filter((item: any) => !masterIds.has(item.id));
if (toDelete.length > 0) {
await slaveRepo.remove(toDelete);
logger.log(`从 slave 数据库删除了 ${toDelete.length}${tableName} 记录`);
}
// 同步数据:使用 save 方法进行 upsert 操作
let syncedCount = 0;
for (const item of masterData) {
await slaveRepo.save(item);
syncedCount++;
}
logger.log(`成功同步 ${syncedCount}${tableName} 记录到 slave 数据库`);
return syncedCount;
}
// 创建数据库(如果不存在)
async function createDatabaseIfNotExists(config: DataSourceOptions) {
const connection = await mysql.createConnection({
host: (config as any).host,
port: (config as any).port,
user: (config as any).username,
password: (config as any).password,
});
await connection.query(`CREATE DATABASE IF NOT EXISTS \`${(config as any).database}\``);
await connection.end();
}
// 同步表结构
async function syncSchema(masterDataSource: DataSource, slaveDataSource: DataSource): Promise<DataSource> {
logger.log('开始同步表结构...');
// 获取主数据库的所有表
const tables = await masterDataSource.query(`
SELECT TABLE_NAME
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = '${(masterDbConfig as any).database}'
`);
for (const table of tables) {
const tableName = table.TABLE_NAME;
logger.log(`同步表结构: ${tableName}`);
// 获取主数据库的建表语句
const createTableResult = await masterDataSource.query(`
SHOW CREATE TABLE \`${tableName}\`
`);
let createTableSql = createTableResult[0]['Create Table'];
// 转换 MariaDB 语法到 MySQL 语法
// 将 uuid 类型转换为 CHAR(36)
createTableSql = createTableSql.replace(/\buuid\b/gi, 'CHAR(36)');
// 检查 slave 数据库中是否存在该表
const tableExists = await slaveDataSource.query(`
SELECT COUNT(*) as count
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = '${(slaveDbConfig as any).database}'
AND TABLE_NAME = '${tableName}'
`);
const tempTableName = `temp_${tableName}_${Date.now()}`;
if (tableExists[0].count > 0) {
// 表存在,先备份数据到临时表
logger.log(`备份表 ${tableName} 的数据到 ${tempTableName}...`);
await slaveDataSource.query(`CREATE TABLE ${tempTableName} AS SELECT * FROM \`${tableName}\``);
logger.log(`备份完成,共备份 ${await slaveDataSource.query(`SELECT COUNT(*) as count FROM ${tempTableName}`).then(r => r[0].count)} 条记录`);
}
// 删除 slave 数据库中的表(如果存在)
await slaveDataSource.query(`DROP TABLE IF EXISTS \`${tableName}\``);
// 在 slave 数据库中创建表
await slaveDataSource.query(createTableSql);
// 如果之前有备份数据,尝试恢复
if (tableExists[0].count > 0) {
try {
logger.log(`${tempTableName} 恢复数据到 ${tableName}...`);
// 获取临时表的列名
const columns = await slaveDataSource.query(`
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = '${(slaveDbConfig as any).database}'
AND TABLE_NAME = '${tempTableName}'
`);
const columnNames = columns.map((c: any) => `\`${c.COLUMN_NAME}\``).join(', ');
// 将数据从临时表插入到新表
await slaveDataSource.query(`
INSERT INTO \`${tableName}\` (${columnNames})
SELECT ${columnNames} FROM ${tempTableName}
`);
const restoredCount = await slaveDataSource.query(`SELECT COUNT(*) as count FROM \`${tableName}\``);
logger.log(`数据恢复完成,共恢复 ${restoredCount[0].count} 条记录`);
// 删除临时表
await slaveDataSource.query(`DROP TABLE IF EXISTS ${tempTableName}`);
} catch (error) {
logger.warn(`恢复数据失败: ${error.message}`);
logger.warn(`临时表 ${tempTableName} 保留,请手动处理`);
}
}
}
logger.log('表结构同步完成');
// 重新初始化 slave 数据库连接以清除 TypeORM 元数据缓存
logger.log('重新初始化 slave 数据库连接...');
await slaveDataSource.destroy();
await slaveDataSource.initialize();
logger.log('Slave 数据库连接重新初始化完成');
return slaveDataSource;
}
// 主同步函数
async function syncDatabase() {
let masterDataSource: DataSource | null = null;
let slaveDataSource: DataSource | null = null;
try {
logger.log('开始数据库同步...');
// 创建 slave 数据库(如果不存在)
logger.log('检查并创建 slave 数据库...');
await createDatabaseIfNotExists(slaveDbConfig);
logger.log('Slave 数据库准备就绪');
// 创建主数据库连接
masterDataSource = new DataSource(masterDbConfig);
await masterDataSource.initialize();
logger.log('主数据库连接成功');
// 创建 slave 数据库连接
slaveDataSource = new DataSource(slaveDbConfig);
await slaveDataSource.initialize();
logger.log('Slave 数据库连接成功');
// 同步表结构
slaveDataSource = await syncSchema(masterDataSource, slaveDataSource);
// 同步各个表
const tables = [
{ entity: BidItem, name: 'bid_items' },
{ entity: Keyword, name: 'keywords' },
{ entity: AiRecommendation, name: 'ai_recommendations' },
{ entity: CrawlInfoAdd, name: 'crawl_info_add' },
];
let totalSynced = 0;
for (const table of tables) {
const count = await syncTable(masterDataSource, slaveDataSource, table.entity, table.name);
totalSynced += count;
}
logger.log(`数据库同步完成,共同步 ${totalSynced} 条记录`);
await masterDataSource.destroy();
await slaveDataSource.destroy();
process.exit(0);
} catch (error) {
logger.error('数据库同步失败:', error);
if (masterDataSource && masterDataSource.isInitialized) {
await masterDataSource.destroy();
}
if (slaveDataSource && slaveDataSource.isInitialized) {
await slaveDataSource.destroy();
}
process.exit(1);
}
}
// 执行同步
syncDatabase();