Logstash输入jdbc是重复的结果 [英] Logstash input jdbc is duplicating results

查看:196
本文介绍了Logstash输入jdbc是重复的结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用logstash输入jdbc插件来读取两个(或更多)数据库,并将数据发送到弹性搜索,并使用kibana 4来对这些数据进行虚拟化。



这是我的logstash配置:

  input {
jdbc {
type => A
jdbc_driver_library => C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar
jdbc_driver_class => Java :: net.sourceforge.jtds.jdbc.Driver
jdbc_connection_string => jdbc:jtds:sqlserver:// dev_data_base_server:1433 / dbApp1; domain = CORPDOMAIN; useNTLMv2 = true
jdbc_user => user
jdbc_password => pass
schedule => 5 * * * *
statement => select id,date,content,status from test_table
}

jdbc {
type => B
jdbc_driver_library => C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar
jdbc_driver_class => Java :: net.sourceforge.jtds.jdbc.Driver
jdbc_connection_string => jdbc:jtds:sqlserver:// dev_data_base_server:1433 / dbApp2; domain = CORPDOMAIN; useNTLMv2 = true
jdbc_user => user
jdbc_password => pass
schedule => 5 * * * *
statement => select id,date,content,status from test_table
}
}
过滤器{

}
输出{

如果[type] ==A{
elasticsearch {
host => localhost
protocol => http
index => logstash-servera - %{+ YYYY.MM.dd}
}
}
如果[type] ==B{
elasticsearch {
host => localhost
protocol => http
index => logstash-serverb - %{+ YYYY.MM.dd}
}
}

stdout {codec => rubydebug}
}

问题是每次运行logstash时,它开始保存所有已经在弹性搜索中的数据。



