replicode
mem.h
Go to the documentation of this file.
1 // mem.h
2 //
3 // Author: Eric Nivel
4 //
5 // BSD license:
6 // Copyright (c) 2010, Eric Nivel
7 // All rights reserved.
8 // Redistribution and use in source and binary forms, with or without
9 // modification, are permitted provided that the following conditions are met:
10 //
11 // - Redistributions of source code must retain the above copyright
12 // notice, this list of conditions and the following disclaimer.
13 // - Redistributions in binary form must reproduce the above copyright
14 // notice, this list of conditions and the following disclaimer in the
15 // documentation and/or other materials provided with the distribution.
16 // - Neither the name of Eric Nivel nor the
17 // names of their contributors may be used to endorse or promote products
18 // derived from this software without specific prior written permission.
19 //
20 // THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY
21 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
22 // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23 // DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY
24 // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
25 // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
26 // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
27 // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
29 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 
31 #ifndef mem_h
32 #define mem_h
33 
34 #include "reduction_job.h"
35 #include "time_job.h"
36 #include "pgm_overlay.h"
37 #include "binding_map.h"
38 #include "CoreLibrary/dll.h"
39 
40 #include <list>
41 #include <atomic>
42 #include <thread>
43 #include <queue>
44 #include <condition_variable>
45 #include "../r_code/list.h"
46 #include "../r_comp/segments.h"
47 
48 namespace r_exec {
49 
50 // The rMem.
51 // Maintains 2 pipes of jobs (injection, update, etc.). each job is processed asynchronously by instances of ReductionCore and TimeCore.
52 // Pipes and threads are created at starting time and deleted at stopping time.
53 // Groups and IPGMControllers are cleared up when only held by jobs;
54 // - when a group is not projected anywhere anymore, it is invalidated (it releases all its views) and when a job attempts an update, the latter is cancelled.
55 // - when a reduction core attempts to perform a reduction for an ipgm-controller that is not projected anywhere anymore, the reduction is cancelled.
56 // In addition:
57 // - when an object is scheduled for injection and the target group does not exist anymore (or is invalidated), the injection is cancelled.
58 // - when an object is scheduled for propagation of sln changes and has no view anymore, the operation is cancelled.
59 // Main processing in _Mem::update().
61  public r_code::Mem {
62 public:
63  typedef enum {
64  NOT_STARTED = 0,
65  RUNNING = 1,
66  STOPPED = 2
67  } State;
68 protected:
69 // Parameters::Init.
73 
74 // Parameters::System.
77  double tpx_dsr_thr;
87 
88 // Parameters::Debug.
89  bool debug;
92 
93 // Parameters::Run.
95 
96  template <class Type> struct JobQueue {
97  void pushJob(Type *job) {
98  std::unique_lock<std::mutex> lock(m_pushMutex);
99  m_mutex.lock();
100  while (m_jobs.size() > 1024) { // while, because spurious wakeups
101  m_mutex.unlock();
102  m_canPushCondition.wait(lock);
103  m_mutex.lock();
104  }
105  m_jobs.push(job);
106  if (m_jobs.size() == 1) {
107  m_canPopCondition.notify_all();
108  }
109  m_mutex.unlock();
110  }
111 
112  Type *popJob() {
113  std::unique_lock<std::mutex> lock(m_popMutex);
114  m_mutex.lock();
115  while (m_jobs.size() < 1) { // while, because of spurious wakeups
116  m_mutex.unlock();
117  m_canPopCondition.wait(lock);
118  m_mutex.lock();
119  }
120  Type *r = m_jobs.front();
121  m_jobs.pop();
122  if (m_jobs.size() < 1024) {
123  m_canPushCondition.notify_all();
124  }
125  m_mutex.unlock();
126 
127  return r;
128  }
129 
130  private:
131  std::mutex m_mutex;
132  std::queue<Type*> m_jobs;
133 
134  std::mutex m_pushMutex;
135  std::condition_variable m_canPushCondition;
136  std::mutex m_popMutex;
137  std::condition_variable m_canPopCondition;
138  };
139 
142  std::mutex m_timeJobMutex;
144 
145  std::vector<std::thread> m_coreThreads;
146 
147 // Performance stats.
149  uint64_t reduction_job_avg_latency; // latency: popping time.-pushing time; the lower the better.
152  uint64_t time_job_avg_latency; // latency: deadline-the time the job is popped from the pipe; if <0, not registered (as it is too late for action); the higher the better.
153  uint64_t _time_job_avg_latency; // previous value.
154 
155  std::atomic<uint64_t> m_coreCount;
156  std::condition_variable m_coresRunning;
157  std::mutex m_coreCountMutex; // blocks the rMem until all cores terminate.
158 
160  std::mutex m_stateMutex;
161 
162 
163  r_code::list<P<Code> > objects; // store objects in order of injection: holds the initial objects (and dynamically created ones if MemStatic is used).
164 
165  P<Group> _root; // holds everything.
169 
170  std::vector<Group *> initial_groups; // convenience; cleared after start();
171 
172  void init_timings(uint64_t now) const;
173 
174  void store(Code *object);
175  virtual void set_last_oid(int64_t oid) = 0;
176  virtual void bind(View *view) = 0;
177 
178  bool deleted;
179 
180  static const uint64_t DebugStreamCount = 8;
181  ostream *debug_streams[8];
182 
183  _Mem();
184 
185  void _unpack_code(Code *hlp, uint16_t fact_object_index, Code *fact_object, uint16_t read_index) const;
186 public:
187  static _Mem *Get() {
188  return (_Mem *)Mem::Get();
189  }
190 
191  typedef enum {
192  STDIN = 0,
193  STDOUT = 1
194  } STDGroupID;
195 
196  virtual ~_Mem();
197 
198  void init(uint64_t base_period,
199  uint64_t reduction_core_count,
200  uint64_t time_core_count,
201  double mdl_inertia_sr_thr,
202  uint64_t mdl_inertia_cnt_thr,
203  double tpx_dsr_thr,
204  uint64_t min_sim_time_horizon,
205  uint64_t max_sim_time_horizon,
206  double sim_time_horizon,
207  uint64_t tpx_time_horizon,
208  uint64_t perf_sampling_period,
209  double float_tolerance,
210  uint64_t time_tolerance,
211  uint64_t primary_thz,
212  uint64_t secondary_thz,
213  bool debug,
214  uint64_t ntf_mk_res,
215  uint64_t goal_pred_success_res,
216  uint64_t probe_level,
217  uint64_t traces);
218 
220  return probe_level;
221  }
222  double get_mdl_inertia_sr_thr() const {
223  return mdl_inertia_sr_thr;
224  }
226  return mdl_inertia_cnt_thr;
227  }
228  double get_tpx_dsr_thr() const {
229  return tpx_dsr_thr;
230  }
232  return min_sim_time_horizon;
233  }
235  return max_sim_time_horizon;
236  }
238  return horizon * sim_time_horizon;
239  }
241  return tpx_time_horizon;
242  }
244  return primary_thz;
245  }
247  return secondary_thz;
248  }
249 
250  bool get_debug() const {
251  return debug;
252  }
254  return ntf_mk_res;
255  }
257 
258  if (debug)
259  return goal_pred_success_res;
260  if (time_to_live == 0)
261  return 1;
262  return Utils::GetResilience(now, time_to_live, host->get_upr());
263  }
264 
265  Code *get_root() const;
266  Code *get_stdin() const;
267  Code *get_stdout() const;
268  Code *get_self() const;
269 
270  State check_state(); // called by delegates after waiting in case stop() is called in the meantime.
271  void start_core(); // called upon creation of a delegate.
272  void shutdown_core(); // called upon completion of a delegate's task.
273 
276  bool load(std::vector<r_code::Code *> *objects, uint64_t stdin_oid, uint64_t stdout_oid, uint64_t self_oid);
277 // return false on error.
278  uint64_t start(); // return the starting time.
279  void stop(); // after stop() the content is cleared and one has to call load() and start() again.
280 
281 // Internal core processing ////////////////////////////////////////////////////////////////
282 
283  _ReductionJob *popReductionJob();
284  void pushReductionJob(_ReductionJob *j);
285  TimeJob *popTimeJob();
286  void pushTimeJob(TimeJob *j);
287 
288 // Called upon successful reduction.
289  void inject(View *view);
290  void inject_async(View *view);
291  void inject_new_object(View *view);
292  void inject_existing_object(View *view, Code *object, Group *host);
293  void inject_null_program(Controller *c, Group *group, uint64_t time_to_live, bool take_past_inputs); // build a view v (ijt=now, act=1, sln=0, res according to time_to_live in the group), attach c to v, inject v in the group.
294  void inject_hlps(std::vector<View *> views, Group *destination);
295  void inject_notification(View *view, bool lock);
296  virtual Code *check_existence(Code *object) = 0; // returns the existing object if any, or object otherwise: in the latter case, packing may occur.
297 
298  void propagate_sln(Code *object, double change, double source_sln_thr);
299 
300 // Called by groups.
301  void inject_copy(View *view, Group *destination); // for cov; NB: no cov for groups, r-groups, models, pgm or notifications.
302 
303 // Called by cores.
304  void register_reduction_job_latency(uint64_t latency);
305  void register_time_job_latency(uint64_t latency);
306  void inject_perf_stats();
307 
308 // rMem to rMem.
309 // The view must contain the destination group (either stdin or stdout) as its grp member.
310 // To be redefined by object transport aware subcalsses.
311  virtual void eject(View *view, uint16_t nodeID);
312 
313 // From rMem to I/O device.
314 // To be redefined by object transport aware subcalsses.
315  virtual void eject(Code *command);
316 
317  virtual r_code::Code *_build_object(Atom head) const = 0;
318  virtual r_code::Code *build_object(Atom head) const = 0;
319 
320 // unpacking of high-level patterns: upon loading or reception.
321  void unpack_hlp(Code *hlp) const;
322  Code *unpack_fact(Code *hlp, uint16_t fact_index) const;
323  Code *unpack_fact_object(Code *hlp, uint16_t fact_object_index) const;
324 
325 // packing of high-level patterns: upon dynamic generation or transmission.
326  void pack_hlp(Code *hlp) const;
327  void pack_fact(Code *fact, Code *hlp, uint16_t &write_index, std::vector<P<Code> > *references) const;
328  void pack_fact_object(Code *fact_object, Code *hlp, uint16_t &write_index, std::vector<P<Code> > *references) const;
329 
330  Code *clone(Code *original) const; // shallow copy.
331 
332 // External device I/O ////////////////////////////////////////////////////////////////
333  virtual r_comp::Image *get_objects() = 0; // create an image; fill with all objects; call only when stopped.
334  r_comp::Image *get_models(); // create an image; fill with all models; call only when stopped.
335 
336 //std::vector<uint64> timings_report; // debug facility.
337  typedef enum {
338  CST_IN = 0,
339  CST_OUT = 1,
340  MDL_IN = 2,
341  MDL_OUT = 3,
342  PRED_MON = 4,
343  GOAL_MON = 5,
344  MDL_REV = 6,
345  HLP_INJ = 7
346  } TraceLevel;
347  static std::ostream &Output(TraceLevel l);
348 };
349 
350 
351 #define OUTPUT(c) _Mem::Output(_Mem::c)
352 
353 // _Mem that stores the objects as long as they are not invalidated.
355  public _Mem {
356 private:
357  std::mutex m_objectsMutex; // protects last_oid and objects.
359  void bind(View *view); // assigns an oid, stores view->object in objects if needed.
360  void set_last_oid(int64_t oid);
361 protected:
362  MemStatic();
363 public:
364  virtual ~MemStatic();
365 
366  void delete_object(r_code::Code *object); // erase the object from objects if needed.
367 
368  r_comp::Image *get_objects(); // return an image containing valid objects.
369 };
370 
371 // _Mem that does not store objects.
373  public _Mem {
374 private:
375  std::atomic_int_fast64_t last_oid;
376  uint64_t get_oid();
377  void bind(View *view); // assigns an oid (atomic operation).
378  void set_last_oid(int64_t oid);
379 protected:
380  MemVolatile();
381 public:
382  virtual ~MemVolatile();
383 
384  void delete_object(r_code::Code *object) {}
385 
387  return NULL;
388  }
389 };
390 
391 // O is the class of the objects held by the rMem (except groups and notifications):
392 // r_exec::LObject if non distributed, or
393 // RObject (see the integration project) when network-aware.
394 // Notification objects and groups are instances of r_exec::LObject (they are not network-aware).
395 // Objects are built at reduction time as r_exec:LObjects and packed into instances of O when O is network-aware.
396 // S is the super-class.
397 template<class O, class S> class Mem:
398  public S {
399 public:
400  Mem();
401  virtual ~Mem();
402 
403 // Called at load time.
405 
406 // Called at runtime.
407  r_code::Code *_build_object(Atom head) const;
408  r_code::Code *build_object(Atom head) const;
409 
410 // Executive device functions ////////////////////////////////////////////////////////
411 
412  Code *check_existence(Code *object);
413 
414 // Called by the communication device (I/O).
415  void inject(O *object, View *view);
416 };
417 
418 }
419 
420 
421 #include "mem.tpl.cpp"
422 
423 
424 #endif
uint64_t perf_sampling_period
Definition: mem.h:82
Definition: segments.h:119
uint64_t goal_pred_success_res
Definition: mem.h:91
std::mutex m_stateMutex
Definition: mem.h:160
uint64_t ntf_mk_res
Definition: mem.h:90
double get_mdl_inertia_sr_thr() const
Definition: mem.h:222
static bool Output
Definition: compiler.cpp:39
P< Group > _root
Definition: mem.h:165
Code * check_existence(Code *object)
Definition: mem.tpl.cpp:146
uint64_t get_probe_level() const
Definition: mem.h:219
uint64_t get_sim_time_horizon(uint64_t horizon) const
Definition: mem.h:237
std::condition_variable m_canPopCondition
Definition: mem.h:137
bool now(const Context &context, uint16_t &index)
Definition: operator.cpp:55
JobQueue< _ReductionJob > m_reductionJobQueue
Definition: mem.h:140
#define dll_export
Definition: dll.h:44
uint64_t time_job_count
Definition: mem.h:151
uint64_t min_sim_time_horizon
Definition: mem.h:78
Definition: object.h:328
bool debug
Definition: mem.h:89
Code * _stdout
Definition: mem.h:167
std::condition_variable m_canPushCondition
Definition: mem.h:135
double mdl_inertia_sr_thr
Definition: mem.h:75
uint64_t _reduction_job_avg_latency
Definition: mem.h:150
uint64_t probe_level
Definition: mem.h:94
Definition: _context.cpp:34
uint64_t last_oid
Definition: mem.h:358
uint64_t base_period
Definition: mem.h:70
std::atomic< uint64_t > m_coreCount
Definition: mem.h:155
uint64_t secondary_thz
Definition: mem.h:86
Definition: mem.h:354
double sim_time_horizon
Definition: mem.h:80
std::vector< Group * > initial_groups
Definition: mem.h:170
static _Mem * Get()
Definition: mem.h:187
State
Definition: mem.h:63
Definition: base.h:47
uint64_t primary_thz
Definition: mem.h:85
Definition: group.h:49
uint64_t tpx_time_horizon
Definition: mem.h:81
uint64_t get_secondary_thz() const
Definition: mem.h:246
uint64_t get_tpx_time_horizon() const
Definition: mem.h:240
std::queue< Type * > m_jobs
Definition: mem.h:132
uint64_t _time_job_avg_latency
Definition: mem.h:153
r_code::Code * _build_object(Atom head) const
Definition: mem.tpl.cpp:93
uint64_t get_ntf_mk_res() const
Definition: mem.h:253
uint64_t get_max_sim_time_horizon() const
Definition: mem.h:234
Definition: view.h:47
uint64_t mdl_inertia_cnt_thr
Definition: mem.h:76
Definition: mem.h:60
std::mutex m_popMutex
Definition: mem.h:136
r_code::list< P< Code > > objects
Definition: mem.h:163
double float_tolerance
Definition: mem.h:83
std::mutex m_timeJobMutex
Definition: mem.h:142
std::condition_variable m_coresRunning
Definition: mem.h:156
r_code::Code * build_object(r_code::SysObject *source) const
Definition: mem.tpl.cpp:56
bool get_debug() const
Definition: mem.h:250
uint64_t max_sim_time_horizon
Definition: mem.h:79
std::mutex m_reductionJobMutex
Definition: mem.h:143
std::mutex m_pushMutex
Definition: mem.h:134
uint64_t get_min_sim_time_horizon() const
Definition: mem.h:231
Definition: atom.h:45
uint64_t time_job_avg_latency
Definition: mem.h:152
uint64_t time_tolerance
Definition: mem.h:84
void inject(O *object, View *view)
Definition: mem.tpl.cpp:160
Definition: time_job.h:40
std::vector< std::thread > m_coreThreads
Definition: mem.h:145
bool deleted
Definition: mem.h:178
uint64_t get_mdl_inertia_cnt_thr() const
Definition: mem.h:225
Definition: object.h:76
std::atomic_int_fast64_t last_oid
Definition: mem.h:375
void pushJob(Type *job)
Definition: mem.h:97
Definition: reduction_job.h:40
std::mutex m_mutex
Definition: mem.h:131
uint64_t reduction_core_count
Definition: mem.h:71
uint64_t time_core_count
Definition: mem.h:72
double tpx_dsr_thr
Definition: mem.h:77
Definition: list.h:42
virtual ~Mem()
Definition: mem.tpl.cpp:46
Definition: mem.h:372
Definition: object.h:172
Definition: overlay.h:50
std::mutex m_coreCountMutex
Definition: mem.h:157
void delete_object(r_code::Code *object)
Definition: mem.h:384
Type * popJob()
Definition: mem.h:112
uint64_t get_goal_pred_success_res(Group *host, uint64_t now, uint64_t time_to_live) const
Definition: mem.h:256
double get_tpx_dsr_thr() const
Definition: mem.h:228
r_comp::Image * get_objects()
Definition: mem.h:386
uint64_t reduction_job_avg_latency
Definition: mem.h:149
static const DebugStream debug(const std::string area)
Definition: debug.h:51
uint64_t get_primary_thz() const
Definition: mem.h:243
Definition: mem.h:397
Mem()
Definition: mem.tpl.cpp:43
JobQueue< TimeJob > m_timeJobQueue
Definition: mem.h:141
uint32_t get_upr() const
Definition: group.cpp:93
Definition: mem.h:96
uint64_t reduction_job_count
Definition: mem.h:148
State state
Definition: mem.h:159
Code * _self
Definition: mem.h:168
std::mutex m_objectsMutex
Definition: mem.h:357
Code * _stdin
Definition: mem.h:166