如何使用插入/修改/删除将dynamodb设计为弹性搜索 [英] How to design dynamodb to Elastic search with Insert/Modify/Remove

查看:87
本文介绍了如何使用插入/修改/删除将dynamodb设计为弹性搜索的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何使用Python将整个文档传递给弹性搜索?这是进行弹性搜索的正确方法吗?

在dynamodb中,id是主键

如何插入dynamodb下面是代码

import boto3
from boto3.dynamodb.conditions import Key, And, Attr
def lambda_handler(event, context):
    dynamodb = boto3.resource ('dynamodb')
    table =dynamodb.Table('newtable')
    with table.batch_writer(overwrite_by_pkeys=['id']) as batch:
            batch.put_item(
                Item={
                    'id': '1',
                    'last_name': 'V',
                    'age': '2',
                }
            )
            batch.put_item(
                Item={
                    'id': '2',
                    'last_name': 'JJ',
                    'age': '7',
                }
            )
            batch.put_item(
                Item={
                    'id': '9',
                    'last_name': 'ADD',
                    'age': '95',
                }
            )
            batch.put_item(
                Item={
                    'id': '10',
                    'last_name': 'ADD',
                    'age': '95',
                }
            )

  • 如何将期望结果推入弹性搜索

  • 如果dynamodb内容发生变化,如何在ES中自动反映

我已经通过链接解决方案

您可以检查以下内容.我试图复制该问题,并且可以确认错误

ERROR: NameError("name 'event' is not defined")

根据您的示例和我自己的表,我使用了DynamoDb流中的模拟插入event:

