Logstash输入jdbc是重复的结果 [英] Logstash input jdbc is duplicating results
问题描述
我使用logstash输入jdbc插件来读取两个(或更多)数据库,并将数据发送到弹性搜索,并使用kibana 4来对这些数据进行虚拟化。
这是我的logstash配置:
input {
jdbc {
type => A
jdbc_driver_library => C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar
jdbc_driver_class => Java :: net.sourceforge.jtds.jdbc.Driver
jdbc_connection_string => jdbc:jtds:sqlserver:// dev_data_base_server:1433 / dbApp1; domain = CORPDOMAIN; useNTLMv2 = true
jdbc_user => user
jdbc_password => pass
schedule => 5 * * * *
statement => select id,date,content,status from test_table
}
jdbc {
type => B
jdbc_driver_library => C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar
jdbc_driver_class => Java :: net.sourceforge.jtds.jdbc.Driver
jdbc_connection_string => jdbc:jtds:sqlserver:// dev_data_base_server:1433 / dbApp2; domain = CORPDOMAIN; useNTLMv2 = true
jdbc_user => user
jdbc_password => pass
schedule => 5 * * * *
statement => select id,date,content,status from test_table
}
}
过滤器{
}
输出{
如果[type] ==A{
elasticsearch {
host => localhost
protocol => http
index => logstash-servera - %{+ YYYY.MM.dd}
}
}
如果[type] ==B{
elasticsearch {
host => localhost
protocol => http
index => logstash-serverb - %{+ YYYY.MM.dd}
}
}
stdout {codec => rubydebug}
}
问题是每次运行logstash时,它开始保存所有已经在弹性搜索中的数据。
运行where子句= date>'2015-09-10'后,我会把logstash放在一起(再次运行)与--debug)与'特殊参数':sql_last_date。 logstash启动后,它开始在日志中显示:
←[36m执行JDBC查询{:statement =>SELECT \\\
\tSUBSTRING(R.RECEBEDOR,1,2)
AS'DDD',\\\
CASE WHEN R.STATUS&RCON'AND R.COD_RESPOSTA in(428,429,230,425,
430,427,418,422,415,424,214,433,435,207,426)THEN'REGRA DENEGÓCIO'\\\
W
HEN R.STATUS ='RCON'THEN'SUCESSO'\\\
\E ELSE'ERRO'\\\
END AS'TIPO_MENSAGEM
' ,\\\
AP.ALIAS为'CANAL',R.ID_RECARGA,R.VALOR,R.STATUS,R.COD_RESPOSTA,R.DESC
_RESPOSTA,R.DT_RECARGA as'@timestamp',R.ID_CLIENTE,R .ID_DEPENDENTE,R.ID_APL
ICACAO,RECEBEDOR,R.ID_OPERADORA,R.TIPO_PRODUTO \\\
\\\
FROM RECARGA R(NOLOCK)\\\
J
OIN APLICACAO AP ON R.ID_APLICACAO = AP.ID_APLICACAO \\\
where R.DT_RECARGA>:sql
_last_start\\\
ORDER BY R.DT_RECARGA ASC,:parameters => {:sql_last_start => 2015-09-
10 18:48:00世界标准时间} ,= level =>:debug,:file =>/ DEV / logstash-1.5.4 / vendor / bundle / jrub
y / 1.9 / gems / logstash-input-jdbc-1.0.0 / lib / logstash / plugin_mixins / jdbc.rb,:line =
>107,:method =>execute_statement}←[0m
这次我用真实语句运行:
SELECT
SUBSTRING(R.RECEBEDOR,1,2)AS'DDD',
CASE WHEN R.STATUS< RCON和R.COD_RESPOSTA(428,429,230,425,430,427,418,422,415,424,414,433,435,207,426)THEN'REGRA DENEGÓCIO'
WHEN R.STATUS ='RCON'THEN'SUCESSO'
ELSE'ERRO'
END as'TIPO_MENSAGEM' ,
AP.ALIAS as'CANAL',R.ID_RECARGA,R.VALOR,R.STATUS,R.COD_RESPOSTA,R.DESC_RESPOSTA,R.DT_RECARGA as'@timestamp',R.ID_CLIENTE,R.ID_DEPENDENTE, R.ID_APLICACAO,RECEBEDOR,R.ID_OPERADORA
FROM RECARGA R(NOLOCK)
JOIN APLICACAO AP ON R.ID_APLICACAO = AP.ID_APLICACAO
其中R.DT_RECARGA> :sql_last_start
ORDER BY R.DT_RECARGA ASC
任何人都知道如何解决它? p>
谢谢!
sql_last_start
现在是 sql_last_value
请检查这里
特殊参数 sql_last_start
现在重命名为 sql_last_value
为了更好的清晰度,因为它不仅限于datetime,而且可能还有其他列类型。
所以现在解决方案可能是这样的一个例子
input {
pre>
jdbc {
type = > A
jdbc_driver_library => C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch- jdbc-1.7.1.0\lib\jtds-1.3.1.jar
jdbc_driver_class => Java :: net.sourceforge.jtds.jdbc.Driver
jdbc_connection_string => jdbc:jtds:sqlserver:// dev_data_base_server:1433 / dbApp1; domain = CORPDOMAIN; useNTLMv2 = true
jdbc_user => user
jdbc_password => pass
schedule => 5 * * * *
use_column_value => true
tracking_column => date
statement => select id,date,content,status from test_table WHERE date>:sql_last_value
#clean_run true表示如果数据类型为date(默认值也为false),则将sql_last_value重置为零或初始值
clean_run => false
}
jdbc {
#for类型B ....
}
}
我已经通过sql Server DB测试了
请首先运行clean_run =数据类型错误,在开发中我们可能有不同的数据类型值存储在
sql_last_value
变量I'm using logstash input jdbc plugin to read two (or more) databases and send the data to elasticsearch, and using kibana 4 to vizualize these data.
This is my logstash config:
input { jdbc { type => "A" jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar" jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver" jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp1;domain=CORPDOMAIN;useNTLMv2=true" jdbc_user => "user" jdbc_password => "pass" schedule => "5 * * * *" statement => "SELECT id, date, content, status from test_table" } jdbc { type => "B" jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar" jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver" jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp2;domain=CORPDOMAIN;useNTLMv2=true" jdbc_user => "user" jdbc_password => "pass" schedule => "5 * * * *" statement => "SELECT id, date, content, status from test_table" } } filter { } output { if [type] == "A" { elasticsearch { host => "localhost" protocol => http index => "logstash-servera-%{+YYYY.MM.dd}" } } if [type] == "B" { elasticsearch { host => "localhost" protocol => http index => "logstash-serverb-%{+YYYY.MM.dd}" } } stdout { codec => rubydebug } }
The problem is that every time run the logstash, it starts to save all data that is already in elastic search.
After run with the where clause = date > '2015-09-10' I'd stoped the logstash and run again (with --debug) with the 'special parameter' :sql_last_date. After the logstash startup It starts to show this in the log:
←[36mExecuting JDBC query {:statement=>"SELECT \n\tSUBSTRING(R.RECEBEDOR, 1, 2) AS 'DDD',\nCASE WHEN R.STATUS <> 'RCON' AND R.COD_RESPOSTA in (428,429,230,425, 430,427,418,422,415,424,214,433,435,207,426) THEN 'REGRA DE NEGÓCIO' \n W HEN R.STATUS = 'RCON' THEN 'SUCESSO'\n\t ELSE 'ERRO'\n END AS 'TIPO_MENSAGEM ',\nAP.ALIAS as 'CANAL', R.ID_RECARGA, R.VALOR, R.STATUS, R.COD_RESPOSTA, R.DESC _RESPOSTA, R.DT_RECARGA as '@timestamp', R.ID_CLIENTE, R.ID_DEPENDENTE, R.ID_APL ICACAO, RECEBEDOR, R.ID_OPERADORA, R.TIPO_PRODUTO \n\nFROM RECARGA R (NOLOCK)\nJ OIN APLICACAO AP ON R.ID_APLICACAO = AP.ID_APLICACAO \nwhere R.DT_RECARGA > :sql _last_start\nORDER BY R.DT_RECARGA ASC", :parameters=>{:sql_last_start=>2015-09- 10 18:48:00 UTC}, :level=>:debug, :file=>"/DEV/logstash-1.5.4/vendor/bundle/jrub y/1.9/gems/logstash-input-jdbc-1.0.0/lib/logstash/plugin_mixins/jdbc.rb", :line= >"107", :method=>"execute_statement"}←[0m
This time i ran with the 'real' statement that is:
SELECT SUBSTRING(R.RECEBEDOR, 1, 2) AS 'DDD', CASE WHEN R.STATUS <> 'RCON' AND R.COD_RESPOSTA in (428,429,230,425,430,427,418,422,415,424,214,433,435,207,426) THEN 'REGRA DE NEGÓCIO' WHEN R.STATUS = 'RCON' THEN 'SUCESSO' ELSE 'ERRO' END AS 'TIPO_MENSAGEM', AP.ALIAS as 'CANAL', R.ID_RECARGA, R.VALOR, R.STATUS, R.COD_RESPOSTA, R.DESC_RESPOSTA, R.DT_RECARGA as '@timestamp', R.ID_CLIENTE, R.ID_DEPENDENTE, R.ID_APLICACAO, RECEBEDOR, R.ID_OPERADORA FROM RECARGA R (NOLOCK) JOIN APLICACAO AP ON R.ID_APLICACAO = AP.ID_APLICACAO where R.DT_RECARGA > :sql_last_start ORDER BY R.DT_RECARGA ASC
Anyone knows how to solve it?
Thanks!
解决方案
sql_last_start
is nowsql_last_value
please check here the special parametersql_last_start
is now renamed tosql_last_value
for better clarity as it is not only limited to datetime but may have other column type as well. so now solution may be something like thisinput { jdbc { type => "A" jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch- jdbc-1.7.1.0\lib\jtds-1.3.1.jar" jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver" jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp1;domain=CORPDOMAIN;useNTLMv2=true" jdbc_user => "user" jdbc_password => "pass" schedule => "5 * * * *" use_column_value => true tracking_column => date statement => "SELECT id, date, content, status from test_table WHERE date >:sql_last_value" #clean_run true means it will reset sql_last_value to zero or initial value if datatype is date(default is also false) clean_run =>false } jdbc{ #for type B.... } }
i have tested with sql Server DB
please run for first time with clean_run=>ture to avoid datatype error while in development we may have different datatype value stored in
sql_last_value
variable这篇关于Logstash输入jdbc是重复的结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!