AttributeError:“模块"对象没有属性"_strptime"(在Python2.7中使用threading.Thread使用threading time.strptime) [英] AttributeError: 'module' object has no attribute '_strptime' (threading time.strptime using threading.Thread in Python2.7)

查看:76
本文介绍了AttributeError:“模块"对象没有属性"_strptime"(在Python2.7中使用threading.Thread使用threading time.strptime)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想知道是否有人遇到过这个问题并有解决方案.

I am wondering if anyone has ever faced this issue and has a solution to it.

Traceback主要错误:AttributeError:'模块'对象没有属性'_strptime'

Traceback main error: AttributeError: 'module' object has no attribute '_strptime'

在Python2.7中的线程中运行time.strptime时 与此问题类似: https://mail.python.org /pipermail/python-list/2015-October/697689.html

When running a time.strptime in Thread in Python2.7 Similar to this issue: https://mail.python.org/pipermail/python-list/2015-October/697689.html

from datetime import datetime
import functools
import logging
import time
from threading import currentThread, Thread
import sys

log = logging.getLogger(__name__)



def _worker_process_queue(worker_fnc, input_queue, output_queue):
    """Function to get item from input queue and call `worker_fnc` on it and store the result of it in output queue

    Args:
        worker_fnc (function): task function
        input_queue (list): List that stores all inputs that needs to be passed to worker_fnc
        output_queue (list): List that stores all results/output from worker_fnc

    NOTE:
        Using List and not Queue because the operations being performed on the shared List are all atomic
        (.pop, .append)
        http://effbot.org/pyfaq/what-kinds-of-global-value-mutation-are-thread-safe.htm
        https://stackoverflow.com/questions/6319207/are-lists-thread-safe

    """
    err = None
    task_output = None
    _task = input_queue.pop()           # NOTE: This list operation is thread-safe
    task_input = _task['task_input']
    task_index = _task['task_index']
    try:
        log.debug('Thread: {} is running: {} with kwargs: {}'.format(currentThread(), worker_fnc, task_input))
        task_output = worker_fnc(**task_input)
    except Exception as e:
        err = e
        log.exception("run_tasks_concurrently._worker unhandled Exception: {}".format(e))
    finally:
        # NOTE: This list operation is thread-safe
        output_queue.append({'task_output': task_output, 'task_error': err, 'task_index': task_index})


