Time-limited locks for Azure Functions
We start with some existing logic. This logic can be triggered for a record in CRM (further – Trigger Event).
Our logic is a long running operation, and the challenge is that new Trigger Event can come while first Event is still being processed.
Usually, a new Trigger Event means that there is new data available, which the logic should consider, i. e. we cannot just throw away the second Event. Instead, we need to somehow “queue” the processing of the second Event to run after the processing of the first one has finished.
For example: We have Process A that started first and Process B that started while A had already been running.
There are several options how Processes A and B could interact:
- Process B notifies A that a new Event occurred.
Process A terminates in the middle and informs B that the ‘Terminate Signal’ was received and processed.
Process B starts a long running operation. - Process B notifies A that a new Event occurred and terminates.
Process A starts from scratch without finishing the current operation. - Process B notices that Process A is already running, sets up ‘Retrigger Required’ flag and exits.
Process A finishes processing, checks if flag is set and if yes, sends a new Trigger Event.
Process X is started on the new Trigger Event, checks if it can be considered A or B and acts accordingly.
Downsides of Option 1:
- Process A should constantly check if the notification from Process B arrived.
- Process B should wait for Process A to terminate, which is in fact another signal.
- Prolonged data inconsistency (Data is inconsistent after Trigger Event and until logic has finished. With restarting/terminating the logic in the middle though, we prolong the time frame where data is inconsistent).
Downsides of Option 2:
- Process A should constantly check if notification from Process B arrived.
- Restarting means that the lifespan of a single process can exceed the allowed execution timeframe of underlying platform.
- Prolonged data inconsistency.
Upsides of option 3:
- Does not require constant checking if there has been a new Trigger Event. Instead, the process checks for a flag once, right after the long running operation (logic) has finished.
- Minimal number of inter-process signals (“AlreadyProcessing”, “RetriggerRequired”).
- Predictable execution timeframe.
- Shortened data inconsistency.
Taking all the pros and cons listed above into account, we decided to proceed with Option 3.
Here is a more detailed description of what should happen when the new Trigger Event comes:
var isAlreadyProcessing = CheckIfThereIsAnotherProcess(recordId);
if (isAlreadyProcessing)
{
SetRetriggerRequiredFlag(recordId);
return;
}
else
{
SetAlreadyProcessing(recordId);
Logic(recordId);
ClearAlreadyProcessing(recordId);
var isRetriggerRequired = CheckIfRetriggerRequired(recordId);
if (isRetriggerRequired)
{
ClearRetriggerRequired(recordId);
RetriggerEvent(recordId);
}
}
Listing 1: high-level handling of a single trigger event
The algorithm above is a little bit oversimplified: CheckIfThereIsAnotherProcess() and SetAlreadyProcessing() should be implemented in such a way (atomic) so if two processes start at the same time – only one will see that there is no other process yet and will be able to go ahead and set the flag, effectively marking themselves as “Process A”.
The “RetriggerRequired” flag can be set multiple times during the runtime of “Process A”, but there will be only one retrigger (no need to process each trigger event, only last one).
And another consideration: if “AlreadyProcessing” flag is being up for too long we would like to have a mechanism to recognize such situation and to forcefully reset “AlreadyProcessing” flag (Expired Flag case).
Next, let us add even more technical details.
- The event happens in the CRM, so we need a plugin to intercept it.
- Plugin will send record ID to the Azure Function conveniently named “Start<%FunctionalityName%>”.
- The sole purpose of Azure Function “Start<%FunctionalityName%>” is to put recordId into the queue so the second Azure Function named “<%FunctionalityName%>” can pick it up and do all the heavy lifting (long-running operation).
- Such approach (two Azure Functions and a queue) detaches the CRM plugin from a long-running operation (so plugin can finish before long running operation has completed).
- Another benefit of having a queue-triggered Azure Function is an Automatic Retry if a Function fails unexpectedly (for example, due to a short-lived problem with the network connection).
- Considering that we usually have multiple CRM instances (DEV, TEST, PROD), we should also have independent instances of Azure Functions, one per CRM instance. And we need to have a way to tell the CRM plugin which Azure Function Host it should be talking to, which can be configured via Environment Variables in the CRM.
That was a high-level boiler plate. Now, how do we implement Azure Function “<%FunctionalityName%>” with regards to code in Listing 1?
We have implemented a helper class – LockHelper with a single public method RunUnderLockAsync() which transparently for us handles all details of algorithm from Listing 1. All we need to do is to define what is our “Core logic” and how to “Retrigger Event”:
await new LockHelper(
scope, // can be logical name of CRM entity
recordId,
logger)
.RunUnderLockAsync(
async () =>
{
// this callback defines Core Logic
},
async () =>
{
// this callback defines Retrigger Event
}
);
Listing 2: How to use LockHelper class
Please note that LockHelper has no dependency on IOrganizationService (CRM connection object). Instead, this dependency is incapsulated in the core logic callback.
Internally, LockHelper is built using new BlobLock class.
BlobLock class gives us means to define a Flag and to synchronize/queue access to that Flag. It means that only one process will be able to work with given Flag at given moment (or span) of time.
This is built on top of Azure BLOB Storage Acquire Lease functionality – multiple processes can request Lease, but only one will get it. Other processes would have to execute random delay and to try get Lease again later.
Flag here is a Blob with name built after “recordId” and located in BlobContainer named after “scope” parameter. The content of the Blob is up to us, so we store a Timestamp of when Blob was created. This gives us a possibility to detect when Blob (effectively – Flag, Lock) should be considered as expired.
using (var blobLock = new BlobLock(scope, key, logger))
{
// only one Process at a time (for given scope and key)
// will be able to execute code here
}
Listing 3: How to use BlobLock class
The LockHelper class, implementing “Option (c) approach”, will be working with two Flags:
- “AlreadyProcessing”, with internal blob-name “{recordId}-lock” and
- “RetriggerRequired”, with internal blob-name “{recordId}-flag”.
The internal logic of LockHelper.RunUnderLockAsync() method can be described as follows:
public async Task RunUnderLockAsync(
Func<Task> coreLogic,
Func<Task> reTriggerCallback)
{
var canRunCoreLogic = await CheckIfCanRunCoreLogic(
scope, key, reTriggerCallback);
if (canRunCoreLogic)
{
await coreLogic();
var isReTriggerRequired = false;
using (var blobLock = new BlobLock(scope, GetLockKey(key)))
{
isReTriggerRequired = await IsFlagSet(scope, key);
ResetFlag(scope, key);
blobLock.DeleteBlobOnRelease();
}
if (isReTriggerRequired)
{
await reTriggerCallback();
}
}
}
static private string GetLockKey(string key) => $"{key}-lock";
static private string GetFlagKey(string key) => $"{key}-flag";
private async Task<bool> CheckIfCanRunCoreLogic(
string scope,
string key,
Func<Task> reTriggerCallback)
{
var canRunCoreLogic = false;
using (var blobLock = new BlobLock(scope, GetLockKey(key))
{
var timestamp = await blobLock.GetTimestampAsync();
if (false == timestamp.HasValue)
{
await blobLock.SetTimestampAsync();
canRunCoreLogic = true;
}
else
{
var isLockTooOld = (DateTime.UtcNow - timestamp.Value)
.TotalMinutes > CTooOldInMinutes;
if (isLockTooOld)
{
blobLock.DeleteBlobOnRelease();
await reTriggerCallback();
}
else
{
await SetFlag(scope, key);
}
}
}
return canRunCoreLogic;
}
private async Task SetFlag(string scope, string key)
{
using (var blobLock = new BlobLock(scope, GetFlagKey(key)))
{
await blobLock.SetTimestampAsync();
}
}
private async Task<bool> IsFlagSet(string scope, string key)
{
var isFlagSet = false;
using (var blobLock = new BlobLock(scope, GetFlagKey(key)))
{
var timestamp = await blobLock.GetTimestampAsync();
isFlagSet = timestamp.HasValue;
}
return isFlagSet;
}
private void ResetFlag(string scope, string key)
{
using (var blobLock = new BlobLock(scope, GetFlagKey(key)))
{
blobLock.DeleteBlobOnRelease();
}
}
Listing 4: simplified version of LockHelper.RunUnderLockAsync()
All in all, it has always been a challenge to synchronize individual instances of queue-triggered Azure Functions. We have managed to create a Locking Mechanism that not only guarantees a single instance of Long Running Logic for any given record, but also has a built-in recovery mechanism to release expired Locks. The solution has been successfully applied in real life scenarios and does neither have a negative impact on performance nor overhead costs.
The full version can be found here at GitHub.