{"id":150,"date":"2014-08-04T00:41:33","date_gmt":"2014-08-04T00:41:33","guid":{"rendered":"http:\/\/looselycoupledlabs.com\/?p=150"},"modified":"2014-08-04T02:34:55","modified_gmt":"2014-08-04T02:34:55","slug":"scaling-out-subscribers-with-masstransit","status":"publish","type":"post","link":"https:\/\/looselycoupledlabs.com\/2014\/08\/scaling-out-subscribers-with-masstransit\/","title":{"rendered":"Scaling Out Subscribers With MassTransit"},"content":{"rendered":"
So far on this blog, we\u2019ve been looking at the publish\/subscribe<\/a> messaging pattern using MassTransit<\/a> and RabbitMQ<\/a>. So far, we\u2019ve dealt with a single publisher and a single subscriber. We looked at how we can have those two roles live on separate servers<\/a>. Finally, we looked at how to handle errors<\/a> in the subscriber.<\/p>\n What happens, now, when your subscriber can\u2019t process messages as fast as the messages are being published on the bus? This is a special situation, to be sure, but it certainly is possible in high message volume environments. It can be compounded if the messages themselves are fairly expensive to process.<\/p>\n Let\u2019s take our simple publish\/subscribe example<\/a> and tweak it so we can have it dump a large number of messages onto the bus extremely quickly. Originally, the example prompted for a string and published that string as a single SomethingHappened message. Instead, let\u2019s prompt for a number of messages that should be put onto the bus.<\/p>\n We\u2019re using the System.Threading.Tasks.Parallel.For method to be able to simultaneously publish multiple messages onto the bus.<\/p>\n Now in our subscriber, let\u2019s have it simulate 250 milliseconds of processing time with a call to System.Threading.Thread.Sleep. Also, because MassTransit will run 4 threads per CPU for our consumer and the messages will be flying in, we\u2019ll condense our Console output to a single WriteLine call instead of multiple calls to Write so as to avoid the output from multiple messages getting jumbled together.<\/p>\n The test code for this example can be found on github<\/a>.<\/p>\n Make sure both TestPublisher and TestSubscriber are set up as startup projects and run the project in Visual Studio. Try publishing 1,000 messages:<\/p>\n As you could probably see, the prompt on the publisher returned well before the subscriber finished processing the messages. This means we were able to publish messages to the bus faster that we could process them. This could be a problem if you don\u2019t expect any \u201clulls\u201d in publishing which would allow the subscriber to catch up.<\/p>\n We can further illustrate the backlog by looking at the graph for the MtPubSubExample_TestSubscriber queue in the RabbitMQ management interface (found at http:\/\/localhost:15672\/<\/a> \u2013 see this post<\/a> for details). You have to have the interface up and be watching the graph while your publisher\/subscriber test is actually running:<\/p>\n Here you can see that the publisher hit a peak of 200 messages per second, while the subscriber sustained a steady rate of about 40 messages per second.<\/p>\n With a spike and then nothing for a time, perhaps slow and steady wins the race for our subscriber. Try 10,000 messages and watch the RabbitMQ graphs:<\/p>\n This more dramatically illustrates the problem. The number of queued messages (top graph) is continuing to go up with no relief in sight. And the bottom graph shows we\u2019re publishing messages at a rate of 259 per second, but we only process them at a rate of around 40 per second. Again, since the publish storm eventually passes, the subscriber does eventually<\/em> catch up.<\/p>\n Let\u2019s look at a couple ways we can increase the throughput of our subscriber.<\/p>\n If you do the math on the rate of 40 messages per second that we observe, you will arrive at what appears to be 10 simultaneous threads processing messages (each message takes a quarter of a second). However, the default number of threads that MassTransit can use for consumers is actually the number of processors in your machine multiplied by 4. So, on my 8 core machine, that would be 32 threads. Why are we only observing 10?<\/p>\n The reason is due to the number of messages the RabbitMQ transport will \u201cprefetch\u201d from the queue. The default for this is 10, so we can only process 10 messages simultaneously. To increase this, you include a \u201cprefetch=X\u201d parameter in the query string of the queue URL. For example:<\/p>\n Now that this is set to 32 to match the maximum thread of 32, we should observe a 128 message per second processing rate.<\/p>\n We can also tell MassTransit to allow more threads to be used to consume messages. You put a call to SetConcurrentConsumerLimit in your bus initialization code. Below we bump the thread count to 64 (doubling the number of threads):<\/p>\n Don\u2019t forget to also increase your prefetch setting (see option 1 above) to match. Now we\u2019re processing 256 messages per second! That\u2019s pretty close to our 259 per second we observed being published onto the bus.<\/p>\n However, at some point, your machine is going to run out of processing power. Perhaps it already has. We\u2019re just sleeping the thread here for 250ms, so the ceiling is pretty high on how many threads we could run, but if there was real processing happening, we might be maxing out the CPU on the machine. As any good architect knows, don\u2019t scale up, scale out!<\/p>\n Try dumping another 10,000 messages onto the bus. While you\u2019ve got one subscriber running, you can simply execute another instance of the TestSubscriber executable and it will start processing messages too, effectively doubling your processing rate!<\/p>\n Having multiple subscribers connected to the same<\/em> RabbitMQ queue is what\u2019s called the \u201ccompeting consumer\u201d pattern. RabbitMQ will make sure each consumer gets unique messages in essentially a round-robin fashion.<\/p>\n Again, however, if we\u2019re already maxing out the CPU on the subscriber machine, what we really need is to run another subscriber on another<\/em> machine with its own available resources. If we use our current example code, however, we will<\/em> get duplicate messages because each machine is connecting to localhost for its RabbitMQ instance. (Don\u2019t forget to make sure all the RabbitMQ instances are in a cluster<\/a>.) Since each instance will have its own queue, then MassTransit will treat each queue as a unique consumer as opposed to competing consumers. Each queue will get a copy of the same message routed to it. Remember this diagram from our clustering article illustrating how each machine has its own RabbitMQ instance:<\/p>\n Clearly, this is not what we want. In order to be competing consumers, the two subscriber processes must be connected to the same<\/em> RabbitMQ instance and queue. What we need is an architecture more like the following:<\/p>\n Obviously, this makes the RabbitMQ server a single point of failure and a dependency for the two subscriber machines. If high availability is a requirement, then you would need to look into some type of virtual IP address based clustering (like keepalived<\/a> on Linux or NLB<\/a> on Windows). You will also need to implement highly available queues<\/a> in RabbitMQ so that the queues are replicated across your multiple instances.<\/p>\n Obviously, the first step is to install RabbitMQ on a new server. We\u2019ll call this machine \u201csubmaster\u201d (for subscription master). Instructions for installing RabbitMQ can be found in this blog post<\/a>. Then, join the RabbitMQ instance on submaster to a cluster with the RabbitMQ instance on your publisher machine. Instructions for creating a RabbitMQ cluster can be found in this blog post<\/a>. We should have these nodes in our RabbitMQ cluster:<\/p>\n In order to connect to a remote RabbitMQ instance, we need to do some security housekeeping. First, we need to modify the Windows Firewall on the submaster machine to allow in the default RabbitMQ port of 5672. Next, we need to create a new user in RabbitMQ that the subscriber can use to login. We\u2019ll call it \u201ctestsubscriber\u201d and give it a password of \u201ctest\u201d. On the Admin tab of the RabbitMQ management interface, you can begin adding a new user:<\/p>\n Type in the username, password, administrator tag, and click Add user. Initially, the user won\u2019t have any permissions:<\/p>\n Click on the testsubscriber user and then click \u201cSet permission\u201d as seen here:<\/p>\n Now we need to modify our Configuration.BusInitializer class to be able to connect to a specific machine name instead of hard-coding localhost as well as utilize our username and password. We\u2019ll have it read these items from our App.config. Remember, our publisher can still use localhost (which doesn\u2019t require username\/password), but our subscriber needs to connect to the submaster machine with some credentials.<\/p>\n First, add a reference to System.Configuration to the Configuration project. Then modify the BusInitializer class to allow reading the machine name, username, and password from configuration:<\/p>\n Since we\u2019re only going to deviate from the default of localhost on our subscriber, open the TestSubscriber project and add the following lines into the App.config:<\/p>\n On my dev machine, I fired up one instance of TestSubscriber with the above configuration. Then, on the publisher machine, I started up both TestPublisher and TestSubscriber. Here\u2019s it running after pushing 100,000 messages onto the bus:<\/p>\n Lots of blinking lights. The more interesting thing is to observe the messages processed per second in RabbitMQ:<\/p>\n So now you can see how it would be possible to scale out your message processing. Perhaps in a future post, we\u2019ll take a look at leveraging the cloud. It should be possible to monitor the number of messages in your queue and spin up new cloud workers to pick up the slack and then shut them down when the queue quiets back down. Until then\u2026<\/p>\n","protected":false},"excerpt":{"rendered":" So far on this blog, we\u2019ve been looking at the publish\/subscribe messaging pattern using MassTransit and RabbitMQ. So far, we\u2019ve dealt with a single publisher and a single subscriber. We looked at how we can have those two roles live on separate servers. Finally, we looked at how to handle errors in the subscriber. What… Continue reading Open the Flood Gates<\/h1>\n
using Configuration;\r\nusing Contracts;\r\nusing System;\r\nusing System.Threading.Tasks;\r\n\r\nnamespace TestPublisher\r\n{\r\n class Program\r\n {\r\n static void Main(string[] args)\r\n {\r\n var bus = BusInitializer.CreateBus(\"TestPublisher\", x => { });\r\n string text = \"\";\r\n\r\n while (text != \"quit\")\r\n {\r\n Console.Write(\"Enter number of messages to generate (quit to exit): \");\r\n text = Console.ReadLine();\r\n\r\n int numMessages = 0;\r\n if (int.TryParse(text, out numMessages) && numMessages > 0)\r\n {\r\n Parallel.For(0, numMessages, i =>\r\n {\r\n var message = new SomethingHappenedMessage() { What = \"message \" + i.ToString(), When = DateTime.Now };\r\n bus.Publish<SomethingHappened>(message, x => { x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent); });\r\n });\r\n }\r\n else if(text != \"quit\")\r\n {\r\n Console.WriteLine(\"\\\"\" + text + \"\\\" is not a number.\");\r\n }\r\n }\r\n\r\n bus.Dispose();\r\n }\r\n }\r\n}\r\n<\/pre>\n
using Contracts;\r\nusing MassTransit;\r\nusing System;\r\nusing System.Threading;\r\n\r\nnamespace TestSubscriber\r\n{\r\n class SomethingHappenedConsumer : Consumes<SomethingHappened>.Context\r\n {\r\n public void Consume(IConsumeContext<SomethingHappened> message)\r\n {\r\n Console.WriteLine(\"TXT: \" + message.Message.What +\r\n \" SENT: \" + message.Message.When.ToString() +\r\n \" PROCESSED: \" + DateTime.Now.ToString() + \r\n \" (\" + System.Threading.Thread.CurrentThread.ManagedThreadId.ToString() + \")\");\r\n\r\n \/\/ Simulate processing time\r\n Thread.Sleep(250);\r\n }\r\n }\r\n}\r\n<\/pre>\n
Running the Test<\/h2>\n
<\/a><\/p>\n
<\/a><\/p>\n
<\/a><\/p>\n
Option 1: Increase the Prefetch Count<\/h2>\n
x.UseRabbitMq();\r\nx.ReceiveFrom(\"rabbitmq:\/\/localhost\/MtPubSubExample_\" + queueName + \"?prefetch=32\");\r\n<\/pre>\n
Option 2: Increase the Thread Count<\/h2>\n
using Configuration;\r\nusing MassTransit;\r\nusing System;\r\n\r\nnamespace TestSubscriber\r\n{\r\n class Program\r\n {\r\n static void Main(string[] args)\r\n {\r\n var bus = BusInitializer.CreateBus(\"TestSubscriber\", x =>\r\n {\r\n x.SetConcurrentConsumerLimit(64);\r\n x.Subscribe(subs =>\r\n {\r\n subs.Consumer<SomethingHappenedConsumer>().Permanent();\r\n });\r\n });\r\n\r\n Console.ReadKey();\r\n\r\n bus.Dispose();\r\n }\r\n }\r\n}\r\n<\/pre>\n
Option 3: Run More Subscribers<\/h2>\n
<\/p>\n
<\/a><\/p>\n
Implementing the Centralized RabbitMQ Server<\/h3>\n
<\/a><\/p>\n
<\/a><\/p>\n
<\/a><\/p>\n
<\/a><\/p>\n
using MassTransit;\r\nusing MassTransit.BusConfigurators;\r\nusing MassTransit.Log4NetIntegration.Logging;\r\nusing System;\r\nusing System.Configuration;\r\n\r\nnamespace Configuration\r\n{\r\n public class BusInitializer\r\n {\r\n public static IServiceBus CreateBus(string queueName, Action<ServiceBusConfigurator> moreInitialization)\r\n {\r\n Log4NetLogger.Use();\r\n var bus = ServiceBusFactory.New(x =>\r\n {\r\n var serverName = GetConfigValue(\"rabbitmq-server-name\", \"localhost\");\r\n var userName = GetConfigValue(\"rabbitmq-username\", \"\");\r\n var password = GetConfigValue(\"rabbitmq-password\", \"\");\r\n var queueUri = \"rabbitmq:\/\/\" + serverName + \"\/MtPubSubExample_\" + queueName + \"?prefetch=64\";\r\n\r\n if (userName != \"\")\r\n {\r\n x.UseRabbitMq(r =>\r\n {\r\n r.ConfigureHost(new Uri(queueUri), h =>\r\n {\r\n h.SetUsername(userName);\r\n h.SetPassword(password);\r\n });\r\n });\r\n }\r\n else\r\n x.UseRabbitMq();\r\n\r\n x.ReceiveFrom(queueUri);\r\n moreInitialization(x);\r\n });\r\n\r\n return bus;\r\n }\r\n\r\n private static string GetConfigValue(string key, string defaultValue)\r\n {\r\n string value = ConfigurationManager.AppSettings[key];\r\n return string.IsNullOrEmpty(value) ? defaultValue : value;\r\n }\r\n }\r\n}\r\n<\/pre>\n
<?xml version=\"1.0\" encoding=\"utf-8\" ?>\r\n<configuration>\r\n <startup>\r\n <supportedRuntime version=\"v4.0\" sku=\".NETFramework,Version=v4.5\" \/>\r\n <\/startup>\r\n\r\n <appSettings>\r\n <add key=\"rabbitmq-server-name\" value=\"submaster\" \/>\r\n <add key=\"rabbitmq-username\" value=\"testsubscriber\" \/>\r\n <add key=\"rabbitmq-password\" value=\"test\" \/>\r\n <\/appSettings>\r\n<\/configuration>\r\n<\/pre>\n
Running the Test<\/h3>\n
<\/a><\/p>\n
<\/a><\/p>\n
Wrap Up<\/h1>\n