paho-mqtt-cpp
MQTT C++ Client for POSIX and Windows
Loading...
Searching...
No Matches
token.h
Go to the documentation of this file.
1
7
8/*******************************************************************************
9 * Copyright (c) 2013-2019 Frank Pagliughi <fpagliughi@mindspring.com>
10 *
11 * All rights reserved. This program and the accompanying materials
12 * are made available under the terms of the Eclipse Public License v2.0
13 * and Eclipse Distribution License v1.0 which accompany this distribution.
14 *
15 * The Eclipse Public License is available at
16 * http://www.eclipse.org/legal/epl-v20.html
17 * and the Eclipse Distribution License is available at
18 * http://www.eclipse.org/org/documents/edl-v10.php.
19 *
20 * Contributors:
21 * Frank Pagliughi - initial implementation and documentation
22 * Frank Pagliughi - MQTT v5 support & server responses
23 *******************************************************************************/
24
25#ifndef __mqtt_token_h
26#define __mqtt_token_h
27
28#include "MQTTAsync.h"
30#include "mqtt/exception.h"
31#include "mqtt/types.h"
32#include "mqtt/properties.h"
33#include "mqtt/buffer_ref.h"
36#include <vector>
37#include <thread>
38#include <mutex>
39#include <condition_variable>
40#include <chrono>
41
42namespace mqtt {
43
44class iasync_client;
45
47
52class token
53{
54public:
56 using ptr_t = std::shared_ptr<token>;
58 using const_ptr_t = std::shared_ptr<const token>;
60 using weak_ptr_t = std::weak_ptr<token>;
61
70
71private:
73 using guard = std::lock_guard<std::mutex>;
75 using unique_lock = std::unique_lock<std::mutex>;
76
78 mutable std::mutex lock_;
80 mutable std::condition_variable cond_;
81
83 Type type_;
85 iasync_client* cli_;
87 int rc_;
89 ReasonCode reasonCode_;
91 string errMsg_;
93 MQTTAsync_token msgId_;
97 void* userContext_;
98
104 iaction_listener* listener_;
106 size_t nExpected_;
108 bool complete_;
109
111 //properties props_;
113 std::unique_ptr<connect_response> connRsp_;
115 std::unique_ptr<subscribe_response> subRsp_;
117 std::unique_ptr<unsubscribe_response> unsubRsp_;
118
120 friend class async_client;
121 friend class mock_async_client;
122
123 friend class connect_options;
124 friend class response_options;
126 friend class disconnect_options;
127
131 void reset();
137 void set_message_id(MQTTAsync_token msgId) {
138 guard g(lock_);
139 msgId_ = msgId;
140 }
150 static void on_success(void* tokObj, MQTTAsync_successData* rsp);
151 static void on_success5(void* tokObj, MQTTAsync_successData5* rsp);
161 static void on_failure(void* tokObj, MQTTAsync_failureData* rsp);
162 static void on_failure5(void* tokObj, MQTTAsync_failureData5* rsp);
169 static void on_connected(void* tokObj, char* /*cause*/);
174 void on_success(MQTTAsync_successData* rsp);
175 void on_success5(MQTTAsync_successData5* rsp);
180 void on_failure(MQTTAsync_failureData* rsp);
181 void on_failure5(MQTTAsync_failureData5* rsp);
182
187 void check_ret() const {
188 if (rc_ != MQTTASYNC_SUCCESS || reasonCode_ > ReasonCode::GRANTED_QOS_2)
189 throw exception(rc_, reasonCode_, errMsg_);
190 }
191
192public:
199 : token(typ, cli, MQTTAsync_token(0)) {}
209 token(Type typ, iasync_client& cli, void* userContext, iaction_listener& cb)
210 : token(typ, cli, const_string_collection_ptr(), userContext, cb) {}
211
218 token(Type typ, iasync_client& cli, const string& topic)
219 : token(typ, cli, string_collection::create(topic)) {}
230 token(Type typ, iasync_client& cli, const string& topic,
231 void* userContext, iaction_listener& cb)
232 : token(typ, cli, string_collection::create(topic), userContext, cb) {}
233
252 void* userContext, iaction_listener& cb);
259 token(Type typ, iasync_client& cli, MQTTAsync_token tok);
263 virtual ~token() {}
270 static ptr_t create(Type typ, iasync_client& cli) {
271 return std::make_shared<token>(typ, cli);
272 }
282 static ptr_t create(Type typ, iasync_client& cli, void* userContext,
283 iaction_listener& cb) {
284 return std::make_shared<token>(typ, cli, userContext, cb);
285 }
292 static ptr_t create(Type typ, iasync_client& cli, const string& topic) {
293 return std::make_shared<token>(typ, cli, topic);
294 }
305 static ptr_t create(Type typ, iasync_client& cli, const string& topic,
306 void* userContext, iaction_listener& cb) {
307 return std::make_shared<token>(typ, cli, topic, userContext, cb);
308 }
316 return std::make_shared<token>(typ, cli, topics);
317 }
329 void* userContext, iaction_listener& cb) {
330 return std::make_shared<token>(typ, cli, topics, userContext, cb);
331 }
337 Type get_type() const { return type_; }
343 guard g(lock_);
344 return listener_;
345 }
351 virtual iasync_client* get_client() const { return cli_; }
356 virtual int get_message_id() const {
357 static_assert(sizeof(msgId_) <= sizeof(int), "MQTTAsync_token must fit into int");
358 return int(msgId_);
359 }
367 return topics_;
368 }
373 virtual void* get_user_context() const {
374 guard g(lock_);
375 return userContext_;
376 }
381 virtual bool is_complete() const { return complete_; }
388 virtual int get_return_code() const { return rc_; }
393 virtual void set_action_callback(iaction_listener& listener) {
394 guard g(lock_);
395 listener_ = &listener;
396 }
402 virtual void set_user_context(void* userContext) {
403 guard g(lock_);
404 userContext_ = userContext;
405 }
411 void set_num_expected(size_t n) { nExpected_ = n; }
412
417 //const properties& get_properties() const { return props_; }
422 ReasonCode get_reason_code() const { return reasonCode_; }
427 virtual void wait();
433 virtual bool try_wait() {
434 guard g(lock_);
435 if (complete_)
436 check_ret();
437 return complete_;
438 }
446 virtual bool wait_for(long timeout) {
447 return wait_for(std::chrono::milliseconds(timeout));
448 }
455 template <class Rep, class Period>
456 bool wait_for(const std::chrono::duration<Rep, Period>& relTime) {
457 unique_lock g(lock_);
458 if (!cond_.wait_for(g, std::chrono::milliseconds(relTime),
459 [this]{return complete_;}))
460 return false;
461 check_ret();
462 return true;
463 }
470 template <class Clock, class Duration>
471 bool wait_until( const std::chrono::time_point<Clock, Duration>& absTime) {
472 unique_lock g(lock_);
473 if (!cond_.wait_until(g, absTime, [this]{return complete_;}))
474 return false;
475 check_ret();
476 return true;
477 }
478
503};
504
507
510
511
513// end namespace mqtt
514}
515
516#endif // __mqtt_token_h
517
Definition async_client.h:108
Definition connect_options.h:49
Definition server_response.h:75
Definition response_options.h:204
Definition disconnect_options.h:40
Definition exception.h:47
Definition iaction_listener.h:49
Definition iasync_client.h:59
Definition response_options.h:35
Definition string_collection.h:43
Definition token.h:53
token(Type typ, iasync_client &cli, void *userContext, iaction_listener &cb)
Definition token.h:209
static ptr_t create(Type typ, iasync_client &cli, const string &topic)
Definition token.h:292
static ptr_t create(Type typ, iasync_client &cli)
Definition token.h:270
virtual int get_message_id() const
Definition token.h:356
virtual void * get_user_context() const
Definition token.h:373
virtual int get_return_code() const
Definition token.h:388
virtual void wait()
token(Type typ, iasync_client &cli, const string &topic, void *userContext, iaction_listener &cb)
Definition token.h:230
static ptr_t create(Type typ, iasync_client &cli, const string &topic, void *userContext, iaction_listener &cb)
Definition token.h:305
Type get_type() const
Definition token.h:337
ReasonCode get_reason_code() const
Definition token.h:422
std::shared_ptr< token > ptr_t
Definition token.h:56
unsubscribe_response get_unsubscribe_response() const
static ptr_t create(Type typ, iasync_client &cli, const_string_collection_ptr topics)
Definition token.h:315
subscribe_response get_subscribe_response() const
std::shared_ptr< const token > const_ptr_t
Definition token.h:58
virtual bool wait_for(long timeout)
Definition token.h:446
virtual const_string_collection_ptr get_topics() const
Definition token.h:366
virtual bool is_complete() const
Definition token.h:381
virtual void set_user_context(void *userContext)
Definition token.h:402
token(Type typ, iasync_client &cli)
Definition token.h:198
Type
Definition token.h:63
@ SUBSCRIBE
Definition token.h:65
@ CONNECT
Definition token.h:64
@ UNSUBSCRIBE
Definition token.h:67
@ PUBLISH
Definition token.h:66
@ DISCONNECT
Definition token.h:68
token(Type typ, iasync_client &cli, MQTTAsync_token tok)
bool wait_until(const std::chrono::time_point< Clock, Duration > &absTime)
Definition token.h:471
token(Type typ, iasync_client &cli, const_string_collection_ptr topics)
connect_response get_connect_response() const
virtual iaction_listener * get_action_callback() const
Definition token.h:342
virtual iasync_client * get_client() const
Definition token.h:351
virtual bool try_wait()
Definition token.h:433
virtual ~token()
Definition token.h:263
std::weak_ptr< token > weak_ptr_t
Definition token.h:60
token(Type typ, iasync_client &cli, const string &topic)
Definition token.h:218
void set_num_expected(size_t n)
Definition token.h:411
token(Type typ, iasync_client &cli, const_string_collection_ptr topics, void *userContext, iaction_listener &cb)
virtual void set_action_callback(iaction_listener &listener)
Definition token.h:393
friend class mock_async_client
Definition token.h:121
static ptr_t create(Type typ, iasync_client &cli, const_string_collection_ptr topics, void *userContext, iaction_listener &cb)
Definition token.h:328
static ptr_t create(Type typ, iasync_client &cli, void *userContext, iaction_listener &cb)
Definition token.h:282
bool wait_for(const std::chrono::duration< Rep, Period > &relTime)
Definition token.h:456
Definition topic.h:44
Definition server_response.h:177
Definition async_client.h:49
ReasonCode
Definition types.h:57
@ GRANTED_QOS_2
Definition types.h:62
token::ptr_t token_ptr
Definition token.h:506
string_collection::const_ptr_t const_string_collection_ptr
Definition string_collection.h:234
token::const_ptr_t const_token_ptr
Definition token.h:509
Definition server_response.h:123