ElasticSearch56.3[SOURCE]+logstash6.4.2[RPM]的logstash-input-jdbc实现mysql数据同步


ElasticSearch安装忽略

安装logstash
官方:https://www.elastic.co/guide/en/logstash/current/installing-logstash.html
1.下载公共密钥
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
2.添加yum源
vim  /etc/yum.repos.d/logstash.repo
文件中写入
[logstash-5.x]
name=Elastic repository for 5.x packages
baseurl=https://artifacts.elastic.co/packages/5.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md


#yum makecache
3.使用yum安装
yum install logstash


4.验证是否安装成功
进入 logstash 安装目录
cd /usr/share/logstash
运行
bin/logstash -e 'input { stdin { } } output { stdout {} }'
等待几秒钟 出现  
The stdin plugin is now waiting for input:
然后输入 
hello world
得到类似的结果
2016-11-24T08:01:55.949Z bogon hello world
安装logstash-input-jdbc插件
1.修改ruby仓库镜像
如果没有安装 gem 的话 安装gem 
安装ruby 
yum install ruby
装好以后测试一下 
ruby -v 
ruby 1.8.7 (2012-06-29 patchlevel 370) [i386-linux] 
ok 已经变成1.8.7 了
安装rubygems 
yum install rubygems
替换国内的镜像
gem sources --add https://gems.ruby-china.com/ --remove https://rubygems.org/
验证是否成功
gem sources -l
 
修改Gemfile的数据源地址
whereis logstash # 查看logstash安装的位置, 我的在 /usr/share/logstash目录
cd /usr/share/logstash
vim Gemfile
修改 source 的值 为: "https://gems.ruby-china.com/"
vim  Gemfile
 # 找到 remote 修改它的值为:
https://gems.ruby-china.com/
或者直接替换源这样你不用改你的 Gemfile 的 source。
#gem install bundler
$ bundle config mirror.https://rubygems.org https://gems.ruby-china.com/
然后开始安装
/usr/share/logstash/bin/logstash-plugin  install logstash-input-jdbc
如果镜像地址没有改错的话应该可以直接安装
或者  进入源码地址的release页面logstash-input-jdbc


2.开始同步 mysql 数据进行全量更新, 一次同步表中所有的记录
需要建立 两个文件  一个  .conf后缀的文件    一个 .sql 后缀的文件
 一个 mysql 的Java 驱动包  : mysql-connector-java-5.1.47-bin.jar    到这里下载:https://dev.mysql.com/downloads/connector/j/


需要同步数据库的表结构:


root@mysql6683 16:29:  [test]> desc test.cc;
+-------+-------------+------+-----+---------+-------+
| Field | Type        | Null | Key | Default | Extra |
+-------+-------------+------+-----+---------+-------+
| id    | int(11)     | YES  |     | NULL    |       |
| name  | varchar(20) | YES  |     | NULL    |       |
+-------+-------------+------+-----+---------+-------+
2 rows in set (0.00 sec)


root@mysql6683 16:29:  [test]> select * from test.cc;
+------+------+
| id   | name |
+------+------+
|    1 | rsc  |
|    2 | gmy  |
|    3 | ryx  |
+------+------+
3 rows in set (0.00 sec)




logstash-mysql.conf  内容:
里面的参数可以参考 logstash-input-jdbc官方参考文档
vim /usr/share/logstash/config/logstash-mysql.conf
input {
    stdin {
    }
    jdbc {
    # 数据库地址  端口  数据库名
      jdbc_connection_string => "jdbc:mysql://localhost:3306/test"    #test表示使用test数据库
    # 数据库用户名      
    jdbc_user => "root"
    # 数据库密码
      jdbc_password => "123456"
    # mysql java驱动地址 
      jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-5.1.47-bin.jar"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      # sql 语句文件
      statement_filepath => "/usr/share/logstash/config/test.sql" #通过引用外部文件执行sq语句
      #statement => "SELECT * FROM hexin_erp_order"   #也可以通过直接输入查询语句
      schedule => "* * * * *"
      type => "jdbc"
    }
}
output {
 stdout {
        codec => json_lines
    }
   elasticsearch {
        hosts  => "localhost:9200"
        index => "contacts"
     document_type => "contact"
        document_id => "%{id}"
    }
}
cat /usr/share/logstash/config/test.sql
select * from cc




