{"id":27,"date":"2014-06-28T00:23:10","date_gmt":"2014-06-28T00:23:10","guid":{"rendered":"http:\/\/looselycoupledlabs.com\/?p=27"},"modified":"2015-07-04T23:09:43","modified_gmt":"2015-07-04T23:09:43","slug":"masstransit-publish-subscribe-example","status":"publish","type":"post","link":"https:\/\/looselycoupledlabs.com\/2014\/06\/masstransit-publish-subscribe-example\/","title":{"rendered":"A Simple MassTransit Publish\/Subscribe Example"},"content":{"rendered":"
When I first sat down to learn how to use MassTransit<\/a>, I found it difficult to just get a simple example that published a message onto the bus with another process that subscribed to messages of the same type working. Hopefully, this primer will get you on the bus quicker.<\/p>\n The first thing you need is a message queuing framework. MassTransit supports MSMQ, RabbitMQ, and others, but I find that RabbitMQ is really the way to go. That\u2019s especially true when using the publish\/subscribe pattern. The reason for this is that RabbitMQ has a complete routing framework built-in and MassTransit will leverage this when persisting your subscriptions. When creating a cluster of RabbitMQ servers for availability, this routing information is replicated to all the nodes.<\/p>\n In this article, you\u2019re going to run RabbitMQ on your local Windows development box. Both our publisher and subscriber will connect to the same RabbitMQ instance. In a future post, I\u2019ll detail how to set up multiple RabbitMQ instances in a cluster.<\/p>\n RabbitMQ requires the Erlang runtime, so that\u2019s the first thing you need to download and install. Head over to Erlang.org\u2019s download page<\/a> and get the latest binary release for Windows (it\u2019s likely you\u2019ll want the 64-bit version). It\u2019s a simple setup wizard, so you\u2019ll have Erlang installed on your machine in short order.<\/p>\n Next, download the latest version of RabbitMQ<\/a> for Windows. Again, it\u2019s an easy setup wizard that you can quickly fly through. Just accept the defaults.<\/p>\n One RabbitMQ feature that I found extremely useful (but which isn\u2019t enabled by default) is the web-based management interface. With this, you can see the exchanges and queues that are set up by MassTransit in RabbitMQ. To enable this, find the \u201cRabbitMQ Command Prompt (sbin dir)\u201d item that the RabbitMQ installer added to your Start menu and launch it. From the command line, run the following command:<\/p>\n It will confirm that the plugin and its dependencies have been enabled and instruct you to restart RabbitMQ. When installed on Windows, RabbitMQ runs as a Windows service. You can use the Services MMC snap-in to restart it or just run the following command:<\/p>\n Now go to http:\/\/localhost:15672\/<\/a> to open the management console. Default credentials to login are guest\/guest (you can change the credentials from the Admin tab).<\/p>\n There\u2019s not much to see yet, but we\u2019ll set the stage. Go to the Exchanges tab. You\u2019ll see the following default RabbitMQ exchanges:<\/p>\n An exchange is something you can send messages to. It cannot hold messages. It\u2019s merely a set of routing instructions that tell RabbitMQ where to deliver the message. We\u2019ll come back here in a little while.<\/p>\n Now click on the Queues tab. Nothing here yet. Queues can actually hold messages and are where applications can actually pick up messages.<\/p>\n So, to use a real world analogy, an Exchange is like the local Post Office, and a Queue is like your mailbox. The only thing that an Exchange can do that most traditional Post Offices don\u2019t do is actually make multiple copies of a message to be delivered to multiple mailboxes.<\/p>\n I used Visual Studio 2013 to create this sample, but it should work in 2012 as well. You can get the entire source from: https:\/\/github.com\/dprothero\/MtPubSubExample<\/a><\/p>\n I like to use the concept of a \u201ccontract\u201d for my messages I want to put onto the service bus. This is an interface definition that both the publisher and subscriber have to agree upon. They don\u2019t need to know anything about the implementation of this interface on either side. To keep the publisher and subscriber as loosely coupled as possible, I like to put my contracts in their own assembly so that this is the only shared dependency.<\/p>\n So, the first step is to create a new solution called MtPubSubExample and a new class library called \u201cContracts\u201d. To the class library, add a single interface called \u201cSomethingHappened.\u201d<\/p>\n SomethingHappened will be the message interface we use for our sample message. Our publisher will create an instance of a class implementing SomethingHappened, set What and When properties, and publish it onto the service bus.<\/p>\n Our subscriber will then set up a subscription (aka Consumer) to listen for all messages of type SomethingHappened. MassTransit will call our Consumer class whenever a SomethingHappened message is received, and we can handle it as we wish, presumably inspecting the What and the When properties.<\/p>\n When you\u2019re writing a new project from scratch, you go through many permutations and refactor as you go. Initially, this example had the service bus setup code duplicated in both the publisher and subscriber projects. This is fine, particularly if you really aren\u2019t in a position to share much code between the two sides (except the contracts of course). However, in my case, I preferred to use a common class which I\u2019ll call \u201cBusInitializer\u201d to set up my instance to MassTransit and get it configured.<\/p>\n So, add another class library to the MtPubSubExample solution and name it \u201cConfiguration\u201d. Before creating our class, it\u2019s time to head to NuGet and pull in MassTransit. The quickest way to get everything you need is to find the MassTransit.RabbitMq package and install that. Doing so will install all of MassTransit and its dependencies.<\/p>\n You still need one more package. I found that MassTransit doesn\u2019t work unless you install one of the logging integration packages that are designed for it. For me, I selected the Log4Net integration package (MassTransit.Log4Net).<\/p>\n Now, create a new class called \u201cBusInitializer.\u201d<\/p>\n We\u2019re creating a static method called \u201cCreateBus,\u201d which both our publisher and subscriber can use to set up an instance of a bus, using the Log4NetLogger, and connect to a local RabbitMQ instance. Because there may be additional custom setup that the publisher or subscriber may want to do, we allow passing in a lambda expression to perform the additional setup.<\/p>\n We\u2019ll make the publisher a very simple console application that just prompts the user for some text and then publishes that text as part of a SomethingHappened message. Add a new Console Application project called \u201cTestPublisher\u201d to the solution and add a new class called \u201cSomethingHappenedMessage.\u201d This will be our concrete implementation of the SomethingHappened interface. You\u2019ll need to add a project reference to the Contracts (and add one to Configuration too, while you\u2019re at it).<\/p>\n Now, in the Main method of the Program.cs file in your Console Application, you can put in the code to set up the bus, prompt the user for text, and publish that text onto the bus. Real quick first, however, add a NuGet reference to the MassTransit package.<\/p>\n Pretty simple, huh? We put the input capture and message publishing in a loop to make it easy to send multiple messages. Just put a catch for the string \u201cquit\u201d so we can exit the publisher when we\u2019d like.<\/p>\n If you make TestPublisher the startup project of the solution and run it, right now you can publish messages all you like\u2026. However, nobody is listening yet!<\/p>\n If you go back into the RabbitMQ web interface and jump over to the Exchanges tab, you\u2019ll see we have a couple new arrivals.<\/p>\n Contracts:SomethingHappened is a new exchange created for the SomethingHappened message type. When we published this message, MassTransit automatically created this exchange. Click on it and scroll down to the Bindings section, and you\u2019ll see there are no bindings yet:<\/p>\n That\u2019s because nobody has subscribed to SomethingHappened messages yet. They go to the exchange and then die because there\u2019s no queue to route them to.<\/p>\n The MtPubSubExample_TestPublisher exchange (and corresponding queue on the Queues tab) were setup in our BusInitializer code. Our publisher isn\u2019t listening for messages sent to it, so this isn\u2019t really being used.<\/p>\n The final piece of the puzzle! Add another Console Application project to your solution and call it TestSubscriber. Again, add project references to Contracts and Configuration and then add the MassTransit NuGet package.<\/p>\n The first thing we need is a Consumer class to consume the SomethingHappened messages. Add a new class to the console app and call it \u201cSomethingHappenedConsumer.\u201d<\/p>\n This consumer class implements a specific MassTransit interface whose Consume method will be called with the message context and SomethingHappened message each time a message is received. Here we are simply writing the message out to the console.<\/p>\n Finally, in the Main method of Program.cs, we can initialize the bus and, as part of the initialization, instruct MassTransit that we wish to subscribe to messages of type SomethingHappened.<\/p>\n Now right-click on the MtPubSubExample solution in the solution explorer and choose \u201cSet Startup Projects\u2026.\u201d From here, choose the Multiple startup projects option and set the Action for both TestPublisher and TestSubscriber to Start. Now when you run your solution, both the publisher and subscriber will run.<\/p>\n Type some messages into the publisher. You should see them show up immediately in the subscriber window!<\/p>\n Now close just<\/em> the Subscriber sample window and publish a few more messages in the Publisher window.<\/p>\n Go ahead and close the Publisher window for now. Let\u2019s take a deeper look at where those three messages went.<\/p>\n Go back into the RabbitMQ web interface and go back to the Exchanges tab. You\u2019ll see a new exchange called MtPubSubExample_TestSubscriber, but first click on the Contracts:SomethingHappened exchange and scroll down to the Bindings section. You\u2019ll see we now have a binding.<\/p>\n So, by creating a subscription from our TestSubscriber, MassTransit automatically set up this binding for us. Click on the MtPubSubExample_TestSubscriber here, and you\u2019ll see you\u2019re taken to the setup page for an exchange called MtPubSubExample_TestSubscriber. Scroll down to Bindings, and you\u2019ll see we\u2019re bound to a queue named the same as the exchange (in the binding diagrams, exchanges show up as rectangles with rounded corners, whereas queues have straight corners).<\/p>\n The web interface is great in how it shows the predecessor in addition to the successor in the path. Click the MtPubSubExample_TestSubscriber queue here, and you\u2019ll be taken to the queue setup page for that queue. If you haven\u2019t fired up the TestSubscriber app since we published those last three messages, you should see that there are three messages in the queue:<\/p>\n Fire up the TestSubscriber app, and you should see it process the three messages left in the queue.<\/p>\n Notice the timestamp from the message versus the timestamp of when the subscriber actually published the message. In this case, there was a 6-minute lag (the 6 minutes we were poking around in RabbitMQ before starting up the subscriber again).<\/p>\n Hopefully, this post was helpful in getting you off the ground with MassTransit. There\u2019s more to come. In upcoming posts, I\u2019ll dig into how to put this together in the real world. First, your publisher and subscriber are likely to live on separate machines, so we\u2019ll look at how to set up a RabbitMQ cluster to make that work. We\u2019ll set up an ASP.NET application that publishes event messages and then a Windows Service that will subscribe to the messages and log them to a data store. Let me know if you have other examples you would like to see.<\/p>\n Until then\u2026<\/p>\n","protected":false},"excerpt":{"rendered":" When I first sat down to learn how to use MassTransit, I found it difficult to just get a simple example that published a message onto the bus with another process that subscribed to messages of the same type working. Hopefully, this primer will get you on the bus quicker. Setting Up Your Environment The… Continue reading Setting Up Your Environment<\/h1>\n
Installing RabbitMQ<\/h2>\n
Enabling the RabbitMQ Web Management Interface<\/h2>\n
> rabbitmq-plugins enable rabbitmq_management<\/pre>\n
> net service stop RabbitMQ\r\n...\r\n> net service start RabbitMQ<\/pre>\n
<\/a><\/p>\n
Creating the Sample Applications<\/h1>\n
Creating a Contract<\/h2>\n
using System;\r\n\r\nnamespace Contracts\r\n{\r\n public interface SomethingHappened\r\n {\r\n string What { get; }\r\n DateTime When { get; }\r\n }\r\n}<\/pre>\n
Shared Configuration Setup Code<\/h2>\n
using MassTransit;\r\nusing MassTransit.BusConfigurators;\r\nusing MassTransit.Log4NetIntegration.Logging;\r\nusing System;\r\n\r\nnamespace Configuration\r\n{\r\n public class BusInitializer\r\n {\r\n public static IServiceBus CreateBus(string queueName, Action<ServiceBusConfigurator> moreInitialization)\r\n {\r\n Log4NetLogger.Use();\r\n var bus = ServiceBusFactory.New(x =>\r\n {\r\n x.UseRabbitMq();\r\n x.ReceiveFrom(\"rabbitmq:\/\/localhost\/MtPubSubExample_\" + queueName);\r\n moreInitialization(x);\r\n });\r\n\r\n return bus;\r\n }\r\n }\r\n}<\/pre>\n
Creating the Publisher<\/h2>\n
using Contracts;\r\nusing System;\r\n\r\nnamespace TestPublisher\r\n{\r\n class SomethingHappenedMessage : SomethingHappened\r\n {\r\n public string What { get; set; }\r\n public DateTime When { get; set; }\r\n }\r\n}<\/pre>\n
using Configuration;\r\nusing Contracts;\r\nusing System;\r\n\r\nnamespace TestPublisher\r\n{\r\n class Program\r\n {\r\n static void Main(string[] args)\r\n {\r\n var bus = BusInitializer.CreateBus(\"TestPublisher\", x => { });\r\n string text = \"\";\r\n\r\n while (text != \"quit\")\r\n {\r\n Console.Write(\"Enter a message: \");\r\n text = Console.ReadLine();\r\n\r\n var message = new SomethingHappenedMessage() { What = text, When = DateTime.Now };\r\n bus.Publish<SomethingHappened>(message, x => { x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent); });\r\n }\r\n\r\n bus.Dispose();\r\n }\r\n }\r\n}<\/pre>\n
What\u2019s Going on in RabbitMQ So Far?<\/h2>\n
<\/a><\/p>\n
<\/a><\/p>\n
Creating the Subscriber<\/h2>\n
using Contracts;\r\nusing MassTransit;\r\nusing System;\r\n\r\nnamespace TestSubscriber\r\n{\r\n class SomethingHappenedConsumer : Consumes<SomethingHappened>.Context\r\n {\r\n public void Consume(IConsumeContext<SomethingHappened> message)\r\n {\r\n Console.Write(\"TXT: \" + message.Message.What);\r\n Console.Write(\" SENT: \" + message.Message.When.ToString());\r\n Console.Write(\" PROCESSED: \" + DateTime.Now.ToString());\r\n Console.WriteLine(\" (\" + System.Threading.Thread.CurrentThread.ManagedThreadId.ToString() + \")\");\r\n }\r\n }\r\n}<\/pre>\n
using Configuration;\r\nusing MassTransit;\r\nusing System;\r\n\r\nnamespace TestSubscriber\r\n{\r\n class Program\r\n {\r\n static void Main(string[] args)\r\n {\r\n var bus = BusInitializer.CreateBus(\"TestSubscriber\", x =>\r\n {\r\n x.Subscribe(subs =>\r\n {\r\n subs.Consumer<SomethingHappenedConsumer>().Permanent();\r\n });\r\n });\r\n\r\n Console.ReadKey();\r\n\r\n bus.Dispose();\r\n }\r\n }\r\n}<\/pre>\n
<\/a><\/p>\n
<\/a><\/p>\n
<\/h2>\n
What\u2019s Going on in RabbitMQ Now?<\/h2>\n
<\/a><\/p>\n
<\/a><\/p>\n
<\/a><\/p>\n
<\/a><\/p>\n
<\/h1>\n
Wrap Up<\/h1>\n