'_UnwindowedValues' 类型的对象没有 len() 是什么意思? [英] What does object of type '_UnwindowedValues' has no len() mean?

查看:37
本文介绍了'_UnwindowedValues' 类型的对象没有 len() 是什么意思?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是 Dataflow 0.5.5 Python.在非常简单的代码中遇到以下错误:

I'm using Dataflow 0.5.5 Python. Ran into the following error in very simple code:

print(len(row_list))

row_list 是一个列表.完全相同的代码、相同的数据和相同的管道在 DirectRunner 上运行得非常好,但在 DataflowRunner 上抛出以下异常.这是什么意思,我该如何解决?

row_list is a list. Exactly the same code, same data and same pipeline runs perfectly fine on DirectRunner, but throws the following exception on DataflowRunner. What does it mean and how I can solve it?

job name: `beamapp-root-0216042234-124125`

    (f14756f20f567f62): Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 544, in do_work
    work_executor.execute()
  File "dataflow_worker/executor.py", line 973, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:30547)
    with op.scoped_metrics_container:
  File "dataflow_worker/executor.py", line 974, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:30495)
    op.start()
  File "dataflow_worker/executor.py", line 302, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:12149)
    def start(self):
  File "dataflow_worker/executor.py", line 303, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:12053)
    with self.scoped_start_state:
  File "dataflow_worker/executor.py", line 316, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:11968)
    with self.shuffle_source.reader() as reader:
  File "dataflow_worker/executor.py", line 320, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:11912)
    self.output(windowed_value)
  File "dataflow_worker/executor.py", line 152, in dataflow_worker.executor.Operation.output (dataflow_worker/executor.c:6317)
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "dataflow_worker/executor.py", line 85, in dataflow_worker.executor.ConsumerSet.receive (dataflow_worker/executor.c:4021)
    cython.cast(Operation, consumer).process(windowed_value)
  File "dataflow_worker/executor.py", line 766, in dataflow_worker.executor.BatchGroupAlsoByWindowsOperation.process (dataflow_worker/executor.c:25558)
    self.output(wvalue.with_value((k, wvalue.value)))
  File "dataflow_worker/executor.py", line 152, in dataflow_worker.executor.Operation.output (dataflow_worker/executor.c:6317)
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "dataflow_worker/executor.py", line 85, in dataflow_worker.executor.ConsumerSet.receive (dataflow_worker/executor.c:4021)
    cython.cast(Operation, consumer).process(windowed_value)
  File "dataflow_worker/executor.py", line 545, in dataflow_worker.executor.DoOperation.process (dataflow_worker/executor.c:18474)
    with self.scoped_process_state:
  File "dataflow_worker/executor.py", line 546, in dataflow_worker.executor.DoOperation.process (dataflow_worker/executor.c:18428)
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 195, in apache_beam.runners.common.DoFnRunner.receive (apache_beam/runners/common.c:5137)
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 262, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:7078)
    self.reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 274, in apache_beam.runners.common.DoFnRunner.reraise_augmented (apache_beam/runners/common.c:7467)
    raise type(exn), args, sys.exc_info()[2]
  File "apache_beam/runners/common.py", line 258, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:6967)
    self._dofn_simple_invoker(element)
  File "apache_beam/runners/common.py", line 198, in apache_beam.runners.common.DoFnRunner._dofn_simple_invoker (apache_beam/runners/common.c:5283)
    self._process_outputs(element, self.dofn_process(element.value))
  File "apache_beam/runners/common.py", line 286, in apache_beam.runners.common.DoFnRunner._process_outputs (apache_beam/runners/common.c:7678)
    for result in results:
  File "trip_augmentation_test.py", line 120, in get_osm_way
TypeError: object of type '_UnwindowedValues' has no len() [while running 'Pull way info from mapserver']

代码在这里:trip_augmentation_test.py

#!/usr/bin/env python
# coding: utf-8

from __future__ import absolute_import

import argparse
import logging
import json

import apache_beam as beam
from apache_beam.utils.options import PipelineOptions
from apache_beam.utils.options import SetupOptions