{
  "Records": [
    {
      "eventID": "b8b993cf16d1aacb61b40411b39e0b1f",
      "eventName": "INSERT",
      "eventVersion": "1.1",
      "eventSource": "aws:dynamodb",
      "awsRegion": "us-east-1",
      "dynamodb": {
        "ApproximateCreationDateTime": 1595922821.0,
        "Keys": {
          "id": {
            "N": "1"
          }
        },
        "NewImage": {
          "last_name": {
            "S": "V"
          },
          "id": {
            "N": "1"
          },
          "age": {
            "S": "2"
          }
        },
        "SequenceNumber": "25200000000020406897812",
        "SizeBytes": 22,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569"
    },
    {
      "eventID": "e5d5bec988945c06ffc879cf16b89bf7",
      "eventName": "INSERT",
      "eventVersion": "1.1",
      "eventSource": "aws:dynamodb",
      "awsRegion": "us-east-1",
      "dynamodb": {
        "ApproximateCreationDateTime": 1595922821.0,
        "Keys": {
          "id": {
            "N": "9"
          }
        },
        "NewImage": {
          "last_name": {
            "S": "ADD"
          },
          "id": {
            "N": "9"
          },
          "age": {
            "S": "95"
          }
        },
        "SequenceNumber": "25300000000020406897813",
        "SizeBytes": 25,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569"
    },
    {
      "eventID": "f1a7c9736253b5ef28ced38ed5ff645b",
      "eventName": "INSERT",
      "eventVersion": "1.1",
      "eventSource": "aws:dynamodb",
      "awsRegion": "us-east-1",
      "dynamodb": {
        "ApproximateCreationDateTime": 1595922821.0,
        "Keys": {
          "id": {
            "N": "2"
          }
        },
        "NewImage": {
          "last_name": {
            "S": "JJ"
          },
          "id": {
            "N": "2"
          },
          "age": {
            "S": "7"
          }
        },
        "SequenceNumber": "25400000000020406897819",
        "SizeBytes": 23,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569"
    },
    {
      "eventID": "bfcbad9dc19883e4172e6dc25e66637b",
      "eventName": "INSERT",
      "eventVersion": "1.1",
      "eventSource": "aws:dynamodb",
      "awsRegion": "us-east-1",
      "dynamodb": {
        "ApproximateCreationDateTime": 1595922821.0,
        "Keys": {
          "id": {
            "N": "10"
          }
        },
        "NewImage": {
          "last_name": {
            "S": "ADD"
          },
          "id": {
            "N": "10"
          },
          "age": {
            "S": "95"
          }
        },
        "SequenceNumber": "25500000000020406897820",
        "SizeBytes": 25,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569"
    }
  ]
}

示例修改event:

{
  "Records": [
    {
      "eventID": "4e4629c88aa00e366c89a293d9c82d54",
      "eventName": "MODIFY",
      "eventVersion": "1.1",
      "eventSource": "aws:dynamodb",
      "awsRegion": "us-east-1",
      "dynamodb": {
        "ApproximateCreationDateTime": 1595924589.0,
        "Keys": {
          "id": {
            "N": "2"
          }
        },
        "NewImage": {
          "last_name": {
            "S": "zhgdhfgdh"
          },
          "id": {
            "N": "2"
          },
          "age": {
            "S": "7"
          }
        },
        "OldImage": {
          "last_name": {
            "S": "JJ"
          },
          "id": {
            "N": "2"
          },
          "age": {
            "S": "7"
          }
        },
        "SequenceNumber": "25600000000020408264140",
        "SizeBytes": 49,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "eventSourceARN": "arn:aws:dynamodb:us-east-1:34234:table/newtable/stream/2020-07-28T06:59:38.569"
    }
  ]
}

修改后的lambda函数代码,我现在可以确认不会产生错误:

import boto3
import json
import re

from requests_aws4auth import AWS4Auth
from elasticsearch import Elasticsearch, RequestsHttpConnection

session = boto3.session.Session()
credentials = session.get_credentials()

s3 = session.resource('s3')

awsauth = AWS4Auth(credentials.access_key,
                  credentials.secret_key,
                  session.region_name, 'es',
                  session_token=credentials.token)

    
es = Elasticsearch(
    ['https://vpc-test-dmamain-452frn764ggb4a.us-east-1.es.amazonaws.com'],
    use_ssl=True,
    verify_certs=True,
    http_auth=awsauth,
    connection_class=RequestsHttpConnection
)
reserved_fields = ["uid", "_id", "_type", "_source", "_all", "_parent", "_fieldnames", "_routing", "_index", "_size",
                   "_timestamp", "_ttl"]


def lambda_handler(event, context):
    print(event)
    #dynamodb = boto3.resource('dynamodb')

    # Loop over the DynamoDB Stream records
    for record in event['Records']:
            
        if record['eventName'] == "INSERT":
            insert_document(event, es, record)
        elif record['eventName'] == "REMOVE":
            remove_document(event, es, record)
        elif record['eventName'] == "MODIFY":
            modify_document(event, es, record)


# Process MODIFY events
def modify_document(event, es, record):
    table = getTable(record)
    print("Dynamo Table: " + table)

    docId = docid(event, event)
    print("KEY")
    print(docId)

    # Unmarshal the DynamoDB JSON to a normal JSON
    doc = json.dumps(document(event))

    print("Updated document:")
    print(doc)

    # We reindex the whole document as ES accepts partial docs
    es.index(index=table,
             body=doc,
             id=docId,
             doc_type=table,
             refresh=True)

    print("Successly modified - Index: " , table , " - Document ID: " , docId)


def remove_document(event, es, record):
    
    table = getTable(record)
    
    print("Dynamo Table: " + table)

    docId = docid(event, event)
    print("Deleting document ID: ", docId)

    es.delete(index=table,
              id=docId,
              doc_type=table,
              refresh=True)

    print("Successly removed - Index: ", table, " - Document ID: " , docId)


# Process INSERT events
def insert_document(event, es, record):
    table = getTable(record)
    print("Dynamo Table: " + table)

    # Create index if missing
    if es.indices.exists(table) == False:
        print("Create missing index: " + table)

        es.indices.create(table,
                          body='{"settings": { "index.mapping.coerce": true } }')

        print("Index created: " + table)

    # Unmarshal the DynamoDB JSON to a normal JSON
    doc = json.dumps(document(event))

    print("New document to Index:")
    print(doc)

    newId = docid(event, record)
    
    es.index(index=table,
             body=doc,
             id=newId,
             doc_type=table,
             refresh=True)

    print("Successly inserted - Index: " , table + " - Document ID: " , newId)


def getTable(record):
    p = re.compile('arn:aws:dynamodb:.*?:.*?:table/([0-9a-zA-Z_-]+)/.+')
    m = p.match(record['eventSourceARN'])
    if m is None:
        raise Exception("Table not found in SourceARN")
    return m.group(1).lower()


def document(event):
    result = []
    for r in event['Records']:
        tmp = {}
        for k, v in r['dynamodb']['NewImage'].items():
            if "S" in v.keys() or "BOOL" in v.keys():
                tmp[k] = v.get('S', v.get('BOOL', False))
            elif 'NULL' in v:
                tmp[k] = None
        result.append(tmp)
        for i in result:
            return i


def docid(event, record):
    result = []
    for r in event['Records']:
        tmp = {}
        for k, v in r['dynamodb']['Keys'].items():
            if "S" in v.keys() or "BOOL" in v.keys():
                tmp[k] = v.get('S', v.get('BOOL', False))
            elif 'NULL' in v:
                tmp[k] = None
        result.append(tmp)
    for newId in result:
        return newId

尚未验证,如果数据已正确写入,修改或插入ElasticSearch.但是我运行了ES域,并在lambda中使用了ES域,以验证lambda是否可以连接到它并运行查询.

lambda的INSERT事件输出示例:

Dynamo Table: newtable
New document to Index:
{"last_name": "V", "age": "2"}
Successly inserted - Index:  newtable - Document ID:  {}
Dynamo Table: newtable
New document to Index:
{"last_name": "V", "age": "2"}
Successly inserted - Index:  newtable - Document ID:  {}
Dynamo Table: newtable
New document to Index:
{"last_name": "V", "age": "2"}
Successly inserted - Index:  newtable - Document ID:  {}
Dynamo Table: newtable
New document to Index:
{"last_name": "V", "age": "2"}
Successly inserted - Index:  newtable - Document ID:  {}

Example output from lambda from MODIFY event:

更新的文档:

{
    "last_name": "zhgdhfgdh",
    "age": "7"
}
Successly modified - Index:  newtable  - Document ID:  
{}

我认为docid是否可以正常工作还需要进一步调查,因为它似乎返回了空dict:

 Document ID:  {}

How to pass this entire document into elastic search using Python? Is this this the right way to put into elastic search?

In dynamodb id is the primary key

How to insert in to dynamodb Below is the code

import boto3
from boto3.dynamodb.conditions import Key, And, Attr
def lambda_handler(event, context):
    dynamodb = boto3.resource ('dynamodb')
    table =dynamodb.Table('newtable')
    with table.batch_writer(overwrite_by_pkeys=['id']) as batch:
            batch.put_item(
                Item={
                    'id': '1',
                    'last_name': 'V',
                    'age': '2',
                }
            )
            batch.put_item(
                Item={
                    'id': '2',
                    'last_name': 'JJ',
                    'age': '7',
                }
            )
            batch.put_item(
                Item={
                    'id': '9',
                    'last_name': 'ADD',
                    'age': '95',
                }
            )
            batch.put_item(
                Item={
                    'id': '10',
                    'last_name': 'ADD',
                    'age': '95',
                }
            )

  • How to push expected out into Elastic Search

  • How to reflect automatically in ES if dynamodb content changes

I have gone through the link https://aws.amazon.com/blogs/compute/indexing-amazon-dynamodb-content-with-amazon-elasticsearch-service-using-aws-lambda/

Below is code I am getting error ERROR: NameError("name 'event' is not defined")

Code. * Before that trigger the below lambda function from the dynamodb table

import boto3
import json
import re
from requests_aws4auth import AWS4Auth
from elasticsearch import Elasticsearch, RequestsHttpConnection

session = boto3.session.Session()
credentials = session.get_credentials()
# s3 = session.resource('s3')
awsauth = AWS4Auth(credentials.access_key,
                   credentials.secret_key,
                   session.region_name, 'es',
                   session_token=credentials.token)
es = Elasticsearch(
    ['https://xx-east-1.es.amazonaws.com'],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection
)
reserved_fields = ["uid", "_id", "_type", "_source", "_all", "_parent", "_fieldnames", "_routing", "_index", "_size",
                   "_timestamp", "_ttl"]


def lambda_handler(event, context):
    print(event)
    dynamodb = boto3.resource('dynamodb')

    # Loop over the DynamoDB Stream records
    for record in event['Records']:

        try:

            if record['eventName'] == "INSERT":
                insert_document(es, record)
            elif record['eventName'] == "REMOVE":
                remove_document(es, record)
            elif record['eventName'] == "MODIFY":
                modify_document(es, record)

        except Exception as e:
            print("Failed to process:")
            print(json.dumps(record))
            print("ERROR: " + repr(e))
            continue


# Process MODIFY events
def modify_document(es, record):
    table = getTable(record)
    print("Dynamo Table: " + table)

    docId = docid(record)
    print("KEY")
    print(docId)

    # Unmarshal the DynamoDB JSON to a normal JSON
    doc = json.dumps(document())

    print("Updated document:")
    print(doc)

    # We reindex the whole document as ES accepts partial docs
    es.index(index=table,
             body=doc,
             id=docId,
             doc_type=table,
             refresh=True)

    print("Successly modified - Index: " + table + " - Document ID: " + docId)


def remove_document(es, record):
    table = getTable(record)
    print("Dynamo Table: " + table)

    docId = docid(record)
    print("Deleting document ID: " + docId)

    es.delete(index=table,
              id=docId,
              doc_type=table,
              refresh=True)

    print("Successly removed - Index: " + table + " - Document ID: " + docId)


# Process INSERT events
def insert_document(es, record):
    table = getTable(record)
    print("Dynamo Table: " + table)

    # Create index if missing
    if es.indices.exists(table) == False:
        print("Create missing index: " + table)

        es.indices.create(table,
                          body='{"settings": { "index.mapping.coerce": true } }')

        print("Index created: " + table)

    # Unmarshal the DynamoDB JSON to a normal JSON
    doc = json.dumps(document())

    print("New document to Index:")
    print(doc)

    newId = docid(record)
    es.index(index=table,
             body=doc,
             id=newId,
             doc_type=table,
             refresh=True)

    print("Successly inserted - Index: " + table + " - Document ID: " + newId)


def getTable(record):
    p = re.compile('arn:aws:dynamodb:.*?:.*?:table/([0-9a-zA-Z_-]+)/.+')
    m = p.match(record['eventSourceARN'])
    if m is None:
        raise Exception("Table not found in SourceARN")
    return m.group(1).lower()


def document(event):
    result = []
    for r in event['Records']:
        tmp = {}
        for k, v in r['dynamodb']['NewImage'].items():
            if "S" in v.keys() or "BOOL" in v.keys():
                tmp[k] = v.get('S', v.get('BOOL', False))
            elif 'NULL' in v:
                tmp[k] = None
        result.append(tmp)
        for i in result:
            return i


def docid(event):
    result = []
    for r in event['Records']:
        tmp = {}
        for k, v in r['dynamodb']['Keys'].items():
            if "S" in v.keys() or "BOOL" in v.keys():
                tmp[k] = v.get('S', v.get('BOOL', False))
            elif 'NULL' in v:
                tmp[k] = None
        result.append(tmp)
    for newId in result:
        return newId

Getting error at document and docid

Individually both are giving output

result = []
for r in event['Records']:
    tmp = {}

    for k, v in r['dynamodb']['NewImage'].items():
    #for k, v in r['dynamodb']['Keys'].items():
        if "S" in v.keys() or "BOOL" in v.keys():
            tmp[k] = v.get('S', v.get('BOOL', False))
        elif 'NULL' in v:
            tmp[k] = None

    result.append(tmp)
for i in result:
    print (i)

event = {'Records': [{'eventID': '2339bc590c21035b84f8cc602b12c1d2', 'eventName': 'INSERT', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '9'}}, 'NewImage': {'last_name': {'S': 'Hus'}, 'id': {'S': '9'}, 'age': {'S': '95'}}, 'SequenceNumber': '3100000000035684810908', 'SizeBytes': 23, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}, {'eventID': 'xxxx', 'eventName': 'MODIFY', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '2'}}, 'NewImage': {'last_name': {'S': 'JJ'}, 'id': {'S': '2'}, 'age': {'S': '5'}}, 'SequenceNumber': '3200000000035684810954', 'SizeBytes': 21, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}, {'eventID': 'a9c90c0c4a5a4b64d0314c4557e94e28', 'eventName': 'INSERT', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '10'}}, 'NewImage': {'last_name': {'S': 'Hus'}, 'id': {'S': '10'}, 'age': {'S': '95'}}, 'SequenceNumber': '3300000000035684810956', 'SizeBytes': 25, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}, {'eventID': '288f4a424992e5917af0350b53f754dc', 'eventName': 'MODIFY', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '1'}}, 'NewImage': {'last_name': {'S': 'V'}, 'id': {'S': '1'}, 'age': {'S': '2'}}, 'SequenceNumber': '3400000000035684810957', 'SizeBytes': 20, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}]}

