The C++ and Rust clients use dynamic "Tokens" to track asynchronous operations. …These are accessed via reference-counted smart pointers, but we can think of the behavior as something similar to using a heap-allocated struct as the `context` pointer that gets passes to the onSuccess/onFailure callback. The memory is then free'd in the callback.
So the assumption is that a callback will be called exactly once for each operation; either indicating success or failure.
The value of the _context_ pointer is different for each operation. It would be a disaster if the callback had the wrong context value (segfault), or if the callback were fired twice (double free).
Both of those problems seem to be happening. Consider a slightly modified _MQTTAsync_subscribe_ example which the connect options `context` is a malloc'ed struct.
```
/*******************************************************************************
* Copyright (c) 2012, 2022 IBM Corp., Ian Craggs
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0/
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial contribution
*******************************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTAsync.h"
#if !defined(_WIN32)
#include <unistd.h>
#else
#include <windows.h>
#endif
#if defined(_WRS_KERNEL)
#include <OsWrapper.h>
#endif
// #define TRY_RECONNECT
#define ADDRESS "mqtt://localhost:1883"
#define CLIENTID "ExampleClientSub"
#define TOPIC "MQTTExamples"
#define QOS 1
#define TIMEOUT 10000L
int disc_finished = 0;
int subscribed = 0;
int finished = 0;
void onConnect(void* context, MQTTAsync_successData* response);
void onConnectFailure(void* context, MQTTAsync_failureData* response);
typedef struct tagConnectToken {
MQTTAsync client;
int otherStuff;
} ConnectToken;
// We do this to make sure a new allocation gives us a different address than the last one
// (if you malloc right after a free, you often get the same block handed back to you)
ConnectToken* newConnectToken(void)
{
ConnectToken* tok1 = malloc(sizeof(ConnectToken));
ConnectToken* tok = malloc(sizeof(ConnectToken));
free(tok1);
return tok;
}
void pahoSleep(long ms)
{
#if defined(_WIN32)
Sleep(ms);
#else
usleep(ms * 1000L);
#endif
}
void reconnect(MQTTAsync client)
{
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
ConnectToken* tok = newConnectToken();
tok->client = client;
tok->otherStuff = 42;
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = tok;
printf("Reconnecting w/ token 0x%p\n", tok);
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
finished = 1;
}
}
void connlost(void *context, char *cause)
{
MQTTAsync client = (MQTTAsync)context;
printf("\nConnection lost\n");
if (cause)
printf(" cause: %s\n", cause);
reconnect(client);
}
int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
{
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: %.*s\n", message->payloadlen, (char*)message->payload);
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
void onDisconnectFailure(void* context, MQTTAsync_failureData* response)
{
printf("Disconnect failed, rc %d\n", response->code);
disc_finished = 1;
}
void onDisconnect(void* context, MQTTAsync_successData* response)
{
printf("Successful disconnection\n");
disc_finished = 1;
}
void onSubscribe(void* context, MQTTAsync_successData* response)
{
printf("Subscribe succeeded\n");
subscribed = 1;
}
void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
{
printf("Subscribe failed, rc %d\n", response->code);
finished = 1;
}
void onConnectFailure(void* context, MQTTAsync_failureData* response)
{
printf("Connect failed w/ token 0x%p, rc %d\n", context, response->code);
ConnectToken* tok = (ConnectToken*)context;
MQTTAsync client = tok->client;
free(tok);
#if defined(TRY_RECONNECT)
pahoSleep(1000);
reconnect(client);
#else
finished = 1;
#endif
}
void onConnect(void* context, MQTTAsync_successData* response)
{
printf("Successful connection w/ token 0x%p\n", context);
ConnectToken* tok = (ConnectToken*)context;
MQTTAsync client = tok->client;
free(tok);
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
"Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
opts.onSuccess = onSubscribe;
opts.onFailure = onSubscribeFailure;
opts.context = client;
if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start subscribe, return code %d\n", rc);
finished = 1;
}
}
int main(int argc, char* argv[])
{
MQTTAsync client;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
int rc;
int ch;
if ((rc = MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL))
!= MQTTASYNC_SUCCESS)
{
printf("Failed to create client, return code %d\n", rc);
rc = EXIT_FAILURE;
goto exit;
}
if ((rc = MQTTAsync_setCallbacks(client, client, connlost, msgarrvd, NULL)) != MQTTASYNC_SUCCESS)
{
printf("Failed to set callbacks, return code %d\n", rc);
rc = EXIT_FAILURE;
goto destroy_exit;
}
ConnectToken* tok = newConnectToken();
tok->client = client;
tok->otherStuff = 42;
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = tok;
printf("Attempting connect w/ token 0x%p\n", tok);
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
rc = EXIT_FAILURE;
goto destroy_exit;
}
while (!subscribed && !finished)
{
pahoSleep(100);
}
if (finished)
goto exit;
do
{
ch = getchar();
} while (ch!='Q' && ch != 'q');
disc_opts.onSuccess = onDisconnect;
disc_opts.onFailure = onDisconnectFailure;
if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start disconnect, return code %d\n", rc);
rc = EXIT_FAILURE;
goto destroy_exit;
}
while (!disc_finished)
{
pahoSleep(100);
}
destroy_exit:
MQTTAsync_destroy(&client);
exit:
return rc;
}
```
If this connects properly, and then the broker goes down causing us to lose the connection, we get this:
```
Connection lost
Reconnecting w/ token 0x0x7f5128000ce0
20230315 095628.641 Connect failed, more to try
20230315 095628.641 0 inflight messages deleted for client ExampleClientSub
20230315 095628.641 0 responses removed for client ExampleClientSub
20230315 095628.641 Calling connect failure for client ExampleClientSub
Connect failed w/ token 0x0x7f5128000ce0, rc -1 <- 1st callback
20230315 095628.641 Connecting to serverURI localhost:1883 with MQTT version 4
20230315 095628.641 New socket 3 for localhost:1883, port 1883
20230315 095628.641 Connect pending
20230315 095628.641 m->c->connect_state = 1
20230315 095628.641 Removed socket 3
20230315 095628.641 Removed socket 3
20230315 095628.641 Connect failed, more to try
20230315 095628.641 Could not find client corresponding to socket 3
20230315 095628.641 Connecting to serverURI localhost:1883 with MQTT version 3
20230315 095628.641 New socket 3 for localhost:1883, port 1883
20230315 095628.641 Connect pending
20230315 095628.641 m->c->connect_state = 1
20230315 095628.641 Removed socket 3
20230315 095628.841 Removed socket 3
20230315 095628.841 0 inflight messages deleted for client ExampleClientSub
20230315 095628.841 0 responses removed for client ExampleClientSub
20230315 095628.841 Calling connect failure for client ExampleClientSub
Connect failed w/ token 0x0x7f5128000ce0, rc -1 <- 2nd callback
free(): double free detected in tcache 2
Aborted (core dumped)
```
The onFailure callback is fired twice for one call to `MQTTAsync_connect()`. The first one doesn't make sense to me. It appears that it's being fired before a connection is even attempted.
It then gets even weirder if you #define TRY_RECONNECT. In that case, then it seems that when a connection is re-established after a few failed attempts, the onSuccess is called with the context of the previous failed attempt! That segfaults.