replicode
messaging.h
Go to the documentation of this file.
1 /*
2 * HUMANOBS - mBrane
3 *
4 * Eric Nivel
5 * Center for Analysis and Design of Intelligent Agents
6 * Reykjavik University, Menntavegur 1, 101 Reykjavik, Iceland
7 * http://cadia.ru.is
8 * Copyright(c)2012
9 *
10 * This software was developed by the above copyright holder as part of
11 * the HUMANOBS EU research project, in collaboration with the
12 * following parties:
13 *
14 * Autonomous Systems Laboratory
15 * Technical University of Madrid, Spain
16 * http://www.aslab.org/
17 *
18 * Communicative Machines
19 * Edinburgh, United Kingdom
20 * http://www.cmlabs.com/
21 *
22 * Istituto Dalle Molle di Studi sull'Intelligenza Artificiale
23 * University of Lugano and SUPSI, Switzerland
24 * http://www.idsia.ch/
25 *
26 * Institute of Cognitive Sciences and Technologies
27 * Consiglio Nazionale delle Ricerche, Italy
28 * http://www.istc.cnr.it/
29 *
30 * Dipartimento di Ingegneria Informatica
31 * University of Palermo, Italy
32 * http://roboticslab.dinfo.unipa.it/index.php/Main/HomePage
33 *
34 *
35 * --- HUMANOBS Open-Source BSD License, with CADIA Clause v 1.0 ---
36 *
37 * Redistribution and use in source and binary forms, with or without
38 * modification, is permitted provided that the following conditions
39 * are met:
40 *
41 * - Redistributions of source code must retain the above copyright
42 * and collaboration notice, this list of conditions and the
43 * following disclaimer.
44 *
45 * - Redistributions in binary form must reproduce the above copyright
46 * notice, this list of conditions and the following
47 * disclaimer in the documentation and/or other materials provided
48 * with the distribution.
49 *
50 * - Neither the name of its copyright holders nor the names of its
51 * contributors may be used to endorse or promote products
52 * derived from this software without specific prior written permission.
53 *
54 * - CADIA Clause: The license granted in and to the software under this
55 * agreement is a limited-use license. The software may not be used in
56 * furtherance of:
57 * (i) intentionally causing bodily injury or severe emotional distress
58 * to any person;
59 * (ii) invading the personal privacy or violating the human rights of
60 * any person; or
61 * (iii) committing or preparing for any act of war.
62 *
63 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
64 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
65 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
66 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
67 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
68 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
69 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
70 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
71 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
72 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
73 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
74 */
75 
76 #ifndef mBrane_messaging_h
77 #define mBrane_messaging_h
78 
79 #define MESSAGE_INPUT_BLOCK_SIZE 64
80 #define MESSAGE_OUTPUT_BLOCK_SIZE 64
81 #define JOBS_BLOCK_SIZE 128
82 
83 #include "mBrane.h"
84 #include <CoreLibrary/pipe.h>
85 #include <Core/list.h>
86 #include <Core/control_messages.h>
87 #include <thread>
88 #include "module_descriptor.h"
89 
90 using namespace mBrane::sdk;
91 using namespace mBrane::sdk::module;
92 
93 namespace mBrane
94 {
95 
96 typedef struct _Job {
99  _Job(_Payload *p = NULL, _Module *m = NULL): m(m)
100  {
101  this->p = p;
102  }
103 } Job;
104 
105 class Node;
106 
108 {
109 public:
110  static void ReceiveMessages(RecvThread *_this);
111  Pipe11<P<_Payload>, MESSAGE_INPUT_BLOCK_SIZE> buffer; // incoming messages from remote nodes
115  RecvThread(Node *node, CommChannel *channel, uint8_t sourceNID);
116  ~RecvThread();
118 };
119 
121 {
122 public:
123  static void PushJobs(PushThread *_this);
124  Node *const node;
127  ~PushThread();
129 };
130 
132 {
133 public:
134  static void Run(GarbageCollector *_this);
135  Node *const node;
136  GarbageCollector(Node *node);
137  ~GarbageCollector();
139 };
140 
141 class Executing;
142 class XThread;
143 class mbrane_dll Messaging
144 {
145  friend class RecvThread;
146  friend class PushThread;
147  friend class XThread;
148  friend class Executing;
149  friend class GarbageCollector;
150 private:
152  std::mutex moduleMutex;
153  std::mutex spaceMutex;
154  std::mutex projectionMutex;
155 protected:
156  typedef struct {
158  uint8_t destinationNode; // 0xFF means unspecified.
160  } MessageSlot;
163 
165 
168  static void SendMessages(Node *node);
169  Array<PushThread *, MESSAGE_INPUT_BLOCK_SIZE> pushThreads; // one for each message source (recvThread->buffer plus input queue); push jobs in the job pipe
170 
171  //TODO: add a pointer to processControlMessage plugin here.
172 
173  // Cache.
175  {
176  public:
177  std::string name;
180  ConstantEntry(std::string name, __Payload *object): name(name), object(object) {}
181  };
182  std::vector<ConstantEntry> constants; // indexed by the IDs.
183  UNORDERED_MAP<uint32_t, P<_Payload>> cache; // shared objects residing on the node (local if they have been sent at least once, and foreign); indexed by full IDs.
184  std::vector<UNORDERED_SET<uint32_t>> lookup; // shared objects's full IDs known as being held by remote nodes; vector indexed by NIDs.
185  std::mutex cacheMutex; // concurrency: Messaging::processControlMessage, CommChannel::send, CommChannel::recv.
186 
187  // Deletion handling.
188  uint8_t pendingAck; // number of ack to wait for.
189  UNORDERED_SET<_Payload *> pendingDeletions[2]; // 2-buffered list of objects to be deleted. Any object in here is smart pointed by the cache, and its ref count is 1.
190  uint8_t pendingDeletions_GC; // index for access from GC::Run.
191  uint8_t pendingDeletions_SO; // index for access from SharedObject::decRef.
192  std::mutex pendingDeletionsMutex; // concurrency GarbageCollector::Run, CommChannel::recv.
193  uint32_t GCPeriod; // at which the GC kicks in; in ms.
195 
196  Messaging();
197  ~Messaging();
198 
199  bool loadConfig(XMLNode &n);
200 
201  void start();
202  void shutdown();
203  void send(_Payload *message, module::Node::Network network);
204  void send(_Payload *message, uint8_t nodeID, module::Node::Network network);
205  void pushJobs(_Payload *p, NodeEntry &e);
206  void pushJobs(_Payload *p);
207  void processControlMessage(_Payload *p);
208 };
209 }
210 
211 #include "networking.h"
212 
213 #endif
Definition: messaging.h:131
_Job(_Payload *p=NULL, _Module *m=NULL)
Definition: messaging.h:99
Definition: messaging.h:143
Array< RecvThread *, MESSAGE_INPUT_BLOCK_SIZE > recvThreads
Definition: messaging.h:166
Definition: executing.h:93
std::vector< ConstantEntry > constants
Definition: messaging.h:182
P< _Payload > p
Definition: messaging.h:159
_Module * m
Definition: messaging.h:98
GarbageCollector * GC
Definition: messaging.h:194
uint8_t pendingDeletions_GC
Definition: messaging.h:190
Definition: payload.h:95
module::Node::Network network
Definition: messaging.h:157
uint8_t pendingDeletions_SO
Definition: messaging.h:191
Definition: messaging.h:107
uint8_t sourceNID
Definition: messaging.h:114
std::mutex spaceMutex
Definition: messaging.h:153
Node *const node
Definition: messaging.h:124
Node *const node
Definition: messaging.h:135
std::thread thread
Definition: RR.h:320
#define MESSAGE_OUTPUT_BLOCK_SIZE
Definition: messaging.h:80
Node * node
Definition: messaging.h:112
P< _Payload > p
Definition: messaging.h:97
std::string name
Definition: messaging.h:177
Definition: pipe.h:144
Definition: array.h:84
std::mutex moduleMutex
control access to spaces, modules descriptors and projections in processControlMessages.
Definition: messaging.h:152
Pipe11< P< _Payload >, MESSAGE_INPUT_BLOCK_SIZE > messageInputQueue
Definition: messaging.h:161
std::thread thread
Definition: messaging.h:128
uint32_t GCPeriod
Definition: messaging.h:193
Definition: module_descriptor.h:103
ConstantEntry(std::string name, __Payload *object)
Definition: messaging.h:180
Array< PushThread *, MESSAGE_INPUT_BLOCK_SIZE > pushThreads
Definition: messaging.h:169
Network
Definition: module_node.h:104
std::mutex cacheMutex
Definition: messaging.h:185
uint8_t pendingAck
Definition: messaging.h:188
struct mBrane::_Job Job
Definition: messaging.h:120
std::mutex pendingDeletionsMutex
Definition: messaging.h:192
#define MESSAGE_INPUT_BLOCK_SIZE
Definition: messaging.h:79
Definition: messaging.h:156
PipeN1< MessageSlot, MESSAGE_OUTPUT_BLOCK_SIZE > messageOutputQueue
Definition: messaging.h:162
P< __Payload > object
Definition: messaging.h:178
Definition: module.cpp:85
Definition: array.h:86
Definition: payload.h:122
std::mutex projectionMutex
Definition: messaging.h:154
std::thread thread
Definition: messaging.h:138
Pipe11< P< _Payload >, MESSAGE_OUTPUT_BLOCK_SIZE > * source
Definition: messaging.h:125
Definition: xml_parser.h:172
Definition: executing.h:110
void start()
Definition: Perf_modules.h:96
Definition: mdaemon_node.h:100
std::thread sendThread
Definition: messaging.h:167
Definition: messaging.h:174
Definition: node.h:96
std::thread thread
Definition: messaging.h:117
PipeNN< Job, JOBS_BLOCK_SIZE > jobs
Definition: messaging.h:164
ConstantEntry()
Definition: messaging.h:179
Pipe11< P< _Payload >, MESSAGE_INPUT_BLOCK_SIZE > buffer
Definition: messaging.h:111
Definition: array.h:162
Definition: pipe.h:156
UNORDERED_MAP< uint32_t, P< _Payload > > cache
Definition: messaging.h:183
CommChannel * channel
Definition: messaging.h:113
Definition: messaging.h:96
Definition: module.h:99
mBrane::Node * node
Definition: main.cpp:90
std::vector< UNORDERED_SET< uint32_t > > lookup
Definition: messaging.h:184
Definition: pipe.h:93
Definition: network_interface.h:120
uint8_t destinationNode
Definition: messaging.h:158