Open WorkerRoleA.cs and examine the code.
The OnStart method initializes the context objects that you need in order to work with Windows Azure Storage entities. It also makes sure that all of the tables, queues, and blob containers that you'll be using in the Run method exist. The code that performs these tasks is similar to what you saw earlier in the MVC controller constructors. You'll configure the connection string that this method uses later.
public override bool OnStart()
{
ServicePointManager.DefaultConnectionLimit = Environment.ProcessorCount;
ConfigureDiagnostics();
Trace.TraceInformation("Initializing storage account in WorkerA");
var storageAccount = CloudStorageAccount.Parse(RoleEnvironment.GetConfigurationSettingValue("StorageConnectionString"));
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
sendEmailQueue = queueClient.GetQueueReference("azuremailqueue");
var tableClient = storageAccount.CreateCloudTableClient();
mailingListTable = tableClient.GetTableReference("mailinglist");
messageTable = tableClient.GetTableReference("message");
messagearchiveTable = tableClient.GetTableReference("messagearchive");
// Create if not exists for queue, blob container, SentEmail table.
sendEmailQueue.CreateIfNotExists();
messageTable.CreateIfNotExists();
mailingListTable.CreateIfNotExists();
messagearchiveTable.CreateIfNotExists();
return base.OnStart();
} You may have seen earlier documentation on working with Windows Azure Storage that shows the initialization code in a loop that checks for transport errors. This is no longer necessary because the API now has a built-in retry mechanism that absorbs transient network failures for up to 3 additional attempts.
The ConfigureDiagnostics method that the OnStart method calls sets up tracing so that you will be able to see the output from Trace.Information and Trace.Error methods. This method is explained in the second tutorial.
The OnStop method sets the global variable onStopCalled to true, then it waits for the Run method to set the global variable returnedFromRunMethod to true, which signals it is ready to do a clean shutdown.
public override void OnStop()
{
onStopCalled = true;
while (returnedFromRunMethod == false)
{
System.Threading.Thread.Sleep(1000);
}
} The OnStop method is called when the worker role is shutting down for one of the following reasons:
- Windows Azure needs to reboot the virtual machine (the web role or worker role instance) or the physical machine that hosts the virtual machine.
- You stopped your cloud service by using the Stop button on the Windows Azure Management Portal.
- You deployed an update to your cloud service project.
The Run method monitors the variable onStopCalled and stops pulling any new work items to process when that variable changes to true. This coordination between the OnStop and Run methods enables a graceful shutdown of the worker process.
Windows Azure periodically installs operating system updates in order to ensure that the platform is secure, reliable, and performs well. These updates typically require the machines that host your cloud service to shut down and reboot. For more information, see Role Instance Restarts Due to OS Upgrades.
The Run method performs two functions:
-
Scans the message table looking for messages scheduled to be sent today or earlier, for which queue work items haven't been created yet.
-
Scans the message table looking for messages that have a status indicating that all of the queue work items were created but not all of the emails have been sent yet. If it finds one, it scans SendEmail rows for that message to see if all emails were sent, and if they were, it updates the status to Completed and archives the message row.
The method also checks the global variable onStopCalled. When the variable is true, the method stops pulling new work items to process, and it returns when already-started tasks are completed.
public override void Run()
{
Trace.TraceInformation("WorkerRoleA entering Run()");
while (true)
{
try
{
var tomorrow = DateTime.Today.AddDays(1.0).ToString("yyyy-MM-dd");
// If OnStop has been called, return to do a graceful shutdown.
if (onStopCalled == true)
{
Trace.TraceInformation("onStopCalled WorkerRoleB");
returnedFromRunMethod = true;
return;
}
// Retrieve all messages that are scheduled for tomorrow or earlier
// and are in Pending or Queuing status.
string typeAndDateFilter = TableQuery.CombineFilters(
TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.GreaterThan, "message"),
TableOperators.And,
TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.LessThan, tomorrow));
var query = (new TableQuery<Message>().Where(typeAndDateFilter));
var messagesToProcess = messageTable.ExecuteQuery(query).ToList();
TableOperation replaceOperation;
// Process each message (queue emails to be sent).
foreach (Message messageToProcess in messagesToProcess)
{
string restartFlag = "0";
// If the message is already in Queuing status,
// set flag to indicate this is a restart.
if (messageToProcess.Status == "Queuing")
{
restartFlag = "1";
}
// If the message is in Pending status, change
// it to Queuing.
if (messageToProcess.Status == "Pending")
{
messageToProcess.Status = "Queuing";
replaceOperation = TableOperation.Replace(messageToProcess);
messageTable.Execute(replaceOperation);
}
// If the message is in Queuing status,
// process it and change it to Processing status;
// otherwise it's already in processing status, and
// in that case check if processing is complete.
if (messageToProcess.Status == "Queuing")
{
ProcessMessage(messageToProcess, restartFlag);
messageToProcess.Status = "Processing";
replaceOperation = TableOperation.Replace(messageToProcess);
messageTable.Execute(replaceOperation);
}
else
{
CheckAndArchiveIfComplete(messageToProcess);
}
}
// Sleep for one minute to minimize query costs.
System.Threading.Thread.Sleep(1000 * 60);
}
catch (Exception ex)
{
string err = ex.Message;
if (ex.InnerException != null)
{
err += " Inner Exception: " + ex.InnerException.Message;
}
Trace.TraceError(err);
// Don't fill up Trace storage if we have a bug in queue process loop.
System.Threading.Thread.Sleep(1000 * 60);
}
}
} Notice that all of the work is done in an infinite loop in a while block, and all of the code in the while block is wrapped in a try-catch block to prevent an unhandled exception. If an unhandled exception occurs, Windows Azure will raise the UnhandledException event, the worker process is terminated, and the role is taken offline. The worker role will be restarted by Windows Azure, but this takes several minutes. The try block calls TraceError to record the error and then sleeps for 60 seconds so that if the error is persistent the error message won't be repeated too many times. In a production application you might send an email to an administrator in the try block.
The Run method processes a query for message rows in the message table that have scheduled date before tomorrow:
// Retrieve all messages that are scheduled for tomorrow or earlier
// and are in Pending or Queuing status.
string typeAndDateFilter = TableQuery.CombineFilters(
TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.GreaterThan, "message"),
TableOperators.And,
TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.LessThan, tomorrow));
var query = (new TableQuery<Message>().Where(typeAndDateFilter));
var messagesToProcess = messageTable.ExecuteQuery(query).ToList(); Note: One of the benefits of moving message rows to the messagearchive table after they are processed is that this query only needs to specify PartitionKey and RowKey as search criteria. If we did not archive processed rows, the query would also have to specify a non-key field (Status) and would have to search through more rows. The table size would increases, and the query would take longer and could start getting continuation tokens.
If a message is in Pending status, processing has not yet begun; if it is in Queuing status, processing did begin earlier but was interrupted before all queue messages were created. In that case an additional check has to be done in worker role B when it is sending each email to make sure the email hasn't already been sent. That is the purpose of the restartFlag variable.
string restartFlag = "0";
if (messageToProcess.Status == "Queuing")
{
restartFlag = "1";
} Next, the code sets message rows that are in Pending status to Queuing. Then, for those rows plus any that were already in Queuing status, it calls the ProcessMessage method to create the queue work items to send emails for the message.
if (messageToProcess.Status == "Pending")
{
messageToProcess.Status = "Queuing";
replaceOperation = TableOperation.Replace(messageToProcess);
messageTable.Execute(replaceOperation);
}
if (messageToProcess.Status == "Queuing")
{
ProcessMessage(messageToProcess, restartFlag);
messageToProcess.Status = "Processing";
replaceOperation = TableOperation.Replace(messageToProcess);
messageTable.Execute(replaceOperation);
}
else
{
CheckAndArchiveIfComplete(messageToProcess);
} After processing a message in Queuing status the code sets the Message row status to Processing. Rows in the message table that are not in Pending or Queuing status are already in Processing status, and for those rows the code calls a method that checks if all of the emails for the message were sent. If all emails have been sent, the message row is archived.
After processing all records retrieved by the query, the code sleeps for one minute.
// Sleep for one minute to minimize query costs.
System.Threading.Thread.Sleep(1000*60); There is a minimal charge for every Windows Azure Storage query, even if it doesn't return any data, so continuously re-scanning would unnecessarily add to your Windows Azure expenses. As this tutorial is being written, the cost is $0.10 per million transactions (a query counts as a transaction), so the sleep time could be made much less than a minute and the cost of scanning the tables for messages to be sent would still be minimal. For more information about pricing, see the first tutorial.
Note on threading and optimal CPU utilization: There are two tasks in the Run method (queuing emails and checking for completed messages), and they run sequentially in a single thread. A small virtual machine (VM) has 1.75 GB RAM and only one CPU, so it's probably OK to run these tasks sequentially with a single thread. Suppose your application needed more memory than the small VM provided to run efficiently. A medium VM provides 3.5 GB RAM and 2 CPU's, but this application would only use one CPU, because it's single threaded. To take advantage of all the CPUs, you would need to create a worker thread for each CPU. Even so, a single CPU is not fully utilized by one thread. When a thread makes network or I/O calls, the thread must wait for the I/O or network call to complete, and while it waits, it's not doing useful work. If the Run method was implemented using two threads, when one thread was waiting for a network or I/O operation to complete, the other thread could be doing useful work.
The ProcessMessage method gets all of the email addresses for the destination email list, and creates a queue work item for each email address. As it creates queue work items, it also creates SendEmail rows in the Message table. These rows provide worker role B with the information it needs to send emails and includes an EmailSent property that tracks whether each email has been sent.
private void ProcessMessage(Message messageToProcess, string restartFlag)
{
// Get Mailing List info to get the "From" email address.
var retrieveOperation = TableOperation.Retrieve<MailingList>(messageToProcess.ListName, "mailinglist");
var retrievedResult = mailingListTable.Execute(retrieveOperation);
var mailingList = retrievedResult.Result as MailingList;
if (mailingList == null)
{
Trace.TraceError("Mailing list not found: " + messageToProcess.ListName + " for message: " + messageToProcess.MessageRef);
return;
}
// Get email addresses for this Mailing List.
string filter = TableQuery.CombineFilters(
TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, messageToProcess.ListName),
TableOperators.And,
TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.NotEqual, "mailinglist"));
var query = new TableQuery<Subscriber>().Where(filter);
var subscribers = mailingListTable.ExecuteQuery(query).ToList();
foreach (Subscriber subscriber in subscribers)
{
// Verify that the subscriber email address has been verified.
if (subscriber.Verified == false)
{
Trace.TraceInformation("Subscriber " + subscriber.EmailAddress + " not Verified, so not queuing ");
continue;
}
// Create a SendEmail entity for this email.
var sendEmailRow = new SendEmail
{
PartitionKey = messageToProcess.PartitionKey,
RowKey = messageToProcess.MessageRef.ToString() + subscriber.EmailAddress,
EmailAddress = subscriber.EmailAddress,
EmailSent = false,
MessageRef = messageToProcess.MessageRef,
ScheduledDate = messageToProcess.ScheduledDate,
FromEmailAddress = mailingList.FromEmailAddress,
SubjectLine = messageToProcess.SubjectLine,
SubscriberGUID = subscriber.SubscriberGUID,
ListName = mailingList.ListName
};
// When we try to add the entity to the SendEmail table,
// an exception might happen if this worker role went
// down after processing some of the email addresses and then restarted.
// In that case the row might already be present, so we do an Upsert operation.
try
{
var upsertOperation = TableOperation.InsertOrReplace(sendEmailRow);
messageTable.Execute(upsertOperation);
}
catch (Exception ex)
{
string err = "Error creating SendEmail row: " + ex.Message;
if (ex.InnerException != null)
{
err += " Inner Exception: " + ex.InnerException;
}
Trace.TraceError(err);
}
// Create the queue message.
string queueMessageString =
sendEmailRow.PartitionKey + "," +
sendEmailRow.RowKey + "," +
restartFlag;
var queueMessage = new CloudQueueMessage(queueMessageString);
sendEmailQueue.AddMessage(queueMessage);
}
Trace.TraceInformation("ProcessMessage end PK: "
+ messageToProcess.PartitionKey);
} The code first gets the mailing list row from the mailinglist table for the destination mailing list. This row has the "from" email address which needs to be provided to worker role B for sending emails.
// Get Mailing List info to get the "From" email address.
var retrieveOperation = TableOperation.Retrieve<MailingList>(messageToProcess.ListName, "mailinglist");
var retrievedResult = mailingListTable.Execute(retrieveOperation);
var mailingList = retrievedResult.Result as MailingList;
if (mailingList == null)
{
Trace.TraceError("Mailing list not found: " + messageToProcess.ListName + " for message: " + messageToProcess.MessageRef);
return;
} Then it queries the mailinglist table for all of the subscriber rows for the destination mailing list.
// Get email addresses for this Mailing List.
string filter = TableQuery.CombineFilters(
TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, messageToProcess.ListName),
TableOperators.And,
TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.NotEqual, "mailinglist"));
var query = new TableQuery<Subscriber>().Where(filter);
var subscribers = mailingListTable.ExecuteQuery(query).ToList(); In the loop that processes the query results, the code begins by checking if subscriber email address is verified, and if not no email is queued.
// Verify that the subscriber email address has been verified.
if (subscriber.Verified == false)
{
Trace.TraceInformation("Subscriber " + subscriber.EmailAddress + " not Verified, so not queuing ");
continue;
} Next, the code creates a SendEmail row in the message table. This row contains the information that worker role B will use to send an email. The row is created with the EmailSent property set to false.
// Create a SendEmail entity for this email.
var sendEmailRow = new SendEmail
{
PartitionKey = messageToProcess.PartitionKey,
RowKey = messageToProcess.MessageRef.ToString() + subscriber.EmailAddress,
EmailAddress = subscriber.EmailAddress,
EmailSent = false,
MessageRef = messageToProcess.MessageRef,
ScheduledDate = messageToProcess.ScheduledDate,
FromEmailAddress = mailingList.FromEmailAddress,
SubjectLine = messageToProcess.SubjectLine,
SubscriberGUID = subscriber.SubscriberGUID,
ListName = mailingList.ListName
};
try
{
var upsertOperation = TableOperation.InsertOrReplace(sendEmailRow);
messageTable.Execute(upsertOperation);
}
catch (Exception ex)
{
string err = "Error creating SendEmail row: " + ex.Message;
if (ex.InnerException != null)
{
err += " Inner Exception: " + ex.InnerException;
}
Trace.TraceError(err);
} The code uses an "upsert" operation because the row might already exist if worker role A is restarting after a failure.
The last task to be done for each email address is to create the queue work item that will trigger worker role B to send an email. The queue work item contains the partition key and row key value of the SendEmail row that was just created, plus the restart flag that was set earlier. The SendEmail row contains all of the information that worker role B needs in order to send an email.
// Create the queue message.
string queueMessageString =
sendEmailRow.PartitionKey + "," +
sendEmailRow.RowKey + "," +
restartFlag;
var queueMessage = new CloudQueueMessage(queueMessageString);
sendEmailQueue.AddMessage(queueMessage); The CheckAndUpdateStatusIfComplete method checks messages that are in Processing status to see if all emails have been sent. If it finds no unsent emails, it updates the row status to Completed and archives the row.
private void CheckAndArchiveIfComplete(Message messageToCheck)
{
// Get the list of emails to be sent for this message: all SendEmail rows
// for this message.
string pkrkFilter = TableQuery.CombineFilters(
TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, messageToCheck.PartitionKey),
TableOperators.And,
TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.LessThan, "message"));
var query = new TableQuery<SendEmail>().Where(pkrkFilter);
var emailToBeSent = messageTable.ExecuteQuery(query).FirstOrDefault();
if (emailToBeSent != null)
{
return;
}
// All emails have been sent; copy the message row to the archive table.
// Insert the message row in the messagearchive table
var messageToDelete = new Message { PartitionKey = messageToCheck.PartitionKey, RowKey = messageToCheck.RowKey, ETag = "*" };
messageToCheck.Status = "Complete";
var insertOrReplaceOperation = TableOperation.InsertOrReplace(messageToCheck);
messagearchiveTable.Execute(insertOrReplaceOperation);
// Delete the message row from the message table.
var deleteOperation = TableOperation.Delete(messageToDelete);
messageTable.Execute(deleteOperation);
}