def run_tasks_concurrently(pool_size, worker_fnc, tasks):
    """Run I/O Tasks (e.g. GET requests calls) concurrently using Threads

    This function blocks until all tasks are completed

    Args:
        pool_size (int): Total amount of threads to spawn at a given time
        worker_fnc (function): Worker function that will get called at each thread.
                               NOTE: Make sure that `worker_fnc` is thread safe
                                     https://en.wikipedia.org/wiki/Thread_safety
        tasks (list): List of dict, where keys and values of the dict are same as kwargs
                      of `worker_fnc`

    Returns:
        list: List of items, where each item is the return of `worker_fnc`. Order of this list is the same as `tasks`

    Logic:

        1. Create a input queue (list to store all inputs that needs to be passed to worker_fnc)
        2. Populate the input queue with items from `tasks`
        2. Create a output queue (list to store all outputs after running worker_fnc)
        2. For each task start a thread (that will run in background), total of `pool_size` threads will run at a time
        3. Within each thread:
            a. Get item from input queue and call `worker_fnc` on it and store the result of it in output queue
            b. Handle any exception and store it in the output queue
        4. After all threads have run, get all results from output queue and order then in order of input
        5. Throw RunTime exception if any unhandled exception is found in one of the task

    Example:

        >>> import requests
        >>> def worker(ip):
        >>>     return {'ip': ip, 'code': requests.get('http://{}'.format(ip)).status_code}
        >>> pool_size = 5
        >>> tasks = [{'ip': "172.217.0.{}".format(i)} for i in range(pool_size)]
        >>> print(run_tasks_concurrently(pool_size, worker, tasks))
        [{'ip': '172.217.0.0', 'code': 200}, {'ip': '172.217.0.1', 'code': 200}, ...]

    """
    # Populate the tasks in the input_queue
    output_queue = []
    input_queue = []
    size_of_tasks = len(tasks)
    for i, task in reversed(list(enumerate(tasks))):          # Reversing so that worker can pop and use last entry
        input_queue.append({'task_input': task, 'task_index': i})

    start_time = datetime.now()
    log.debug("Starting run_tasks_concurrently {} tasks with pool_size {}".format(size_of_tasks, pool_size))

    # For each task start a thread (that will run in background), total of `pool_size` threads will run at a time
    threads = []
    all_chunked_tasks = chunks(tasks, pool_size)
    for i, chunked_tasks in enumerate(all_chunked_tasks):
        for chunked_tasks in chunked_tasks:
            t = Thread(target=_worker_process_queue, args=(worker_fnc, input_queue, output_queue))
            t.daemon = True
            t.start()
            threads.append(t)

        # Block the code until chunked tasks threads complete
        log.debug('Waiting for tasks {}->{} to finish out of total {} tasks'
                  .format(pool_size * i, pool_size * (i + 1), size_of_tasks))
        [t.join() for t in threads]

    # After all threads have run, get all results from output queue and order then in order of input
    results = [None] * size_of_tasks
    unhandled_exceptions = []
    for _result in output_queue:
        task_error = _result.get('task_error')
        task_index = _result['task_index']
        # Order the result in the order of tasks
        results[task_index] = _result['task_output']
        if task_error:
            unhandled_exceptions.append((task_error, task_index))

    # Throw RunTime exception if any unhandled exception is found in one of the task
    if unhandled_exceptions:
        error_msgs = ["For give task input {} unhandled exception {}".format(tasks[task_index], task_error)
                      for task_error, task_index in unhandled_exceptions]
        error_msgs = "\n".join(error_msgs)
        raise RuntimeError("Unhandled exceptions caught in threads:\n{}".format(error_msgs))

    # Get the result, close all threads and return results
    end_time = datetime.now() - start_time
    log.debug("runtime for run_tasks_concurrently: {} result length: {}".format(end_time, len(tasks)))
    return results


def chunks(l, n):
    """Yield successive n-sized chunks from l.

    Args:
        l (list): list of items that needs to be evenly chunked
        n (int): chunk size

    Returns:
        list: list of list

    Example:

        >>> chunks(range(5), 2)
        [[0,1], [2,3], [4,]]
        >>>

    """
    for i in range(0, len(l), n):
        yield l[i:i + n]

def a_worker_fnc(x):
    time.strptime('Tue, 31 Jul 2018 17:15:24 GMT', '%a, %d %b %Y %X GMT')
    return x*x


def main():
    print(sys.version)
    t1 = datetime.now()
    logging.basicConfig(level=logging.NOTSET, format="%(asctime)s %(levelname)s %(name)s %(process)d/%(thread)d: %(message)s")
    no_of_jobs,no_of_workers = 2,2
    log.info('Starting script for # of jobs: {} running # of workers at a time {}...'.format(no_of_jobs, no_of_workers))

    tasks = [{"x": x}for x in range(no_of_jobs)]
    print(run_tasks_concurrently(no_of_workers, a_worker_fnc, tasks))
    print(datetime.now() - t1)

if __name__ == '__main__':
    main()

追踪

