replicode
networking.h
Go to the documentation of this file.
1 /*
2 * HUMANOBS - mBrane
3 *
4 * Eric Nivel
5 * Center for Analysis and Design of Intelligent Agents
6 * Reykjavik University, Menntavegur 1, 101 Reykjavik, Iceland
7 * http://cadia.ru.is
8 * Copyright(c)2012
9 *
10 * This software was developed by the above copyright holder as part of
11 * the HUMANOBS EU research project, in collaboration with the
12 * following parties:
13 *
14 * Autonomous Systems Laboratory
15 * Technical University of Madrid, Spain
16 * http://www.aslab.org/
17 *
18 * Communicative Machines
19 * Edinburgh, United Kingdom
20 * http://www.cmlabs.com/
21 *
22 * Istituto Dalle Molle di Studi sull'Intelligenza Artificiale
23 * University of Lugano and SUPSI, Switzerland
24 * http://www.idsia.ch/
25 *
26 * Institute of Cognitive Sciences and Technologies
27 * Consiglio Nazionale delle Ricerche, Italy
28 * http://www.istc.cnr.it/
29 *
30 * Dipartimento di Ingegneria Informatica
31 * University of Palermo, Italy
32 * http://roboticslab.dinfo.unipa.it/index.php/Main/HomePage
33 *
34 *
35 * --- HUMANOBS Open-Source BSD License, with CADIA Clause v 1.0 ---
36 *
37 * Redistribution and use in source and binary forms, with or without
38 * modification, is permitted provided that the following conditions
39 * are met:
40 *
41 * - Redistributions of source code must retain the above copyright
42 * and collaboration notice, this list of conditions and the
43 * following disclaimer.
44 *
45 * - Redistributions in binary form must reproduce the above copyright
46 * notice, this list of conditions and the following
47 * disclaimer in the documentation and/or other materials provided
48 * with the distribution.
49 *
50 * - Neither the name of its copyright holders nor the names of its
51 * contributors may be used to endorse or promote products
52 * derived from this software without specific prior written permission.
53 *
54 * - CADIA Clause: The license granted in and to the software under this
55 * agreement is a limited-use license. The software may not be used in
56 * furtherance of:
57 * (i) intentionally causing bodily injury or severe emotional distress
58 * to any person;
59 * (ii) invading the personal privacy or violating the human rights of
60 * any person; or
61 * (iii) committing or preparing for any act of war.
62 *
63 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
64 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
65 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
66 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
67 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
68 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
69 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
70 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
71 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
72 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
73 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
74 */
75 
76 #ifndef mBrane_networking_h
77 #define mBrane_networking_h
78 
79 #include <Core/network_interface.h>
80 #include "network_id.h"
81 
82 #include <CoreLibrary/pipe.h>
83 #include <Core/list.h>
84 #include <Core/control_messages.h>
85 #include "messaging.h"
86 #include <thread>
87 
88 using namespace mBrane::sdk;
89 using namespace mBrane::sdk::mdaemon;
90 
91 namespace mBrane
92 {
93 
94 #define CONTROL_PRIMARY_INITIALISED 0x0001
95 #define DATA_PRIMARY_INITIALISED 0x0002
96 #define STREAM_PRIMARY_INITIALISED 0x0004
97 #define CONTROL_SECONDARY_INITIALISED 0x0008
98 #define DATA_SECONDARY_INITIALISED 0x0010
99 #define STREAM_SECONDARY_INITIALISED 0x0020
100 #define CONTROL_PRIMARY_CONNECTED 0x0040
101 #define DATA_PRIMARY_CONNECTED 0x0080
102 #define STREAM_PRIMARY_CONNECTED 0x0100
103 #define CONTROL_SECONDARY_CONNECTED 0x0200
104 #define DATA_SECONDARY_CONNECTED 0x0400
105 #define STREAM_SECONDARY_CONNECTED 0x0800
106 
107 class NodeCon;
111 };
112 
113 class Networking;
114 class NodeCon
115 {
116 public:
118  virtual ~NodeCon();
119 
120  bool setSourceNID(uint8_t sourceNID);
121  bool setName(const char *name);
122 
123  bool isInUse();
124  uint32_t getConnectionStatus();
125  bool isConnected(module::Node::Network network = module::Node::EITHER);
126  bool disconnect();
127 
128  bool startNetworkChannel(CommChannel *c, uint8_t type, bool isCopy = false);
129  CommChannel *getNetworkChannel(uint8_t type);
130 
134  char *name;
135  bool joined;
136  bool ready;
137 
141 
142  Pipe11<P<_Payload>, MESSAGE_INPUT_BLOCK_SIZE> buffer; // incoming messages from remote nodes
143  static void ReceiveMessages(ReceiveThreadInfo *info);
144  static void PushJobs(NodeCon *_this);
145 };
146 
147 class Messaging;
148 // Handles network initialization and connection.
149 // Handles two isolated networks: primary (ex: core computation) and secondary (ex: I/O, signal processing)
150 // Network IDs carry the primary, secondary or both identifications
151 // When receiving a bcast id bearing two, connect to the primary only
152 // When sending to a node, use the primary only if two are available
153 // Receiving is agnostic
154 //
155 // Reference nodes must be on the primary network
156 //
157 // Node boot sequence:
158 //
159 // 1 boot one single node with a timeout (if it times out, it's the ref node)
160 // 2 when ready (callback), boot all the other nodes
161 //
162 // Algorithm for node connection:
163 //
164 // bcast its net ID on discovery channel
165 // accept connections:
166 // if timedout this is ref node, scan IDs on discovery channel
167 // else
168 // the ref node sends (on data channel if control channel is bcast, on control channel otherwise): its own net ID, an assigned NID and the net map (i.e. the list of ready nodes net ID)
169 // connect to each node in the list excepted the sender
170 // if(ref node) send time sync periodically on control channel
171 // start messages sending and receiving threads
172 //
173 // When at least one connection to a remote node dies, the node in question is considred dead and the other connections to it are terminated
174 // if the ref node dies, the node with the lowest NID is the new ref node
175 class mbrane_dll Networking:
176  public mdaemon::Node, public Messaging
177 {
178  friend class Messaging;
179  friend class RecvThread;
180 // friend class Messaging;
181  friend class NodeCon;
182 protected:
183  Host::host_name hostName;
185 
186  typedef void (*BootCallback)();
188  BootCallback bootCallback;
189 
190  DynamicClassLoader<NetworkInterface> *networkInterfaceLoaders[7];
191  NetworkInterface *networkInterfaces[7];
192 
195 
196  bool startInterfaces();
197  void stopInterfaces();
198 
199  int32_t bcastTimeout; // in ms
200 
202 
204 
205  //class DataCommChannel{
206  //public:
207  // DataCommChannel();
208  // ~DataCommChannel();
209  // typedef struct{
210  // CommChannel *data;
211  // CommChannel *stream;
212  // }CommChannels;
213  // CommChannels channels[2]; // 1 for each network
214  // NetworkID *networkID;
215  //};
217  CommChannel *broadcastChannel[2]; // bcast
218 // Array<CommChannel *,32> controlChannels[2]; // for each network: 1 (bcast capable) or many (connected)
219 // Array<DataCommChannel *,32,ArrayManaged> dataChannels;
220  std::mutex channelsMutex; // protects controlChannels and dataChannels
221  UNORDERED_MAP<uint8_t, NodeCon *> nodes;
222 
225  void setNewReference();
226 
227  virtual void startReceivingThreads(uint8_t NID) = 0;
228  virtual void notifyNodeJoined(uint8_t NID, NetworkID *networkID) = 0;
229  virtual void notifyNodeLeft(uint8_t NID) = 0;
230  virtual void shutdown();
231 
233 
234  bool checkSyncProbe(uint8_t syncNodeID);
235  void systemReady();
236 
238  bool addNodeName(const char *name, bool myself = false);
239  uint8_t getNodeID(const char *name);
240  bool allNodesJoined();
241  bool allNodesReady();
242 
243  static void ScanIDs(Networking *node);
244  typedef struct {
246  int32_t timeout;
250  static void AcceptConnections(AcceptConnectionArgs *acargs);
251  static void Sync(Networking *node);
252  int64_t timeDrift; // in ms
253  int64_t syncPeriod; // in ms
254 
255  uint16_t sendID(CommChannel *c, NetworkID *networkID);
256  uint16_t recvID(CommChannel *c, NetworkID *&networkID, bool expectToken = true);
257  uint16_t sendMap(CommChannel *c);
258  uint16_t recvMap(CommChannel *c, NetworkID *fromNetworkID);
259  uint16_t connect(NetworkID *networkID);
260  uint16_t connect(Network network, NetworkID *networkID);
261  void _broadcastControlMessage(_Payload *p, Network network);
262  void broadcastControlMessage(_Payload *p, Network network);
263  void _sendControlMessage(_Payload *p, uint8_t destinationNID, Network network);
264  void sendControlMessage(_Payload *p, uint8_t destinationNID, Network network);
265  void sendData(uint8_t NID, _Payload *p, Network network);
266  void sendStreamData(uint8_t NID, _Payload *p, Network network);
267  void processError(uint8_t NID); // upon send/recv error. Disconnect the node on both networks
268  uint8_t addNodeEntry();
269 
270  bool init();
271  virtual void start(uint8_t assignedNID, NetworkID *networkNID, bool isTimeReference);
272  bool startSync();
273 
274  Networking();
275  ~Networking();
276  bool loadInterface(XMLNode &interfaces, XMLNode &config, const char *name, InterfaceType type);
277  bool loadConfig(XMLNode &n);
278 };
279 }
280 
281 
282 #endif
int64_t timeDrift
Definition: networking.h:252
Definition: messaging.h:143
CommChannel * discoveryChannel
Definition: networking.h:216
UNORDERED_MAP< uint8_t, NodeCon * > nodes
Definition: networking.h:221
uint8_t connectedNodeCount
Definition: networking.h:201
SharedLibrary * callbackLibrary
Definition: networking.h:187
Definition: messaging.h:107
char * name
Definition: networking.h:134
Networking * node
Definition: networking.h:245
Definition: networking.h:108
std::thread thread
Definition: RR.h:320
uint8_t referenceNID
Definition: networking.h:224
Network network
Definition: networking.h:193
Definition: array.h:84
CommChannel * channel
Definition: networking.h:110
Definition: network_id.h:101
Network
Definition: module_node.h:104
Network network
Definition: networking.h:247
Array< std::thread, 32 > commThreads
Definition: networking.h:232
Category
Definition: payload.h:126
std::thread pushThread
Definition: networking.h:140
int64_t syncPeriod
Definition: networking.h:253
#define MESSAGE_INPUT_BLOCK_SIZE
Definition: messaging.h:79
Array< CommChannel *, 6 > commChannels
Definition: networking.h:138
Definition: dynamic_class_loader.h:89
Host::host_name hostName
Definition: networking.h:183
Definition: array.h:86
int32_t timeout
Definition: networking.h:246
Definition: payload.h:122
Definition: utils.h:138
std::mutex acceptConnectionMutex
Definition: networking.h:194
Definition: network_interface.h:92
Definition: xml_parser.h:172
uint8_t nodeCount
Definition: networking.h:237
void start()
Definition: Perf_modules.h:96
uint8_t sourceNID
Definition: networking.h:133
Definition: mdaemon_node.h:100
Pipe11< P< _Payload >, MESSAGE_INPUT_BLOCK_SIZE > buffer
Definition: networking.h:142
uint8_t hostNameSize
Definition: networking.h:184
int32_t bcastTimeout
Definition: networking.h:199
NodeCon * con
Definition: networking.h:109
InterfaceType
Definition: network_id.h:89
Networking * node
Definition: networking.h:131
Definition: networking.h:175
Definition: networking.h:244
bool ready
Definition: networking.h:136
_Payload::Category category
Definition: networking.h:248
NetworkID * networkID
Definition: networking.h:132
NetworkID * networkID
Definition: networking.h:203
Definition: module_node.h:109
mBrane::Node * node
Definition: main.cpp:90
bool joined
Definition: networking.h:135
Definition: pipe.h:93
Array< std::thread, 6 > commThreads
Definition: networking.h:139
std::mutex channelsMutex
Definition: networking.h:220
BootCallback bootCallback
Definition: networking.h:188
Definition: network_interface.h:120
bool isTimeReference
Definition: networking.h:223
Definition: networking.h:114