Open WorkerRoleB.cs and examine the code.
As you already saw in worker role A, the OnStart method initializes the context classes that you need in order to work with Windows Azure storage entities. It also makes sure that all of the tables, queues, and blob containers you need in the Run method exist.
The difference compared to worker role A is the addition of the blob container and the subscribe queue among the resources to create if they don't already exist. You'll use the blob container to get the files that contain the HTML and plain text for the email body. The subscribe queue is used for sending subscription confirmation emails.
public override bool OnStart()
{
ServicePointManager.DefaultConnectionLimit = Environment.ProcessorCount;
// Read storage account configuration settings
ConfigureDiagnostics();
Trace.TraceInformation("Initializing storage account in worker role B");
var storageAccount = CloudStorageAccount.Parse(RoleEnvironment.GetConfigurationSettingValue("StorageConnectionString"));
// Initialize queue storage
Trace.TraceInformation("Creating queue client.");
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
this.sendEmailQueue = queueClient.GetQueueReference("azuremailqueue");
this.subscribeQueue = queueClient.GetQueueReference("azuremailsubscribequeue");
// Initialize blob storage
CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();
this.blobContainer = blobClient.GetContainerReference("azuremailblobcontainer");
// Initialize table storage
var tableClient = storageAccount.CreateCloudTableClient();
tableServiceContext = tableClient.GetDataServiceContext();
Trace.TraceInformation("WorkerB: Creating blob container, queue, tables, if they don't exist.");
this.blobContainer.CreateIfNotExists();
this.sendEmailQueue.CreateIfNotExists();
this.subscribeQueue.CreateIfNotExists();
var messageTable = tableClient.GetTableReference("Message");
messageTable.CreateIfNotExists();
var mailingListTable = tableClient.GetTableReference("MailingList");
mailingListTable.CreateIfNotExists();
return base.OnStart();
} The Run method processes work items from two queues: the queue used for messages sent to email lists (work items created by worker role A), and the queue used for subscription confirmation emails (work items created by the subscribe API method in the MvcWebRole project).
public override void Run()
{
CloudQueueMessage msg = null;
Trace.TraceInformation("WorkerRoleB start of Run()");
while (true)
{
try
{
bool messageFound = false;
// If OnStop has been called, return to do a graceful shutdown.
if (onStopCalled == true)
{
Trace.TraceInformation("onStopCalled WorkerRoleB");
returnedFromRunMethod = true;
return;
}
// Retrieve and process a new message from the send-email-to-list queue.
msg = sendEmailQueue.GetMessage();
if (msg != null)
{
ProcessQueueMessage(msg);
messageFound = true;
}
// Retrieve and process a new message from the subscribe queue.
msg = subscribeQueue.GetMessage();
if (msg != null)
{
ProcessSubscribeQueueMessage(msg);
messageFound = true;
}
if (messageFound == false)
{
System.Threading.Thread.Sleep(1000 * 60);
}
}
catch (Exception ex)
{
string err = ex.Message;
if (ex.InnerException != null)
{
err += " Inner Exception: " + ex.InnerException.Message;
}
if (msg != null)
{
err += " Last queue message retrieved: " + msg.AsString;
}
Trace.TraceError(err);
// Don't fill up Trace storage if we have a bug in either process loop.
System.Threading.Thread.Sleep(1000 * 60);
}
}
} This code runs in an infinite loop until the worker role is shut down. If a work item is found in the main queue, the code processes it and then checks the subscribe queue.
// Retrieve and process a new message from the send-email-to-list queue.
msg = this.sendEmailQueue.GetMessage();
if (msg != null)
{
ProcessQueueMessage(msg);
messageFound = true;
}
// Retrieve and process a new message from the subscribe queue.
msg = this.subscribeQueue.GetMessage();
if (msg != null)
{
ProcessSubscribeQueueMessage(msg);
messageFound = true;
} If nothing is waiting in either queue, the code sleeps 60 seconds before continuing with the loop.
if (messageFound == false)
{
System.Threading.Thread.Sleep(1000 * 60);
} The purpose of the sleep time is to minimize Windows Azure Storage transaction costs, as explained in the previous tutorial.
When a queue item is pulled from the queue by the GetMessage method, that queue item becomes invisible for 30 seconds to all other worker and web roles accessing the queue. This is what ensures that only one worker role instance will pick up any given queue message for processing. You can explicitly set this exclusive lease time (the time the queue item is invisible) by passing a visibility timeout parameter to the GetMessage method. If the worker role could take more than 30 seconds to process a queue message, you should increase the exclusive lease time to prevent other role instances from processing the same message.
On the other hand, you don't want to set the exclusive lease time to an excessively large value. For example, if the exclusive lease time is set to 48 hours and your worker role unexpectedly shuts down after dequeuing a message, another worker role would not be able to process the message for 48 hours. The exclusive lease maximum is 7 days.
The GetMessages method (notice the "s" at the end of the name) can be used to pull up to 32 messages from the queue in one call. Each queue access incurs a small transaction cost, and the transaction cost is the same whether 32 messages are returned or zero messages are returned. The following code fetches up to 32 messages in one call and then processes them.
foreach (CloudQueueMessage msg in sendEmailQueue.GetMessages(32))
{
ProcessQueueMessage(msg);
messageFound = true;
} When using GetMessages to remove multiple messages, be sure the visibility timeout gives your application enough time to process all the messages. Once the visibility timeout expires, other role instances can access the message, and once they do, the first instance will not be able to delete the message when it finishes processing the work item.
The Run method calls ProcessQueueMessage when it finds a work item in the main queue:
private void ProcessQueueMessage(CloudQueueMessage msg)
{
// Log and delete if this is a "poison" queue message (repeatedly processed
// and always causes an error that prevents processing from completing).
// Production applications should move the "poison" message to a "dead message"
// queue for analysis rather than deleting the message.
if (msg.DequeueCount > 5)
{
Trace.TraceError("Deleting poison message: message {0} Role Instance {1}.",
msg.ToString(), GetRoleInstance());
sendEmailQueue.DeleteMessage(msg);
return;
}
// Parse message retrieved from queue.
// Example: 2012-01-01,0123456789email@domain.com,0
var messageParts = msg.AsString.Split(new char[] { ',' });
var partitionKey = messageParts[0];
var rowKey = messageParts[1];
var restartFlag = messageParts[2];
Trace.TraceInformation("ProcessQueueMessage start: partitionKey {0} rowKey {1} Role Instance {2}.",
partitionKey, rowKey, GetRoleInstance());
// If this is a restart, verify that the email hasn't already been sent.
if (restartFlag == "1")
{
var retrieveOperationForRestart = TableOperation.Retrieve<SendEmail>(partitionKey, rowKey);
var retrievedResultForRestart = messagearchiveTable.Execute(retrieveOperationForRestart);
var messagearchiveRow = retrievedResultForRestart.Result as SendEmail;
if (messagearchiveRow != null)
{
// SendEmail row is in archive, so email is already sent.
// If there's a SendEmail Row in message table, delete it,
// and delete the queue message.
Trace.TraceInformation("Email already sent: partitionKey=" + partitionKey + " rowKey= " + rowKey);
var deleteOperation = TableOperation.Delete(new SendEmail { PartitionKey = partitionKey, RowKey = rowKey, ETag = "*" });
try
{
messageTable.Execute(deleteOperation);
}
catch
{
}
sendEmailQueue.DeleteMessage(msg);
return;
}
}
// Get the row in the Message table that has data we need to send the email.
var retrieveOperation = TableOperation.Retrieve<SendEmail>(partitionKey, rowKey);
var retrievedResult = messageTable.Execute(retrieveOperation);
var emailRowInMessageTable = retrievedResult.Result as SendEmail;
if (emailRowInMessageTable == null)
{
Trace.TraceError("SendEmail row not found: partitionKey {0} rowKey {1} Role Instance {2}.",
partitionKey, rowKey, GetRoleInstance());
return;
}
// Derive blob names from the MessageRef.
var htmlMessageBodyRef = emailRowInMessageTable.MessageRef + ".htm";
var textMessageBodyRef = emailRowInMessageTable.MessageRef + ".txt";
// If the email hasn't already been sent, send email and archive the table row.
if (emailRowInMessageTable.EmailSent != true)
{
SendEmailToList(emailRowInMessageTable, htmlMessageBodyRef, textMessageBodyRef);
var emailRowToDelete = new SendEmail { PartitionKey = partitionKey, RowKey = rowKey, ETag = "*" };
emailRowInMessageTable.EmailSent = true;
var upsertOperation = TableOperation.InsertOrReplace(emailRowInMessageTable);
messagearchiveTable.Execute(upsertOperation);
var deleteOperation = TableOperation.Delete(emailRowToDelete);
messageTable.Execute(deleteOperation);
}
// Delete the queue message.
sendEmailQueue.DeleteMessage(msg);
Trace.TraceInformation("ProcessQueueMessage complete: partitionKey {0} rowKey {1} Role Instance {2}.",
partitionKey, rowKey, GetRoleInstance());
} Poison messages are those that cause the application to throw an exception when they are processed. If a message has been pulled from the queue more than five times, we assume that it cannot be processed and remove it from the queue so that we don't keep trying to process it. Production applications should consider moving the poison message to a "dead message" queue for analysis rather than deleting the message.
The code parses the queue message into the partition key and row key needed to retrieve the SendEmail row, and a restart flag.
var messageParts = msg.AsString.Split(new char[] { ',' });
var partitionKey = messageParts[0];
var rowKey = messageParts[1];
var restartFlag = messageParts[2]; If processing for this message has been restarted after an unexpected shut down, the code checks the messagearchive table to determine if this email has already been sent. If it has already been sent, the code deletes the SendEmail row if it exists and deletes the queue message.
if (restartFlag == "1")
{
var retrieveOperationForRestart = TableOperation.Retrieve<SendEmail>(partitionKey, rowKey);
var retrievedResultForRestart = messagearchiveTable.Execute(retrieveOperationForRestart);
var messagearchiveRow = retrievedResultForRestart.Result as SendEmail;
if (messagearchiveRow != null)
{
Trace.TraceInformation("Email already sent: partitionKey=" + partitionKey + " rowKey= " + rowKey);
var deleteOperation = TableOperation.Delete(new SendEmail { PartitionKey = partitionKey, RowKey = rowKey, ETag = "*" });
try
{
messageTable.Execute(deleteOperation);
}
catch
{
}
sendEmailQueue.DeleteMessage(msg);
return;
}
} Next, we get the SendEmail row from the message table. This row has all of the information needed to send the email, except for the blobs that contain the HTML and plain text body of the email.
var retrieveOperation = TableOperation.Retrieve<SendEmail>(partitionKey, rowKey);
var retrievedResult = messageTable.Execute(retrieveOperation);
var emailRowInMessageTable = retrievedResult.Result as SendEmail;
if (emailRowInMessageTable == null)
{
Trace.TraceError("SendEmail row not found: partitionKey {0} rowKey {1} Role Instance {2}.",
partitionKey, rowKey, GetRoleInstance());
return;
} Then the code sends the email and archives the SendEmail row.
if (emailRowInMessageTable.EmailSent != true)
{
SendEmailToList(emailRowInMessageTable, htmlMessageBodyRef, textMessageBodyRef);
var emailRowToDelete = new SendEmail { PartitionKey = partitionKey, RowKey = rowKey, ETag = "*" };
emailRowInMessageTable.EmailSent = true;
var upsertOperation = TableOperation.InsertOrReplace(emailRowInMessageTable);
messagearchiveTable.Execute(upsertOperation);
var deleteOperation = TableOperation.Delete(emailRowToDelete);
messageTable.Execute(deleteOperation);
} Moving the row to the messagearchive table can't be done in a transaction because it affects multiple tables.
Finally, if everything else is successful, the queue message is deleted.
sendEmailQueue.DeleteMessage(msg);
The actual work of sending the email by using SendGrid is done by the SendEmailToList method. If you want to use a different service than SendGrid, all you have to do is change the code in this method.
Note: If you have invalid credentials in the project settings, the call to SendGrid will fail but the application will not get any indication of the failure. If you use SendGrid in a production application, consider setting up separate credentials for the web API in order to avoid causing silent failures when an administrator changes his or her SendGrid user account password. For more information, see SendGrid MultiAuth - Multiple Account Credentials. You can set up credentials at https://sendgrid.com/credentials.
private void SendEmailToList(string emailAddress, string fromEmailAddress, string subjectLine,
string htmlMessageBodyRef, string textMessageBodyRef)
{
var email = SendGrid.GenerateInstance();
email.From = new MailAddress(fromEmailAddress);
email.AddTo(emailAddress);
email.Html = GetBlobText(htmlMessageBodyRef);
email.Text = GetBlobText(textMessageBodyRef);
email.Subject = subjectLine;
var credentials = new NetworkCredential(RoleEnvironment.GetConfigurationSettingValue("SendGridUserName"),
RoleEnvironment.GetConfigurationSettingValue("SendGridPassword"));
var transportREST = REST.GetInstance(credentials);
transportREST.Deliver(email);
}
private string GetBlobText(string blogRef)
{
var blob = blobContainer.GetBlockBlobReference(blogRef);
blob.FetchAttributes();
var blobSize = blob.Properties.Length;
using (var memoryStream = new MemoryStream((int)blobSize))
{
blob.DownloadToStream(memoryStream);
return System.Text.Encoding.UTF8.GetString(memoryStream.ToArray());
}
} In the GetBlobText method, the code gets the blob size and then uses that value to initialize the MemoryStream object for performance reasons. If you don't provide the size, what the MemoryStream does is allocate 256 bytes, then when the download exceeds that, it allocates 512 more bytes, and so on, doubling the amount allocated each time. For a large blob this process would be inefficient compared to allocating the correct amount at the start of the download.
The Run method calls ProcessSubscribeQueueMessage when it finds a work item in the subscribe queue:
private void ProcessSubscribeQueueMessage(CloudQueueMessage msg)
{
// Log and delete if this is a "poison" queue message (repeatedly processed
// and always causes an error that prevents processing from completing).
// Production applications should move the "poison" message to a "dead message"
// queue for analysis rather than deleting the message.
if (msg.DequeueCount > 5)
{
Trace.TraceError("Deleting poison subscribe message: message {0}.",
msg.AsString, GetRoleInstance());
subscribeQueue.DeleteMessage(msg);
return;
}
// Parse message retrieved from queue. Message consists of
// subscriber GUID and list name.
// Example: 57ab4c4b-d564-40e3-9a3f-81835b3e102e,contoso1
var messageParts = msg.AsString.Split(new char[] { ',' });
var subscriberGUID = messageParts[0];
var listName = messageParts[1];
Trace.TraceInformation("ProcessSubscribeQueueMessage start: subscriber GUID {0} listName {1} Role Instance {2}.",
subscriberGUID, listName, GetRoleInstance());
// Get subscriber info.
string filter = TableQuery.CombineFilters(
TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, listName),
TableOperators.And,
TableQuery.GenerateFilterCondition("SubscriberGUID", QueryComparisons.Equal, subscriberGUID));
var query = new TableQuery<Subscriber>().Where(filter);
var subscriber = mailingListTable.ExecuteQuery(query).ToList().Single();
// Get mailing list info.
var retrieveOperation = TableOperation.Retrieve<MailingList>(subscriber.ListName, "mailinglist");
var retrievedResult = mailingListTable.Execute(retrieveOperation);
var mailingList = retrievedResult.Result as MailingList;
SendSubscribeEmail(subscriberGUID, subscriber, mailingList);
subscribeQueue.DeleteMessage(msg);
Trace.TraceInformation("ProcessSubscribeQueueMessage complete: subscriber GUID {0} Role Instance {1}.",
subscriberGUID, GetRoleInstance());
} This method performs the following tasks:
- If the message is a "poison" message, logs and deletes it.
- Gets the subscriber GUID from the queue message.
- Uses the GUID to get subscriber information from the MailingList table.
- Sends a confirmation email to the new subscriber.
- Deletes the queue message.
As with emails sent to lists, the actual sending of the email is in a separate method, making it easy for you to change to a different email service if you want to do that.
private static void SendSubscribeEmail(string subscriberGUID, Subscriber subscriber, MailingList mailingList)
{
var email = SendGrid.GenerateInstance();
email.From = new MailAddress(mailingList.FromEmailAddress);
email.AddTo(subscriber.EmailAddress);
string subscribeURL = RoleEnvironment.GetConfigurationSettingValue("AzureMailServiceURL") +
"/subscribe?id=" + subscriberGUID + "&listName=" + subscriber.ListName;
email.Html = String.Format("<p>Click the link below to subscribe to {0}. " +
"If you don't confirm your subscription, you won't be subscribed to the list.</p>" +
"<a href=\"{1}\">Confirm Subscription</a>", mailingList.Description, subscribeURL);
email.Text = String.Format("Copy and paste the following URL into your browser in order to subscribe to {0}. " +
"If you don't confirm your subscription, you won't be subscribed to the list.\n" +
"{1}", mailingList.Description, subscribeURL);
email.Subject = "Subscribe to " + mailingList.Description;
var credentials = new NetworkCredential(RoleEnvironment.GetConfigurationSettingValue("SendGridUserName"), RoleEnvironment.GetConfigurationSettingValue("SendGridPassword"));
var transportREST = REST.GetInstance(credentials);
transportREST.Deliver(email);
}