++++++++++++++++成功配置文件+++++++++++++++++++
input {
    stdin {
    }
    jdbc {
      jdbc_connection_string => "jdbc:mysql://192.168.66.83:3306/test"
      jdbc_user => "root"
      jdbc_password => "123456"
      jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-5.1.47-bin.jar"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      statement_filepath => "/usr/share/logstash/config/test.sql"
      schedule => "* * * * *"
      type => "jdbc"
    }
}
output {
 stdout {
        codec => "json_lines"
    }
 elasticsearch {
        hosts  => "192.168.66.83:9200"
        index => "contacts"
        document_type => "contact"
        document_id => "%{id}"
    }
}
+++++++++++++++++++++++++++++++++++




注意: 在你的数据库里 要有一个数据库名字和logstash-mysql.conf 里的对应就可以了, 表名和 test.sql 里的对应就可以了   在表中  有一个id字段是为了和test.sql 中document_id => "%{id}" 这个参数对应 可以执行修改
然后开始执行
/usr/share/logstash/bin/logstash -f /usr/share/logstash/config/logstash-mysql.conf
如果出现错误 或者没有结果 可以进入 logs 目录中查看日志
 


出现类似这样的内容  就可以了
本例是对一个数据库表进行同步,如果需要同步多个表的数据,可以创建多个配置文件,也可以在一个配置文件中指定多个 jdbc input。配置中的所有项目都必须重新复制一遍。


增量更新
这个例子中的SQL执行的全量更新,如果需要进行增量更新,就需要对SQL进行一些修改。
input { 
stdin { 

jdbc { 
# 数据库地址端口数据库名 
jdbc_connection_string => "jdbc:mysql://localhost:3306/shen" 
# 数据库用户名 
jdbc_user => "root" 
# 数据库密码 
jdbc_password => "rootroot" 
# mysql java驱动地址 
jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-5.1.43-bin.jar" 
# 驱动类的名称 
jdbc_driver_class => "com.mysql.jdbc.Driver" 
jdbc_paging_enabled => "true" 
jdbc_page_size => "50000" 
#是否记录上次运行的结果 
record_last_run => true 
#记录上次运行结果的文件位置 
last_run_metadata_path => "" 
#是否使用数据库某一列的值, 
use_column_value => true 
tracking_column => "id" 
#numeric或者timestamp 
tracking_column_type => "numeric" 
#如果为true则会清除 last_run_metadata_path 的记录,即重新开始同步数据 
clean_run => false 
#sql_last_value根据tracking类型,默认为0或者1970-1-1 
statement => "SELECT * FROM TABLE WHERE id > :last_sql_value" 
# sql 语句文件,对于复杂的查询,可以放在文件中。 
# statement_filepath => "filename.sql" 
# 设置监听间隔,语法与Linux系统Cron相同 
schedule => "* * * * *" 


output { 
stdout { 
codec => json_lines 

elasticsearch { 
hosts=> "localhost:9200" 
index => "contacts" 
document_type => "contact" 
document_id => "%{id}" 


增量更新会忽略对历史数据的更新,如果业务中历史数据经常发生变化,则可以通过全量更新的方法。


查看是否同步成功:
[root@node6683 ~]# curl http://192.168.66.83:9200/contacts/contact/1?pretty=true
{
  "_index" : "contacts",
  "_type" : "contact",
  "_id" : "1",
  "_version" : 39,
  "found" : true,
  "_source" : {
    "@version" : "1",
    "@timestamp" : "2018-10-23T08:54:00.154Z",
    "id" : 1,
    "type" : "jdbc",
    "name" : "rsc"
  }
}


url 参数说明  contacts 是对应logstash-mysql.conf中 index => "contacts"  contact 对应document_type => "contact"   1表示id为1 的数据

分割线
感谢打赏
江西数库信息技术有限公司
YWSOS.COM 平台代运维解决方案
 评论
 发表评论
姓   名:

Powered by AKCMS