问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

如何实现logstash-input-jdbc 增量读取SQLserver数据库

发布网友 发布时间:2022-04-10 04:45

我来回答

2个回答

懂视网 时间:2022-05-03 08:17

需要学习的地方:使用logstash获取数据后,然后根据这些数据再从MySQL数据库中进行匹配,增加一些数据到logstash的数据流中,然后输出到es

在IoT物联网时代,我们经常会遇到从传感器采集数据的情况。这些传感器,可以上传物联网数据,比如温度,湿度。通常这些传感器带有自己的ID,但是它并不具有像地理位置等这样的信息。当物联网数据传到我们的数据平台时,我们希望对采集上来的数据进行数据的丰富,比如我们对物联网的数据加上它所在的位置等信息,这将对我们的数据分析非常有用。这些需要丰富的数据通常会存放于一个关系数据库的表格中,比如MySQL的数据库中。在今天的文章中,我们来介绍如何使用jdbc_streamline来丰富我们的数据。

安装Logstash

安装MySQL

把MySQL数据导入到Elasticsearch中”来进行安装及配置好自己的MySQL数据库。记得把相应的Connector放入相应的文件目录中。

安装好我们的MySQL后,创建一个叫做data的数据库,并创建一个叫做sensors的表格

在这里,我们创建一个记录。这个记录是用来丰富我们的物联网的数据的。当传感器传入的数据的id和这个表格id相匹配的时候,那么这个表格的数据将会被加入到事件中,从而丰富我们的事件数据。

Logstash配置文件

我们先定义一个配置这样一个叫做logstash_jdbc_streaming.con的文件,并把它置于自己的一个文件目录中,比如我把它存于我的一个叫做data的目录中:

# data/logstash_jdbc_streaming.conf

 input {
 http {
 id => "sensor_data_http_input"
 user => "sensor_data"
 password => "sensor_data"
 }
 }
 
 
 output {
 stdout { 
 codec => rubydebug 
 }
 }

在这里,我们使用一个叫做http的input。HTTP输入插件的参考文档位https://www.elastic.co/guide/en/logstash/current/plugins-inputs-http.html。
在这种情况下,由于我们使用的是http输入插件的默认配置,因此我们刚刚指定了ID。我们应该保护此HTTP端点的安全,因为它将在Internet上公开,以允许传感器从任何地方发送数据。 我们可以配置用户和密码参数,以使用所需的用户名和密码保护此端。

我们可以先启动我们的Logstash。在Logstash的安装目录下运行:

./bin/logstash -f ~/data/logstash_jdbc_streaming.conf

使用此输入插件启动Logstash时,它将在端口8080上启动HTTP服务器,该HTTP服务器使用具有给定用户名和密码的基本身份验证进行保护。 我们可以使用curl命令将请求发送到此Logstash管道,如下所示:

curl -XPOST -u sensor_data:sensor_data --header "Content-Type:application/json" "http://localhost:8080/" -d '{"id":1,"time":1512102540000,"reading":17.00}'

当我们执行上面的命令后,我们可以在Logstash运行的terminal中看到显示结果。

使用jdbc_streaming丰富数据

我们接下来丰富我们的传感器传来的数据。首先,我们来进一步修改我们的Logstash配置文件:

# cat data/logstash_jdbc_streaming.conf

 input {
 http {
 id => "sensor_data_http_input"
 user => "sensor_data"
 password => "sensor_data"
 }
 }
 
 filter {
 jdbc_streaming {
 jdbc_connection_string => "jdbc:mysql://localhost:3306/data?useTimezone=true&&serverTimezone=UTC"
 jdbc_user => "root"
 jdbc_password => "YourDatabasePassword"
 jdbc_validate_connection => true
 jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
 parameters => { "sensor_identifier" => "id"}
 statement => "select * from sensors where id= :sensor_identifier"
 target => lookupResult
 }
 }
 
 output {
 stdout { 
 codec => rubydebug 
 }
 }

这里必须注意的几点:

  1. 在配置jdbc connector时,必须把相应的驱动拷贝到Logstash安装目录下的
 $ pwd
 /Users/liuxg/elastic/logstash-7.3.0
 (base) liuxg:logstash-7.3.0 liuxg$ ls ./logstash-core/lib/jars/mysql-connector-java-8.0.17.jar 
 ./logstash-core/lib/jars/mysql-connector-java-8.0.17.jar
  1. 另外在配置文件中,不要配置jdbc_driver_library这个项。否则我们可能会看到如下的错误信息
