Hero Image by Ahmad Bello from Pixabay
In a Ruby on Rails project, creating a job to run in the background is as simple as
Defining the job class / execution code
Sending a job instance into a queue
Have a worker execute that instance
But all things that can go wrong when multiple users are trying to access the same shared state, a.k.a. concurrency issues, the same wrong things can happen when multiple processes / threads (let’s call them workers) are trying to execute a background task.
Also, there might be reasons when you want a background task to be executed only once. How do you prevent it from being enqueued many times?
Let’s see various scenarios of concurrent access to shared state and possible solutions.
This blog post is long, but still not exhaustive of the subject.
Before I delve into details, here is the list of tools suggested as possible solutions to various problems:
Synchronize access to parts of the code using a Mutex.
Use ActiveRecord#increment! to update an integer value atomically.
Enqueued Jobs table.
Sidekiq::Shutdown
Exception.
Other Solutions like Sidekiq Enterprise, Sidekiq Unique Jobs, Sidekiq Iteration.
A Ruby on Rails application. Repository here.
A simple data model, User
, stored in users
table, and UserBalance
stored in table user_balances
.
The User
:
The UserBalance
:
I keep track of a user balance.
I have a Sidekiq job in my Ruby on Rails project.
You can imagine any background task in your project.
This job is not thread-and-process safe.
We can prove it by running this code in a rails console. Assuming that we have created a User who has balance 0
, running the following code should increase it to 5.
This is an instance of running this script:
be rails runner script/threads_increase_balance.rb
12780 -> Calculating balance for user: panagiotis@matsinopoulos.gr
12860 -> Calculating balance for user: panagiotis@matsinopoulos.gr
12840 -> Calculating balance for user: panagiotis@matsinopoulos.gr
12800 -> Calculating balance for user: panagiotis@matsinopoulos.gr
12820 -> Calculating balance for user: panagiotis@matsinopoulos.gr
user_balance: 1
This is because all threads are accessing the balance concurrently. They are not serialized one behind the other.
There are, but not all of them are good.
I can make the job use a Mutex to access the shared balance resource.
The #perform()
needs to take an extra argument, the mutex
. And the script to fire the 5 jobs should be something like this:
Bad!
This is bad. I don’t like it. I would have to do the same for all my background jobs. Also, the most important thing is that it is not process-safe. I can’t share the same mutex across different processes. And the #perform
taking as input the mutex
is not an option. It’s not something that can be serialized to be stored into a queue, like Redis.
#increment!
(I don’t like it)I can use the ActiveRecord#increment! method, which is atomic. That’s not bad initially.
It works as expected with the script that spawns threads to update the balance in parallel.
And it works well if we put these jobs in the background. I use this script to do it: (I make sure that I have sidekiq
running)
However, I still don’t like that.
First reason? #increment!()
does not call the ActiveRecord
callbacks.
Second and most important reason? Doesn’t work with job business logic which is a little bit more elaborate than just an increment.
I generalize my job to do some complex logic to calculate the balance. Imagine that it reads other models, possibly updates others, and finally comes up with a new balance to set the balance with a new value, and it is not just an increment. It is a value which does not depend on the previous / existing balance.
So, I am on a new problem now.
This is another, more complex problem, job:
Here, the job sets the balance
to a business-dependent value.
And because there are 5 jobs running in parallel, we don’t know which one has set the correct value, whatever correct means in the business that we run.
BTW, the script prints a log of the jobs and what balance each job calculated.
The business logic inside the CalculateUserBalanceJob
might be long and complex, as I said, and I can’t set two (or more) instances of this job run at the same time.
I should NOT allow two (or more) instances of this job to run at the same time.
How can I solve this problem now?
I am going to assume that this job is going to run as a singleton, i.e at most one instance of it at any point in time. I am optimism. But, if for any reason, this is violated, then I need a way to know and make sure that I don’t suffer any serious implication.
The tool I use is called optimistic locking.
I will add the column :lock_version
to the UserBalance
model. This will make sure that only the first process that loads a UserBalance
instance will succeed in updating that instance. All others will fail with ActiveRecord::StaleObjectError
exception.
Without doing any modification to my Job code itself, if I try the script that fires 5 background jobs, 4 of them will fail.
And Sidekiq
will keep on retrying them, until they succeed. And finally they will succeed since on every retry, one will be succeeding and the rest will be failing until all succeed.
So, this looks good. But
I don’t like the fact that my jobs fail. Sidekiq
will retry them unto to 25 times, but with exponential wait time in between retries.
Lot’s of exceptions and noise.
Is there anything better I can do?
I can rescue the ActiveRecord::StaleObjectError
and reload the object and retry again, maybe after waiting for some time.
So, I could do the job that Sidekiq
does for me with Retries.
Nah! Very dirty code and raises questions like:
How many retries?
How long to wait between retries?
There has to be something better.
I will keep the optimistic locking with the lock_version
. I really like it in general. It keeps me from doing updates on objects that might be accessed by more than one process. In a multi-user Web application for example, the majority of objects would benefit from optimistic locking.
Hence, I am leaving optimistic locking in, but I also introduce pessimistic locking. This will lock all processes out except one. The other processes will wait behind a database enforced lock mechanism.
Ruby on rails allows me to do record-level locking and will work well for my case.
That’s almost perfect. Isn’t it?
When we run the script, in the Sidekiq
console, the ”****** <job-id> Calculating balance for user:…”
does not appear for all 5 jobs at once. Only after a job finishes, then next message appears. And the jobs do not fail with the ActiveRecord::StaleObjectError
. So, the jobs have been serialized behind the User.lock.find(user_id)
statement, which does a select * from users where user_id = ? for update
.
Good! That is a very good solution for locking processes out when updating a record instance, like a User
with a specific id.
Not Good For: But what happens if I don’t want to update a record, but I want to create a record?
Let’s see another problem. I have a job that creates a User
. But I want to make sure that it runs, again, as singleton, unique job at a given point in time.
This is an example:
If I just span 5 such jobs in Sidekiq
, simulating a bad, but possible to happen, situation, 4 of them will fail, because the email is unique. They will retry and will retry until they find that another process has created the user.
Note: One might say, “Hey you! Why don’t you use User.find_or_create_by!(email: user_email)
”. My answer to this comment is that I would do it if it were that simple. If I didn’t have any complex business logic in creating a User. The point is to find a solution to a more complex problem. A job/task that does a lot of things before committing/creating one or more things into the database. So, the example of creating a User by email might not be representative, but this is why I am not using find_or_create_by()
. To simulate a more complex scenario.
How do I lock a process out when another one is being executed in this creation case? There is no way I can do select for update
, because there are not any records to lock. I am not doing any update.
I am going to use Postgres Advisory Locks.
Side note to those who use MySQL or MS SQL Server: I asked an LLM to tell me whether MySQL and MS SQL Server have equivalent to Advisory Locks tools. Here is what it replied to me:
MySQL: MySQL doesn't have a direct equivalent to Advisory Locks, but you can achieve similar functionality using GET_LOCK() and RELEASE_LOCK() functions. These allow you to create named locks that can be used across sessions, much like Advisory Locks in PostgreSQL.
SQL Server: SQL Server doesn't have an exact match either, but you can use sp_getapplock and sp_releaseapplock to create application-level locks. These functions allow you to define locks that aren't tied to specific database objects, making them somewhat comparable to Advisory Locks.
Here is how I can use it in the CreateUserJob
:
The behavior of this process is exactly the same like when I used the Rails pessimistic locking with User.lock.find()
. Each process is queued behind the other.
I repeat here the lines that do the locking:
# LOCKING >
lock_namespace = ADVISORY_LOCK_NAMESPACES[:create_user_job]
lock_key = user_email
lock_comment = "CreateUserJob: #{provider_job_id} - #{user_email}"
lock_query = <<~SQL.squish
SELECT pg_advisory_xact_lock(:lock_namespace, hashtext(:key)) /* #{lock_comment} */
SQL
lock_query_args = {
lock_namespace: lock_namespace,
key: lock_key
}
sql = ActiveRecord::Base.sanitize_sql_array([ lock_query, lock_query_args ])
ActiveRecord::Base.connection.execute(sql)
# < LOCKING
The juice of the story here is the SELECT pg_advisory_xact_lock()
. I am not going to explain how it works. You can read the Postgres documentation.
What I am going to emphasize is what I did on top of it:
I use the namespace to tell that I am locking out CreateUserJob
(s) for the key user_email
. Note that the pg_advisory_xact_lock()
takes as arguments integers. That’s why I have to convert the namespace :create_user_job
and the email to an integer. The first I do it with an application level hash and the second using the hashtext()
function in Postgres.
I also annotate the SQL statement with an SQL level lock.
Important Note: hashtext()
is an internal function in Postgres. It is not publicly documented. This means that it might break in the future. Also, as a hashing function, it might fall into collisions. Now, I am looking at alternatives to generate a unique integer from a string. But, if you can provide an integer for the key, rather than a string which is converted with hashtext()
to an integer, that would be ideal.
Suggestion: Factor out these lines of code into a reusable Ruby method. Like this:
module AdvisoryLock
class << self
def lock(lock_namespace:, lock_key:, lock_comment:)
lock_query = <<~SQL.squish
SELECT pg_advisory_xact_lock(:lock_namespace, hashtext(:key)) /* #{lock_comment} */
SQL
lock_query_args = {
lock_namespace: lock_namespace,
key: lock_key
}
sql = ActiveRecord::Base.sanitize_sql_array([lock_query, lock_query_args])
ActiveRecord::Base.connection.execute(sql)
end
end
end
Then it will only be a matter of calling this method: AdvisoryLock.lock(…)
at the beginning of your transaction.
Disadvantages?: It doesn’t have any. As long as your server is configured to accept many advisory locks, then you will not hit any problem. Except this solution is not very good if the job is very-long-running with numerous updates in db. See next problem.
Good! I even use it some times instead of the Ruby on Rails pessimistic locking. This is because Ruby on Rails pessimistic locking requires you to lock some rows in the database and prevents others from updating them, which might not be necessary.
Important Important Important! Exactly as it is mentioned on the Best Practices section in Sidekiq documentation,
Make your jobs idempotent and transactional
This is exactly what I did with the CreateUserJob
.
It is idempotent, because I am checking whether a user already exists or not. If it exists, I don’t do anything. And it runs inside a transaction.
Hey! This solution is not good for the next problem. What if my job is a long-running job? Assuming that it needs to run for 20 mins or for 40 mins or for 2 hours? What if it is a batch job?
Important! I don’t like long running batch jobs. If I have a long-running batch job, I try to break into smaller chunks, chunks that can finish in about 1 or 2 minutes maximum.
But, let’s suppose that we have a job that is long-running and cannot be broken into smaller jobs.
Keeping the advisory lock and the transaction open for 1 hour or 2 hours is not good for many reasons.
Before I enqueue a job with something like LongRunningBatchJob.perform_later(batch_id)
, I check whether the job has already been enqueued.
So, I want something like this:
unless job_enqueued?(LongRunningBatchJob.name, batch_id)
LongRunningBatchJob.perform_later(batch_id)
end
But this ☝️, needs to be atomic. I.e. in between job_enqueued?()
check and the …perform_later()
the job should not be enqueued.
Hence, we need to find a way to lock the access to the queue when checking and then enqueuing.
Can I use Sidekiq
API to do that. No, I can’t. Sidekiq
uses Redis to store background tasks. And I don’t see Sidekiq
API having any such facility as LongRunningJob.perform_later_if_not_already_enqueued()
.
And it’s not only that we want to check whether it is already enqueued. It might be enqueued and have already started. In that case it is not literally enqueued in Sidekiq
terms.
So, generally, I believe that Sidekiq
is not very helpful here.
Hence, I am going to use a Postgres table to implement this feature.
enqueued_jobs
table:
EnqueuedJob
model:
This is a model that has a polymorphic association to units of work, other models that are called queuables
. For example to a User
, or to a Batch
.
Each enqueued job is for
a queueable
, such as a User
, a Batch
, an Order
. What we usually give as argument to the job, #perform
.
a job_class_name
, such as CreateUserJob
, LongRunningBatchJob
, e.t.c.
This should not be done with just LongRunningBatchJob.perform_later(batch_id)
. We should not call the #perform_later()
on the Job class. If we do, we don’t take advantage of this enqueued_jobs
table.
We should call the enqueue()
method on the model at hand, on the queueable
.
For example: batch.enqueue(…)
or user.enqueue(…)
.
In order to support this interface, we introduce the Queuable
module concern
.
Queuable
API.I have developed the module Queueable
:
The main method is the enqueue
. If I include this module into the User
or Batch
or other model, then I can call it like this:
user = User.find_by(email: “panagiotis@matsinopoulos.gr”)
user.enqueue(job_class_name: LongRunningBatchJob.name)
This will use the Postgres advisory lock to create an entry in the enqueued_jobs
only if there is none. If it creates the entry it also do a perform_later()
. Otherwise, it doesn’t.
The Queueable
API supports enqueuing jobs in the future with the scheduling_options
too.
I will not go into the details of the implementation, but they should be fairly clear if you have read the whole blog post from the beginning.
But how do I implement a job to work together with the enqueued_jobs
?
Here it is how:
First, I include a concern called EnqueueableJob
Then, I wrap the actual work inside a call such as:
process_enqueued_job(queueable: batch) do
...
end
As simple as that. The interaction wit the enqueued_jobs
happens inside the process_enqueued_job
which is implemented in the EnqueueableJob
concern as follows:
This is very simple.
I run the job using yield
and then I delete it from the enqueued_jobs
.
In summary, for the long running jobs:
I synchronize the access to the enqueued_jobs
and the subsequent call to #perform_later()
.
The job itself, runs only if it still exists in the enqueued_jobs
and removes itself after finishing.
This is a very interesting problem. When I have a long running job that usually takes 1 or 2 hours, for example, to run, Sidekiq
might kill it as part of a restart. For example, every time I do a deployment.
My job needs to be able to continue from the point it was killed. Not do the whole job from the beginning. Otherwise, in the case of frequent deployments, more frequent than the duration of a single-run of the job, this job will never finish.
Sidekiq::Shutdown
, Save State, RaiseInside the job, one can rescue the Sidekiq::Shutdown
exception which signals the killing of a job by Sidekiq
.
Then, inside the rescue block, job will save its state so that the next time it starts it will continue from the point it was stopped.
Then it will raise
the exception because this is the proper thing to do as a Sidekiq
job.
There are some gems that are trying to solve the above problems, but I have to be honest, I have not tried them out. Maybe in another post I will do:
gm! I am Panos Matsinopoulos, humble software reader, writer, classical music and V8 engine lover. I work for Talent Protocol, where we make builders get the recognition they deserve.