解决方案

You can check the following. I tried to replicate the issue and can confirm the error of

ERROR: NameError("name 'event' is not defined")

I used simulated INSERT event from DynamoDb stream, based on your example and my own table:

{
  "Records": [
    {
      "eventID": "b8b993cf16d1aacb61b40411b39e0b1f",
      "eventName": "INSERT",
      "eventVersion": "1.1",
      "eventSource": "aws:dynamodb",
      "awsRegion": "us-east-1",
      "dynamodb": {
        "ApproximateCreationDateTime": 1595922821.0,
        "Keys": {
          "id": {
            "N": "1"
          }
        },
        "NewImage": {
          "last_name": {
            "S": "V"
          },
          "id": {
            "N": "1"
          },
          "age": {
            "S": "2"
          }
        },
        "SequenceNumber": "25200000000020406897812",
        "SizeBytes": 22,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569"
    },
    {
      "eventID": "e5d5bec988945c06ffc879cf16b89bf7",
      "eventName": "INSERT",
      "eventVersion": "1.1",
      "eventSource": "aws:dynamodb",
      "awsRegion": "us-east-1",
      "dynamodb": {
        "ApproximateCreationDateTime": 1595922821.0,
        "Keys": {
          "id": {
            "N": "9"
          }
        },
        "NewImage": {
          "last_name": {
            "S": "ADD"
          },
          "id": {
            "N": "9"
          },
          "age": {
            "S": "95"
          }
        },
        "SequenceNumber": "25300000000020406897813",
        "SizeBytes": 25,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569"
    },
    {
      "eventID": "f1a7c9736253b5ef28ced38ed5ff645b",
      "eventName": "INSERT",
      "eventVersion": "1.1",
      "eventSource": "aws:dynamodb",
      "awsRegion": "us-east-1",
      "dynamodb": {
        "ApproximateCreationDateTime": 1595922821.0,
        "Keys": {
          "id": {
            "N": "2"
          }
        },
        "NewImage": {
          "last_name": {
            "S": "JJ"
          },
          "id": {
            "N": "2"
          },
          "age": {
            "S": "7"
          }
        },
        "SequenceNumber": "25400000000020406897819",
        "SizeBytes": 23,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569"
    },
    {
      "eventID": "bfcbad9dc19883e4172e6dc25e66637b",
      "eventName": "INSERT",
      "eventVersion": "1.1",
      "eventSource": "aws:dynamodb",
      "awsRegion": "us-east-1",
      "dynamodb": {
        "ApproximateCreationDateTime": 1595922821.0,
        "Keys": {
          "id": {
            "N": "10"
          }
        },
        "NewImage": {
          "last_name": {
            "S": "ADD"
          },
          "id": {
            "N": "10"
          },
          "age": {
            "S": "95"
          }
        },
        "SequenceNumber": "25500000000020406897820",
        "SizeBytes": 25,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569"
    }
  ]
}

Example MODIFY event:

{
  "Records": [
    {
      "eventID": "4e4629c88aa00e366c89a293d9c82d54",
      "eventName": "MODIFY",
      "eventVersion": "1.1",
      "eventSource": "aws:dynamodb",
      "awsRegion": "us-east-1",
      "dynamodb": {
        "ApproximateCreationDateTime": 1595924589.0,
        "Keys": {
          "id": {
            "N": "2"
          }
        },
        "NewImage": {
          "last_name": {
            "S": "zhgdhfgdh"
          },
          "id": {
            "N": "2"
          },
          "age": {
            "S": "7"
          }
        },
        "OldImage": {
          "last_name": {
            "S": "JJ"
          },
          "id": {
            "N": "2"
          },
          "age": {
            "S": "7"
          }
        },
        "SequenceNumber": "25600000000020408264140",
        "SizeBytes": 49,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "eventSourceARN": "arn:aws:dynamodb:us-east-1:34234:table/newtable/stream/2020-07-28T06:59:38.569"
    }
  ]
}

Modified code of lambda function, which I can confirm does not produce errors now:

import boto3
import json
import re

from requests_aws4auth import AWS4Auth
from elasticsearch import Elasticsearch, RequestsHttpConnection

session = boto3.session.Session()
credentials = session.get_credentials()

s3 = session.resource('s3')

awsauth = AWS4Auth(credentials.access_key,
                  credentials.secret_key,
                  session.region_name, 'es',
                  session_token=credentials.token)

    
es = Elasticsearch(
    ['https://vpc-test-dmamain-452frn764ggb4a.us-east-1.es.amazonaws.com'],
    use_ssl=True,
    verify_certs=True,
    http_auth=awsauth,
    connection_class=RequestsHttpConnection
)
reserved_fields = ["uid", "_id", "_type", "_source", "_all", "_parent", "_fieldnames", "_routing", "_index", "_size",
                   "_timestamp", "_ttl"]


def lambda_handler(event, context):
    print(event)
    #dynamodb = boto3.resource('dynamodb')

    # Loop over the DynamoDB Stream records
    for record in event['Records']:
            
        if record['eventName'] == "INSERT":
            insert_document(event, es, record)
        elif record['eventName'] == "REMOVE":
            remove_document(event, es, record)
        elif record['eventName'] == "MODIFY":
            modify_document(event, es, record)


# Process MODIFY events
def modify_document(event, es, record):
    table = getTable(record)
    print("Dynamo Table: " + table)

    docId = docid(event, event)
    print("KEY")
    print(docId)

    # Unmarshal the DynamoDB JSON to a normal JSON
    doc = json.dumps(document(event))

    print("Updated document:")
    print(doc)

    # We reindex the whole document as ES accepts partial docs
    es.index(index=table,
             body=doc,
             id=docId,
             doc_type=table,
             refresh=True)

    print("Successly modified - Index: " , table , " - Document ID: " , docId)


def remove_document(event, es, record):
    
    table = getTable(record)
    
    print("Dynamo Table: " + table)

    docId = docid(event, event)
    print("Deleting document ID: ", docId)

    es.delete(index=table,
              id=docId,
              doc_type=table,
              refresh=True)

    print("Successly removed - Index: ", table, " - Document ID: " , docId)


# Process INSERT events
def insert_document(event, es, record):
    table = getTable(record)
    print("Dynamo Table: " + table)

    # Create index if missing
    if es.indices.exists(table) == False:
        print("Create missing index: " + table)

        es.indices.create(table,
                          body='{"settings": { "index.mapping.coerce": true } }')

        print("Index created: " + table)

    # Unmarshal the DynamoDB JSON to a normal JSON
    doc = json.dumps(document(event))

    print("New document to Index:")
    print(doc)

    newId = docid(event, record)
    
    es.index(index=table,
             body=doc,
             id=newId,
             doc_type=table,
             refresh=True)

    print("Successly inserted - Index: " , table + " - Document ID: " , newId)


def getTable(record):
    p = re.compile('arn:aws:dynamodb:.*?:.*?:table/([0-9a-zA-Z_-]+)/.+')
    m = p.match(record['eventSourceARN'])
    if m is None:
        raise Exception("Table not found in SourceARN")
    return m.group(1).lower()


def document(event):
    result = []
    for r in event['Records']:
        tmp = {}
        for k, v in r['dynamodb']['NewImage'].items():
            if "S" in v.keys() or "BOOL" in v.keys():
                tmp[k] = v.get('S', v.get('BOOL', False))
            elif 'NULL' in v:
                tmp[k] = None
        result.append(tmp)
        for i in result:
            return i


def docid(event, record):
    result = []
    for r in event['Records']:
        tmp = {}
        for k, v in r['dynamodb']['Keys'].items():
            if "S" in v.keys() or "BOOL" in v.keys():
                tmp[k] = v.get('S', v.get('BOOL', False))
            elif 'NULL' in v:
                tmp[k] = None
        result.append(tmp)
    for newId in result:
        return newId

I haven't verified if data is correctly written, modified or inserted into ElasticSearch. But I had ES domain running and used in the lambda to verify if lambda can connect to it and run the queries.

Example output from lambda for INSERT event:

Dynamo Table: newtable
New document to Index:
{"last_name": "V", "age": "2"}
Successly inserted - Index:  newtable - Document ID:  {}
Dynamo Table: newtable
New document to Index:
{"last_name": "V", "age": "2"}
Successly inserted - Index:  newtable - Document ID:  {}
Dynamo Table: newtable
New document to Index:
{"last_name": "V", "age": "2"}
Successly inserted - Index:  newtable - Document ID:  {}
Dynamo Table: newtable
New document to Index:
{"last_name": "V", "age": "2"}
Successly inserted - Index:  newtable - Document ID:  {}

Example output from lambda from MODIFY event:

Updated document:

{
    "last_name": "zhgdhfgdh",
    "age": "7"
}
Successly modified - Index:  newtable  - Document ID:  
{}

I think docid requires further investigation if it works correctly as it seems to return empty dict:

 Document ID:  {}

这篇关于如何使用插入/修改/删除将dynamodb设计为弹性搜索的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