UniSet  2.6.0
UNetReceiver.h
1 /*
2  * Copyright (c) 2015 Pavel Vainerman.
3  *
4  * This program is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU Lesser General Public License as
6  * published by the Free Software Foundation, version 2.1.
7  *
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11  * Lesser General Lesser Public License for more details.
12  *
13  * You should have received a copy of the GNU Lesser General Public License
14  * along with this program. If not, see <http://www.gnu.org/licenses/>.
15  */
16 // -----------------------------------------------------------------------------
17 #ifndef UNetReceiver_H_
18 #define UNetReceiver_H_
19 // -----------------------------------------------------------------------------
20 #include <ostream>
21 #include <memory>
22 #include <string>
23 #include <queue>
24 #include <unordered_map>
25 #include <sigc++/sigc++.h>
26 #include <ev++.h>
27 #include "UniSetObject.h"
28 #include "Trigger.h"
29 #include "Mutex.h"
30 #include "SMInterface.h"
31 #include "SharedMemory.h"
32 #include "UDPPacket.h"
33 #include "CommonEventLoop.h"
34 #include "UDPCore.h"
35 // --------------------------------------------------------------------------
36 namespace uniset
37 {
38  // -----------------------------------------------------------------------------
39  /* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP.
40  * ===============
41  * Собственно реализация сделана так:
42  * В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности
43  * что были посланы, сделана очередь с приоритетом. В качестве приориета используется номер пакета
44  * (чем меньше тем старше). При этом обработка ведётся только тех пакетов, которые идут "подряд",
45  * как только встречается "дырка" происходит ожидание её "заполения". Если в течение времени (lostTimeout)
46  * "дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается дальше..
47  * Всё это реализовано в функции UNetReceiver::real_update()
48  *
49  * КЭШ
50  * ===
51  * Для оптимизации работы с SM, т.к. в пакетах приходят только пары [id,value] сделан кэш итераторов.
52  * Идея проста: сделан вектор размером с количество принимаемых данных. В векторе хранятся итераторы (и всё что необходимо).
53  * Порядковый номер данных в пакете является индексом в кэше.
54  * Для защиты от изменения последовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем,
55  * ID который пришёл в пакете - элемент кэша обновляется.
56  * Если количество пришедших данных не совпадают с размером кэша.. кэш обновляется.
57  *
58  * КЭШ (ДОПОЛНЕНИЕ)
59  * ===
60  * Т.к. в общем случае, данные могут быть разбиты не несколько (много) пакетов, то для каждого из них выделен свой кэш и создан отдельный
61  * map, ключом в котором является идентификатор данных (см. UDPMessage::getDataID()).
62  * Кэш в map добавляется когда приходит пакет с новым UDPMessage::getDataID() и в дальнейшим используется для этого пакета.
63  * В текущей реализации размер map не контролируется (завязан на UDPMessage::getDataID()) и расчитан на статичность пакетов,
64  * т.е. на то что UNetSender не будет с течением времени менять структуру отправляемых пакетов.
65  *
66  * Обработка сбоя или переполнения счётчика пакетов(перехода через максимум)
67  * =========================================================================
68  * Для защиты от сбоя счётика сделана следующая логика:
69  * Если номер очередного пришедшего пакета отличается от последнего обработанного на maxDifferens, то считается,
70  * что произошёл сбой счётчика и происходит ожидание пока функция update, не обработает основную очередь полностью.
71  * При этом принимаемые пакеты складываются во временную очередь qtmp. Как только основная очередь пустеет,
72  * в неё копируется всё накопленное во временной очереди..и опять идёт штатная обработка.
73  * Если во время "ожидания" опять происходит "разрыв" в номерах пакетов, то временная очередь чиститься
74  * и данные которые в ней были теряются! Аналог ограниченного буфера (у любых карт), когда новые данные
75  * затирают старые, если их не успели вынуть и обработать.
76  * \todo Сделать защиту от бесконечного ожидания "очистки" основной очереди.
77  * =========================================================================
78  * ОПТИМИЗАЦИЯ N1: см. UNetSender.h. Если номер последнего принятого пакета не менялся.. не обрабатываем..
79  *
80  * Создание соединения (открытие сокета)
81  * ======================================
82  * Попытка создать сокет производиться сразу в конструкторе, если это не получается,
83  * то создаётся таймер (evCheckConnection), который периодически (checkConnectionTime) пытается вновь
84  * открыть сокет.. и так бесконечно, пока не получиться. Это важно для систем, где в момент загрузки программы
85  * (в момент создания объекта UNetReceiver) ещё может быть не поднята сеть или какой-то сбой с сетью и требуется
86  * ожидание (без вылета программы) пока "внешняя система мониторинга" не поднимет сеть).
87  * Если такая логика не требуется, то можно задать в конструкторе
88  * последним аргументом флаг nocheckconnection=true, тогда при создании объекта UNetReceiver, в конструкторе будет
89  * выкинуто исключение при неудачной попытке создания соединения.
90  *
91  * Стратегия обновления данных в SM
92  * ==================================
93  * При помощи функции setUpdateStrategy() можно выбрать стратегию обновления данных в SM.
94  * Поддерживается два варианта:
95  * 'thread' - отдельный поток обновления
96  * 'evloop' - использование общего с приёмом event loop (libev)
97  */
98  // -----------------------------------------------------------------------------
99  class UNetReceiver:
100  protected EvWatcher,
101  public std::enable_shared_from_this<UNetReceiver>
102  {
103  public:
104  UNetReceiver( const std::string& host, int port, const std::shared_ptr<SMInterface>& smi, bool nocheckConnection = false );
105  virtual ~UNetReceiver();
106 
107  void start();
108  void stop();
109 
110  inline const std::string getName() const
111  {
112  return myname;
113  }
114 
115  // блокировать сохранение данных в SM
116  void setLockUpdate( bool st ) noexcept;
117  inline bool isLockUpdate() const noexcept
118  {
119  return lockUpdate;
120  }
121 
122  void resetTimeout() noexcept;
123 
124  inline bool isRecvOK() const noexcept
125  {
126  return !ptRecvTimeout.checkTime();
127  }
128  inline size_t getLostPacketsNum() const noexcept
129  {
130  return lostPackets;
131  }
132 
133  void setReceiveTimeout( timeout_t msec ) noexcept;
134  void setReceivePause( timeout_t msec ) noexcept;
135  void setUpdatePause( timeout_t msec ) noexcept;
136  void setLostTimeout( timeout_t msec ) noexcept;
137  void setPrepareTime( timeout_t msec ) noexcept;
138  void setCheckConnectionPause( timeout_t msec ) noexcept;
139  void setMaxDifferens( unsigned long set ) noexcept;
140 
141  void setRespondID( uniset::ObjectId id, bool invert = false ) noexcept;
142  void setLostPacketsID( uniset::ObjectId id ) noexcept;
143 
144  void setMaxProcessingCount( int set ) noexcept;
145 
146  void forceUpdate() noexcept; // пересохранить очередной пакет в SM даже если данные не менялись
147 
148  inline std::string getAddress() const noexcept
149  {
150  return addr;
151  }
152  inline int getPort() const noexcept
153  {
154  return port;
155  }
156 
158  enum Event
159  {
162  };
163 
164  typedef sigc::slot<void, const std::shared_ptr<UNetReceiver>&, Event> EventSlot;
165  void connectEvent( EventSlot sl ) noexcept;
166 
167  // --------------------------------------------------------------------
170  {
171  useUpdateUnknown,
174  };
175 
176  static UpdateStrategy strToUpdateStrategy( const std::string& s ) noexcept;
177  static std::string to_string( UpdateStrategy s) noexcept;
178 
180  void setUpdateStrategy( UpdateStrategy set );
181 
182  // специальная обёртка, захватывающая или нет mutex в зависимости от стратегии
183  // (т.к. при evloop mutex захватытвать не нужно)
185  {
186  public:
187  pack_guard( std::mutex& m, UpdateStrategy s );
188  ~pack_guard();
189 
190  protected:
191  std::mutex& m;
192  UpdateStrategy s;
193  };
194 
195  // --------------------------------------------------------------------
196 
197  inline std::shared_ptr<DebugStream> getLog()
198  {
199  return unetlog;
200  }
201 
202  virtual const std::string getShortInfo() const noexcept;
203 
204  protected:
205 
206  const std::shared_ptr<SMInterface> shm;
207  std::shared_ptr<DebugStream> unetlog;
208 
209  bool receive() noexcept;
210  void step() noexcept;
211  void update() noexcept;
212  void updateThread() noexcept;
213  void callback( ev::io& watcher, int revents ) noexcept;
214  void readEvent( ev::io& watcher ) noexcept;
215  void updateEvent( ev::periodic& watcher, int revents ) noexcept;
216  void checkConnectionEvent( ev::periodic& watcher, int revents ) noexcept;
217  void statisticsEvent( ev::periodic& watcher, int revents ) noexcept;
218  virtual void evprepare( const ev::loop_ref& eloop ) noexcept override;
219  virtual void evfinish(const ev::loop_ref& eloop ) noexcept override;
220  virtual std::string wname() const noexcept override
221  {
222  return myname;
223  }
224 
225  void initIterators() noexcept;
226  bool createConnection( bool throwEx = false );
227  void checkConnection();
228 
229  public:
230 
231  // функция определения приоритетного сообщения для обработки
233  public std::binary_function<UniSetUDP::UDPMessage, UniSetUDP::UDPMessage, bool>
234  {
235  inline bool operator()(const UniSetUDP::UDPMessage& lhs,
236  const UniSetUDP::UDPMessage& rhs) const
237  {
238  return lhs.num > rhs.num;
239  }
240  };
241 
242  typedef std::priority_queue<UniSetUDP::UDPMessage, std::vector<UniSetUDP::UDPMessage>, PacketCompare> PacketQueue;
243 
244  private:
245  UNetReceiver();
246 
247  timeout_t recvpause = { 10 };
248  timeout_t updatepause = { 100 };
250  std::shared_ptr<UDPReceiveU> udp;
251  std::string addr;
252  int port = { 0 };
253  Poco::Net::SocketAddress saddr;
254  std::string myname;
255  ev::io evReceive;
256  ev::periodic evCheckConnection;
257  ev::periodic evStatistic;
258  ev::periodic evUpdate;
259 
260  UpdateStrategy upStrategy = { useUpdateEventLoop };
261 
262  // счётчики для подсчёта статистики
263  size_t recvCount = { 0 };
264  size_t upCount = { 0 };
265 
266  // текущая статистик
267  size_t statRecvPerSec = { 0 };
268  size_t statUpPerSec = { 0 };
270  std::shared_ptr< ThreadCreator<UNetReceiver> > upThread; // update thread
271 
272  // делаем loop общим.. одним на всех!
273  static CommonEventLoop loop;
274 
275  double checkConnectionTime = { 10.0 }; // sec
276  std::mutex checkConnMutex;
277 
278  PassiveTimer ptRecvTimeout;
279  PassiveTimer ptPrepare;
280  timeout_t recvTimeout = { 5000 }; // msec
281  timeout_t prepareTime = { 2000 };
282  timeout_t lostTimeout = { 200 };
283  PassiveTimer ptLostTimeout;
284  size_t lostPackets = { 0 };
286  uniset::ObjectId sidRespond = { uniset::DefaultObjectId };
287  IOController::IOStateList::iterator itRespond;
288  bool respondInvert = { false };
289  uniset::ObjectId sidLostPackets;
290  IOController::IOStateList::iterator itLostPackets;
291 
292  std::atomic_bool activated = { false };
293 
294  PacketQueue qpack;
295  UniSetUDP::UDPMessage pack;
296  UniSetUDP::UDPPacket r_buf;
297  std::mutex packMutex;
298  size_t pnum = { 0 };
303  size_t maxDifferens = { 20 };
304 
305  PacketQueue qtmp;
306  bool waitClean = { false };
307  size_t rnum = { 0 };
309  size_t maxProcessingCount = { 100 };
311  std::atomic_bool lockUpdate = { false };
313  EventSlot slEvent;
314  Trigger trTimeout;
315  std::mutex tmMutex;
316 
317  struct CacheItem
318  {
319  long id = { uniset::DefaultObjectId };
320  IOController::IOStateList::iterator ioit;
321 
322  CacheItem():
323  id(uniset::DefaultObjectId) {}
324  };
325 
326  typedef std::vector<CacheItem> CacheVec;
327  struct CacheInfo
328  {
329  CacheInfo():
330  cache_init_ok(false) {}
331 
332  bool cache_init_ok = { false };
333  CacheVec cache;
334  };
335 
336  // ключом является UDPMessage::getDataID()
337  typedef std::unordered_map<long, CacheInfo> CacheMap;
338  CacheMap d_icache_map;
339  CacheMap a_icache_map;
341  bool d_cache_init_ok = { false };
342  bool a_cache_init_ok = { false };
343 
344  void initDCache( UniSetUDP::UDPMessage& pack, bool force = false ) noexcept;
345  void initACache( UniSetUDP::UDPMessage& pack, bool force = false ) noexcept;
346  };
347  // --------------------------------------------------------------------------
348 } // end of namespace uniset
349 // -----------------------------------------------------------------------------
350 #endif // UNetReceiver_H_
351 // -----------------------------------------------------------------------------
Definition: DebugStream.h:91
Definition: SMInterface.h:30
Definition: CallbackTimer.h:29
Definition: UNetReceiver.h:172
Definition: CommonEventLoop.h:16
STL namespace.
Definition: UNetReceiver.h:161
const ObjectId DefaultObjectId
Definition: UniSetTypes.h:56
virtual bool checkTime() const noexceptoverride
Definition: PassiveTimer.cc:46
Definition: UDPPacket.h:93
Definition: UNetReceiver.h:232
Event
Definition: UNetReceiver.h:158
void setUpdateStrategy(UpdateStrategy set)
функция должна вызываться до первого вызова start()
Definition: UNetReceiver.cc:903
Definition: UNetReceiver.h:184
Definition: UNetReceiver.h:160
Definition: UNetReceiver.h:99
UpdateStrategy
Definition: UNetReceiver.h:169
Definition: UNetReceiver.h:173
long ObjectId
Definition: UniSetTypes_i.idl:30