paho-mqtt-cpp
MQTT C++ Client for POSIX and Windows
Loading...
Searching...
No Matches
async_client.h
Go to the documentation of this file.
1
7
8/*******************************************************************************
9 * Copyright (c) 2013-2022 Frank Pagliughi <fpagliughi@mindspring.com>
10 *
11 * All rights reserved. This program and the accompanying materials
12 * are made available under the terms of the Eclipse Public License v2.0
13 * and Eclipse Distribution License v1.0 which accompany this distribution.
14 *
15 * The Eclipse Public License is available at
16 * http://www.eclipse.org/legal/epl-v20.html
17 * and the Eclipse Distribution License is available at
18 * http://www.eclipse.org/org/documents/edl-v10.php.
19 *
20 * Contributors:
21 * Frank Pagliughi - initial implementation and documentation
22 * Frank Pagliughi - MQTT v5 support
23 *******************************************************************************/
24
25#ifndef __mqtt_async_client_h
26#define __mqtt_async_client_h
27
28#include "MQTTAsync.h"
29#include "mqtt/types.h"
30#include "mqtt/token.h"
31#include "mqtt/create_options.h"
33#include "mqtt/delivery_token.h"
36#include "mqtt/properties.h"
37#include "mqtt/exception.h"
38#include "mqtt/message.h"
39#include "mqtt/callback.h"
40#include "mqtt/thread_queue.h"
41#include "mqtt/iasync_client.h"
42#include <vector>
43#include <list>
44#include <memory>
45#include <tuple>
46#include <functional>
47#include <stdexcept>
48
49namespace mqtt {
50
51// OBSOLETE: The legacy constants that lacked the "PAHO_MQTTPP_" prefix
52// clashed with #define's from other libraries and will be removed at the
53// next major version upgrade.
54
55#if defined(PAHO_MQTTPP_VERSIONS)
57 const uint32_t PAHO_MQTTPP_VERSION = 0x01030002;
59 const string PAHO_MQTTPP_VERSION_STR("Paho MQTT C++ (mqttpp) v. 1.3.2");
61 const string PAHO_MQTTPP_COPYRIGHT("Copyright (c) 2013-2023 Frank Pagliughi");
62#else
64 const uint32_t VERSION = 0x01030002;
66 const string VERSION_STR("Paho MQTT C++ (mqttpp) v. 1.3.2");
68 const string COPYRIGHT("Copyright (c) 2013-2023 Frank Pagliughi");
69#endif
70
72
107class async_client : public virtual iasync_client
108{
109public:
111 using ptr_t = std::shared_ptr<async_client>;
113 using consumer_queue_type = std::unique_ptr<thread_queue<const_message_ptr>>;
114
116 using message_handler = std::function<void(const_message_ptr)>;
118 using connection_handler = std::function<void(const string& cause)>;
120 using disconnected_handler = std::function<void(const properties&, ReasonCode)>;
122 using update_connection_handler = std::function<bool(connect_data&)>;
123
124private:
126 using guard = std::unique_lock<std::mutex>;
128 using unique_lock = std::unique_lock<std::mutex>;
129
131 mutable std::mutex lock_;
133 MQTTAsync cli_;
135 string serverURI_;
137 string clientId_;
139 int mqttVersion_;
141 std::unique_ptr<MQTTClient_persistence> persist_;
143 callback* userCallback_;
145 connection_handler connHandler_;
147 connection_handler connLostHandler_;
149 disconnected_handler disconnectedHandler_;
151 update_connection_handler updateConnectionHandler_;
153 message_handler msgHandler_;
155 connect_options connOpts_;
157 token_ptr connTok_;
159 std::list<token_ptr> pendingTokens_;
161 std::list<delivery_token_ptr> pendingDeliveryTokens_;
164
166 static void on_connected(void* context, char* cause);
167 static void on_connection_lost(void *context, char *cause);
168 static void on_disconnected(void* context, MQTTProperties* cprops,
169 MQTTReasonCodes reasonCode);
170 static int on_message_arrived(void* context, char* topicName, int topicLen,
171 MQTTAsync_message* msg);
172 static void on_delivery_complete(void* context, MQTTAsync_token tok);
173 static int on_update_connection(void* context, MQTTAsync_connectData* cdata);
174
176 friend class token;
177 virtual void add_token(token_ptr tok);
178 virtual void add_token(delivery_token_ptr tok);
179 virtual void remove_token(token* tok) override;
180 virtual void remove_token(token_ptr tok) { remove_token(tok.get()); }
181 void remove_token(delivery_token_ptr tok) { remove_token(tok.get()); }
182
184 async_client() =delete;
185 async_client(const async_client&) =delete;
186 async_client& operator=(const async_client&) =delete;
187
189 static void check_ret(int rc) {
190 if (rc != MQTTASYNC_SUCCESS)
191 throw exception(rc);
192 }
193
194public:
206 async_client(const string& serverURI, const string& clientId,
207 const string& persistDir);
221 async_client(const string& serverURI, const string& clientId,
222 iclient_persistence* persistence=nullptr);
236 async_client(const string& serverURI, const string& clientId,
237 int maxBufferedMessages, const string& persistDir);
253 async_client(const string& serverURI, const string& clientId,
254 int maxBufferedMessages,
255 iclient_persistence* persistence=nullptr);
268 async_client(const string& serverURI, const string& clientId,
269 const create_options& opts, const string& persistDir);
284 async_client(const string& serverURI, const string& clientId,
285 const create_options& opts,
286 iclient_persistence* persistence=nullptr);
290 ~async_client() override;
297 void set_callback(callback& cb) override;
303 void disable_callbacks() override;
340 token_ptr connect() override;
364 token_ptr connect(connect_options options, void* userContext,
365 iaction_listener& cb) override;
377 token_ptr connect(void* userContext, iaction_listener& cb) override {
378 return connect(connect_options{}, userContext, cb);
379 }
411 token_ptr disconnect(int timeout) override {
412 return disconnect(disconnect_options(timeout));
413 }
424 template <class Rep, class Period>
425 token_ptr disconnect(const std::chrono::duration<Rep, Period>& timeout) {
426 // TODO: check range
427 return disconnect((int) to_milliseconds_count(timeout));
428 }
443 token_ptr disconnect(int timeout, void* userContext,
444 iaction_listener& cb) override;
459 template <class Rep, class Period>
460 token_ptr disconnect(const std::chrono::duration<Rep, Period>& timeout,
461 void* userContext, iaction_listener& cb) {
462 // TODO: check range
463 return disconnect((int) to_milliseconds_count(timeout), userContext, cb);
464 }
476 token_ptr disconnect(void* userContext, iaction_listener& cb) override {
477 return disconnect(0L, userContext, cb);
478 }
488 std::vector<delivery_token_ptr> get_pending_delivery_tokens() const override;
493 string get_client_id() const override { return clientId_; }
498 string get_server_uri() const override { return serverURI_; }
508 int mqtt_version() const noexcept { return mqttVersion_; }
513 bool is_connected() const override { return to_bool(MQTTAsync_isConnected(cli_)); }
526 delivery_token_ptr publish(string_ref topic, const void* payload, size_t n,
527 int qos, bool retained) override;
536 delivery_token_ptr publish(string_ref topic, const void* payload, size_t n) override {
537 return publish(std::move(topic), payload, n,
539 }
552 int qos, bool retained) override;
561 return publish(std::move(topic), std::move(payload),
563 }
580 const void* payload, size_t n,
581 int qos, bool retained,
582 void* userContext, iaction_listener& cb) override;
604 void* userContext, iaction_listener& cb) override;
615 token_ptr subscribe(const string& topicFilter, int qos,
617 const properties& props=properties()) override;
635 token_ptr subscribe(const string& topicFilter, int qos,
636 void* userContext, iaction_listener& cb,
638 const properties& props=properties()) override;
653 const qos_collection& qos,
654 const std::vector<subscribe_options>& opts=std::vector<subscribe_options>(),
655 const properties& props=properties()) override;
673 const qos_collection& qos,
674 void* userContext, iaction_listener& cb,
675 const std::vector<subscribe_options>& opts=std::vector<subscribe_options>(),
676 const properties& props=properties()) override;
685 token_ptr unsubscribe(const string& topicFilter,
686 const properties& props=properties()) override;
697 const properties& props=properties()) override;
710 void* userContext, iaction_listener& cb,
711 const properties& props=properties()) override;
724 token_ptr unsubscribe(const string& topicFilter,
725 void* userContext, iaction_listener& cb,
726 const properties& props=properties()) override;
732 void start_consuming() override;
738 void stop_consuming() override;
744 const_message_ptr consume_message() override { return que_->get(); }
752 return que_->try_get(msg);
753 }
761 template <typename Rep, class Period>
763 const std::chrono::duration<Rep, Period>& relTime) {
764 return que_->try_get_for(msg, relTime);
765 }
772 template <typename Rep, class Period>
773 const_message_ptr try_consume_message_for(const std::chrono::duration<Rep, Period>& relTime) {
775 que_->try_get_for(&msg, relTime);
776 return msg;
777 }
785 template <class Clock, class Duration>
787 const std::chrono::time_point<Clock,Duration>& absTime) {
788 return que_->try_get_until(msg, absTime);
789 }
795 template <class Clock, class Duration>
796 const_message_ptr try_consume_message_until(const std::chrono::time_point<Clock,Duration>& absTime) {
798 que_->try_get_until(&msg, absTime);
799 return msg;
800 }
801};
802
805
807// end namespace mqtt
808}
809
810#endif // __mqtt_async_client_h
811
Definition async_client.h:108
void set_connection_lost_handler(connection_handler cb)
delivery_token_ptr publish(const_message_ptr msg, void *userContext, iaction_listener &cb) override
void set_disconnected_handler(disconnected_handler cb)
const_message_ptr try_consume_message_until(const std::chrono::time_point< Clock, Duration > &absTime)
Definition async_client.h:796
token_ptr subscribe(const string &topicFilter, int qos, const subscribe_options &opts=subscribe_options(), const properties &props=properties()) override
token_ptr disconnect(disconnect_options opts) override
bool try_consume_message(const_message_ptr *msg) override
Definition async_client.h:751
token_ptr disconnect() override
Definition async_client.h:392
delivery_token_ptr publish(string_ref topic, const void *payload, size_t n, int qos, bool retained, void *userContext, iaction_listener &cb) override
delivery_token_ptr publish(string_ref topic, const void *payload, size_t n) override
Definition async_client.h:536
std::unique_ptr< thread_queue< const_message_ptr > > consumer_queue_type
Definition async_client.h:113
token_ptr subscribe(const string &topicFilter, int qos, void *userContext, iaction_listener &cb, const subscribe_options &opts=subscribe_options(), const properties &props=properties()) override
std::shared_ptr< async_client > ptr_t
Definition async_client.h:111
void stop_consuming() override
bool try_consume_message_for(const_message_ptr *msg, const std::chrono::duration< Rep, Period > &relTime)
Definition async_client.h:762
token_ptr disconnect(int timeout) override
Definition async_client.h:411
token_ptr subscribe(const_string_collection_ptr topicFilters, const qos_collection &qos, const std::vector< subscribe_options > &opts=std::vector< subscribe_options >(), const properties &props=properties()) override
std::function< bool(connect_data &)> update_connection_handler
Definition async_client.h:122
async_client(const string &serverURI, const string &clientId, const create_options &opts, iclient_persistence *persistence=nullptr)
void set_message_callback(message_handler cb)
token_ptr disconnect(const std::chrono::duration< Rep, Period > &timeout, void *userContext, iaction_listener &cb)
Definition async_client.h:460
token_ptr connect(connect_options options, void *userContext, iaction_listener &cb) override
std::function< void(const properties &, ReasonCode)> disconnected_handler
Definition async_client.h:120
std::function< void(const_message_ptr)> message_handler
Definition async_client.h:116
token_ptr unsubscribe(const string &topicFilter, const properties &props=properties()) override
delivery_token_ptr publish(string_ref topic, const void *payload, size_t n, int qos, bool retained) override
delivery_token_ptr publish(string_ref topic, binary_ref payload) override
Definition async_client.h:560
string get_client_id() const override
Definition async_client.h:493
void set_callback(callback &cb) override
bool try_consume_message_until(const_message_ptr *msg, const std::chrono::time_point< Clock, Duration > &absTime)
Definition async_client.h:786
token_ptr subscribe(const_string_collection_ptr topicFilters, const qos_collection &qos, void *userContext, iaction_listener &cb, const std::vector< subscribe_options > &opts=std::vector< subscribe_options >(), const properties &props=properties()) override
const_message_ptr consume_message() override
Definition async_client.h:744
token_ptr reconnect() override
const_message_ptr try_consume_message_for(const std::chrono::duration< Rep, Period > &relTime)
Definition async_client.h:773
async_client(const string &serverURI, const string &clientId, iclient_persistence *persistence=nullptr)
void set_connected_handler(connection_handler cb)
async_client(const string &serverURI, const string &clientId, const create_options &opts, const string &persistDir)
token_ptr disconnect(int timeout, void *userContext, iaction_listener &cb) override
std::vector< delivery_token_ptr > get_pending_delivery_tokens() const override
token_ptr unsubscribe(const_string_collection_ptr topicFilters, const properties &props=properties()) override
token_ptr unsubscribe(const_string_collection_ptr topicFilters, void *userContext, iaction_listener &cb, const properties &props=properties()) override
int mqtt_version() const noexcept
Definition async_client.h:508
token_ptr unsubscribe(const string &topicFilter, void *userContext, iaction_listener &cb, const properties &props=properties()) override
string get_server_uri() const override
Definition async_client.h:498
~async_client() override
token_ptr disconnect(const std::chrono::duration< Rep, Period > &timeout)
Definition async_client.h:425
delivery_token_ptr get_pending_delivery_token(int msgID) const override
token_ptr disconnect(void *userContext, iaction_listener &cb) override
Definition async_client.h:476
async_client(const string &serverURI, const string &clientId, const string &persistDir)
token_ptr connect(void *userContext, iaction_listener &cb) override
Definition async_client.h:377
void disable_callbacks() override
async_client(const string &serverURI, const string &clientId, int maxBufferedMessages, const string &persistDir)
void set_update_connection_handler(update_connection_handler cb)
token_ptr connect() override
bool is_connected() const override
Definition async_client.h:513
delivery_token_ptr publish(const_message_ptr msg) override
void start_consuming() override
token_ptr connect(connect_options options) override
async_client(const string &serverURI, const string &clientId, int maxBufferedMessages, iclient_persistence *persistence=nullptr)
delivery_token_ptr publish(string_ref topic, binary_ref payload, int qos, bool retained) override
std::function< void(const string &cause)> connection_handler
Definition async_client.h:118
Definition callback.h:42
Definition connect_options.h:587
Definition connect_options.h:49
Definition create_options.h:38
Definition disconnect_options.h:40
Definition iaction_listener.h:49
Definition iasync_client.h:59
std::vector< int > qos_collection
Definition iasync_client.h:65
Definition iclient_persistence.h:73
static PAHO_MQTTPP_EXPORT const int DFLT_QOS
Definition message.h:59
static PAHO_MQTTPP_EXPORT const bool DFLT_RETAINED
Definition message.h:61
Definition properties.h:256
Definition subscribe_options.h:42
Definition token.h:53
Definition topic.h:44
Definition async_client.h:49
ReasonCode
Definition types.h:57
delivery_token::ptr_t delivery_token_ptr
Definition delivery_token.h:125
const string VERSION_STR("Paho MQTT C++ (mqttpp) v. 1.3.2")
bool to_bool(int n)
Definition types.h:161
token::ptr_t token_ptr
Definition token.h:506
async_client::ptr_t async_client_ptr
Definition async_client.h:804
const uint32_t VERSION
Definition async_client.h:64
string_collection::const_ptr_t const_string_collection_ptr
Definition string_collection.h:234
message::const_ptr_t const_message_ptr
Definition message.h:368
const string COPYRIGHT("Copyright (c) 2013-2023 Frank Pagliughi")
long to_milliseconds_count(const std::chrono::duration< Rep, Period > &dur)
Definition types.h:149