Communicating microservices with RabbitMQ

Posted By : Utkarsh Audichya | 29-Sep-2021

Microservices python

Loading...

We are going to use RabbitMQ to enable this communication. RabbitMQ is a messaging service that allows microservices to communicate with one another.

It offers the following advantages:

  • Order and delivery guarantee
  • Redundancy
  • Scalability

RabbitMQ works by having publishers/producers create messages and send them to an exchange with a routing-key. Consumers then call and receive the messages and process them.

We are using pike library for connecting RabbitMQ (just run `pip3 install pika`)

Consider a scenario where an API contains details about different blogs and their associated number of likes. We also have another API that handles the liking of a blog such that a user can only like a blog once and when he does the number of blogs is incremented.

We will create a new directory called blogger to keep our projects. We are going to first create a project called blogger that will contain our blogs. We will also create another project Likes that handles liking.

Create a new directory called blogger. cd into your Projects directory, run the following commands:

django-admin startproject blogger
django-admin startproject likes

Your directory structure should now look as follows:

Blogger project
Let’s create an app called blogs that we will build our blogs API with. Cd into blogs and run python3 manage.py startapp blogs.

Then add the following to your INSTALLED APPS in the settings.py file:

blogger/settings.py

'blogs',
'rest_framework',
'corsheaders'

You need to have the Django REST framework and django-cors-headers installed, You can run
pip3 install djangorestframework
pip3 install django-cors-headers

Add the code snippet 'corsheaders.middleware.CorsMiddleware' in MIDDLEWARE and set CORS_ORIGIN_ALLOW_ALL = True so as to allow other sites we are going to use to make cross-origin requests to our application. This however is not recommended as it gives all websites access to your website.

We can proceed to create our model.

Change your models.py file to look like this:

blogs/models.py

from django.db import models
from django.contrib.auth.models import User

# Create your models here.
class Blogs(models.Model):
author = models.ForeignKey(User, on_delete=models.CASCADE, related_name='blogs')
title = models.CharField(max_length=255)
body = models.TextField()
status = models.CharField(max_length=255, choices=(('publish', 'Publish'), ('draft', 'Draft')))
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
slug = models.SlugField()

likes = models.PositiveIntegerField(default=0)

def __dir__(self):
return self.title

def save(self):
self.slug = '-'.join(title.split(' '))
super().save()

Create a serializers.py file to serialize and deserialize the blog instances.

Add the following code to it:

blogs/serializer.py

from rest_framework import serializers
from .models import Blogs
from django.contrib.auth.models import User

class BlogsSerializer(serializers.ModelSerializer):
class Meta:
model = Blogs
fields = '__all__'

class UserSerializer(serializerms.ModelSerializer):
class Meta:
model = User
exclude = ('password', )


Let’s create some views to render data to a web page. There happens to be several ways for creating views and our views.py file will have two for a start.

Edit your views.py file to this:

blogs/views.py

from django.http import Http404
from django.shortcuts import render
import random
from rest_framework import viewsets, status
from rest_framework.response import Response
from rest_framework.views import APIView
from .models import Blogs
from .serializers import UserSerializer, BlogsSerializer

# Create your views here.
class BlogsViewset(viewsets.ViewSet):
def list(self, request):
blogs = Blogs.objects.all()
serializer = BlogsSerializer(blogs, many=True)
return Response(serializer.data)

def create(self, request):
serializer = BlogsSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
serializer.save()
return Response(serializer.data, status=status.HTTP_201_CREATED)

def retrieve(self, request, pk=None):
blog = Blogs.objects.get(pk=pk)
serializer = BlogsSerializer(blog)
return Response(serializer.data)

def update(self, request, pk=None):
blog = Blogs.objects.get(pk=pk)
serializer = BlogsSerializer(instance=blog, data=request.data)
serializer.is_valid(raise_exception=True)
serializer.save()
return Response(serializer.data)

def destroy(self, request, pk=None):
blog = Blogs.objects.get(pk=pk)
blog.delete()
return Response(status=status.HTTP_204_NO_CONTENT)


class UserAPIView(APIView):
def get(self, _):
users = User.objects.all()
return Response(UserSerializer(users).data)

class UserDetailAPIView(APIView):
def get_user(self, pk):
try:
User.objects.get(pk=pk)
except User.DoesNotExist:
raise Http404

def get(self, request, pk, format=None):
user = self.get_user(pk)
serializer = UserSerializer(user)
return Response(serializer.data)

We then create a url.py file for routing our views.

Edit the file to be similar to this:

blogs/url.py

from django.urls import path, include
from rest_framework.routers import DefaultRouter

from . import views
router = DefaultRouter()
router.register('blogs', views.BlogsViewset, basename='blogs')
urlpatterns = [
path('', include(router.urls)),
path('users', views.UserAPIView.as_view(), name='users'),
path('users/<int:pk>/', views.UserDetailAPIView.as_view(),name='user-details')
]

