CloudDataflow不能使用"google.cloud.datastore"包裹? [英] CloudDataflow can not use "google.cloud.datastore" package?
问题描述
我想将带有事务的数据存储放在CloudDataflow上. 所以,我在下面写道.
I want to put datastore with transaction on CloudDataflow. So, I wrote below.
def exe_dataflow():
....
from google.cloud import datastore
# call from pipeline
def ds_test(content):
datastore_client = datastore.Client()
kind = 'test_out'
name = 'change'
task_key = datastore_client.key(kind, name)
for _ in range(3):
with datastore_client.transaction():
current_value = client.get(task_key)
current_value['v'] += content['v']
datastore_client.put(task)
# pipeline
....
| 'datastore test' >> beam.Map(ds_test)
但是,发生了错误,并且日志消息显示如下.
But, Error occured and log message was displayed as below.
(7b75e0ef2db229da): Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 582, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 167, in execute
op.start()
...(SNIP)...
File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 767, in _import_module
return getattr(__import__(module, None, None, [obj]), obj)
AttributeError: 'module' object has no attribute 'datastore'
CloudDataflow不能使用"google.cloud.datastore"包吗?
CloudDataflow can not use "google.cloud.datastore" package?
添加2018/2/28.
我将--requirements_file添加到MyOption
I add --requirements_file to MyOption
options = MyOptions(flags = ["--requirements_file", "./requirements.txt"])
然后我创建了requirements.txt
and I make requirements.txt
google-cloud-datastore==1.5.0
但是,另一个错误发生了.
But, Another error occurred.
(366397598dcf7f02): Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 582, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 167, in execute
op.start()
...(SNIP)...
File "my_dataflow.py", line 66, in to_entity
File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore/__init__.py", line 60, in <module>
from google.cloud.datastore.batch import Batch
File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore/batch.py", line 24, in <module>
from google.cloud.datastore import helpers
File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore/helpers.py", line 29, in <module>
from google.cloud.datastore_v1.proto import datastore_pb2
File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore_v1/__init__.py", line 17, in <module>
from google.cloud.datastore_v1 import types
File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore_v1/types.py", line 21, in <module>
from google.cloud.datastore_v1.proto import datastore_pb2
File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore_v1/proto/datastore_pb2.py", line 17, in <module>
from google.cloud.datastore_v1.proto import entity_pb2 as google_dot_cloud_dot_datastore__v1_dot_proto_dot_entity__pb2
File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore_v1/proto/entity_pb2.py", line 28, in <module>
dependencies=[google_dot_api_dot_annotations__pb2.DESCRIPTOR,google_dot_protobuf_dot_struct__pb2.DESCRIPTOR,google_dot_protobuf_dot_timestamp__pb2.DESCRIPTOR,google_dot_type_dot_latlng__pb2.DESCRIPTOR,])
File "/usr/local/lib/python2.7/dist-packages/google/protobuf/descriptor.py", line 824, in __new__
return _message.default_pool.AddSerializedFile(serialized_pb)
TypeError: Couldn't build proto file into descriptor pool!
Invalid proto descriptor for file "google/cloud/datastore_v1/proto/entity.proto":
google.datastore.v1.PartitionId.project_id: "google.datastore.v1.PartitionId.project_id" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
...(SNIP)...
google.datastore.v1.Entity.properties: "google.datastore.v1.Entity.PropertiesEntry" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto". To use it here, please add the necessary import.
推荐答案
从Cloud Dataflow管道与Cloud Datastore进行交互的推荐方法是使用
The recommended way to interact with Cloud Datastore from a Cloud Dataflow Pipeline is to use the Datastore I/O API, which is available through the Dataflow SDK and provides some methods to read and write data to a Cloud Datastore database.
您可以在 datastore.v1.datastoreio
module 是您要使用的特定模块.我共享的链接中包含大量信息,但简而言之,它是数据存储区的连接器,它使用PTransform
来读取/写入/删除来自数据存储区的PCollection
,分别使用 ReadFromDatastore()
/ WriteToDatastore()
/ DeleteFromDatastore()
类.
You can find detailed documentation for the Datastore I/O package for Dataflow SDK 2.x for Python in this other link. The datastore.v1.datastoreio
module is the specific module that you want to use. There is plenty of information in the links I am sharing, but in short, it is a connector to Datastore that uses PTransform
to read / write / delete a PCollection
from Datastore using the classes ReadFromDatastore()
/ WriteToDatastore()
/ DeleteFromDatastore()
respectively.
您应该尝试使用它而不是自己实现调用.我怀疑这可能是您看到错误的原因,因为Dataflow SDK中已经存在数据存储实现:
You should try using it instead of implementing the calls yourself. I suspect this may be the reason for the error you are seeing, as a Datastore implementation already exists in the Dataflow SDK:
"google.datastore.v1.PartitionId.project_id" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
更新:
UPDATE:
It looks like those three classes collect several mutations and executes them in a single transaction. You can check that in the code describing the classes.
如果目标是检索(get()
)然后更新(put()
)数据存储区实体,则可以使用文档中所述,您可以使用完整的批量突变执行您感兴趣的操作.
If the aim is to retrieve (get()
) and then update (put()
) a Datastore entity, you can probably work with the write_mutations()
function, which is described in the documentation, and you can work with a full batch of mutations performing the operations you are interested in.
这篇关于CloudDataflow不能使用"google.cloud.datastore"包裹?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!