在龙卷风的Python APScheduler运行异步功能 [英] APScheduler run async function in Tornado Python

查看:3307
本文介绍了在龙卷风的Python APScheduler运行异步功能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想开发一个小应用程序,它会从API收集天气数据。我已经使用APScheduler执行函数每隔x分钟。我使用Python龙卷风框架。

I am trying to develop a small app which will gather weather data from an API. I have used APScheduler to execute the function every x minutes. I use Python Tornado framework.

我得到的错误是:

INFO     Job "GetWeather (trigger: interval[0:01:00], next run at: 2015-03-28 11:40:58 CET)" executed successfully
ERROR    Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x0335C978>, <tornado.concurrent.Future object at 0x03374430>)
Traceback (most recent call last):
  File "C:\Python34\Lib\site-packages\tornado\ioloop.py", line 568, in _run_callback
    ret = callback()
  File "C:\Python34\Lib\site-packages\tornado\stack_context.py", line 275, in null_wrapper
    return fn(*args, **kwargs)
greenlet.error: cannot switch to a different thread

我认为这是从协程从GetWeather()来作为,如果我删除了这一切asycn功能,它的工作原理。

Which I think is coming from the Coroutine from GetWeather() as, if I remove all asycn features from it, it works.

我使用的电机来读取所需的坐标和MongoDB中通过他们通过API和存储的天气数据。

I am using Motor to read needed coordinates and pass them through the API and store weather data in MongoDB.

import os.path, logging
import tornado.web
import tornado.ioloop
from tornado.httpclient import AsyncHTTPClient
from tornado import gen
from tornado.options import define, options
from apscheduler.schedulers.tornado import TornadoScheduler
import motor

client = motor.MotorClient()
db = client['apitest']

console_log = logging.getLogger(__name__)

define("port", default=8888, help="run on the given port", type=int)
define("debug", default=False, help="run in debug mode")

class MainRequest (tornado.web.RequestHandler):
    def get(self):
        self.write("Hello")

scheduler = TornadoScheduler()

class ScheduledTasks(object):
    def get(self):
        print("This is the scheduler");

def AddJobs():
    scheduler.add_job(GetWeather, 'interval', minutes=1)

def StartScheduler():
    scheduler.start();

def StopScheduler():
    scheduler.stop();

class Weather(tornado.web.RequestHandler):
    def get(self):
        self.write("This is the Weather Robot!")
        GetWeather()

@gen.coroutine
def GetWeather():
    '''
    Getting city weather from forecast.io API
    '''
    console_log.debug('Start: weather robot')    
    cursor = FindCities()

    while (yield cursor.fetch_next):
        city = cursor.next_object()
        lat = str(city["lat"])
        lon = str(city["lon"])     
        http_client = AsyncHTTPClient()
        response = yield http_client.fetch("https://api.forecast.io/forecast/3925d0668cf520768ca855951f1097cd/%s,%s" %(lat, lon))

        if response.error:
            print ("Error:", response.error)
            # Store all cities with errors in order to save them in the log file
        else:         
            json = tornado.escape.json_decode(response.body)
            temperature =  json["currently"]["temperature"]
            summary = json["currently"]["summary"]
            db.cities.update({'_id': city["_id"]}, {'$set': {'temperature': temperature, 'summary': summary}})

    console_log.debug('End: weather robot')
    return

def FindCities():
    '''
    cities = [{
                "_id" : ObjectId("55165d07258058ee8dca2172"),
                "name" : "London",
                "country" : "United Kingdom",
                "lat" : 51.507351,
                "lon" : -0.127758
            },
            {
                "_id" : ObjectId("55165d07258058ee8dca2173"),
                "name" : "Barcelona",
                "country" : "Spain",
                "lat" : 41.385064,
                "lon" : 2.173403
            }      
    '''
    cities = db.cities.find().sort([('_id', -1)])
    return cities

def main():
    logging.basicConfig(level=logging.DEBUG,format='%(levelname)-8s %(message)s')
    app = tornado.web.Application(
            [
                (r'/robots/weather', Weather),
                (r'/', MainRequest)
            ],
            cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
            login_url="/auth/login",
            template_path=os.path.join(os.path.dirname(__file__), "templates"),
            static_path=os.path.join(os.path.dirname(__file__), "static"),
            xsrf_cookies=True,
            debug=options.debug,
        )
    app.listen(options.port)
    AddJobs()
    StartScheduler()
    tornado.ioloop.IOLoop.instance().start()


if __name__ == "__main__":
    main()

任何想法,我做错了什么?正如我在APScheduler code看到,TornadoScheduler()运行在龙卷风IOLoop ...(<一个href=\"https://bitbucket.org/agronholm/apscheduler/src/a34075b0037dba46735bae67f598ec6133003ef1/apscheduler/schedulers/tornado.py?at=master\" rel=\"nofollow\">https://bitbucket.org/agronholm/apscheduler/src/a34075b0037dba46735bae67f598ec6133003ef1/apscheduler/schedulers/tornado.py?at=master)

哦!我忘了说,这个想法是能够通过APScheduler或手动两种执行任务。

Oh! I forgot to say that the idea is to be able to execute the task via APScheduler or manually both.

非常感谢!

推荐答案

默认情况下,TornadoScheduler奔跑安排在一个线程池的任务。特定的任务,但是,使用IOLoop等期望在相同的线程中运行。为了解决这个问题,你可以使用龙卷风IOLoop的add_callback()方法来安排任务是在IOLoop的线程尽快运行。

By default, TornadoScheduler runs scheduled tasks in a thread pool. Your specific task, however, uses the IOLoop and so expects to be run in the same thread. To fix this, you can use the add_callback() method of the tornado IOLoop to schedule a task to be run in the IOLoop's thread as soon as possible.

像这样:

def your_scheduled_task():
    IOLoop.instance().add_callback(your_real_task_function)

甚至更好:

scheduler.add_job(IOLoop.instance().add_callback, 'interval', minutes=1, args=[GetWeather])

这篇关于在龙卷风的Python APScheduler运行异步功能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