在宝塔环境中实现 Elasticsearch 与数据库的实时同步,主要有以下几种方案:
一、同步方案概览
| 方案 | 实时性 | 复杂度 | 数据一致性 | 适用场景 |
|---|---|---|---|---|
| 应用层双写 | 最高 | 中等 | 最终一致 | 新建项目,可修改代码 |
| Logstash JDBC | 分钟级 | 低 | 延迟一致 | 已有项目,增量同步 |
| Canal | 秒级 | 高 | 最终一致 | MySQL 数据库,要求高实时性 |
| 数据库触发器 | 实时 | 高 | 强一致 | 数据库层面同步 |
二、方案一:应用层双写(推荐)
实现原理
在业务代码中同时写入数据库和 Elasticsearch。
PHP 实现示例
<?php
/**
* 应用层双写方案
* 在写入数据库的同时写入 Elasticsearch
*/
require '/www/wwwroot/your-site/vendor/autoload.php';
use Elasticsearch\ClientBuilder;
class DualWriteService {
private $esClient;
private $db;
public function __construct() {
// 初始化 ES 客户端
$this->esClient = ClientBuilder::create()
->setHosts(['localhost:9200'])
->build();
// 初始化数据库连接
$this->initDatabase();
}
private function initDatabase() {
$host = 'localhost';
$dbname = 'your_database';
$username = 'your_username';
$password = 'your_password';
try {
$this->db = new PDO(
"mysql:host={$host};dbname={$dbname};charset=utf8mb4",
$username,
$password,
[
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC
]
);
} catch (PDOException $e) {
throw new Exception("数据库连接失败: " . $e->getMessage());
}
}
/**
* 添加文章(双写)
*/
public function addArticle($articleData) {
$dbResult = $this->writeToDatabase($articleData);
if ($dbResult['success']) {
$esResult = $this->writeToElasticsearch($articleData, $dbResult['id']);
if (!$esResult['success']) {
// ES 写入失败,记录日志或加入重试队列
$this->logSyncFailure('add', $dbResult['id'], $articleData);
}
return $dbResult;
}
return $dbResult;
}
/**
* 更新文章(双写)
*/
public function updateArticle($id, $updateData) {
$dbResult = $this->updateDatabase($id, $updateData);
if ($dbResult['success']) {
$esResult = $this->updateElasticsearch($id, $updateData);
if (!$esResult['success']) {
$this->logSyncFailure('update', $id, $updateData);
}
}
return $dbResult;
}
/**
* 删除文章(双写)
*/
public function deleteArticle($id) {
$dbResult = $this->deleteFromDatabase($id);
if ($dbResult['success']) {
$esResult = $this->deleteFromElasticsearch($id);
if (!$esResult['success']) {
$this->logSyncFailure('delete', $id);
}
}
return $dbResult;
}
/**
* 写入数据库
*/
private function writeToDatabase($data) {
try {
$sql = "INSERT INTO articles (title, content, author, category, tags, status, created_at, updated_at)
VALUES (:title, :content, :author, :category, :tags, :status, NOW(), NOW())";
$stmt = $this->db->prepare($sql);
$stmt->execute([
':title' => $data['title'],
':content' => $data['content'],
':author' => $data['author'],
':category' => $data['category'],
':tags' => is_array($data['tags']) ? implode(',', $data['tags']) : $data['tags'],
':status' => $data['status'] ?? 1
]);
$id = $this->db->lastInsertId();
return [
'success' => true,
'id' => $id
];
} catch (Exception $e) {
return [
'success' => false,
'error' => $e->getMessage()
];
}
}
/**
* 写入 Elasticsearch
*/
private function writeToElasticsearch($data, $id) {
try {
$esData = [
'id' => (int)$id,
'title' => $data['title'],
'content' => $data['content'],
'author' => $data['author'],
'category' => $data['category'],
'tags' => is_array($data['tags']) ? $data['tags'] : explode(',', $data['tags']),
'status' => $data['status'] ?? 1,
'created_at' => date('Y-m-d H:i:s'),
'updated_at' => date('Y-m-d H:i:s')
];
$params = [
'index' => 'articles',
'id' => $id,
'body' => $esData
];
$response = $this->esClient->index($params);
return ['success' => true];
} catch (Exception $e) {
return [
'success' => false,
'error' => $e->getMessage()
];
}
}
/**
* 更新数据库
*/
private function updateDatabase($id, $data) {
try {
$sql = "UPDATE articles SET
title = :title,
content = :content,
author = :author,
category = :category,
tags = :tags,
status = :status,
updated_at = NOW()
WHERE id = :id";
$stmt = $this->db->prepare($sql);
$stmt->execute([
':title' => $data['title'],
':content' => $data['content'],
':author' => $data['author'],
':category' => $data['category'],
':tags' => is_array($data['tags']) ? implode(',', $data['tags']) : $data['tags'],
':status' => $data['status'] ?? 1,
':id' => $id
]);
return ['success' => true];
} catch (Exception $e) {
return [
'success' => false,
'error' => $e->getMessage()
];
}
}
/**
* 更新 Elasticsearch
*/
private function updateElasticsearch($id, $data) {
try {
$updateData = [
'title' => $data['title'],
'content' => $data['content'],
'author' => $data['author'],
'category' => $data['category'],
'tags' => is_array($data['tags']) ? $data['tags'] : explode(',', $data['tags']),
'status' => $data['status'] ?? 1,
'updated_at' => date('Y-m-d H:i:s')
];
$params = [
'index' => 'articles',
'id' => $id,
'body' => [
'doc' => $updateData
]
];
$response = $this->esClient->update($params);
return ['success' => true];
} catch (Exception $e) {
return [
'success' => false,
'error' => $e->getMessage()
];
}
}
/**
* 从数据库删除
*/
private function deleteFromDatabase($id) {
try {
$sql = "DELETE FROM articles WHERE id = :id";
$stmt = $this->db->prepare($sql);
$stmt->execute([':id' => $id]);
return ['success' => true];
} catch (Exception $e) {
return [
'success' => false,
'error' => $e->getMessage()
];
}
}
/**
* 从 Elasticsearch 删除
*/
private function deleteFromElasticsearch($id) {
try {
$params = [
'index' => 'articles',
'id' => $id
];
$response = $this->esClient->delete($params);
return ['success' => true];
} catch (Exception $e) {
return [
'success' => false,
'error' => $e->getMessage()
];
}
}
/**
* 记录同步失败
*/
private function logSyncFailure($operation, $id, $data = null) {
$logData = [
'timestamp' => date('Y-m-d H:i:s'),
'operation' => $operation,
'id' => $id,
'data' => $data
];
file_put_contents(
'/www/wwwlogs/es_sync_failures.log',
json_encode($logData) . "\n",
FILE_APPEND
);
}
/**
* 重试失败的同步
*/
public function retryFailedSyncs() {
$logFile = '/www/wwwlogs/es_sync_failures.log';
if (!file_exists($logFile)) {
return ['success' => true, 'message' => '无失败记录'];
}
$logs = file($logFile, FILE_IGNORE_NEW_LINES | FILE_SKIP_EMPTY_LINES);
$successCount = 0;
$failCount = 0;
foreach ($logs as $log) {
$data = json_decode($log, true);
try {
switch ($data['operation']) {
case 'add':
$this->writeToElasticsearch($data['data'], $data['id']);
break;
case 'update':
$this->updateElasticsearch($data['id'], $data['data']);
break;
case 'delete':
$this->deleteFromElasticsearch($data['id']);
break;
}
$successCount++;
} catch (Exception $e) {
$failCount++;
}
}
// 清空日志文件
file_put_contents($logFile, '');
return [
'success' => true,
'retried' => $successCount,
'failed' => $failCount
];
}
}
// 使用示例
$syncService = new DualWriteService();
// 添加文章
$article = [
'title' => '测试文章标题',
'content' => '这是文章内容...',
'author' => '张三',
'category' => '技术',
'tags' => ['PHP', 'Elasticsearch', '搜索'],
'status' => 1
];
$result = $syncService->addArticle($article);
if ($result['success']) {
echo "文章添加成功,ID: " . $result['id'];
} else {
echo "添加失败: " . $result['error'];
}
?>三、方案二:Logstash JDBC 输入插件
安装和配置 Logstash
1. 在宝塔中安装 Logstash
# 进入宝塔终端
cd /www/server
# 下载 Logstash(版本需要与 ES 对应)
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.17.0-linux-x86_64.tar.gz
# 解压
tar -zxvf logstash-7.17.0-linux-x86_64.tar.gz
mv logstash-7.17.0 logstash
# 下载 MySQL JDBC 驱动
cd logstash
wget https://cdn.mysql.com/archives/mysql-connector-java-8.0/mysql-connector-java-8.0.28.tar.gz
tar -zxvf mysql-connector-java-8.0.28.tar.gz2. 创建 Logstash 配置文件
创建 /www/server/logstash/config/mysql-sync.conf:
input {
jdbc {
# MySQL 连接配置
jdbc_connection_string => "jdbc:mysql://localhost:3306/your_database?useUnicode=true&characterEncoding=utf8&useSSL=false"
jdbc_user => "your_username"
jdbc_password => "your_password"
# JDBC 驱动路径
jdbc_driver_library => "/www/server/logstash/mysql-connector-java-8.0.28/mysql-connector-java-8.0.28.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
# 启用分页
jdbc_paging_enabled => true
jdbc_page_size => 50000
# 调度配置(每分钟执行一次)
schedule => "* * * * *"
# SQL 查询(增量同步)
statement => "SELECT * FROM articles WHERE updated_at > :sql_last_value OR created_at > :sql_last_value"
# 记录最后一次同步时间
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "updated_at"
# 记录文件位置
last_run_metadata_path => "/www/server/logstash/last_run_metadata.txt"
# 时区设置
jdbc_default_timezone => "Asia/Shanghai"
}
}
filter {
# 添加 @timestamp 字段
date {
match => [ "updated_at", "yyyy-MM-dd HH:mm:ss" ]
target => "@timestamp"
}
# 处理 tags 字段
if [tags] {
mutate {
gsub => [ "tags", ",", "|" ]
split => { "tags" => "|" }
}
}
# 移除不需要的字段
mutate {
remove_field => ["@version", "@timestamp"]
}
}
output {
# 输出到 Elasticsearch
elasticsearch {
hosts => ["localhost:9200"]
index => "articles"
document_id => "%{id}"
# 定义映射模板
template => "/www/server/logstash/config/articles-template.json"
template_name => "articles"
template_overwrite => true
}
# 可选:输出到文件用于调试
file {
path => "/www/server/logstash/logs/mysql-sync.log"
}
}3. 创建 Elasticsearch 映射模板
创建 /www/server/logstash/config/articles-template.json:
{
"index_patterns": ["articles*"],
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"analysis": {
"analyzer": {
"ik_smart_analyzer": {
"type": "custom",
"tokenizer": "ik_smart"
},
"ik_max_analyzer": {
"type": "custom",
"tokenizer": "ik_max_word"
}
}
}
},
"mappings": {
"properties": {
"id": {"type": "integer"},
"title": {
"type": "text",
"analyzer": "ik_smart_analyzer",
"search_analyzer": "ik_smart_analyzer"
},
"content": {
"type": "text",
"analyzer": "ik_max_analyzer",
"search_analyzer": "ik_smart_analyzer"
},
"author": {"type": "keyword"},
"category": {"type": "keyword"},
"tags": {"type": "keyword"},
"status": {"type": "integer"},
"created_at": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
"updated_at": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"}
}
}
}4. 启动 Logstash
# 创建日志目录
mkdir -p /www/server/logstash/logs
# 启动 Logstash
cd /www/server/logstash
./bin/logstash -f config/mysql-sync.conf
# 或作为守护进程运行
nohup ./bin/logstash -f config/mysql-sync.conf > logs/logstash.log 2>&1 &5. 创建宝塔计划任务
在宝塔面板中添加计划任务:
# 命令
cd /www/server/logstash && ./bin/logstash -f config/mysql-sync.conf
# 执行周期:每分钟四、方案三:Canal(MySQL Binlog 同步)
安装和配置 Canal
1. 安装 Canal
# 下载 Canal
cd /www/server
wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
tar -zxvf canal.deployer-1.1.6.tar.gz
mv canal.deployer-1.1.6 canal2. 配置 MySQL 开启 Binlog
-- 检查是否开启 binlog
SHOW VARIABLES LIKE 'log_bin';
-- 如果没有开启,在 MySQL 配置文件中添加
-- /etc/my.cnf
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=13. 创建 Canal 用户
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal_password';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;4. 配置 Canal
编辑 /www/server/canal/conf/example/instance.properties:
# 数据库配置
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal_password
canal.instance.connectionCharset=UTF-8
# 要监听的数据库和表
canal.instance.filter.regex=.*\\..*
canal.instance.filter.black.regex=
# 表映射
canal.instance.filter.regex=your_database.articles5. 创建 PHP Canal 客户端
<?php
/**
* Canal PHP 客户端
* 监听 MySQL Binlog 并同步到 Elasticsearch
*/
class CanalClient {
private $esClient;
public function __construct() {
$this->esClient = ClientBuilder::create()
->setHosts(['localhost:9200'])
->build();
}
public function startSync() {
// 连接 Canal 服务器
$client = new \Canal\Client();
$client->connect("127.0.0.1", 11111);
$client->subscribe("example", "your_database.articles", "");
while (true) {
$message = $client->get(100);
if ($message->getEntries()) {
foreach ($message->getEntries() as $entry) {
if ($entry->getEntryType() == \Canal\Protocol\EntryType::ROWDATA) {
$this->processRowChange($entry);
}
}
}
sleep(1); // 避免 CPU 过高
}
}
private function processRowChange($entry) {
$rowChange = \Canal\Protocol\RowChange::parseFromString($entry->getStoreValue());
foreach ($rowChange->getRowDatasList() as $rowData) {
switch ($rowChange->getEventType()) {
case \Canal\Protocol\EventType::INSERT:
$this->handleInsert($rowData);
break;
case \Canal\Protocol\EventType::UPDATE:
$this->handleUpdate($rowData);
break;
case \Canal\Protocol\EventType::DELETE:
$this->handleDelete($rowData);
break;
}
}
}
private function handleInsert($rowData) {
$afterColumns = $rowData->getAfterColumnsList();
$data = [];
foreach ($afterColumns as $column) {
$data[$column->getName()] = $column->getValue();
}
$this->syncToElasticsearch('index', $data);
}
private function handleUpdate($rowData) {
$afterColumns = $rowData->getAfterColumnsList();
$data = [];
$id = null;
foreach ($afterColumns as $column) {
$data[$column->getName()] = $column->getValue();
if ($column->getName() == 'id') {
$id = $column->getValue();
}
}
$this->syncToElasticsearch('update', $data, $id);
}
private function handleDelete($rowData) {
$beforeColumns = $rowData->getBeforeColumnsList();
$id = null;
foreach ($beforeColumns as $column) {
if ($column->getName() == 'id') {
$id = $column->getValue();
break;
}
}
$this->syncToElasticsearch('delete', null, $id);
}
private function syncToElasticsearch($operation, $data = null, $id = null) {
try {
switch ($operation) {
case 'index':
$params = [
'index' => 'articles',
'id' => $data['id'],
'body' => $this->transformData($data)
];
$this->esClient->index($params);
break;
case 'update':
$params = [
'index' => 'articles',
'id' => $id,
'body' => [
'doc' => $this->transformData($data)
]
];
$this->esClient->update($params);
break;
case 'delete':
$params = [
'index' => 'articles',
'id' => $id
];
$this->esClient->delete($params);
break;
}
echo "同步成功: {$operation} ID: {$id}\n";
} catch (Exception $e) {
echo "同步失败: {$e->getMessage()}\n";
$this->logFailure($operation, $id, $data);
}
}
private function transformData($data) {
return [
'id' => (int)$data['id'],
'title' => $data['title'],
'content' => $data['content'],
'author' => $data['author'],
'category' => $data['category'],
'tags' => !empty($data['tags']) ? explode(',', $data['tags']) : [],
'status' => (int)$data['status'],
'created_at' => $data['created_at'],
'updated_at' => $data['updated_at']
];
}
private function logFailure($operation, $id, $data) {
// 记录同步失败日志
file_put_contents(
'/www/wwwlogs/canal_sync_failures.log',
json_encode([
'timestamp' => date('Y-m-d H:i:s'),
'operation' => $operation,
'id' => $id,
'data' => $data
]) . "\n",
FILE_APPEND
);
}
}
// 启动 Canal 客户端
$canalClient = new CanalClient();
$canalClient->startSync();
?>五、方案四:数据库触发器 + 消息队列
1. 创建消息表
CREATE TABLE es_sync_queue (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
table_name VARCHAR(100) NOT NULL,
record_id BIGINT NOT NULL,
operation ENUM('INSERT', 'UPDATE', 'DELETE') NOT NULL,
sync_status TINYINT DEFAULT 0 COMMENT '0:未同步, 1:已同步, 2:同步失败',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_status (sync_status),
INDEX idx_created (created_at)
);2. 创建数据库触发器
-- INSERT 触发器
DELIMITER $$
CREATE TRIGGER after_article_insert
AFTER INSERT ON articles
FOR EACH ROW
BEGIN
INSERT INTO es_sync_queue (table_name, record_id, operation, sync_status)
VALUES ('articles', NEW.id, 'INSERT', 0);
END$$
DELIMITER ;
-- UPDATE 触发器
DELIMITER $$
CREATE TRIGGER after_article_update
AFTER UPDATE ON articles
FOR EACH ROW
BEGIN
INSERT INTO es_sync_queue (table_name, record_id, operation, sync_status)
VALUES ('articles', NEW.id, 'UPDATE', 0);
END$$
DELIMITER ;
-- DELETE 触发器
DELIMITER $$
CREATE TRIGGER after_article_delete
AFTER DELETE ON articles
FOR EACH ROW
BEGIN
INSERT INTO es_sync_queue (table_name, record_id, operation, sync_status)
VALUES ('articles', OLD.id, 'DELETE', 0);
END$$
DELIMITER ;3. PHP 消息队列处理器
<?php
/**
* 消息队列同步处理器
*/
class QueueSyncService {
private $db;
private $esClient;
public function __construct() {
$this->initDatabase();
$this->esClient = ClientBuilder::create()
->setHosts(['localhost:9200'])
->build();
}
public function processQueue($batchSize = 100) {
// 获取待同步的记录
$sql = "SELECT * FROM es_sync_queue
WHERE sync_status = 0
ORDER BY id ASC
LIMIT :limit
FOR UPDATE";
$stmt = $this->db->prepare($sql);
$stmt->bindValue(':limit', $batchSize, PDO::PARAM_INT);
$stmt->execute();
$records = $stmt->fetchAll();
foreach ($records as $record) {
$this->processRecord($record);
}
return count($records);
}
private function processRecord($record) {
try {
switch ($record['operation']) {
case 'INSERT':
case 'UPDATE':
$this->syncUpsert($record);
break;
case 'DELETE':
$this->syncDelete($record);
break;
}
// 标记为已同步
$this->markAsSynced($record['id']);
} catch (Exception $e) {
// 标记为同步失败
$this->markAsFailed($record['id'], $e->getMessage());
}
}
private function syncUpsert($record) {
// 从数据库获取最新数据
$data = $this->getRecordData($record['table_name'], $record['record_id']);
if ($data) {
$params = [
'index' => $record['table_name'],
'id' => $record['record_id'],
'body' => $this->transformData($data)
];
$this->esClient->index($params);
}
}
private function syncDelete($record) {
$params = [
'index' => $record['table_name'],
'id' => $record['record_id']
];
$this->esClient->delete($params);
}
private function getRecordData($tableName, $recordId) {
$sql = "SELECT * FROM {$tableName} WHERE id = :id";
$stmt = $this->db->prepare($sql);
$stmt->execute([':id' => $recordId]);
return $stmt->fetch();
}
private function transformData($data) {
// 根据表结构转换数据
if (isset($data['tags']) && !empty($data['tags'])) {
$data['tags'] = explode(',', $data['tags']);
}
return $data;
}
private function markAsSynced($queueId) {
$sql = "UPDATE es_sync_queue SET sync_status = 1 WHERE id = :id";
$stmt = $this->db->prepare($sql);
$stmt->execute([':id' => $queueId]);
}
private function markAsFailed($queueId, $error) {
$sql = "UPDATE es_sync_queue SET sync_status = 2, error_message = :error WHERE id = :id";
$stmt = $this->db->prepare($sql);
$stmt->execute([
':id' => $queueId,
':error' => $error
]);
}
public function retryFailed($batchSize = 50) {
$sql = "SELECT * FROM es_sync_queue
WHERE sync_status = 2
ORDER BY id ASC
LIMIT :limit";
$stmt = $this->db->prepare($sql);
$stmt->bindValue(':limit', $batchSize, PDO::PARAM_INT);
$stmt->execute();
$records = $stmt->fetchAll();
$successCount = 0;
foreach ($records as $record) {
try {
$this->processRecord($record);
$successCount++;
} catch (Exception $e) {
// 记录重试失败
}
}
return $successCount;
}
}
// 创建宝塔计划任务执行同步
$syncService = new QueueSyncService();
// 每次处理 100 条记录
$processed = $syncService->processQueue(100);
echo "已处理 {$processed} 条同步记录";
// 重试失败的记录
$retried = $syncService->retryFailed(50);
echo "重试成功 {$retried} 条记录";
?>六、方案对比和选择建议
方案选择指南
| 场景 | 推荐方案 | 理由 |
|---|---|---|
| 新建项目 | 应用层双写 | 代码可控,实时性最好 |
| 已有项目,可修改代码 | 应用层双写 + 全量初始化 | 渐进式改造 |
| MySQL 数据库,要求高实时性 | Canal | 基于 Binlog,对业务无侵入 |
| 简单同步,可接受分钟级延迟 | Logstash JDBC | 配置简单,稳定可靠 |
| 数据库层面同步 | 触发器 + 消息队列 | 强一致性,对应用透明 |
宝塔环境推荐配置
对于大多数宝塔用户,我推荐:
- 首选:应用层双写方案
- 备选:Logstash JDBC 方案
- 高级需求:Canal 方案
性能优化建议
// 批量操作优化
public function bulkSync($records) {
$params = ['body' => []];
foreach ($records as $record) {
$params['body'][] = [
'index' => [
'_index' => 'articles',
'_id' => $record['id']
]
];
$params['body'][] = $this->transformData($record);
}
return $this->esClient->bulk($params);
}选择最适合你项目需求和团队技术能力的方案。对于宝塔环境,应用层双写和 Logstash 方案相对更容易实施和维护。
评论 (0)