在特定时间后,Flask-SQLALchemy自动更新记录 [英] Flask-SQLALchemy update record automatically after specific time
问题描述
我有一个这样的数据库模型:
I have a db models like this:
class Payment(db.Model):
id = db.Column(db.Integer(), primary_key=True)
user_id = db.Column(db.Integer(), db.ForeignKey('user.id'))
ticket_status = db.Column(db.Enum(TicketStatus, name='ticket_status', default=TicketStatus.UNUSED))
departure_time = db.Column(db.Date)
我想在datetime.utcnow()
传递了departure_time
中的日期值之后,更改所有ticket_status
中的值.
I want to change the value from all ticket_status
after datetime.utcnow()
passed the date value from departure_time
.
我试图这样编码:
class TicketStatus(enum.Enum):
UNUSED = 'UNUSED'
USED = 'USED'
EXPIRED = 'EXPIRED'
def __repr__(self):
return str(self.value)
class Payment(db.Model):
id = db.Column(db.Integer(), primary_key=True)
user_id = db.Column(db.Integer(), db.ForeignKey('user.id'))
ticket_status = db.Column(db.Enum(TicketStatus, name='ticket_status', default=TicketStatus.UNUSED))
departure_time = db.Column(db.Date)
# TODO | set ticket expirations time
def __init__(self):
if datetime.utcnow() > self.departure_time:
self.ticket_status = TicketStatus.EXPIRED.value
try:
db.session.add(self)
db.session.commit()
except Exception as e:
db.session.rollback()
我也这样尝试过:
def ticket_expiration(self, payment_id):
now = datetime.utcnow().strftime('%Y-%m-%d')
payment = Payment.query.filter_by(id=payment_id).first()
if payment.ticket_status.value == TicketStatus.USED.value:
pass
elif payment and str(payment.departure_time) < now:
payment.ticket_status = TicketStatus.EXPIRED.value
elif payment and str(payment.departure_time) >= now:
payment.ticket_status = TicketStatus.UNUSED.value
try:
db.session.commit()
except Exception as e:
db.session.rollback()
return str('ok')
但是,当datetime.utcnow()
传递了departure_time
中的日期值时,似乎没有任何作用.
But it seems no effect when the datetime.utcnow()
passed the date value from departure_time
.
所以我的问题是,如何在一段时间后自动更改行的值??
So the point of my questions is, how to change the value from a row automatically after a set of times..?
推荐答案
Finally I figure out this by using flask_apscheduler, and here is the snippet of my code that solved this questions:
安装 flask_apscheduler :
pip3 install flask_apscheduler
创建新模块 tasks.py
from datetime import datetime
from flask_apscheduler import APScheduler
from app import db
from app.models import Payment, TicketStatus
scheduler = APScheduler()
def ticket_expiration():
utc_now = datetime.utcnow().strftime('%Y-%m-%d')
app = scheduler.app
with app.app_context():
payment = Payment.query.all()
for data in payment:
try:
if data.ticket_status.value == TicketStatus.USED.value:
pass
elif str(data.departure_time) < utc_now:
data.ticket_status = TicketStatus.EXPIRED.value
elif str(data.departure_time) >= utc_now:
data.ticket_status = TicketStatus.UNUSED.value
except Exception as e:
print(str(e))
try:
db.session.commit()
except Exception as e:
db.session.rollback()
return str('ok')
,然后在 __ init __.py
def create_app(config_class=Config):
app = Flask(__name__)
app.config.from_object(Config)
# The other packages...
# The other packages...
scheduler.init_app(app)
scheduler.start()
return app
# import from other_module...
# To avoid SQLAlchemy circular import, do the import at the bottom.
from app.tasks import scheduler
这是 config.py 的地方:
class Config(object):
# The others config...
# The others config...
# Flask-apscheduler
JOBS = [
{
'id': 'ticket_expiration',
'func': 'app.tasks:ticket_expiration',
'trigger': 'interval',
'hours': 1, # call the task function every 1 hours
'replace_existing': True
}
]
SCHEDULER_JOBSTORES = {
'default': SQLAlchemyJobStore(url='sqlite:///flask_context.db')
}
SCHEDULER_API_ENABLED = True
在上面的配置中,我们可以根据情况调用每1小时,几秒钟或其他时间更新db的函数,有关设置间隔时间的更多信息,我们可以查看它
In the config above, we can call the function to update db every 1 hours, seconds or others time according to our case, for more informations to set the interval time we can see it here.
我希望这个答案对将来会遇到此问题的人有所帮助.
I hope this answer helps someone who facing this in the future.
这篇关于在特定时间后,Flask-SQLALchemy自动更新记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!