Skip to content

Conversation

karelv
Copy link

@karelv karelv commented Jul 18, 2025

…ernet client has enough space in the write buffer to sent the entire MQTT message (publish&subscribe)

As described in issue #57

…ernet client has enough space in the write buffer to sent the entire MQTT message (publish&subscribe)
@TD-er
Copy link
Collaborator

TD-er commented Jul 18, 2025

Nope, this is using beginPublish and endPublish, which are intended for writing streaming large messages.

This will be the code (with small modifications) for these.

I didn't have had the time yet to make a PR for it, but will do now.....

@hmueller01
Copy link
Owner

In #57 (comment) you wrote that it fails to work after several (long?) messages. Can you please provide a small example that I can test it on a ESP82866 (only device I have / use) to see it it fails there as well.

But ... this looks to me more like a bug in QNEthernet lib, as it does not flush it's buffer if it's full. There was a discussion regarding performance and write behavior in ssilverman/QNEthernet#93 ...
We simply could do a _client->flush(); in endPublish() (which is a good idea in my eyes anyway) ... Can you please try this.

@TD-er
Copy link
Collaborator

TD-er commented Jul 19, 2025

Just be a bit careful when calling flush() as this doesn't always do what you would expect.
Some implementations of classes inheriting from Stream may actually do a clear.
And depending on the SDK version, it may also differ among ESP32/ESP8266.

@hmueller01
Copy link
Owner

@TD-er Thanks for the hint. You may be absolutely right. There are so many different client implementations and they may vary. This is the reason why I want an example. Maybe we really only need it for QNEthernet (or discuss with the guys there what we should do best).

@karelv
Copy link
Author

karelv commented Jul 19, 2025

Thanks for the lively discussion here!

I can only imagine that some implementation indeed will erase the current write buffer... (which is what you would expect for an input stream). Let's try to answer below, but I want to stress that if you do not define MQTT_MAX_RETRY_FOR_AVAILABLE_FOR_WRITE, everything stays as before, the additional code is actually not compiled into the library.

It is a bunch of small messages, but I was also subscribing, so while I fixed the publish, the subscribe action as still suffering from the same issue. (So some of my comments originate from the time before I understood this)
My topics are like my_board/relays/001/state, and the value is very short: OFF.
And I subscribe to my_board/relays/001/set.
But then I have a loop of for each of the 128 relays, first subscribe, then publish...

As discussed above, it depends on the underlaying ethernet client implementation how things are really working.
The client->write function might actually block in your case, until it has enough bytes available (client->availableForWrite), while QNEthernet might return an error or cut the message.

I believe you are right to question what the right way is. For now I can live with what I have currently, so I'm good for now. Nevertheless, I would love to see a release, or some recommendation on how to handle this in a clean way.

@TD-er
Copy link
Collaborator

TD-er commented Jul 20, 2025

Well there are several possible ways to go and fix your issue, but I do think that (nearly?) all end up with a big question mark on what's going on with your networking device.

PubSubClient does typically handle every message (in/out) as a single transaction.
And for both incoming and outgoing messages the same buffer is used.
Then there is also the need (sometimes, depending on QoS and maybe other settings I can't think of right now) to receive an ack.
Add to this that the broker typically only has 1 way of letting you know an error has occured and that's by disconnecting your client.

An example of an error is when your message header states there will be a message of N bytes and your message is of different size.
Then the next bytes will not match a message header, or keep-alive ping or result in a timeout.

These are the main limitations of the MQTT protocol (at least 3.x version, I have not yet looked into version 5.x).
So doing 'funky' things with this buffer, like pipelining messages, are for sure something that will bite you later on when hard to reproduce issues occur. (I know in my pending PR I do exactly this...)

What I do think has to be done (and what you also try to do and I also with my PR) is that the buffer should be made more flexible.
Like there might be messages which are longer than the pubsubclient buffer and there might be network implementation which have a buffer smaller than the pubsubclient buffer. There might even be implementations where the data isn't sent if their buffer isn't filled upto X bytes unless explicitly told to flush (not clear). e.g. USB bulk transfer.

However when we need to take several buffer sizes into account, we might need to consider using a circular buffer or else you keep moving data in the pubsubclient buffer to the front.
Also some implementations take a significant amount of time to return availableForWrite, meaning it is better to take the std::min(_client->availableForWrite, bytesReadyToWrite) and just copy that amount and then check if there's already more room to write.

A drawback of using a circular buffer is that you don't have a guaranteed continuous array of bytes when feeding the topic and message to a callback function.

TL;Dr

  • We must know whether the network interface can just accept data and will flush on its own or needs some explicit flush calls
  • We must make sure the flush() isn't actually a clear().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants