Skip to content
Advertisement

MySQL query errors when connecting from Celery task running on Heroku

I’m seeing wrong query results when executing queries against an external MySQL database, but only when connecting from Celery tasks running on Heroku. The same tasks, when run on my own machine do not show these errors, and the errors only appear about half of the time (although when they fail, all tasks are wrong).

The tasks are managed by Celery via Redis, and the MySQL database does not itself run on Heroku. Both my local machine and Heroku connect to the same MySQL database.

I connect to the database using MySQL, with the pymysql driver, using;

DB_URI = 'mysql+pymysql://USER:PW@SERVER/DB'

engine = create_engine(stats_config.DB_URI, convert_unicode=True, echo_pool=True)
db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))
Base = declarative_base()
Base.query = db_session.query_property()

The tasks are executed one by one.

Here is an example of a task with different results:

@shared_task(bind=True, name="get_gross_revenue_task")
def get_gross_revenue_task(self, g_start_date, g_end_date, START_TIME_FORM):

    db_session.close()
    start_date = datetime.strptime(g_start_date, '%d-%m-%Y')
    end_date = datetime.strptime(g_end_date, '%d-%m-%Y')

    gross_rev_trans_VK = db_session.query(func.sum(UsersTransactionsVK.amount)).filter(UsersTransactionsVK.date_added >= start_date, UsersTransactionsVK.date_added <= end_date, UsersTransactionsVK.payed == 'Yes').scalar()
    gross_rev_trans_Stripe = db_session.query(func.sum(UsersTransactionsStripe.amount)).filter(UsersTransactionsStripe.date_added >= start_date, UsersTransactionsStripe.date_added <= end_date, UsersTransactionsStripe.payed == 'Yes').scalar()
    gross_rev_trans = db_session.query(func.sum(UsersTransactions.amount)).filter(UsersTransactions.date_added >= start_date, UsersTransactions.date_added <= end_date, UsersTransactions.on_hold == 'No').scalar()

    if gross_rev_trans_VK is None:
        gross_rev_trans_VK = 0

    if gross_rev_trans_Stripe is None:
        gross_rev_trans_Stripe = 0

    if gross_rev_trans is None:
        gross_rev_trans = 0

    print ('gross', gross_rev_trans_VK, gross_rev_trans_Stripe, gross_rev_trans)

    total_gross_rev = gross_rev_trans_VK + gross_rev_trans_Stripe + gross_rev_trans

    return {'total_rev' : str(total_gross_rev / 100), 'current': 100, 'total': 100, 'statistic': 'get_gross_revenue', 'time_benchmark': (datetime.today() - START_TIME_FORM).total_seconds()}

# Selects gross revenue between selected dates
@app.route('/get-gross-revenue', methods=["POST"])
@basic_auth.required
@check_verified
def get_gross_revenue():
    if request.method == "POST":
        task = get_gross_revenue_task.apply_async([session['g_start_date'], session['g_end_date'], session['START_TIME_FORM']])
        return json.dumps({}), 202, {'Location': url_for('taskstatus_get_gross_revenue', task_id=task.id)}

These are simple and fast tasks, completing within a few seconds.

The tasks fail by producing small differences. For example, for a task where the correct result would by 30111, when things break the task would produce 29811 instead. It is always the code that uses `db

What I tried:

  • I am already using the same timezone by executing:

    db_session.execute("SET SESSION time_zone = 'Europe/Berlin'")
    
  • I checked for errors in the worker logs. Although there are some entries like

    2013 Lost connection to MySQL
    
    sqlalchemy.exc.ResourceClosedError: This result object does not return rows. It has been closed automatically
    
    2014 commands out of sync
    

    I haven’t found a correlation between SQL errors and wrong results. The wrong tasks results can appear without a lost connection.

  • A very dirty fix is to hard-code an expected result for one of the tasks, execute that first and then re-submit everything if the result produced is incorrect.

  • This is probably a cache or isolation level problem with the way I use the SQLAlchemy session. Because I only ever need to use SELECT (no inserts or updates), I also tried different settings for the isolation level, before running tasks, such as

    #db_session.close()
    #db_session.commit()
    #db_session.execute('SET TRANSACTION READ ONLY')
    

    These show an error when I run these on Heroku, but they work when I run them on my Windows machine.

    I also tried to alter the connection itself with 'isolation_level="READ UNCOMMITTED', without any result.

  • I am certain that the workers are not reusing the same db_session.

  • It seems that only tasks which use db_session in the query can return wrong results. Code using the query attribute on the Base base class (a db_session.query_property() object, e.g. Users.query) does not appear to having issues. I thought this was basically the same thing?

Advertisement

Answer

You are re-using sessions between tasks in different workers. Create your session per Celery worker, or even per task.

Know that tasks are actually persisted per worker. You can use this to cache a session for each task, so you don’t have to recreate the session each time the task is run. This is easiest done with a custom task class; the documentation uses database connection caching as an example there.

To do this with a SQLAlchemy session, use:

Session = scoped_session(sessionmaker(autocommit=True, autoflush=True))

class SQLASessionTask(Task):
    _session = None

    @property
    def session(self):
        if self._session is None:
            engine = create_engine(
                stats_config.DB_URI, convert_unicode=True, echo_pool=True) 
            self._session = Session(bind=engine)
        return self._session

Use this as:

@shared_task(base=SQLASessionTask, bind=True, name="get_gross_revenue_task")
def get_gross_revenue_task(self, g_start_date, g_end_date, START_TIME_FORM):
    db_session = self.session
    # ... etc.

This only creates a SQLAlchemy session for the current task only if it needs one, the moment you access self.session.

User contributions licensed under: CC BY-SA
10 People found this is helpful
Advertisement