libcommonc++  0.7
SocketSelector.h++
Go to the documentation of this file.
1 /* ---------------------------------------------------------------------------
2  commonc++ - A C++ Common Class Library
3  Copyright (C) 2005-2014 Mark A Lindner
4 
5  This file is part of commonc++.
6 
7  This library is free software; you can redistribute it and/or
8  modify it under the terms of the GNU Library General Public
9  License as published by the Free Software Foundation; either
10  version 2 of the License, or (at your option) any later version.
11 
12  This library is distributed in the hope that it will be useful,
13  but WITHOUT ANY WARRANTY; without even the implied warranty of
14  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  Library General Public License for more details.
16 
17  You should have received a copy of the GNU Library General Public
18  License along with this library; if not, write to the Free
19  Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  ---------------------------------------------------------------------------
21 */
22 
23 #ifndef __ccxx_SocketSelector_hxx
24 #define __ccxx_SocketSelector_hxx
25 
26 #include <commonc++/Common.h++>
30 #include <commonc++/Iterator.h++>
34 #include <commonc++/Thread.h++>
35 #include <commonc++/Mutex.h++>
36 
37 #ifdef CCXX_OS_POSIX
38 #include <sys/select.h>
39 #endif
40 
41 namespace ccxx {
42 
43 class SocketSelector;
44 
54 {
55  friend class SocketSelector;
56 
57  public:
58 
60  static const size_t DEFAULT_BUFFER_SIZE;
61 
63  virtual ~Connection();
64 
74  bool writeData(ByteBuffer& buffer);
75 
86  bool writeData(const byte_t* buf, size_t count);
87 
97  bool writeLine(const String& text);
98 
99  void beginWrite();
100 
101  void endWrite();
102 
103  void cancelWrite();
104 
114  size_t readData(ByteBuffer& buffer, bool fully = true);
115 
126  size_t readData(byte_t* buf, size_t count, bool fully = true);
127 
137  size_t readLine(String& text, size_t maxLen);
138 
146  uint32_t readUInt32();
147 
155  uint64_t readUInt64();
156 
159  { return(_socket); }
160 
169  void setReadLowWaterMark(size_t count);
170 
180  void setReadHighWaterMark(size_t count);
181 
187  inline size_t getReadLowWaterMark() const
188  { return(_readLoMark); }
189 
194  bool isReadLow() const;
195 
200  bool isReadHigh() const;
201 
212  void setWriteLowWaterMark(size_t count);
213 
221  void setWriteHighWaterMark(size_t count);
222 
228  inline size_t getWriteLowWaterMark() const
229  { return(_writeLoMark); }
230 
236  inline size_t getWriteHighWaterMark() const
237  { return(_writeHiMark); }
238 
243  bool isWriteLow() const;
244 
250  bool isWriteHigh() const;
251 
255  size_t getBytesAvailableToRead() const;
256 
260  size_t getBytesAvailableToWrite() const;
261 
269  void close(bool immediate = false);
270 
277  { _oobFlag = false; return(_oobData); }
278 
285  inline bool getOOBFlag() const
286  { return(_oobFlag); }
287 
289  inline time_ms_t getTimestamp() const
290  { return(_lastRecv); }
291 
293  bool isClosePending() const;
294 
295  protected:
296 
302  Connection(size_t bufferSize = DEFAULT_BUFFER_SIZE);
303 
304  private:
305 
306  void read();
307  void readOOB();
308  void write();
309 
310  void attach(SocketSelector* selector, StreamSocket* socket);
311 
312  inline void setOOBFlag(bool flag)
313  { _oobFlag = flag; }
314 
315  inline void setTimestamp(time_ms_t stamp)
316  { _lastRecv = stamp; }
317 
318  StreamSocket* _socket;
319 
320  protected:
321 
324 
325  private:
326 
327  SocketSelector* _selector;
328  size_t _readLoMark;
329  size_t _readHiMark;
330  size_t _writeLoMark;
331  size_t _writeHiMark;
332  bool _oobFlag;
333  bool _closePending;
334  byte_t _oobData;
335  time_ms_t _lastRecv;
336  mutable CriticalSection _readLock;
337  mutable CriticalSection _writeLock;
338  static const bool _isSameEndianness;
339 
341 };
342 
352 {
353  public:
354 
364  SocketSelector(uint_t maxConnections = 64,
365  timespan_ms_t defaultIdleLimit = 0);
366 
368  virtual ~SocketSelector();
369 
370  void run();
371  void cleanup();
372 
377  void wakeup();
378 
380  size_t getConnectionCount() const;
381 
392  virtual bool init(ServerSocket* socket);
393 
402  uint_t writeAll(const byte_t* buf, size_t count);
403 
404  protected:
405 
415  virtual Connection* connectionReady(const SocketAddress& address) = 0;
416 
425  virtual void dataReceived(Connection* connection) = 0;
426 
435  virtual void dataSent(Connection* connection);
436 
443  virtual void dataReceivedOOB(Connection* connection);
444 
452  virtual void connectionClosed(Connection* connection) = 0;
453 
461  virtual void connectionTimedOut(Connection* connection) = 0;
462 
470  virtual void exceptionOccurred(Connection* connection,
471  const IOException& ex);
472 
473  class ConnectionList; // fwd decl
474 
476  ConnectionList* _connections;
477 
478  private:
479 
480  void _connectionTimedOut(Connection* connection);
481  void _connectionClosed(Connection* connection);
482 
483  Mutex _mutex;
485  timespan_ms_t _idleLimit;
486  ServerSocket* _ssock;
487 #ifndef CCXX_OS_WINDOWS
488  int _wakePipe[2];
489  AtomicCounter _wakeFlag;
490 #endif
491 
493 };
494 
495 } // namespace ccxx
496 
497 #endif // __ccxx_SocketSelector_hxx
byte_t getOOBData()
Get any pending out-of-band data byte, and clear the OOB flag.
Definition: SocketSelector.h++:276
A critical section, a synchronization primitive that is typically more efficient than but roughly sem...
Definition: CriticalSection.h++:51
time_ms_t getTimestamp() const
Get the time at which data was last received on this connection.
Definition: SocketSelector.h++:289
int64_t time_ms_t
A time expressed in milliseconds since the epoch (00:00:00, UTC, January 1, 1970).
Definition: Integers.h++:98
ConnectionList * _connections
The list of active connections.
Definition: SocketSelector.h++:473
CircularByteBuffer writeBuffer
Definition: SocketSelector.h++:323
A server (listening) socket for StreamSocket (TCP) connections.
Definition: ServerSocket.h++:40
A reliable, connection-oriented stream (TCP) socket.
Definition: StreamSocket.h++:38
An exception indicating an I/O error.
Definition: IOException.h++:36
size_t getReadLowWaterMark() const
Get the current value of the read low-water mark.
Definition: SocketSelector.h++:187
unsigned int uint_t
An alias for unsigned int.
Definition: Integers.h++:74
A mutual-exclusion lock.
Definition: Mutex.h++:49
A socket endpoint network address.
Definition: SocketAddress.h++:46
static const size_t DEFAULT_BUFFER_SIZE
The default I/O buffer size.
Definition: SocketSelector.h++:60
size_t getWriteHighWaterMark() const
Get the current value of the write high-water mark.
Definition: SocketSelector.h++:236
An integer counter whose value is modified in an atomic fashion.
Definition: AtomicCounter.h++:36
A socket I/O selector.
Definition: SocketSelector.h++:351
#define COMMONCPP_API
Definition: Common.h++:126
CircularByteBuffer readBuffer
Definition: SocketSelector.h++:322
An abstract object representing a network connection.
Definition: SocketSelector.h++:53
bool getOOBFlag() const
Test the out-of-band flag.
Definition: SocketSelector.h++:285
A buffer for storing a contiguous sequence of elements.
Definition: Buffer.h++:44
size_t getWriteLowWaterMark() const
Get the current value of the write low-water mark.
Definition: SocketSelector.h++:228
A flexible, reference counted, copy-on-write, thread-safe, nullable string.
Definition: String.h++:50
An object pool that allocates (presumably fixed-size) objects in a contiguous memory buffer...
Definition: StaticObjectPool.h++:45
#define CCXX_COPY_DECLS(CLASS)
Inlines declarations of a copy constructor and assignment operator for the class CLASS.
Definition: Common.h++:295
int timespan_ms_t
A timespan expressed in milliseconds.
Definition: Integers.h++:104
StreamSocket * getSocket()
Get the socket for this connection.
Definition: SocketSelector.h++:158
A thread of execution.
Definition: Thread.h++:55
Definition: AllocationMap.c++:25
unsigned char byte_t
An unsigned 8-bit value.
Definition: Integers.h++:68