CloudDataflow不能使用"google.cloud.datastore"包裹? [英] CloudDataflow can not use "google.cloud.datastore" package?

查看:57
本文介绍了CloudDataflow不能使用"google.cloud.datastore"包裹?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想将带有事务的数据存储放在CloudDataflow上. 所以,我在下面写道.

I want to put datastore with transaction on CloudDataflow. So, I wrote below.

def exe_dataflow():
....
  from google.cloud import datastore
  # call from pipeline
  def ds_test(content):
    datastore_client = datastore.Client()

    kind = 'test_out'
    name = 'change'
    task_key = datastore_client.key(kind, name)

    for _ in range(3):
        with datastore_client.transaction():
            current_value = client.get(task_key)
            current_value['v'] += content['v']
            datastore_client.put(task)

    # pipeline
....
      | 'datastore test' >> beam.Map(ds_test)

但是,发生了错误,并且日志消息显示如下.

But, Error occured and log message was displayed as below.

(7b75e0ef2db229da): Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 582, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 167, in execute
    op.start()
  ...(SNIP)...
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 767, in _import_module
    return getattr(__import__(module, None, None, [obj]), obj)
AttributeError: 'module' object has no attribute 'datastore'

CloudDataflow不能使用"google.cloud.datastore"包吗?

CloudDataflow can not use "google.cloud.datastore" package?

添加2018/2/28.

我将--requirements_file添加到MyOption

I add --requirements_file to MyOption

  options = MyOptions(flags = ["--requirements_file", "./requirements.txt"])

然后我创建了requirements.txt

and I make requirements.txt

google-cloud-datastore==1.5.0

但是,另一个错误发生了.

But, Another error occurred.

(366397598dcf7f02): Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 582, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 167, in execute
    op.start()
...(SNIP)...
  File "my_dataflow.py", line 66, in to_entity
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore/__init__.py", line 60, in <module>
    from google.cloud.datastore.batch import Batch
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore/batch.py", line 24, in <module>
    from google.cloud.datastore import helpers
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore/helpers.py", line 29, in <module>
    from google.cloud.datastore_v1.proto import datastore_pb2
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore_v1/__init__.py", line 17, in <module>
    from google.cloud.datastore_v1 import types
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore_v1/types.py", line 21, in <module>
    from google.cloud.datastore_v1.proto import datastore_pb2
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore_v1/proto/datastore_pb2.py", line 17, in <module>
    from google.cloud.datastore_v1.proto import entity_pb2 as google_dot_cloud_dot_datastore__v1_dot_proto_dot_entity__pb2
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore_v1/proto/entity_pb2.py", line 28, in <module>
    dependencies=[google_dot_api_dot_annotations__pb2.DESCRIPTOR,google_dot_protobuf_dot_struct__pb2.DESCRIPTOR,google_dot_protobuf_dot_timestamp__pb2.DESCRIPTOR,google_dot_type_dot_latlng__pb2.DESCRIPTOR,])
  File "/usr/local/lib/python2.7/dist-packages/google/protobuf/descriptor.py", line 824, in __new__
    return _message.default_pool.AddSerializedFile(serialized_pb)
TypeError: Couldn't build proto file into descriptor pool!
Invalid proto descriptor for file "google/cloud/datastore_v1/proto/entity.proto":
  google.datastore.v1.PartitionId.project_id: "google.datastore.v1.PartitionId.project_id" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
...(SNIP)...
  google.datastore.v1.Entity.properties: "google.datastore.v1.Entity.PropertiesEntry" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto".  To use it here, please add the necessary import.

推荐答案

从Cloud Dataflow管道与Cloud Datastore进行交互的推荐方法是使用

The recommended way to interact with Cloud Datastore from a Cloud Dataflow Pipeline is to use the Datastore I/O API, which is available through the Dataflow SDK and provides some methods to read and write data to a Cloud Datastore database.

您可以在 datastore.v1.datastoreio module 是您要使用的特定模块.我共享的链接中包含大量信息,但简而言之,它是数据存储区的连接器,它使用PTransform读取/写入/删除来自数据存储区的PCollection,分别使用 ReadFromDatastore() / WriteToDatastore() / DeleteFromDatastore() 类.

You can find detailed documentation for the Datastore I/O package for Dataflow SDK 2.x for Python in this other link. The datastore.v1.datastoreio module is the specific module that you want to use. There is plenty of information in the links I am sharing, but in short, it is a connector to Datastore that uses PTransform to read / write / delete a PCollection from Datastore using the classes ReadFromDatastore() / WriteToDatastore() / DeleteFromDatastore() respectively.

您应该尝试使用它而不是自己实现调用.我怀疑这可能是您看到错误的原因,因为Dataflow SDK中已经存在数据存储实现:

You should try using it instead of implementing the calls yourself. I suspect this may be the reason for the error you are seeing, as a Datastore implementation already exists in the Dataflow SDK:

"google.datastore.v1.PartitionId.project_id" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".

更新:

UPDATE:

这三个类似乎收集了多个突变并在单个事务中执行它们.您可以在

It looks like those three classes collect several mutations and executes them in a single transaction. You can check that in the code describing the classes.

如果目标是检索(get())然后更新(put())数据存储区实体,则可以使用批量突变执行您感兴趣的操作.

If the aim is to retrieve (get()) and then update (put()) a Datastore entity, you can probably work with the write_mutations() function, which is described in the documentation, and you can work with a full batch of mutations performing the operations you are interested in.

这篇关于CloudDataflow不能使用"google.cloud.datastore"包裹?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