def get_osm_way(pairs_same_group):

  import requests
  from requests.adapters import HTTPAdapter
  from requests.packages.urllib3.exceptions import InsecureRequestWarning
  from multiprocessing.pool import ThreadPool
  import time
  #disable InsecureRequestWarning for a cleaner output
  requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

  print('processing hardwareid={} trips'.format(pairs_same_group[0]))

  row_list = pairs_same_group[1]
  print(row_list)
  http_request_num = len(row_list) ######### this line ran into the above error##########
  with requests.Session() as s:
      s.mount('https://ip address',HTTPAdapter(pool_maxsize=http_request_num))  ##### a host name is needed for this http persistent connection
      pool = ThreadPool(processes=1)

      for row in row_list:
          hardwareid=row['HardwareId']
          tripid=row['TripId']
          latlonArr = row['LatLonStrArr'].split(',');
          print('gps points num: {}'.format(len(latlonArr)))
          cor_array = []
          for latlon in latlonArr:
              lat = latlon.split(';')[0]
              lon = latlon.split(';')[1]
              cor_array.append('{{"x":"{}","y":"{}"}}'.format(lon, lat))
          url = 'https://<ip address>/functionname?coordinates=[{}]'.format(','.join(cor_array))
          print(url)
          print("Requesting")
          r = pool.apply_async(thread_get, (s, url)).get()
          print ("Got response")
          print(r) 
          if r.status_code==200:
              yield (hardwareid,tripid,r.text)
          else:
              yield (hardwareid,tripid,None)


def run(argv=None):
  parser = argparse.ArgumentParser()
  parser.add_argument('--input',
                      help=('Input BigQuery table to process specified as: '
                            'PROJECT:DATASET.TABLE or DATASET.TABLE.'))
  parser.add_argument(
      '--output',
      required=True,
      help=
      ('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
       'or DATASET.TABLE.'))

  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_options = PipelineOptions(argv)
  pipeline_options.view_as(SetupOptions).save_main_session = True
  p = beam.Pipeline(options=pipeline_options)  

  (p
    | 'Read trip from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=known_args.input))
    | 'Convert' >> beam.Map(lambda row: (row['HardwareId'],row))
    | 'Group devices' >> beam.GroupByKey()
    | 'Pull way info from mapserver' >> beam.FlatMap(get_osm_way)
    | 'Map way info to dictionary' >> beam.FlatMap(convert_to_dict)
    | 'Save to BQ' >> beam.io.Write(beam.io.BigQuerySink(
            known_args.output,            schema='HardwareId:INTEGER,TripId:INTEGER,OrderBy:INTEGER,IndexRatio:FLOAT,IsEstimate:BOOLEAN,IsOverRide:BOOLEAN,MaxSpeed:FLOAT,Provider:STRING,RoadName:STRING,WayId:STRING,LastEdited:TIMESTAMP,WayLatLons:STRING,BigDataComment:STRING',
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
  )
  # Run the pipeline (all operations are deferred until run() is called).
  p.run()


if __name__ == '__main__':   
  logging.getLogger().setLevel(logging.INFO)
  run()

此处的管道调用(我使用的是 Google Cloud Datalab)

!python trip_augmentation_test.py \
--output 'my-project:my-dataset.mytable'  \
--input 'SELECT HardwareId,TripId, LatLonStrArr FROM [my-project:my-dataset.mytable] ' \
--project 'my-project' \
--runner 'DataflowRunner' \   ###  if just change this to DirectRunner, everything's fine
--temp_location 'gs://mybucket/tripway_temp' \
--staging_location 'gs://mybucket/tripway_staging' \
--worker_machine_type 'n1-standard-2' \
--profile_cpu True \
--profile_memory True 

跟进

我记录了 row_list 的类型,结果在 DataflowRunner 中,它是 ,而在 DirectRunner 中,它是 list.这是预期的不一致吗?

I logged the type of row_list, turned out, in DataflowRunner, it's <class 'apache_beam.transforms.trigger._UnwindowedValues'>, while in DirectRunner, it's list. Is this an expected inconsistency?

推荐答案

这种抽象在像 Beam/Dataflow(和其他)这样的大数据系统中是必要的.考虑到 list 中的元素数量可以任意大.

This kind of abstraction is necessary in Big Data systems like Beam / Dataflow (and others). Consider that the number of elements in the list could be arbitrarily large.

_UnwindowedValues 提供了一个可迭代的接口来访问这组元素,这些元素可能是任意大小的,并且可能无法在内存中保持完整.

The _UnwindowedValues provides the iterable interface to access this set of elements that could be of any size, and may not be possible to keep whole in memory.

Direct Runner 返回列表的事实是一个不一致的问题,在几个版本的 Beam 之前已修复.在 Dataflow 中,GroupByKey 的结果不是以列表的形式出现,也不支持 len - 但它可迭代的.

The fact that the Direct Runner returned a list is an inconsistency that was fixed a couple versions of Beam ago. In Dataflow, the result from GroupByKey does not come in the form of a list, and does not support len - but it is iterable.

总之,在做http_request_num = len(row_list)之前,可以将其强制为支持len的类型,例如:

In short, before doing http_request_num = len(row_list), you can coerce it into a type that supports len, e.g:

row_list = list(pairs_same_group[1])
http_request_num = len(row_list)

但是考虑到该列表可能非常大.

But consider that the list may be very large.

这篇关于'_UnwindowedValues' 类型的对象没有 len() 是什么意思?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