2.7.8 |Continuum Analytics, Inc.| (default, Aug 21 2014, 18:22:21) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)]
2018-07-31 17:18:07,888 INFO __main__ 21304/139920311363328: Starting script for # of jobs: 2 running # of workers at a time 2...
2018-07-31 17:18:07,888 DEBUG __main__ 21304/139920311363328: Starting run_tasks_concurrently 2 tasks with pool_size 2
2018-07-31 17:18:07,889 DEBUG __main__ 21304/139920173688576: Thread: <Thread(Thread-1, started daemon 139920173688576)> is running: <function a_worker_fnc at 0x7f41b440f140> with kwargs: {'x': 0}
2018-07-31 17:18:07,890 DEBUG __main__ 21304/139920161081088: Thread: <Thread(Thread-2, started daemon 139920161081088)> is running: <function a_worker_fnc at 0x7f41b440f140> with kwargs: {'x': 1}
2018-07-31 17:18:07,890 DEBUG __main__ 21304/139920311363328: Waiting for tasks 0->2 to finish out of total 2 tasks
2018-07-31 17:18:07,890 ERROR __main__ 21304/139920161081088: run_tasks_concurrently._worker unhandled Exception: 'module' object has no attribute '_strptime_time'
Traceback (most recent call last):
  File "/home/barikak/workspace/py27sandbox/concurrency_vs_threading_vs_multiprocessing/threading_example.py", line 34, in _worker_process_queue
    task_output = worker_fnc(**task_input)
  File "/home/barikak/workspace/py27sandbox/concurrency_vs_threading_vs_multiprocessing/threading_example.py", line 152, in a_worker_fnc
    time.strptime('Tue, 31 Jul 2018 17:15:24 GMT', '%a, %d %b %Y %X GMT')
AttributeError: 'module' object has no attribute '_strptime_time'
Traceback (most recent call last):
  File "/home/barikak/workspace/py27sandbox/concurrency_vs_threading_vs_multiprocessing/threading_example.py", line 168, in <module>
    main()
  File "/home/barikak/workspace/py27sandbox/concurrency_vs_threading_vs_multiprocessing/threading_example.py", line 164, in main
    print(run_tasks_concurrently(no_of_workers, a_worker_fnc, tasks))
  File "/home/barikak/workspace/py27sandbox/concurrency_vs_threading_vs_multiprocessing/threading_example.py", line 123, in run_tasks_concurrently
    raise RuntimeError("Unhandled exceptions caught in threads:\n{}".format(error_msgs))
RuntimeError: Unhandled exceptions caught in threads:
For give task input {'x': 1} unhandled exception 'module' object has no attribute '_strptime_time'

与Python3.4传递相同

Same in Python3.4 passes

(pyenv34) barikak@dev-dsk:~% python /home/barikak/workspace/py27sandbox/concurrency_vs_threading_vs_multiprocessing/threading_example.py
3.4.0 |Continuum Analytics, Inc.| (default, Mar 17 2014, 16:13:14)
[GCC 4.1.2 20080704 (Red Hat 4.1.2-54)]
2018-07-31 17:22:41,863 INFO __main__ 22009/140545970165504: Starting script for # of jobs: 2 running # of workers at a time 2...
2018-07-31 17:22:41,863 DEBUG __main__ 22009/140545970165504: Starting run_tasks_concurrently 2 tasks with pool_size 2
2018-07-31 17:22:41,863 DEBUG __main__ 22009/140545839482624: Thread: <Thread(Thread-1, started daemon 140545839482624)> is running: <function a_worker_fnc at 0x7fd360d8c6a8> with kwargs: {'x': 0}
2018-07-31 17:22:41,864 DEBUG __main__ 22009/140545758328576: Thread: <Thread(Thread-2, started daemon 140545758328576)> is running: <function a_worker_fnc at 0x7fd360d8c6a8> with kwargs: {'x': 1}
2018-07-31 17:22:41,864 DEBUG __main__ 22009/140545970165504: Waiting for tasks 0->2 to finish out of total 2 tasks
2018-07-31 17:22:41,869 DEBUG __main__ 22009/140545970165504: runtime for run_tasks_concurrently: 0:00:00.005774 result length: 2
[0, 1]
0:00:00.006079
(pyenv34) barikak@dev-dsk:~%

推荐答案

解决方案是做什么 https://stackoverflow.com/a/22476843/558397 已描述

Solution is to do what https://stackoverflow.com/a/22476843/558397 has described

import _strptime
from datetime import datetime

# then, in threaded block
datetime.strptime(date, format)

这篇关于AttributeError:“模块"对象没有属性"_strptime"(在Python2.7中使用threading.Thread使用threading time.strptime)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