I’m seeing wrong query results when executing queries against an external MySQL database, but only when connecting from Celery tasks running on Heroku. The same tasks, when run on my own machine do not show these errors, and the errors only appear about half of the time (although when they fail, all tasks are wrong).
The tasks are managed by Celery via Redis, and the MySQL database does not itself run on Heroku. Both my local machine and Heroku connect to the same MySQL database.
I connect to the database using MySQL, with the pymysql driver, using;
DB_URI = 'mysql+pymysql://USER:PW@SERVER/DB' engine = create_engine(stats_config.DB_URI, convert_unicode=True, echo_pool=True) db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine)) Base = declarative_base() Base.query = db_session.query_property()
The tasks are executed one by one.
Here is an example of a task with different results:
@shared_task(bind=True, name="get_gross_revenue_task") def get_gross_revenue_task(self, g_start_date, g_end_date, START_TIME_FORM): db_session.close() start_date = datetime.strptime(g_start_date, '%d-%m-%Y') end_date = datetime.strptime(g_end_date, '%d-%m-%Y') gross_rev_trans_VK = db_session.query(func.sum(UsersTransactionsVK.amount)).filter(UsersTransactionsVK.date_added >= start_date, UsersTransactionsVK.date_added <= end_date, UsersTransactionsVK.payed == 'Yes').scalar() gross_rev_trans_Stripe = db_session.query(func.sum(UsersTransactionsStripe.amount)).filter(UsersTransactionsStripe.date_added >= start_date, UsersTransactionsStripe.date_added <= end_date, UsersTransactionsStripe.payed == 'Yes').scalar() gross_rev_trans = db_session.query(func.sum(UsersTransactions.amount)).filter(UsersTransactions.date_added >= start_date, UsersTransactions.date_added <= end_date, UsersTransactions.on_hold == 'No').scalar() if gross_rev_trans_VK is None: gross_rev_trans_VK = 0 if gross_rev_trans_Stripe is None: gross_rev_trans_Stripe = 0 if gross_rev_trans is None: gross_rev_trans = 0 print ('gross', gross_rev_trans_VK, gross_rev_trans_Stripe, gross_rev_trans) total_gross_rev = gross_rev_trans_VK + gross_rev_trans_Stripe + gross_rev_trans return {'total_rev' : str(total_gross_rev / 100), 'current': 100, 'total': 100, 'statistic': 'get_gross_revenue', 'time_benchmark': (datetime.today() - START_TIME_FORM).total_seconds()} # Selects gross revenue between selected dates @app.route('/get-gross-revenue', methods=["POST"]) @basic_auth.required @check_verified def get_gross_revenue(): if request.method == "POST": task = get_gross_revenue_task.apply_async([session['g_start_date'], session['g_end_date'], session['START_TIME_FORM']]) return json.dumps({}), 202, {'Location': url_for('taskstatus_get_gross_revenue', task_id=task.id)}
These are simple and fast tasks, completing within a few seconds.
The tasks fail by producing small differences. For example, for a task where the correct result would by 30111, when things break the task would produce 29811 instead. It is always the code that uses `db
What I tried:
I am already using the same timezone by executing:
db_session.execute("SET SESSION time_zone = 'Europe/Berlin'")
I checked for errors in the worker logs. Although there are some entries like
2013 Lost connection to MySQL sqlalchemy.exc.ResourceClosedError: This result object does not return rows. It has been closed automatically 2014 commands out of sync
I haven’t found a correlation between SQL errors and wrong results. The wrong tasks results can appear without a lost connection.
A very dirty fix is to hard-code an expected result for one of the tasks, execute that first and then re-submit everything if the result produced is incorrect.
This is probably a cache or isolation level problem with the way I use the SQLAlchemy session. Because I only ever need to use SELECT (no inserts or updates), I also tried different settings for the isolation level, before running tasks, such as
#db_session.close() #db_session.commit() #db_session.execute('SET TRANSACTION READ ONLY')
These show an error when I run these on Heroku, but they work when I run them on my Windows machine.
I also tried to alter the connection itself with
'isolation_level="READ UNCOMMITTED'
, without any result.I am certain that the workers are not reusing the same
db_session
.It seems that only tasks which use
db_session
in the query can return wrong results. Code using thequery
attribute on theBase
base class (adb_session.query_property()
object, e.g.Users.query
) does not appear to having issues. I thought this was basically the same thing?
Advertisement
Answer
You are re-using sessions between tasks in different workers. Create your session per Celery worker, or even per task.
Know that tasks are actually persisted per worker. You can use this to cache a session for each task, so you don’t have to recreate the session each time the task is run. This is easiest done with a custom task class; the documentation uses database connection caching as an example there.
To do this with a SQLAlchemy session, use:
Session = scoped_session(sessionmaker(autocommit=True, autoflush=True)) class SQLASessionTask(Task): _session = None @property def session(self): if self._session is None: engine = create_engine( stats_config.DB_URI, convert_unicode=True, echo_pool=True) self._session = Session(bind=engine) return self._session
Use this as:
@shared_task(base=SQLASessionTask, bind=True, name="get_gross_revenue_task") def get_gross_revenue_task(self, g_start_date, g_end_date, START_TIME_FORM): db_session = self.session # ... etc.
This only creates a SQLAlchemy session for the current task only if it needs one, the moment you access self.session
.