Concurrent reading and writing in an Oracle database

I’m working on a job scheduler. I have an IMPORTJOBS table in an Oracle database, in which thousands of import jobs are inserted. Initially they all have a status ‘JOBEXTRACTED’.

Then I have a Quartz scheduler with 10 Quartz jobs executing in parallel. Each Quartz job retrieves the first import job from the IMPORTJOBS table where status is ‘JOBEXTRACTED’ and changes that status to ‘EXECUTIONINPROGRESS’ so that other Quartz jobs don’t retrieve this import job again.

Of course, due to the fact that this has to happen from multiple Quartz jobs that are executed in parallel, I had a concurrency problem that resulted in import jobs being executed more than once, because the same import table row was retrieved multiple times by different Quartz jobs.

The solution was to have a lock strategy that would enable a READ/LOCK/UPDATE/UNLOCK pattern so that each import job is retrieved only once; also known as pessimistic locking: the resource is locked from its first access in a transaction until the end of the transaction and so it cannot be accessed by other transactions (as opposed to optimistic locking).

To do that, I created the following operation:

public Guid? PutNextImportJobToExecuting(string connectionString)
{
Guid? id = null;

const string sqlQuery = "SELECT * FROM IMPORTJOBS WHERE STATUSCODE = 'JOBEXTRACTED' AND ROWNUM <= 1 FOR UPDATE SKIP LOCKED";
const string sqlUpdate = "UPDATE IMPORTJOBS SET STATUSCODE = 'EXECUTIONINPROGRESS' WHERE ID = :id";
using (var transaction = new TransactionScope())
{
using (var oracleConnection = new OracleConnection(connectionString))
{
oracleConnection.Open();
using (var command = oracleConnection.CreateCommand())
{
command.CommandText = sqlQuery;
var returnedImportJobId = command.ExecuteScalar() as byte[];
if (returnedImportJobId != null) id = new Guid(returnedImportJobId);
}
if (id.HasValue)
{
using (var command = oracleConnection.CreateCommand())
{
command.CommandText = sqlUpdate;
command.Parameters.Add(":id", OracleDbType.Raw, id, ParameterDirection.Input);
command.ExecuteNonQuery();
}
}
}
transaction.Complete();
}

return id;
}

You see that first we get the first import job with status ‘JOBEXTRACTED’ but also use the ‘FOR UPDATE SKIP LOCKED’ statement, which makes sure that the returned row remains locked for the current transaction and so will not be retrieved again by other queries.

Next, we update the status of this import job to ‘EXECUTIONINPROGRESS’ and commit the transaction. So it is important to do the SELECT and UPDATE in a transaction!

If you now call this in parallel from different threads, you always get back unique import job id’s.

Later on I noticed that during heavy load sometimes no new import job id was found although there were still import jobs available. So I added some logic to retry this a few times:

private Guid? PutNextImportJobToExecuting(string connectionString, short retries)
{
short retryCounter = retries;
while (retryCounter > 0)
{
Guid? id = PutNextImportJobToExecuting(connectionString);
if (id.HasValue) return id;
retryCounter--;
}
return null;
}

About these ads

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.