We then configure our urls.py file in blogger folder to this:

blogger/urls.py

from django.contrib import admin
from django.urls import path, include

urlpatterns = [
path('admin/', admin.site.urls),
path('', include('blogs.url'))
]

We can now run python3 manage.py makemigrations and then python3 manage.py migrate. Let’s also create a superuser by running python3 manage.py createsuperuser.

When we run python3 manage.py runserver, we should have a page like this.

http://127.0.0.1:8000/blogs

Blogger blogs home page

http://127.0.0.1:8000/users

Blogger user home page

Likes project
Lets create an app called likes. Cd into Likes and run python3 manage.py startapp likes and add the following to your INSTALLED APPS in your settings.py file.

Likes/settings.py

'rest_framework',
'corsheaders',
'likes'


Similarly add the code snippet 'corsheaders.middleware.CorsMiddleware' in MIDDLEWARE and set CORS_ORIGIN_ALLOW_ALL = True.

We can now proceed to create our models as follows:

likes/models.py

from django.db import models

# Create your models here.
class Blogs(models.Model):
id = models.IntegerField(unique=True, primary_key=True)
title = models.CharField(max_length=200)

def __str__(self):
return self.title

class BloggerUser(models.Model):
user_id = models.IntegerField( blank=True)
blog_id = models.IntegerField(unique=True, blank=True)

def __str__(self):
return f"User id {str(self.user_id)} Blog id {str(self.blog_id)}"

I have decided to create a similar model class Blogs to show how we can pass data using RabbitMQ and create model instances using the data.

We will also have an extra model class BloggerUser. We will create the application for now and see how the communication and processing occur in a short while.

Let’s now create a serializer.py file that handles the serialization of our model instances.

Edit the file to the following lines of code:

likes/serializer.py

from rest_framework import serializers
from .models import *

class BlogSerializer(serializers.ModelSerializer):
class Meta:
model = Blogs
fields = '__all__'

class BloggerUserSerializer(serializers.ModelSerializer):
class Meta:
model = BloggerUser
fields = '__all__'

We then create some views in the views.py file.

likes/views.py

import requests
from django.shortcuts import render

# Create your views here.
from rest_framework import viewsets, status
from rest_framework import mixins
from rest_framework.decorators import api_view
from rest_framework.response import Response

from .models import Blog, BloggerUser
from .serializers import BlogSerializer, BloggerUserSerializer

class BlogViewSet(viewsets.GenericViewSet, mixins.ListModelMixin, mixins.CreateModelMixin, mixins.RetrieveModelMixin, mixins.UpdateModelMixin, mixins.DestroyModelMixin):
serializer_class = BlogSerializer
queryset = Blog.objects.all()

class BloggerUserViewSet(viewsets.GenericViewSet, mixins.ListModelMixin, mixins.CreateModelMixin, mixins.RetrieveModelMixin, mixins.UpdateModelMixin, mixins.DestroyModelMixin):
serializer_class = BloggerUserSerializer
queryset = BloggerUser.objects.all()

We also need to create a url.py file for routing our views.

likes/url.py

from django.urls import path, include
from . import views
from rest_framework.routers import DefaultRouter
router = DefaultRouter()
router.register('blogs', views.BlogViewSet, basename='blogs')
router.register('blogger-users', views.BloggerUserViewSet, basename='blogger_user')

urlpatterns = [
path('', include(router.urls))
]

We can now run python3 manage.py makemigrations and then python3 manage.py migrate. Let’s also create a superuser with similar credentials as the one above by running python3 manage.py createsuperuser.

When we run python3 manage.py runserver 8001, we should have a page similar to this.

http://127.0.0.1:8001/blogs

Likes blogs page

http://127.0.0.1:8001/blogger-users/

Likes bloggeruser page

For communication
Now that we have created our APIs, it’s time to communicate them. RabbitMQ works by having applications send and receive messages asynchronously.

We would first like to create a Blog instance in our likes project whenever the Blog instance is created in the Blogs project.

We need to make our Blogs application a producer and the likes project a consumer. To get started, let’s create a producer.py file in the blogger project inside the blogs app.

Add the following lines of code to it.

blogs/producer.py

import json
import pika


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', heartbeat=600, blocked_connection_timeout=300))
channel = connection.channel()

def publish(method, body):
properties = pika.BasicProperties(method)
channel.basic_publish(exchange='', routing_key='likes', body=json.dumps(body), properties=properties)


The following takes place when this code is executed:

The following two lines are responsible for establishing a connection with the RabbitMQ server.
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', heartbeat=600, blocked_connection_timeout=300))

channel = connection.channel()

