使用Logstash句柄删除方案的增量索引 [英] Incremental indexing using logstash handle delete scenario

查看:53
本文介绍了使用Logstash句柄删除方案的增量索引的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

每当插入或更新新行时,我都会使用下面的Logstash配置在其中进行增量索引编制,我能够从MSSQL服务器获取那些特定的行并将其作为文档插入Elasticsearch中,但是挑战在于删除操作./p>

Logstash配置文件

 输入{jdbc {jdbc_driver_library =>"jdbc_driver_class =>"com.microsoft.sqlserver.jdbc.SQLServerDriver"jdbc_connection_string =>"jdbc:sqlserver://xxxxx; databaseName = xxxx;"jdbc_user =>"xxxx"jdbc_paging_enabled =>真的tracking_column =>修改日期tracking_column_type =>时间戳记"use_column_value =>真的jdbc_password =>"xxxx"clean_run =>真的时间表=>"*/1 * * * *"语句=>从[dbo].[xxxx]中选择*,其中Modifyed_date>:sql_last_value"}}筛选 {变异{remove_field =>["@version","@ timestamp"]}}输出 {elasticsearch {主机=>"xxxxx"用户=>"xxxxx"密码=>"xxxxx"索引=>"xxxxx"document_type =>"_doc"document_id =>%{ID}"}标准输出{编解码器=>rubydebug}} 

如何使用Logstash使用增量索引方法删除在MSSQL服务器中被删除的文档.我不知道如何处理删除操作.

有人可以建议如何实现这一目标吗?

解决方案

我能使用以下代码处理插入,更新和删除.这可能对尝试相同的人很有帮助

 输入{jdbc {jdbc_driver_library =>"jdbc_driver_class =>"com.microsoft.sqlserver.jdbc.SQLServerDriver"jdbc_connection_string =>"jdbc:sqlserver://xxxxx:1433; databaseName = xxxxx;"jdbc_user =>"xxxxx"jdbc_paging_enabled =>真的tracking_column =>修改日期tracking_column_type =>时间戳记"use_column_value =>真的jdbc_password =>"xxxxx"clean_run =>真的时间表=>"*/1 * * * *"语句=>从[dbo].[xxxx]中选择*,其中Modifyed_date>:sql_last_value"}}筛选 {如果[is_deleted] {变异{add_field =>{"[@metadata] [elasticsearch_action]" =>删除"}}变异{remove_field =>["is_deleted","@ version","@ timestamp"]}} 别的 {变异{add_field =>{"[@metadata] [elasticsearch_action]" =>指数"}}变异{remove_field =>["is_deleted","@ version","@ timestamp"]}}}输出 {elasticsearch {主机=>xxxxxx"用户=>松紧带"密码=>xxxxxx"索引=>"xxxxx"动作=>%{[@ metadata] [elasticsearch_action]}"document_type =>"_doc"document_id =>%{ID}"}标准输出{编解码器=>rubydebug}} 

特别感谢Claudio M

I am using below Logstash configuration for doing incremental indexing in this whenever a new row inserted or updated I am able to get those particular rows from MSSQL server and insert it as a document in elasticsearch but the challenge is with delete operation.

Logstash configuration file

input {
jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://xxxxx;databaseName=xxxx;"
jdbc_user => "xxxx"
jdbc_paging_enabled => true
tracking_column => modified_date
tracking_column_type => "timestamp"
use_column_value => true
jdbc_password => "xxxx"
clean_run => true
schedule => "*/1 * * * *"
statement => "Select * from [dbo].[xxxx] where modified_date >:sql_last_value"
}
}

filter {
 mutate {
   remove_field => ["@version","@timestamp"]
 }
}

output {
elasticsearch {
hosts => "xxxxx"
user => "xxxxx"
password => "xxxxx"
index => "xxxxx"
document_type => "_doc"
document_id => "%{id}"

}
stdout { codec => rubydebug }
}

How to delete the documents which get deleted in MSSQL server with incremental indexing approach using Logstash. I am not having any idea how to handle particularly the delete operation.

Could anyone please suggest how to achieve this?

解决方案

Hi All i am able to handle Insert,Update and Delete using the below code. This may be helpful to some one who is trying the same

input {
jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://xxxxx:1433;databaseName=xxxxx;"
jdbc_user => "xxxxx"
jdbc_paging_enabled => true
tracking_column => modified_date
tracking_column_type => "timestamp"
use_column_value => true
jdbc_password => "xxxxx"
clean_run => true
schedule => "*/1 * * * *"
statement => "Select * from [dbo].[xxxx] where modified_date >:sql_last_value"
}
}

filter {
if [is_deleted] {
        mutate {    
            add_field => {
                "[@metadata][elasticsearch_action]" => "delete"
            }
        }
        mutate {
            remove_field => [ "is_deleted","@version","@timestamp" ]
        }
    } else {
        mutate {    
            add_field => {
                "[@metadata][elasticsearch_action]" => "index"
            }
        }
        mutate {
            remove_field => [ "is_deleted","@version","@timestamp" ]
        }
    } 
}

output {
elasticsearch {
hosts => "xxxxx"
user => "elastic"
password => "xxxxx"
index => "xxxxx"
action => "%{[@metadata][elasticsearch_action]}"
document_type => "_doc"
document_id => "%{id}"

}
stdout { codec => rubydebug }
}

Thanks to all particularly Claudio M

这篇关于使用Logstash句柄删除方案的增量索引的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