[2019-10-10T14:41:53,015][ERROR][logstash.javapipeline ] Pipeline aborted due to error {:pipeline_id=>"main", :exception=>#<TypeError: failed to coerce jdk.internal.loader.ClassLoaders$AppClassLoader to java.net.URLClassLoader>, :backtrace=>["org/jruby/java/addons/KernelJavaAddons.java:29:in `to_java'", "/Users/liuxg/elastic/logstash-7.3.0/vendor/bundle/jruby/2.5.0/gems/logstash-filter-jdbc_streaming-1.0.7/lib/logstash/plugin_mixins/jdbc_streaming.rb:48:in `prepare_jdbc_connection'", "/Users/liuxg/elastic/logstash-7.3.0/vendor/bundle/jruby/2.5.0/gems/logstash-filter-jdbc_streaming-1.0.7/lib/logstash/filters/jdbc_streaming.rb:200:in `prepare_connected_jdbc_cache'", "/Users/liuxg/elastic/logstash-7.3.0/vendor/bundle/jruby/2.5.0/gems/logstash-filter-jdbc_streaming-1.0.7/lib/logstash/filters/jdbc_streaming.rb:116:in `register'", "org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java:56:in `register'", "/Users/liuxg/elastic/logstash-7.3.0/logstash-core/lib/logstash/java_pipeline.rb:192:in `block in register_plugins'", "org/jruby/RubyArray.java:1792:in `each'", "/Users/liuxg/elastic/logstash-7.3.0/logstash-core/lib/logstash/java_pipeline.rb:191:in `register_plugins'", "/Users/liuxg/elastic/logstash-7.3.0/logstash-core/lib/logstash/java_pipeline.rb:463:in `maybe_setup_out_plugins'", "/Users/liuxg/elastic/logstash-7.3.0/logstash-core/lib/logstash/java_pipeline.rb:204:in `start_workers'", "/Users/liuxg/elastic/logstash-7.3.0/logstash-core/lib/logstash/java_pipeline.rb:146:in `run'", "/Users/liuxg/elastic/logstash-7.3.0/logstash-core/lib/logstash/java_pipeline.rb:105:in `block in start'"], :thread=>"#<Thread:0x3fa8c5a3 run>"}

如果有运行错误,比如timezone错误,需要在JDBC请求时加入serverTimezone

当我们完成上面的一步的时候,我们重新运行我们的Logstash:

./bin/logstash -f ~/data/logstash_jdbc_streaming.conf

在另外一个terminal上,打入如下的命令:

curl -XPOST -u sensor_data:sensor_data --header "Content-Type:application/json" "http://localhost:8080/" -d '{"id":1,"time":1512102540000,"reading":17.00}'

我们可以看到如下的输出在Logstash的terminal上.

在上面,我们可以看到丰富后的数据在lookupResult中出现了。我们可以进一步改造我们的配置文件,并对从收据库中收集的数据更进一步地加工:

# cat data/logstash_jdbc_streaming.conf

 input {
 http {
 id => "sensor_data_http_input"
 user => "sensor_data"
 password => "sensor_data"
 }
 }
 
 filter {
 jdbc_streaming {
 jdbc_connection_string => "jdbc:mysql://localhost:3306/data?useTimezone=true&&serverTimezone=UTC"
 jdbc_user => "root"
 jdbc_password => "YourDabaePassword"
 jdbc_validate_connection => true
 jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
 parameters => { "sensor_identifier" => "id"}
 statement => "select * from sensors where id= :sensor_identifier"
 target => lookupResult
 }
 
 mutate {
 rename => {"[lookupResult][0][sensorType]" => "sensorType"}
 rename => {"[lookupResult][0][customer]" => "customer"}
 rename => {"[lookupResult][0][department]" => "department"}
 rename => {"[lookupResult][0][buildingName]" => "buildingName"}
 rename => {"[lookupResult][0][room]" => "room"}
 rename => {"[lookupResult][0][floor]" => "floor"}
 rename => {"[lookupResult][0][locationOnFloor]" => "locationOnFloor"}
 add_field => {
  "location" => "%{[lookupResult][0][latitude]},%{[lookupResult][0][longitude]}"
 }
 remove_field => ["lookupResult", "headers", "host"]
 } 
 }
 
 output {
 stdout { 
 codec => rubydebug 
 }
 }

重新运行我们的Logstash应用,并在另外一个terminal中打入curl指令,可以看出来这是经过我们转换后的数据。我们删除了一下并不需要的数据,同时,我们也把经纬度信息组合为我们需要的location字段。可以为未来我们的位置信息查询提供方便。

输出至Elasticsearch

到目前为止,我们只显示我们的数据到stdout,仅供我们调试所使用。我们可以进一步改造我们的配置文件:

cat data/logstash_jdbc_streaming.conf

 input {
 http {
 id => "sensor_data_http_input"
 user => "sensor_data"
 password => "sensor_data"
 }
 }
 
 filter {
 jdbc_streaming {
 jdbc_connection_string => "jdbc:mysql://localhost:3306/data?useTimezone=true&&serverTimezone=UTC"
 jdbc_user => "root"
 jdbc_password => "YourPassword"
 jdbc_validate_connection => true
 jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
 parameters => { "sensor_identifier" => "id"}
 statement => "select * from sensors where id= :sensor_identifier"
 target => lookupResult
 }
 
 mutate {
 rename => {"[lookupResult][0][sensorType]" => "sensorType"}
 rename => {"[lookupResult][0][customer]" => "customer"}
 rename => {"[lookupResult][0][department]" => "department"}
 rename => {"[lookupResult][0][buildingName]" => "buildingName"}
 rename => {"[lookupResult][0][room]" => "room"}
 rename => {"[lookupResult][0][floor]" => "floor"}
 rename => {"[lookupResult][0][locationOnFloor]" => "locationOnFloor"}
 add_field => {
  "location" => "%{[lookupResult][0][latitude]},%{[lookupResult][0][longitude]}"
 }
 remove_field => ["lookupResult", "headers", "host"]
 } 
 }
 
 output {
 stdout { 
 codec => rubydebug 
 }
 
 elasticsearch {
 hosts => ["localhost:9200"]
 index => "sensor_data-%{+YYYY.MM.dd}"
 user => "elastic"
 password => "elastic"
 } 
 }

这次,我们加入了elasticsearch作为一个输出。打开我们的Kibana可以看出来,我们已经成功地把数据输出到Elasticsearch之中了。

在实际的使用中,我们可以不间断地运用这个jdbc_streaming把我们的从物联网中收集的数据进行丰富。

Logstash:运用jdbc_streaming来丰富我们的数据

标签:html   time   位置   情况下   bae   cal   plugins   http服务器   组合   

热心网友 时间:2022-05-03 05:25

我的logstash input 配置文件如下 input { jdbc { type => "testdb" jdbc_driver_library => "/app/sqljdbc_6.0/enu/sqljdbc42.jar" jdbc_driver_class
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
如果银行拒贷有哪些办法 小天鹅滚筒洗衣机水位多少合适 阴阳师百闻牌攻略大全 百闻牌式神卡组阵容大全 阴阳师百闻牌三大妖狐阵容推荐 妖狐流派怎么搭配?-新手攻略-安族网... 阴阳师百闻牌妖狐快攻阵容 怎么搭配攻略推荐 阴阳师百闻牌妖狐技能攻略 妖狐属性及卡组搭配推荐-新手攻略-安族网... 阴阳师百闻牌妖狐最强卡组 阵容怎么搭配攻略 阴阳师百闻牌妖狐卡组推荐 怎么搭配攻略分享 带鹏字的公司名字大全 鹏字开头公司起名 叶罗丽娃娃玩具店在哪 问:海星能不能吃哒。、 干海星哪些部位不能吃 新买的奥克斯空调,遥控器上有个笑脸按钮是干什么的?能省电吗?_百度知 ... logstash 能将信息送到oracle吗 加拿大酒店管理专业比较好的大学有哪些? 加拿大大学酒店管理专业的排名 请问加拿大几个大学的酒店管理专业行不行的?推荐几个加拿大有酒店管理专业的学校 谢谢 请问加拿大世界排名在前五十的有麦吉尔大学和多伦多大学这两个大学的酒店管理专业怎么样呢 豆芽放在沙子里为什么中不出来 多伦多大学breath requirement 加拿大多伦多大学本科都有哪些常见专业 加拿大就业率高的城市和专业有哪些 加拿大什么专业就业情况较好? 加拿大哪间大学的酒店管理专业最好? 加拿大 餐饮学校 加拿大多伦多大学里有旅游管理专业(不是酒店管理,偏 加拿大哪所大学的酒店管理专业最好?最好是名校!!! 请问加拿大世界排名在前五十的有麦吉尔大学和多伦多大学这两个大学的酒店管理专业怎么样呢 多伦多大学有酒店管理专业吗?? 多伦多大学有酒店管理专业吗 能具体说说吗 段永平持有oppo多少股份 “疯狂”的茅台,竟指引黄峥创办拼多多? 国内和段永平联系紧密的上市公司 段永平的四大门徒是什么? 曾经和“股神”巴菲特,共进午餐的3个中国人,现在过得怎样呢? 段永平 苹果 多少股票 段永平还持股vivo吗 投资的真谛 集体跳槽:如何面对员工“叛逃”? “疯狂”的茅台,于黄峥有重大意义? 带有“竖折折钩”的字有哪些? 竖折折钩怎样写 笔画里有横折折钩,竖折折折钩的字有哪些? 如何删除logstash sincedb文件 角度,清新,山川,闪电,值日这些的拚音怎么写? 牛批哄哄带闪电的拼音 黎岗闪电的拼音怎么写的 lightning怎么用拼音读音 闪这个字的拼音 雷电的拼音