We then create a function publish that handles the sending of the message. The method parameter is the information about a message and body is the message to be sent.
We also supply a property as contents of the method and pass it as one of the parameters of channel.basic_publish. We will use the default exchange denoted by exchange='' that lets us specify which queue the message is going.

We have set the queue to be likes and will need to be declared at the application that receives the message.

Note: We have set the heartbeat parameter because RabbitMQ has timeouts on idle connections. This means we will have a connection for the time defined by the heartbeat parameter.

To publish messages, we will use the publish function in our views.py file.

Edit the file to looks as follows:

blogs/views.py

from django.http import Http404
from django.shortcuts import render
import random
from rest_framework import viewsets, status
from rest_framework.response import Response
from rest_framework.views import APIView
from .models import *
from .serializers import *
from .producer import publish


# Create your views here.
class BlogsViewset(viewsets.ViewSet):
def list(self, request):
blogs = Blogs.objects.all()
serializer = BlogsSerializer(blogs, many=True)
return Response(serializer.data)

def create(self, request):
serializer = BlogsSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
serializer.save()
publish('blog_created', serializer.data)
return Response(serializer.data, status=status.HTTP_201_CREATED)

def retrieve(self, request, pk=None):
blog = Blogs.objects.get(pk=pk)
serializer = BlogsSerializer(product)
return Response(serializer.data)

def update(self, request, pk=None):
blog = Blogs.objects.get(pk=pk)
serializer = BlogsSerializer(instance=blog, data=request.data)
serializer.is_valid(raise_exception=True)
serializer.save()
publish('blog_updated', serializer.data)
return Response(serializer.data)

def destroy(self, request, pk=None):
blog = Blogs.objects.get(pk=pk)
product.delete()
publish('blog_deleted', pk)
return Response(status=status.HTTP_204_NO_CONTENT)


class UserAPIView(APIView):
def get(self, _):
users = User.objects.all()
return Response(UserSerializer(users, many=True).data)

class UserDetailAPIView(APIView):
def get_user(self, pk):
try:
User.objects.get(pk=pk)
except User.DoesNotExist:
raise Http404

def get(self, request, pk, format=None):
user = self.get_user(pk)
serializer = UserSerializer(user)
return Response(serializer.data)

Note the use of publish function in the views file. Whenever we create, edit or delete a Blogs instance, a message is published containing the associated data with each operation.

Now that we can publish messages, we now create a consumer at our likes project. Create a consumer.py file in Likes directory. The file should be in the same structure level as manage.py.

Edit it to appear as follows:

likes/consumer.py

import json
import pika
import django
from sys import path
from os import environ

path.append('/home/utkarsh/projects/likes/likes/settings.py')
environ.setdefault('DJANGO_SETTINGS_MODULE', 'likes.settings')
django.setup()
from likes.models import Blogs

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', heartbeat=600, blocked_connection_timeout=300))
channel = connection.channel()
channel.queue_declare(queue='likes')

def callback(ch, method, properties, body):
print("Received in likes...")
data = json.loads(body)

if properties.content_type == 'blog_created':
blog = Blogs.objects.create(id=data['id'], title=data['title'])
blog.save()
print("Blog created")
elif properties.content_type == 'blog_updated':
blog = Blogs.objects.get(id=data['id'])
blog.title = data['title']
blog.save()
print("Blog updated")
elif properties.content_type == 'blog_deleted':
blog = Blogs.objects.get(id=data)
blog.delete()
print("Blog deleted")

channel.basic_consume(queue='likes', on_message_callback=callback, auto_ack=True)
print("Started Consuming...")
channel.start_consuming()

Here is what is happening in our code:
1 Like our producer application, we set a connection with the RabbitMQ server.

We also need to set up DJANGO_SETTINGS_MODULE because we are accessing a model while outside our likes app.

path.append('/home/utkarsh/projects/likes/likes/settings.py')
environ.setdefault('DJANGO_SETTINGS_MODULE', 'Likes.settings')
django.setup()
from likes.models import Blogs

We also need to declare a queue that will be receiving the messages. It should be the same name as the one declared in the routing_key parameter in the channels.basic_publish function in producer.py file in the blogger project.

We create a function callback that will be called whenever a message is received. The ch is the channel where communication occurs. method is the information concerning message delivery. properties are user-defined properties on the message. body is the message received.

In our case, the callback function handles the creation, updating, and deletion of Blogs instances. The property.content_type are the properties we declared in our producer.py file.

We also instruct RabbitMQ to allow our callback function to receive messages from the likes queue.
channel.basic_consume(queue='likes', on_message_callback=callback, auto_ack=True)
Tell our channel to start receiving messages.
channel.start_consuming()
To start sending messages, we need to activate the RabbitMQ server. I am running on Ubuntu and will run sudo service rabbitmq-server start. We also need to run our consumer file by running python3 consumer.py.

