ElasticSearch安装忽略
安装logstash官方:https://www.elastic.co/guide/en/logstash/current/installing-logstash.html
1.下载公共密钥
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
2.添加yum源
vim /etc/yum.repos.d/logstash.repo
文件中写入
[logstash-5.x]
name=Elastic repository for 5.x packages
baseurl=https://artifacts.elastic.co/packages/5.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
#yum makecache
3.使用yum安装
yum install logstash
4.验证是否安装成功
进入 logstash 安装目录
cd /usr/share/logstash
运行
bin/logstash -e 'input { stdin { } } output { stdout {} }'
等待几秒钟 出现
The stdin plugin is now waiting for input:
然后输入
hello world
得到类似的结果
2016-11-24T08:01:55.949Z bogon hello world
安装logstash-input-jdbc插件
1.修改ruby仓库镜像
如果没有安装 gem 的话 安装gem
安装ruby
yum install ruby
装好以后测试一下
ruby -v
ruby 1.8.7 (2012-06-29 patchlevel 370) [i386-linux]
ok 已经变成1.8.7 了
安装rubygems
yum install rubygems
替换国内的镜像
gem sources --add https://gems.ruby-china.com/ --remove https://rubygems.org/
验证是否成功
gem sources -l
修改Gemfile的数据源地址
whereis logstash # 查看logstash安装的位置, 我的在 /usr/share/logstash目录
cd /usr/share/logstash
vim Gemfile
修改 source 的值 为: "https://gems.ruby-china.com/"
vim Gemfile
# 找到 remote 修改它的值为:
https://gems.ruby-china.com/
或者直接替换源这样你不用改你的 Gemfile 的 source。
#gem install bundler
$ bundle config mirror.https://rubygems.org https://gems.ruby-china.com/
然后开始安装
/usr/share/logstash/bin/logstash-plugin install logstash-input-jdbc
如果镜像地址没有改错的话应该可以直接安装
或者 进入源码地址的release页面logstash-input-jdbc
2.开始同步 mysql 数据进行全量更新, 一次同步表中所有的记录
需要建立 两个文件 一个 .conf后缀的文件 一个 .sql 后缀的文件
一个 mysql 的Java 驱动包 : mysql-connector-java-5.1.47-bin.jar 到这里下载:https://dev.mysql.com/downloads/connector/j/
需要同步数据库的表结构:
root@mysql6683 16:29: [test]> desc test.cc;
+-------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+-------------+------+-----+---------+-------+
| id | int(11) | YES | | NULL | |
| name | varchar(20) | YES | | NULL | |
+-------+-------------+------+-----+---------+-------+
2 rows in set (0.00 sec)
root@mysql6683 16:29: [test]> select * from test.cc;
+------+------+
| id | name |
+------+------+
| 1 | rsc |
| 2 | gmy |
| 3 | ryx |
+------+------+
3 rows in set (0.00 sec)
logstash-mysql.conf 内容:
里面的参数可以参考 logstash-input-jdbc官方参考文档
vim /usr/share/logstash/config/logstash-mysql.conf
input {
stdin {
}
jdbc {
# 数据库地址 端口 数据库名
jdbc_connection_string => "jdbc:mysql://localhost:3306/test" #test表示使用test数据库
# 数据库用户名
jdbc_user => "root"
# 数据库密码
jdbc_password => "123456"
# mysql java驱动地址
jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-5.1.47-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
# sql 语句文件
statement_filepath => "/usr/share/logstash/config/test.sql" #通过引用外部文件执行sq语句
#statement => "SELECT * FROM hexin_erp_order" #也可以通过直接输入查询语句
schedule => "* * * * *"
type => "jdbc"
}
}
output {
stdout {
codec => json_lines
}
elasticsearch {
hosts => "localhost:9200"
index => "contacts"
document_type => "contact"
document_id => "%{id}"
}
}
cat /usr/share/logstash/config/test.sql
select * from cc
++++++++++++++++成功配置文件+++++++++++++++++++
input {
stdin {
}
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.66.83:3306/test"
jdbc_user => "root"
jdbc_password => "123456"
jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-5.1.47-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement_filepath => "/usr/share/logstash/config/test.sql"
schedule => "* * * * *"
type => "jdbc"
}
}
output {
stdout {
codec => "json_lines"
}
elasticsearch {
hosts => "192.168.66.83:9200"
index => "contacts"
document_type => "contact"
document_id => "%{id}"
}
}
+++++++++++++++++++++++++++++++++++
注意: 在你的数据库里 要有一个数据库名字和logstash-mysql.conf 里的对应就可以了, 表名和 test.sql 里的对应就可以了 在表中 有一个id字段是为了和test.sql 中document_id => "%{id}" 这个参数对应 可以执行修改
然后开始执行
/usr/share/logstash/bin/logstash -f /usr/share/logstash/config/logstash-mysql.conf
如果出现错误 或者没有结果 可以进入 logs 目录中查看日志
出现类似这样的内容 就可以了
本例是对一个数据库表进行同步,如果需要同步多个表的数据,可以创建多个配置文件,也可以在一个配置文件中指定多个 jdbc input。配置中的所有项目都必须重新复制一遍。
增量更新
这个例子中的SQL执行的全量更新,如果需要进行增量更新,就需要对SQL进行一些修改。
input {
stdin {
}
jdbc {
# 数据库地址端口数据库名
jdbc_connection_string => "jdbc:mysql://localhost:3306/shen"
# 数据库用户名
jdbc_user => "root"
# 数据库密码
jdbc_password => "rootroot"
# mysql java驱动地址
jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-5.1.43-bin.jar"
# 驱动类的名称
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#是否记录上次运行的结果
record_last_run => true
#记录上次运行结果的文件位置
last_run_metadata_path => ""
#是否使用数据库某一列的值,
use_column_value => true
tracking_column => "id"
#numeric或者timestamp
tracking_column_type => "numeric"
#如果为true则会清除 last_run_metadata_path 的记录,即重新开始同步数据
clean_run => false
#sql_last_value根据tracking类型,默认为0或者1970-1-1
statement => "SELECT * FROM TABLE WHERE id > :last_sql_value"
# sql 语句文件,对于复杂的查询,可以放在文件中。
# statement_filepath => "filename.sql"
# 设置监听间隔,语法与Linux系统Cron相同
schedule => "* * * * *"
}
}
output {
stdout {
codec => json_lines
}
elasticsearch {
hosts=> "localhost:9200"
index => "contacts"
document_type => "contact"
document_id => "%{id}"
}
}
增量更新会忽略对历史数据的更新,如果业务中历史数据经常发生变化,则可以通过全量更新的方法。
查看是否同步成功:
[root@node6683 ~]# curl http://192.168.66.83:9200/contacts/contact/1?pretty=true
{
"_index" : "contacts",
"_type" : "contact",
"_id" : "1",
"_version" : 39,
"found" : true,
"_source" : {
"@version" : "1",
"@timestamp" : "2018-10-23T08:54:00.154Z",
"id" : 1,
"type" : "jdbc",
"name" : "rsc"
}
}
url 参数说明 contacts 是对应logstash-mysql.conf中 index => "contacts" contact 对应document_type => "contact" 1表示id为1 的数据