运行where子句= date>'2015-09-10'后,我会把logstash放在一起(再次运行)与--debug)与'特殊参数':sql_last_date。 logstash启动后,它开始在日志中显示:

 ←[36m执行JDBC查询{:statement =>SELECT \\\
\tSUBSTRING(R.RECEBEDOR,1,2)
AS'DDD',\\\
CASE WHEN R.STATUS&RCON'AND R.COD_RESPOSTA in(428,429,230,425,
430,427,418,422,415,424,214,433,435,207,426)THEN'REGRA DENEGÓCIO'\\\
W
HEN R.STATUS ='RCON'THEN'SUCESSO'\\\
\E ELSE'ERRO'\\\
END AS'TIPO_MENSAGEM
' ,\\\
AP.ALIAS为'CANAL',R.ID_RECARGA,R.VALOR,R.STATUS,R.COD_RESPOSTA,R.DESC
_RESPOSTA,R.DT_RECARGA as'@timestamp',R.ID_CLIENTE,R .ID_DEPENDENTE,R.ID_APL
ICACAO,RECEBEDOR,R.ID_OPERADORA,R.TIPO_PRODUTO \\\
\\\
FROM RECARGA R(NOLOCK)\\\
J
OIN APLICACAO AP ON R.ID_APLICACAO = AP.ID_APLICACAO \\\
where R.DT_RECARGA>:sql
_last_start\\\
ORDER BY R.DT_RECARGA ASC,:parameters => {:sql_last_start => 2015-09-
10 18:48:00世界标准时间} ,= level =>:debug,:file =>/ DEV / logstash-1.5.4 / vendor / bundle / jrub
y / 1.9 / gems / logstash-input-jdbc-1.0.0 / lib / logstash / plugin_mixins / jdbc.rb,:line =
>107,:method =>execute_statement}←[0m

这次我用真实语句运行:

  SELECT 
SUBSTRING(R.RECEBEDOR,1,2)AS'DDD',
CASE WHEN R.STATUS< RCON和R.COD_RESPOSTA(428,429,230,425,430,427,418,422,415,424,414,433,435,207,426)THEN'REGRA DENEGÓCIO'
WHEN R.STATUS ='RCON'THEN'SUCESSO'
ELSE'ERRO'
END as'TIPO_MENSAGEM' ,
AP.ALIAS as'CANAL',R.ID_RECARGA,R.VALOR,R.STATUS,R.COD_RESPOSTA,R.DESC_RESPOSTA,R.DT_RECARGA as'@timestamp',R.ID_CLIENTE,R.ID_DEPENDENTE, R.ID_APLICACAO,RECEBEDOR,R.ID_OPERADORA

FROM RECARGA R(NOLOCK)
JOIN APLICACAO AP ON R.ID_APLICACAO = AP.ID_APLICACAO
其中R.DT_RECARGA> :sql_last_start
ORDER BY R.DT_RECARGA ASC

任何人都知道如何解决它? p>

谢谢!

解决方案

sql_last_start 现在是 sql_last_value 请检查这里
特殊参数 sql_last_start 现在重命名为 sql_last_value 为了更好的清晰度,因为它不仅限于datetime,而且可能还有其他列类型。
所以现在解决方案可能是这样的一个例子

  input {
jdbc {
type = > A
jdbc_driver_library => C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch- jdbc-1.7.1.0\lib\jtds-1.3.1.jar
jdbc_driver_class => Java :: net.sourceforge.jtds.jdbc.Driver
jdbc_connection_string => jdbc:jtds:sqlserver:// dev_data_base_server:1433 / dbApp1; domain = CORPDOMAIN; useNTLMv2 = true
jdbc_user => user
jdbc_password => pass
schedule => 5 * * * *
use_column_value => true
tracking_column => date
statement => select id,date,content,status from test_table WHERE date>:sql_last_value
#clean_run true表示如果数据类型为date(默认值也为false),则将sql_last_value重置为零或初始值
clean_run => false
}
jdbc {
#for类型B ....
}
}
pre>

我已经通过sql Server DB测试了



请首先运行clean_run =数据类型错误,在开发中我们可能有不同的数据类型值存储在 sql_last_value 变量


I'm using logstash input jdbc plugin to read two (or more) databases and send the data to elasticsearch, and using kibana 4 to vizualize these data.

This is my logstash config:

input {
  jdbc {
    type => "A"
    jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar"
    jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver"
    jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp1;domain=CORPDOMAIN;useNTLMv2=true"
    jdbc_user => "user"
    jdbc_password => "pass"
    schedule => "5 * * * *"
    statement => "SELECT id, date, content, status from test_table"
  }

jdbc {
    type => "B"
    jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar"
    jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver"
    jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp2;domain=CORPDOMAIN;useNTLMv2=true"
    jdbc_user => "user"
    jdbc_password => "pass"
    schedule => "5 * * * *"
    statement => "SELECT id, date, content, status from test_table"
  }
}
filter {

}
output {

    if [type] == "A" {
        elasticsearch {
            host => "localhost"
            protocol => http
            index => "logstash-servera-%{+YYYY.MM.dd}"
        }    
    }
    if [type] == "B" {
        elasticsearch {
            host => "localhost"
            protocol => http
            index => "logstash-serverb-%{+YYYY.MM.dd}"
        }    
    }

  stdout { codec => rubydebug }
}

The problem is that every time run the logstash, it starts to save all data that is already in elastic search.

After run with the where clause = date > '2015-09-10' I'd stoped the logstash and run again (with --debug) with the 'special parameter' :sql_last_date. After the logstash startup It starts to show this in the log:

←[36mExecuting JDBC query {:statement=>"SELECT \n\tSUBSTRING(R.RECEBEDOR, 1, 2)
AS 'DDD',\nCASE WHEN R.STATUS <>  'RCON' AND R.COD_RESPOSTA in (428,429,230,425,
430,427,418,422,415,424,214,433,435,207,426) THEN 'REGRA DE NEGÓCIO'  \n       W
HEN R.STATUS = 'RCON' THEN 'SUCESSO'\n\t   ELSE 'ERRO'\n   END AS 'TIPO_MENSAGEM
',\nAP.ALIAS as 'CANAL', R.ID_RECARGA, R.VALOR, R.STATUS, R.COD_RESPOSTA, R.DESC
_RESPOSTA, R.DT_RECARGA as '@timestamp', R.ID_CLIENTE, R.ID_DEPENDENTE, R.ID_APL
ICACAO, RECEBEDOR, R.ID_OPERADORA, R.TIPO_PRODUTO \n\nFROM RECARGA R (NOLOCK)\nJ
OIN APLICACAO AP ON R.ID_APLICACAO = AP.ID_APLICACAO \nwhere R.DT_RECARGA > :sql
_last_start\nORDER BY R.DT_RECARGA ASC", :parameters=>{:sql_last_start=>2015-09-
10 18:48:00 UTC}, :level=>:debug, :file=>"/DEV/logstash-1.5.4/vendor/bundle/jrub
y/1.9/gems/logstash-input-jdbc-1.0.0/lib/logstash/plugin_mixins/jdbc.rb", :line=
>"107", :method=>"execute_statement"}←[0m

This time i ran with the 'real' statement that is:

SELECT 
    SUBSTRING(R.RECEBEDOR, 1, 2) AS 'DDD',
CASE WHEN R.STATUS <>  'RCON' AND R.COD_RESPOSTA in (428,429,230,425,430,427,418,422,415,424,214,433,435,207,426) THEN 'REGRA DE NEGÓCIO'  
       WHEN R.STATUS = 'RCON' THEN 'SUCESSO'
       ELSE 'ERRO'
   END AS 'TIPO_MENSAGEM',
AP.ALIAS as 'CANAL', R.ID_RECARGA, R.VALOR, R.STATUS, R.COD_RESPOSTA, R.DESC_RESPOSTA, R.DT_RECARGA as '@timestamp', R.ID_CLIENTE, R.ID_DEPENDENTE, R.ID_APLICACAO, RECEBEDOR, R.ID_OPERADORA

FROM RECARGA R (NOLOCK)
JOIN APLICACAO AP ON R.ID_APLICACAO = AP.ID_APLICACAO 
where R.DT_RECARGA > :sql_last_start
ORDER BY R.DT_RECARGA ASC

Anyone knows how to solve it?

Thanks!

解决方案

sql_last_start is now sql_last_valueplease check here the special parameter sql_last_start is now renamed to sql_last_value for better clarity as it is not only limited to datetime but may have other column type as well. so now solution may be something like this

input {
jdbc {
     type => "A"
     jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-  jdbc-1.7.1.0\lib\jtds-1.3.1.jar"
     jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver"
     jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp1;domain=CORPDOMAIN;useNTLMv2=true"
     jdbc_user => "user"
     jdbc_password => "pass"
     schedule => "5 * * * *"
     use_column_value => true
     tracking_column => date
     statement => "SELECT id, date, content, status from test_table WHERE date >:sql_last_value"
    #clean_run true means it will reset sql_last_value to zero or initial value if datatype is date(default is also false)
     clean_run =>false
   }
jdbc{
  #for type B....
  }
}

i have tested with sql Server DB

please run for first time with clean_run=>ture to avoid datatype error while in development we may have different datatype value stored in sql_last_value variable

这篇关于Logstash输入jdbc是重复的结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