Overview
We’ve covered all of Kombu’s incoming and outgoing messages in the previous articles, and it seems like there’s not much left to cover. But, in fact, in our quick walk-through of the code, we missed some important things, one of which is that when fetching a message, we said that Consumer’s consume
is non-blocking, and then proceeded to a loop where we waited for a buffer to be filled and then returned that buffer, but. We don’t know how this works, so let’s take a look at the rest of the implementation.
Consumer
The first thing we need to do is to look at the non-blocking implementation of consume
, after a few code walks we should be able to easily find the Consumer code: kombu/messaging.py, and then look at the consume
operation: kombu/messaging.py.
As you can see, there doesn’t seem to be anything particularly bright, and if there is, it should be in two places.
Queue that cannot be unlisted unless cancel_by_queue
is called.
no_wait=True
for all queues, leaving only one set tono_wait=False
, what’s the point? Keep looking.
There is an additional concept of tag here, as we can see by looking at Kombu’s documentation.
The consumer tag is local to a connection, so two clients can use the same If this field is empty the server will generate a unique tag.
Okay, that’s how it works. Then look at the actual consume code, where you pass a couple of arguments.
tag
: unique identifier of the consumerself._receive_callbak
: a member functionno_ack
: need for acknowledgement?nowait
: whether or not to wait for results
Let’s not look at the specific callback is what, continue to look deeper into the consume, where the design to the queue, this structure we have not seen before, here is a look in passing: kombu/entity.py
The code is simple, it goes directly to the channel, which is already an old friend, so follow it directly.
A very simple paragraph, just set the various values, and then the key is the last sentence: _reset_cycle
, what’s inside, look at it and you’ll see.
Then it seems to be over, call is over, nothing happens, go home.
Message Callback
Wait, wait, wait, there seems to be more to it than that… Here we’re just declaring the mission consumers out, but we don’t know where to put the message after we’ve received it or how to put it in. So go ahead and look at the code behind our sample.
self.channel.connection.client.drain_events(timeout=remaining)
That’s the important one, so let’s find out what the code is here in kombu/connection.py
.
And then into Transport, that can not be helped, can not be avoided.
The key to this implementation is the get in Line 961, which corresponds to kombu/utils/scheduling.py*.
The actual value should be kombu/transport/virtual/base.py: kombu/transport/virtual/base.py.
And then it goes to kombu/transport/redis.py
That’s really the end of it, and at this point we can realize that the so-called asynchrony is actually a training rotation, where Kombu queries all the queues that need to be listened to again, until the query is complete or a queue is encountered that can be used, and then it gets the content and calls back the corresponding callback!
Recall
In this tracking process, we can see that the circles go around, but all that has gone through are these documents.
kombu/transport/virtual/base.py
- It’s the core file. Most of our
Channel
andTransport
operations are implemented here, so it needs a lot of attention.
- It’s the core file. Most of our
kombu/messaging.py
- It’s an important file that implements
Consumer
andProducer
in it.
- It’s an important file that implements
kombu/entity.py
- It implements
Exchange
andQueue
, which is the entity hosting class ofExchange
and theExchangeType
we talked about earlier.
- It implements
kombu/connection.py
.- Our
Connection
class is implemented here, important!
- Our
kombu/utils/scheduling.py
- It’s a bit of a mess, just a few cycle implementations, but it’s still important.
kombu/transport/redis.py
.- Needless to say, the redis implementation implements Redis’
Transport
andChannel
- Needless to say, the redis implementation implements Redis’
Reference
- [Kombu Code] (https://github.com/celery/kombu)