I will run the following commands to get started:

Starting RabbitMQ server

Consuming a message at likes

Starting server at likes project

starting server at blogger project

When we create a blog instance as follows:

Creating a blog at blogger

We also have a blog instance in the likes app created as follows:

Blog created at likes project

We should also be able to do the same when updating and deleting blog instances.

Editing a blog at blogger project

Editing a blog at blogger

Blog edited at likes

Blog edited at likes project

Deleting a blog at blogger project

Deleting a blog at blogger project

Blog also deleted at likes

Blog also deleted at likes project

We can now handle liking of a blogs. We will begin by creating a producer.py file in our likes app. It will be similar to the one in the blogs app and works the same way.

Make it looks as follows:

likes/producer.py

import json , pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', heartbeat=600, blocked_connection_timeout=300))
channel = connection.channel()

def publish(method, body):
properties = pika.BasicProperties(method)
channel.basic_publish(exchange='', routing_key='blogs', body=json.dumps(body), properties=properties)

This time we will be publishing our messages to a queue called blogs. This queue also needs to be declared in the application that will receive the messages i.e blogger.

We will be using the publish function in the views.py file.

Edit the file to looks as follows:

likes/views.py

import requests
from django.shortcuts import render

# Create your views here.
from rest_framework import viewsets, status
from rest_framework import mixins
from rest_framework.decorators import api_view
from rest_framework.response import Response

from .models import *
from .producer import publish
from .serializers import *

class BlogsViewSet(viewsets.GenericViewSet, mixins.ListModelMixin, mixins.CreateModelMixin, mixins.RetrieveModelMixin, mixins.UpdateModelMixin, mixins.DestroyModelMixin):
serializer_class = BlogsSerializer
queryset = Blogs.objects.all()

class BloggerUserViewSet(viewsets.GenericViewSet, mixins.ListModelMixin, mixins.CreateModelMixin, mixins.RetrieveModelMixin, mixins.UpdateModelMixin, mixins.DestroyModelMixin):
serializer_class = BloggerUserSerializer
queryset = BloggerUser.objects.all()

@api_view(['GET'])
def like(request, pk, format=None):

query = {'username': 'UTKARSH'}
req = requests.get('http://127.0.0.1:8000/users', params=query)
data = req.json()

try:
for s in range(len(data)):
if data[s]['id']:
author = BloggerUser.objects.create(user_id=data[s]['id'], blog_id=pk)
author.save()
publish('blog_liked', pk)
print('Bloggeruser created')
return Response('Blog liked...', status=status.HTTP_201_CREATED)
except:

return Response("Blog liked...",status=status.HTTP_400_BAD_REQUEST)

We begin by getting a user instance that matches our query. For now, we only have one user in each application, the superuser. We would like a user to like a blog only once.

We create a blog user when a blog is liked and the blog user can only be created once. When a user likes a blog, the number of likes is incremented by one on the blogger project.

We will create a consumer.py file in our blogger projects, the same way as we did before. The file should look as follows:

blogger/consumer.py

import json
import pika
from sys import path
from os import environ
import django

path.append('/blogger/blogger/settings.py')
environ.setdefault('DJANGO_SETTINGS_MODULE', 'blogger.settings')
django.setup()
from blogs.models import Blog

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', heartbeat=600, blocked_connection_timeout=300))
channel = connection.channel()
channel.queue_declare(queue='blogs', durable=True)

def callback(ch, method, properties, body):
data = json.loads(body)

if properties.content_type == 'blog_liked':
blog = Blogs.objects.get(id=data)
blog.likes += 1
blog.save()
print("Blog likes increased.")

channel.basic_consume(queue='blogs', on_message_callback=callback)
print("Started Consuming...")
channel.start_consuming()
channel.close()

As with the previous consumer, we set up a connection and declared a queue called blogs. Our callback function gets a liked blog and increases its number of likes by 1.

I will also run python3 consumer.py to receive messages when a blog is liked.

Consumer command for blogger

When one visits a page, say, http://127.0.0.1:8001/blogs/2/like, notice the changes when one now visits http://127.0.0.1:8000/blogs/. The number of likes for Blog instance with id 2 is now incremented by 1.

http://127.0.0.1:8001/blogs/2/like

Liking a blog at Likes

http://127.0.0.1:8000/blogs/

Likes increased at blogger

Notice when one tries to like a blog again by refreshing the page, a message is returned saying the blog has already been liked.

http://127.0.0.1:8001/blogs/2/like

Liking a blog more than once

At Oodles ERP, we provide end-to-end ERP application development services with a focus on implementing next-gen technologies. Our development team is skilled at using the latest tech stack, frameworks, and cloud technologies to build scalable enterprise solutions for varying business needs. To learn more about our custom ERP development services, reach out at [email protected]