Category Archives: Oracle

Concurrent reading and writing in an Oracle database

I’m working on a job scheduler. I have an IMPORTJOBS table in an Oracle database, in which thousands of import jobs are inserted. Initially they all have a status ‘JOBEXTRACTED’.

Then I have a Quartz scheduler with 10 Quartz jobs executing in parallel. Each Quartz job retrieves the first import job from the IMPORTJOBS table where status is ‘JOBEXTRACTED’ and changes that status to ‘EXECUTIONINPROGRESS’ so that other Quartz jobs don’t retrieve this import job again.

Of course, due to the fact that this has to happen from multiple Quartz jobs that are executed in parallel, I had a concurrency problem that resulted in import jobs being executed more than once, because the same import table row was retrieved multiple times by different Quartz jobs.

The solution was to have a lock strategy that would enable a READ/LOCK/UPDATE/UNLOCK pattern so that each import job is retrieved only once; also known as pessimistic locking: the resource is locked from its first access in a transaction until the end of the transaction and so it cannot be accessed by other transactions (as opposed to optimistic locking).

To do that, I created the following operation:

public Guid? PutNextImportJobToExecuting(string connectionString)
{
Guid? id = null;

const string sqlQuery = "SELECT * FROM IMPORTJOBS WHERE STATUSCODE = 'JOBEXTRACTED' AND ROWNUM <= 1 FOR UPDATE SKIP LOCKED";
const string sqlUpdate = "UPDATE IMPORTJOBS SET STATUSCODE = 'EXECUTIONINPROGRESS' WHERE ID = :id";
using (var transaction = new TransactionScope())
{
using (var oracleConnection = new OracleConnection(connectionString))
{
oracleConnection.Open();
using (var command = oracleConnection.CreateCommand())
{
command.CommandText = sqlQuery;
var returnedImportJobId = command.ExecuteScalar() as byte[];
if (returnedImportJobId != null) id = new Guid(returnedImportJobId);
}
if (id.HasValue)
{
using (var command = oracleConnection.CreateCommand())
{
command.CommandText = sqlUpdate;
command.Parameters.Add(":id", OracleDbType.Raw, id, ParameterDirection.Input);
command.ExecuteNonQuery();
}
}
}
transaction.Complete();
}

return id;
}

You see that first we get the first import job with status ‘JOBEXTRACTED’ but also use the ‘FOR UPDATE SKIP LOCKED’ statement, which makes sure that the returned row remains locked for the current transaction and so will not be retrieved again by other queries.

Next, we update the status of this import job to ‘EXECUTIONINPROGRESS’ and commit the transaction. So it is important to do the SELECT and UPDATE in a transaction!

If you now call this in parallel from different threads, you always get back unique import job id’s.

Later on I noticed that during heavy load sometimes no new import job id was found although there were still import jobs available. So I added some logic to retry this a few times:

private Guid? PutNextImportJobToExecuting(string connectionString, short retries)
{
short retryCounter = retries;
while (retryCounter > 0)
{
Guid? id = PutNextImportJobToExecuting(connectionString);
if (id.HasValue) return id;
retryCounter--;
}
return null;
}

Advertisements

Bulk insert into an Oracle database

One of the things I worked on today was to import thousands of records (originating from an xml file) into an Oracle database in a .NET component. Normally the data access layer uses entity framework and Devart to handle database operations, but obviously this was not going to be a good solution in this case, as this would definitely be way too slow. A quick test proved this: inserting 25.000 records took more than half an hour and locked up the database server during this time.

So I needed a better way to bulk import data into my Oracle database.

After some research I found two options:

Both allow you to do bulk inserts using something called array binding. This basically means that you can insert multiple records at once in one database call, thereby avoiding unnecessary round trips.

Because in the company we were already using Devart and we have a license for it, I decided to use that one (but the way of working is about the same with ODP.NET).

The table that I had to fill was called IMPORTJOBS:

CREATE TABLE IMPORTJOBS
(
ID RAW(16) NOT NULL,
IMPORTBATCHID RAW(16) NOT NULL,
DATA VARCHAR2(4000 BYTE) NOT NULL,
STATUSCODE VARCHAR2(20 BYTE) NOT NULL,
RESULTID RAW(16)
)

The component that has to call the bulk insert is an operation in the business layer. Because I like abstracting out components to reduce coupling and dependencies, I’ll created an OracleBulkInsertProvider that will contain the actual data logic to insert the records, so that my operations in the business layer do not have to worry about the technical details.

public interface IOracleBulkInsertProvider
{
void BulkInsertImportBatches(string connectionString,
List<ImportJob> importJobs);
}

public class OracleBulkInsertProvider : IOracleBulkInsertProvider
{
public void BulkInsertImportBatches(string connectionString,
List<ImportJob> importJobs)
{
var ids = importJobs.Select(ib => ib.Id).ToList();
var importBatchIds =
importJobs.Select(ib => ib.ImportBatchId).ToList();
var datas = importJobs.Select(ib => ib.Data).ToList();
var statusCodes =
importJobs.Select(ib => ib.ImportJobStatus.Code).ToList();

const string sql = "insert into IMPORTJOBS (ID, IMPORTBATCHID, DATA,
STATUSCODE) values (:id, :importBatchId, :data, :statusCode)"
;

using (var oracleConnection =
new OracleConnection(connectionString))
{
oracleConnection.Open();
using (var command = oracleConnection.CreateCommand())
{
command.CommandText = sql;
command.CommandType = CommandType.Text;
command.Parameters.Add(":id", OracleDbType.Raw,
ids.ToArray(), ParameterDirection.Input);
command.Parameters.Add(":importBatchId", OracleDbType.Raw,
importBatchIds.ToArray(), ParameterDirection.Input);
command.Parameters.Add(":data", OracleDbType.VarChar,
datas.ToArray(), ParameterDirection.Input);
command.Parameters.Add(":statusCode", OracleDbType.VarChar,
statusCodes.ToArray(), ParameterDirection.Input);
command.ExecuteArray(importJobs.Count);
}
}
}
}

I think the code is straightforward. The goal is to create a command, add input parameters and at the same time provide a list of actual values to insert for that parameter. At the end we simply call the ExecuteArray operation.

This way, the time needed to insert 25.000 records takes about 2 seconds.