Overview
In the last post, we talked from a Consumer Sample, looked at some of Kombu’s internal implementations, and learned a bit about how messages are taken back. Now let’s move on to look at how the message was sent, and again, let’s start with the Sample we talked about in the first article and see what the real story is.
The first step is to create a Connection, no doubt about it, because there must be a connection to send the message. We’ve already seen this code in the last article, so let’s leave it at this stage and move on to the next one.
Line 29 We’re explicitly declaring a Producer, which is different from what we saw in the second article, where we simply constructed a SimpleQueue that creates a Consumer and a Producer, but here we have explicitly declares a Producer, so we can explore what’s going on with the code located at kombu/messageing.py Line 61:
You can see a lot of familiar terms here, channel/exchange and so on, but we see that we’re not passing any of these parameters, so by default it’s None, only channel we’re passing connection, ok, so with that in mind, let’s go ahead and see what happens when we call publish
.
The publish comment is very long, so I ignored it, and look at the code we call, we also pass a lot of arguments, let’s see what our arguments do.
- body:
{'hello': 'world'}
- exchange:
Exchange('kombu_demo', type='direct')
- routing_key:
`kombu_demo'
‘’ - serializer:
json
- compression:
zlib
The only useful things are in Line 138-140, which compresses the data, and Line 148, which is also useful for setting retry policy. Finally, I’ll just call the internal _publish
method on Line 149.
The _publish
method is called directly on Line 149.
Here’s where it gets interesting, where Line 188 encapsulates the message as Message, and then Line 194 declares the queue for mq, and then finally sends the message through Channel.
By the way, at this point, we still haven’t seen how Exchange and Routing Key work, so we’re hoping that basic_publish
will give us some insight, and following the base class of Channel, we see virtual/base.py
Line 600.
As you can see, the messaging here was originally left to Exchange, so it’s time to take a look at the Exchange implementation. The code jumps to komu/transport/virtual/exchange.py
and we find a lot of interesting stuff, such as in Line 155.
We’ll see that Kombu supports 3 different Exchanges by default, and defines its Exchange implementation, so let’s take a look at DirectExchange’s specific deliver
.
So the key is how _lookup
is implemented, in fact, _lookup
we do not need to care about the specific implementation, let’s look up, look at lookup
in your mind will have some understanding.
The problem here is what the table is. It’s much clearer if we find out how to implement the table, but if we trace the code, you’ll find that the table is more complicated to get, so here are some typical pieces of code to document where the table comes from. virtual/base.py**.
From these code snippets, we have a general idea of how Exchange routing works, the key point should be prepare_bind
for each Exchange, and then let’s see how the put
operation works for Direct Exchange. kombu/transport/redis.py` Line 762 This location.
No highlights, since we all already know which queue is which, so just put it in there keep an eye out, there’s also a q_for_pri
operation here, which we may see later.
Summary
OK, that’s a simple process for producing a message that we simply had to walk through and learned quite a bit more about.
- Producer contains a lot of things, including Exchange, routing_key, channel, and more!
- Message is just an encapsulation of some elements, not doing anything.
- Exchange just converts the sent
routing_key
to the name ofqueue
. - The actual sending has to be done by the channels, and each different Transport has its own corresponding channel.
Reference
- [Kombu Code] (https://github.com/celery/kombu)