Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
Loading...
Searching...
No Matches
network_loop.h
Go to the documentation of this file.
1/*
2 * Copyright (c) 2015 Roc Streaming authors
3 *
4 * This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 */
8
9//! @file roc_netio/target_libuv/roc_netio/network_loop.h
10//! @brief Network event loop thread.
11
12#ifndef ROC_NETIO_NETWORK_LOOP_H_
13#define ROC_NETIO_NETWORK_LOOP_H_
14
15#include <uv.h>
16
18#include "roc_core/atomic.h"
20#include "roc_core/iallocator.h"
21#include "roc_core/list.h"
22#include "roc_core/mpsc_queue.h"
24#include "roc_core/optional.h"
25#include "roc_core/semaphore.h"
26#include "roc_core/thread.h"
29#include "roc_netio/iconn.h"
35#include "roc_netio/resolver.h"
40#include "roc_packet/iwriter.h"
42
43namespace roc {
44namespace netio {
45
46//! Network event loop thread.
47//! @remarks
48//! This class is a task-based facade for the whole roc_netio module.
50 private ICloseHandler,
52 private core::Thread {
53public:
54 //! Opaque port handle.
55 typedef struct PortHandle* PortHandle;
56
57 //! Subclasses for specific tasks.
58 class Tasks {
59 public:
60 //! Add UDP datagram receiver port.
62 public:
63 //! Set task parameters.
64 //! @remarks
65 //! - Updates @p config with the actual bind address.
66 //! - Passes received packets to @p writer. It is called from network thread.
67 //! It should not block the caller.
69
70 //! Get created port handle.
71 //! @pre
72 //! Should be called only if success() is true.
74
75 private:
76 friend class NetworkLoop;
77
78 UdpReceiverConfig* config_;
79 packet::IWriter* writer_;
80 };
81
82 //! Add UDP datagram sender port.
84 public:
85 //! Set task parameters.
86 //! @remarks
87 //! Updates @p config with the actual bind address.
89
90 //! Get created port handle.
91 //! @pre
92 //! Should be called only if success() is true.
94
95 //! Get created port writer;
96 //! @remarks
97 //! The writer can be used to send packets from the port. It may be called
98 //! from any thread. It will not block the caller.
99 //! @pre
100 //! Should be called only if success() is true.
102
103 private:
104 friend class NetworkLoop;
105
106 UdpSenderConfig* config_;
107 packet::IWriter* writer_;
108 };
109
110 //! Add TCP server port.
112 public:
113 //! Set task parameters.
114 //! @remarks
115 //! - Updates @p config with the actual bind address.
116 //! - Listens for incoming connections and passes new connections
117 //! to @p conn_acceptor. It should return handler that will be
118 //! notified when connection state changes.
120
121 //! Get created port handle.
122 //! @pre
123 //! Should be called only if success() is true.
125
126 private:
127 friend class NetworkLoop;
128
129 TcpServerConfig* config_;
130 IConnAcceptor* conn_acceptor_;
131 };
132
133 //! Add TCP client port.
135 public:
136 //! Set task parameters.
137 //! @remarks
138 //! - Updates @p config with the actual bind address.
139 //! - Notofies @p conn_handler when connection state changes.
141
142 //! Get created port handle.
143 //! @pre
144 //! Should be called only if success() is true.
146
147 private:
148 friend class NetworkLoop;
149
150 TcpClientConfig* config_;
151 IConnHandler* conn_handler_;
152 };
153
154 //! Remove port.
155 class RemovePort : public NetworkTask {
156 public:
157 //! Set task parameters.
159
160 private:
161 friend class NetworkLoop;
162 };
163
164 //! Resolve endpoint address.
166 public:
167 //! Set task parameters.
168 //! @remarks
169 //! Gets endpoint hostname, resolves it, and writes the resolved IP address
170 //! and the port from the endpoint to the resulting SocketAddr.
172
173 //! Get resolved address.
174 //! @pre
175 //! Should be called only if success() is true.
177
178 private:
179 friend class NetworkLoop;
180
181 ResolverRequest resolve_req_;
182 };
183 };
184
185 //! Initialize.
186 //! @remarks
187 //! Start background thread if the object was successfully constructed.
189 core::BufferFactory<uint8_t>& buffer_factory,
190 core::IAllocator& allocator);
191
192 //! Destroy. Stop all receivers and senders.
193 //! @remarks
194 //! Wait until background thread finishes.
195 virtual ~NetworkLoop();
196
197 //! Check if the object was successfully constructed.
198 bool valid() const;
199
200 //! Get number of receiver and sender ports.
201 size_t num_ports() const;
202
203 //! Enqueue a task for asynchronous execution and return.
204 //! The task should not be destroyed until the callback is called.
205 //! The @p completer will be invoked on event loop thread after the
206 //! task completes.
208
209 //! Enqueue a task for asynchronous execution and wait for its completion.
210 //! The task should not be destroyed until this method returns.
211 //! Should not be called from schedule() callback.
212 //! @returns
213 //! true if the task succeeded or false if it failed.
215
216private:
217 static void task_sem_cb_(uv_async_t* handle);
218 static void stop_sem_cb_(uv_async_t* handle);
219
220 virtual void handle_terminate_completed(IConn&, void*);
221 virtual void handle_close_completed(BasicPort&, void*);
222 virtual void handle_resolved(ResolverRequest& req);
223
224 virtual void run();
225
226 void process_pending_tasks_();
227 void finish_task_(NetworkTask&);
228
229 void async_terminate_conn_port_(const core::SharedPtr<TcpConnectionPort>& port,
230 NetworkTask* task);
231 AsyncOperationStatus async_close_port_(const core::SharedPtr<BasicPort>& port,
232 NetworkTask* task);
233 void finish_closing_port_(const core::SharedPtr<BasicPort>& port, NetworkTask* task);
234
235 void update_num_ports_();
236
237 void close_all_sems_();
238 void close_all_ports_();
239
240 void task_add_udp_receiver_(NetworkTask&);
241 void task_add_udp_sender_(NetworkTask&);
242 void task_remove_port_(NetworkTask&);
243 void task_add_tcp_server_(NetworkTask&);
244 void task_add_tcp_client_(NetworkTask&);
245 void task_resolve_endpoint_address_(NetworkTask&);
246
247 packet::PacketFactory& packet_factory_;
248 core::BufferFactory<uint8_t>& buffer_factory_;
249 core::IAllocator& allocator_;
250
251 bool started_;
252
253 uv_loop_t loop_;
254 bool loop_initialized_;
255
256 uv_async_t stop_sem_;
257 bool stop_sem_initialized_;
258
259 uv_async_t task_sem_;
260 bool task_sem_initialized_;
261
263
264 Resolver resolver_;
265
266 core::List<BasicPort> open_ports_;
267 core::List<BasicPort> closing_ports_;
268
269 core::Atomic<int> num_open_ports_;
270};
271
272} // namespace netio
273} // namespace roc
274
275#endif // ROC_NETIO_NETWORK_LOOP_H_
Atomic.
Base class for ports.
Buffer factory.
Network endpoint URI.
Definition: endpoint_uri.h:26
Socket address.
Definition: socket_addr.h:25
Atomic integer. Provides sequential consistency. For a fine-grained memory order control,...
Definition: atomic.h:26
Memory allocator interface.
Definition: iallocator.h:23
Intrusive doubly-linked list.
Definition: list.h:35
Thread-safe lock-free node-based intrusive multi-producer single-consumer queue.
Definition: mpsc_queue.h:40
Shared ownership intrusive pointer.
Definition: shared_ptr.h:32
Base class for thread objects.
Definition: thread.h:26
Base class for ports.
Definition: basic_port.h:40
Close handler interface.
Connection acceptor interface.
Connection event handler interface.
Definition: iconn_handler.h:59
Connection interface.
Definition: iconn.h:30
Network task completion handler.
Resolver request result handler interface.
Termination handler interface.
PortHandle get_handle() const
Get created port handle.
AddTcpClientPort(TcpClientConfig &config, IConnHandler &conn_handler)
Set task parameters.
AddTcpServerPort(TcpServerConfig &config, IConnAcceptor &conn_acceptor)
Set task parameters.
PortHandle get_handle() const
Get created port handle.
AddUdpReceiverPort(UdpReceiverConfig &config, packet::IWriter &writer)
Set task parameters.
PortHandle get_handle() const
Get created port handle.
PortHandle get_handle() const
Get created port handle.
AddUdpSenderPort(UdpSenderConfig &config)
Set task parameters.
packet::IWriter * get_writer() const
Get created port writer;.
RemovePort(PortHandle handle)
Set task parameters.
const address::SocketAddr & get_address() const
Get resolved address.
ResolveEndpointAddress(const address::EndpointUri &endpoint_uri)
Set task parameters.
Subclasses for specific tasks.
Definition: network_loop.h:58
Network event loop thread.
Definition: network_loop.h:52
size_t num_ports() const
Get number of receiver and sender ports.
NetworkLoop(packet::PacketFactory &packet_factory, core::BufferFactory< uint8_t > &buffer_factory, core::IAllocator &allocator)
Initialize.
bool valid() const
Check if the object was successfully constructed.
void schedule(NetworkTask &task, INetworkTaskCompleter &completer)
Enqueue a task for asynchronous execution and return. The task should not be destroyed until the call...
bool schedule_and_wait(NetworkTask &task)
Enqueue a task for asynchronous execution and wait for its completion. The task should not be destroy...
virtual ~NetworkLoop()
Destroy. Stop all receivers and senders.
struct PortHandle * PortHandle
Opaque port handle.
Definition: network_loop.h:55
Base class for network loop tasks.
Definition: network_task.h:29
Hostname resolver.
Definition: resolver.h:25
Packet writer interface.
Definition: iwriter.h:21
Memory allocator interface.
Close handler interface.
Connection interface.
Connection acceptor interface.
Connection event handler interface.
Network task completion handler.
Termination handler interface.
Packet writer interface.
Intrusive doubly-linked list.
Multi-producer single-consumer queue.
MpscQueue node.
AsyncOperationStatus
Asynchronous operation status.
Root namespace.
Network task.
Optionally constructed object.
Packet factory.
Hostname resolver.
Socket address.
TCP connection parameters.
TCP server parameters.
UDP receiver parameters.
UDP sender parameters.
TCP connection.
Thread.
UDP receiver.