项目中涉及的数据量比较大. 查询字段比较多, 用MySQL查询有点捉襟见肘, 了解到Elasticsearch 是一个实时的分布式搜索分析引擎, 它被用作全文检索、结构化搜索、分析以及这三个功能的组合, 就没用以前用过的Sphinx.怎么把数据从MySQL数据表同步到ElasticSearch下呢, Follow me.
使用的技术:
* Docker (Docker version 18.03.0-ce, build 0520e24)
* MySQL5.7 (安装在宿主机)
* Docker-elk (Docker for Elasticsearch, Logstash, Kibana)
* 想了解 ELK Stack 转 https://github.com/elastic/stack-docker
假设已安装配置好了Docker,Docker-composer,MySQL
创建要同步的MySQL表
CREATE TABLE `advertiser` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(34) DEFAULT NULL,
`cap` int(11) DEFAULT '0',
`updated_time` int(10) unsigned DEFAULT '0',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4;
插入测试数据:
INSERT INTO `advertiser` (`id`, `name`, `cap`, `updated_time`)
VALUES
(1, 'Adv1-1', 1, 1527866545),
(2, 'Adv2-1', 2, 1527866545),
(3, 'Adv3-3-2', 3, 1527866545),
(4, 'Adv4-1527866548', 4, 1527866548),
(5, 'Adv5-3-7', 5, 1527866547);
我们需要一个整型时间戳字段: update_time, 稍后做同步时,用于更新数据.
安装配置docker-elk
docker-elk是一个Elasticsearch, Logstash, Kibana组合, 直接在
git clone git@github.com:deviantony/docker-elk.git
cd docker-elk
原项目中使用的Kibana6.2.4版本在启动时报错:
docker-compose up --build
......
......
kibana_1 | module.js:103
kibana_1 | throw e;
kibana_1 | ^
kibana_1 |
kibana_1 | SyntaxError: Error parsing /usr/share/kibana/node_modules/lodash/package.json: Unexpected token
dockerelk_kibana_1 exited with code 1
把kibana/Dockerfile中的版本由6.2.4改成6.2.1就不报错了
FROM docker.elastic.co/kibana/kibana-oss:6.2.4
改成
FROM docker.elastic.co/kibana/kibana-oss:6.2.1
就不报错了
下载MySQL jdbc库放到logstash/lib目录下
mkdir logstash/lib
cd logstash/lib
wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.11.zip
unzip mysql-connector-java-8.0.11.zip
mv mysql-connector-java-8.0.11.jar ../
rm -rf mysql-connector-java-8.0.11
把jdbc库添加到logstash中, 修改docker-compose.yml中的
logstash:
build:
context: logstash/
volumes:
- ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro
- ./logstash/pipeline:/usr/share/logstash/pipeline:ro
下添加
- ./logstash/lib/mysql-connector-java-8.0.11.jar:/home/comp/Downloads/mysql-connector-java-8.0.11.jar
配置Logstash定时同步MySQL表
修改logstash/pipeline/logstash.conf,把
input {
tcp {
port => 5000
}
}
## Add your filters / logstash plugins configuration here
output {
elasticsearch {
hosts => "elasticsearch:9200"
}
}
改成
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.31.116:3306/ultron"
# The user we wish to execute our statement as
jdbc_user => "ultron"
jdbc_password => "ultron"
# The path to our downloaded jdbc driver
jdbc_driver_library => "/home/comp/Downloads/mysql-connector-java-8.0.11.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
# 要同步的数据
statement => "SELECT * FROM advertiser WHERE updated_time > :sql_last_value order by updated_time"
# 每分钟同步一次
schedule => "* * * * *"
# 为了同步上一次同步完后,修改的数据, 使用以下配置 updated_time是整型时间戳
use_column_value => true
tracking_column => updated_time
}
}
output {
stdout { codec => json_lines }
elasticsearch {
"hosts" => "elasticsearch:9200"
"index" => "ultron"
"document_type" => "advertiser"
"document_id" => "%{id}"
}
}
运行
docker-compose up --build
查看效果
浏览器打开Kibana: http://localhost:5601 默认会转到 Create index pattern页面:
- Index pattern 输入 “Ultron*”
- 点击”Next step”
- Time Filter field name 的下拉选择”@timestamp”
- 点击”Create index pattern”
- 创建成功
转到Kibana的 Discover页面,在图中的下拉框架选择Ultron就能看到同步过去的数据了.
注意事项
- 从Github上拉到docker-elk后,需要修改 kibana/Dockerfile中的版本信息,现在拉的kibana-oss:6.2.4 版本会报错. 换成kibana-oss:6.2.1
- logstash/config/logstash.conf中为了同步上一次同步完后,修改的数据
- updated_time 是整型时间戳
- sql_last_value 会记录上一次的 updated_time最大值,所以当数据被添加和修改时,要把updated_time设置成更新数据的整型时间戳.
- 曾尝试把updated_time使用DATETIME类型但失败
参考:
https://qbox.io/blog/migrating-mysql-data-into-elasticsearch-using-logstash
https://www.elastic.co/guide/cn/elasticsearch/guide/current/index.html
https://logz.io/blog/elk-stack-on-docker/
https://github.com/deviantony/docker-elk