Package CyberRail :: Package common :: Module communicationmanager
[hide private]
[frames] | no frames]

Source Code for Module CyberRail.common.communicationmanager

   1  #!/usr/bin/env python 
   2  # -*- coding: utf-8 -*- 
   3  """ 
   4      CommunicationManager module is a module wrapping the select / poll system calls 
   5      enabling you to have a single (or double if you adopt the non blocking mode) 
   6      threaded multiple IO management. 
   7   
   8      Via the CommunicationManager class, you will add and remove IO streams (mainly 
   9      socket, even if pipes and files also work) to the manager. This one will 
  10      dispatch your data and alerts you via callbacks when something is received. 
  11   
  12      Usage (non blocking): 
  13   
  14          >>> ## Creating it 
  15          >>> com = CommunicationManager(blocking = False) 
  16          >>> com.registerLowLevelListener(allLevelListener) 
  17          >>> com.registerHighLevelListener(allLevelListener) 
  18          >>> 
  19          >>> ## Enable it to listen on one port 
  20          >>> lsid = com.listen(port=5555) 
  21          >>> 
  22          >>> ## Connect to another place 
  23          >>> csid = com.connect("ip6-localhost", 5555) 
  24          >>> 
  25          >>> ## send something 
  26          >>> com.sendRaw(csid, "some data") 
  27          >>> 
  28          >>> ## add some File Descriptors 
  29          >>> (pin, pout) = os.pipe() 
  30          >>> com.addFDescriptor(pout) 
  31          >>> com.sendRaw(pout, "plop 1") 
  32          >>> print "----->", os.read(pin, 255) ## "plop 1" 
  33          >>> com.removeFDescriptor(pout) 
  34          >>> 
  35          >>>     ## Stop and close everything 
  36          >>> com.stop() 
  37   
  38      Usage (blocking): 
  39   
  40          >>> ## Creating it 
  41          >>> com = CommunicationManager() 
  42          >>> com.registerLowLevelListener(allLevelListener) 
  43          >>> com.registerHighLevelListener(allLevelListener) 
  44          >>> 
  45          >>> ## Enable it to listen on one port 
  46          >>> lsid = com.listen(port=5555) 
  47          >>> 
  48          >>> ## Connect to another place 
  49          >>> csid = com.connect("ip6-localhost", 5555) 
  50          >>> 
  51          >>> ## Let's enter the main loop 
  52          >>> com.main() 
  53          >>> 
  54          >>> ## Stop and close everything (in a callback since main is blocking) 
  55          >>> com.stop() 
  56   
  57          Advanced usage : 
  58   
  59          This module support half closed connections. If you want to half close a 
  60          connection, use the appropriate communication manager method. 
  61   
  62          How to manage a remote read half closed connection? 
  63          You will receive a high level "end of stream" event. In this case, 
  64          you must hold the connection until you want to close it. As soon as you 
  65          will unhold the connection, it will be full closed. 
  66          You can also track the low level "PEER HALF CLOSED". 
  67   
  68          How to manage a remote write half closed connection? 
  69          You will receive a high level "broken pipe" event. In this case, you 
  70          must either hold or unhold the connection (even if it is not hold). 
  71          If you don't call one of the two, the connection will be fully closed. 
  72   
  73          Why? 
  74   
  75          The communication manager will cleanup all connections which are inactive. 
  76          Connection should always poll for in data. You can disable this behavior 
  77          by holding a connection. In this case, the connection will not be 
  78          polled for in data but CM will keep it. 
  79   
  80          In the case of a read half close, you will never receive data anymore 
  81          from this connection, so POLLIN has to be cleared (keeping pollin up will 
  82          makes poll or select returning immediatly continuously, which is wrong). 
  83          But you can still write to this connection. So holding it will indicate 
  84          to the CM that it shoulds keep it. 
  85   
  86          In the case of a write half close, the default is to close the connection. 
  87          You can either choose to continue to listen (unhold choice) or hold the 
  88          connection for a later use (depending on the behavior of your program). 
  89          Keeping a connection hold is a right choice only if you plan to unhold 
  90          it later in your program. 
  91   
  92   
  93   
  94      @author: Emmanuel Coirier 
  95      @copyright: 2005-2010 Emmanuel Coirier 
  96      @license: GPL v3 
  97  """ 
  98   
  99  import threading 
 100  import select 
 101  import socket 
 102  import time 
 103  import os 
 104  import os.path 
 105  import fcntl 
 106  import sys 
 107  import errno 
 108  import bdb 
 109   
 110  import pprint 
 111   
112 -class ConnectionHandle(object):
113 """ 114 The Connection Handle contains the IO buffers of the associated socket. 115 For inputs, it stores all data until a complete protocol packet is receive 116 For outputs, it stores all data until the socket is ready to send. 117 118 This class in only needed for the default communication manager, 119 the Qt ones has it's own mechanism. 120 121 Operations on buffers run as FIFO. 122 """
123 - def __init__(self, cm, socket, protoIn=None, protoOut=None, ssl=False):
124 """ 125 Create a new Connection Handle. The specific protocol callback will 126 be used instead of the CommunicationManager ones if they are set. 127 128 @param cm: an instance of a CommunicationManager 129 @param socket: the associated socket 130 @param protoIn: specific protocol callback (see CommunicationManager.__init__ doc) 131 @param protoOut: specific protocol callback (see CommunicationManager.__init__ doc) 132 @param ssl: enable ssl if True 133 """ 134 self.cm = cm 135 self.socket = socket 136 self.isInPollList = False 137 # We must poll for something, else the connection will be considered finished 138 self.pollFor = select.POLLIN 139 self.listening = False 140 self.connecting = False 141 self.dontClose = False # set it to true if the socket should live after a disconnection. It won't be managed anymore by the connection manager. 142 self.inData = "" 143 self.outData = "" 144 self.protoIn = protoIn 145 self.protoOut = protoOut 146 self.semaOut = threading.Semaphore() 147 self.ssl = ssl 148 self.hold = False 149 self.readUntil = 0 150 self.halfClose = False ## close write stream after last bytes are written 151 self.brokenPipe = False ## write stream closed, write forbidden 152 self.fcntl_blocking = 0 ## default fcntl state
153
154 - def getOutData(self):
155 """ 156 @return: a copy of all data in the out buffer. Data has not been 157 sent and is not cleared at the end of the call. 158 159 Thread-safe. 160 """ 161 self.semaOut.acquire() 162 data = str(self.outData) 163 self.semaOut.release() 164 return data
165
166 - def removeOutData(self, howMany):
167 """ 168 Clear part of the out buffer. If no more data is present, the pollout 169 flag is removed. 170 171 Thread-safe. 172 173 @param howMany: how many bytes to remove from the beginning of the buffer 174 """ 175 176 self.semaOut.acquire() 177 self.outData = self.outData[howMany:] 178 if len(self.outData) == 0 : 179 self.pollFor &= ~select.POLLOUT 180 self.semaOut.release()
181
182 - def addOutData(self, data):
183 """ 184 Add some data at the end of the out buffer. The pollout flag is set. 185 If the socket is ready, data is immediatly sent at the next loop turn. 186 187 Thread-safe 188 189 @param data: a string of bytes 190 @return: the number of bytes in the buffer 191 """ 192 if self.brokenPipe: 193 raise IOError("Broken pipe (CID: %d)" % self.sid()) 194 self.semaOut.acquire() 195 self.pollFor |= select.POLLOUT 196 self.outData += data 197 result = len(self.outData) 198 self.semaOut.release() 199 self.cm._wakeup("Alerts poll that new data are ready to be sent on socket %d" % self.sid()) 200 return result
201 202 203 ## No synchro is needed for inputData since we are the only one to access this object
204 - def getInData(self):
205 """ 206 Get input data. Data stays in the buffer. 207 """ 208 return self.inData
209
210 - def clearInData(self, howMany=None):
211 """ 212 Clear input buffer. Used by protoIn callback. 213 @param howMany: How many bytes to remove. Default to None which means 214 all data. 215 """ 216 if howMany == None: 217 self.inData = "" 218 else : 219 self.inData = self.inData[howMany:]
220
221 - def addInData(self, data):
222 """ 223 Add data which has just be read. Only used by CommunicationManager. 224 """ 225 self.inData += data
226
227 - def sid(self):
228 """ 229 @return: the socket id. 230 @note: this is the file descriptor. 231 """ 232 try: 233 return self.socket.fileno() 234 except AttributeError: 235 ## in case the socket is in fact a fd... 236 return self.socket
237
238 - def enableSSL(self, serverSide):
239 """ 240 Enable the ssl wrapper around this socket 241 @param serverMode: True is this socket is a server socket 242 """ 243 import ssl 244 245 self.socket = ssl.wrap_socket( 246 self.socket, 247 server_side=serverSide, 248 certfile=self.ssl if type(self.ssl) == type("") else None, 249 ) 250 self.ssl = "server" if serverSide else "client"
251
252 - def disableSSL(self):
253 """ 254 Disable the ssl wrapper. 255 """ 256 self.socket = self.socket.unwrap() 257 self.ssl = False
258 259 GARBAGE = 0 260 """ Stream is corrupted or doesn't contain any valid data. The stream will be 261 flushed by the system """ 262 OK = 1 ## Packet has been read succesfully 263 """ Stream format is ok """ 264 UNDEFINED = 2 ## undefined 265 """ Stream format is ok, but nothing has to be reported """
266 267
268 -class CommunicationManager(object):
269 """ 270 Manage communication between differents parts of processes, via IO. 271 272 The communication manager is responsible for communication between 273 different processes, via sockets or other IO devices. It will emit various 274 events given what data is received on the created sockets. 275 276 The manager can be used in two modes: 277 - blocking: in this case, once created, you should call the main 278 method in order to enable it to work. Or you can call the loop 279 method repeteadly until it return false. In either case, these 280 methods will block. 281 282 - non blocking: you have nothing to do: once constructed, the 283 communication manager will do its work. 284 285 In order to interact with the communication manager, you have to 286 register some callback functions for the various emmited events. 287 288 Low level events are events focused on inputs and outputs operations. 289 You won't need most of them since they are only usefull for debugging 290 purposes. Included are low-level TCP connection process like "CONNECTING", 291 "CONNECTION CLOSED", and detailed errors. 292 293 High level events are more application level events. They notify you 294 for packets received formated by the protoIn callback (see below). 295 296 Moreover, you should provide a "protoIn" callback used for parsing the 297 stream looking for higher level messages, and a "protoOut" callback that 298 will receive these messages that have to be serialized to be carried by 299 the stream. For example, if you use Json/Yaml messages, you can cut the 300 stream based on various bounds and call the yaml/json parser. 301 Respectively, you will call the json/yaml formatter for your high level 302 messages. 303 """ 304
305 - def __init__(self, blocking=True, protoIn=None, protoOut=None):
306 """ 307 Create a communication manager. 308 309 @param blocking: a boolean indicating if the Communication Manager 310 should be running on its own thread (False), or if it will be called 311 via its main function (True) 312 313 @param protoIn: a callback which will cuts the stream 314 into packets. It take one parameter which will be an instance of 315 ConnectionHandle, and should return a tuple of two values: 316 - the constant OK, GARBAGE or UNDEFINED (see their docstrings) 317 - a list of messages, that will be thrown as high events (one event 318 per message). 319 This callback has to manage the in buffer of the connection handle 320 itself. It won't be clear until you do. 321 Default is a callback where each packet is the bytes received at 322 the last read call as a string. 323 324 @param protoOut: a callback wich will format a list of packet into 325 a string. It take one parameter : a list of packet to send and return 326 one string with the value converted. 327 Default is a callback calling str() on each object, concatenating them. 328 329 """ 330 331 self.lowLevelListeners = [] 332 self.highLevelListeners = [] 333 self.semaChs = threading.Semaphore() 334 self.semaChs.acquire() 335 self.chs = {} 336 self.stopOnExceptionFlag = False 337 self.stopOnKeyboardInterruptFlag = True 338 self.poll = select.poll() 339 self.running = True 340 self.connectionCount = 0 341 self.wakeupPipe = os.pipe() 342 self.timeouts = TimeoutsManagement() 343 self.poll.register(self.wakeupPipe[0], select.POLLIN) 344 if protoIn: 345 self.protoIn = protoIn 346 else: 347 self.protoIn = returnRaw 348 349 if protoOut: 350 self.protoOut = protoOut 351 else: 352 self.protoOut = returnIdentity 353 354 if not blocking : 355 self.comThread = threading.Thread(target = self.main) 356 self.comThread.start() 357 self.semaChs.release()
358
359 - def _wakeup(self, reason):
360 """ 361 Wake up select in case we have work. 362 @param reason: Whe should we wake-up? This is a string used only for 363 debug. 364 """ 365 self._throwLowLevelEvent((self.wakeupPipe[1], "WAKE UP", reason)) 366 os.write(self.wakeupPipe[1], "!") ## put anything in the pipe to wake up the poll call.
367
368 - def _managePollList(self):
369 """ 370 Manage all the connections in the polling object. 371 If an object polls for nothing, it is removed. 372 """ 373 chToRelease = [] 374 self.semaChs.acquire() 375 for ch in self.chs.values(): 376 if ch.pollFor == 0: 377 if ch.isInPollList: 378 self.poll.unregister(ch.socket) 379 ch.isInPollList = False 380 if not ch.hold: 381 chToRelease.append(ch) 382 383 if ch.pollFor != 0: 384 self.poll.register(ch.socket, ch.pollFor) 385 ch.isInPollList = True 386 self.semaChs.release() 387 for ch in chToRelease: 388 self._releaseSocket(ch)
389
390 - def _releaseSocket(self, ch) :
391 """ 392 Remove a socket from the ConnectionManager. After then, there is 393 no more trace of it here. 394 """ 395 self.semaChs.acquire() 396 del self.chs[ch.sid()] 397 self.semaChs.release() 398 if ch.dontClose: 399 if ch.fcntl_blocking != 0: 400 fstat = fcntl.fcntl(ch.socket, fcntl.F_GETFL) 401 fcntl.fcntl(ch.socket, fcntl.F_SETFL, fstat & ~ch.fcntl_blocking) 402 403 self._throwLowLevelEvent((ch.sid(), "FD REMOVED")) 404 self._throwHighLevelEvent(("file descriptor unmanaged", ch.sid())) 405 else: 406 sid = ch.sid() 407 try: 408 if ch.listening and type("") == type(ch.socket.getsockname()) and os.path.exists(ch.socket.getsockname()): 409 os.unlink(ch.socket.getsockname()) 410 411 ch.socket.close() 412 413 except AttributeError: 414 ## socket is a FD 415 os.close(ch.socket) 416 417 self._throwLowLevelEvent((sid, "CONNECTION CLOSED")) 418 self._throwHighLevelEvent(("connection closed", sid)) 419 420 self.connectionCount -= 1
421 422
423 - def _manageInData(self, ch):
424 """ 425 Manage incoming data. Throws events. 426 """ 427 if ch.protoIn is not None: 428 result, packetList = ch.protoIn(ch) 429 else: 430 result, packetList = self.protoIn(ch) 431 if result == ch.GARBAGE: 432 self._throwHighLevelEvent(("protocol error", ch.sid(), "packet malformed (%s)" % packetList)) 433 ch.clearInData() 434 elif result == ch.OK: 435 for packet in packetList : 436 self._throwHighLevelEvent(("packet", ch.sid(), packet))
437
438 - def _readSocket(self, ch):
439 """ 440 Try reading socket. Close it properly if 441 needed. 442 """ 443 sizeToRead = ch.readUntil if ch.readUntil != 0 else 4096 444 try: 445 if ch.ssl: 446 data = "" 447 data += ch.socket.read(sizeToRead) 448 try: 449 while len(data) != 0: 450 data += ch.socket.read(sizeToRead) 451 except socket.error, error: 452 if error[0] != errno.ENOENT: #EWOULDBLOCK 453 raise 454 else: 455 data = ch.socket.recv(sizeToRead) 456 except socket.error, error: 457 self._manageErroneousConnection(ch, error[0]) 458 return 459 except AttributeError: 460 ## socket is a fd 461 try: 462 data = os.read(ch.socket, sizeToRead) 463 except OSError, e: 464 if e[0] in (errno.EBADF, errno.EAGAIN): # Bad file descriptor 465 # Bad File descriptor ? 466 # When a fd is write only, POLLIN is kept up by the system... 467 # so hold the fd in order to not wait for POLLIN events... 468 self.hold(ch.socket) 469 # wake up only to trace why we hold this fd 470 self._wakeup("Hold fd %d, because it is write only and POLLIN should'nt be activated" % ch.sid()) 471 return 472 else: 473 raise 474 except IOError, e: 475 self.manageErroneousConnection(ch, e) 476 477 if ch.readUntil != 0 and len(data) == ch.readUntil and not ch.ssl: 478 self.hold(ch.sid()) 479 480 if data == "" : ## socket has been properly closed 481 self._manage_peer_eos(ch) 482 else : 483 self._throwLowLevelEvent((ch.sid(), "READ", str(data))) 484 ch.addInData(data) 485 self._manageInData(ch)
486
487 - def _writeSocket(self, ch):
488 """ 489 Try writing the socket. 490 """ 491 data = ch.getOutData() 492 try: 493 if ch.ssl: 494 sentLen = ch.socket.write(data) 495 else: 496 sentLen = ch.socket.send(data) 497 except AttributeError: 498 ## socket is a FD 499 sentLen = os.write(ch.socket, data) 500 except socket.error, error: 501 self._manageErroneousConnection(ch, error[0]) 502 return 503 ch.removeOutData(sentLen) 504 self._throwLowLevelEvent((ch.sid(), "WRITE", str(data[:sentLen]))) 505 506 if ch.halfClose and len(ch.getOutData()) == 0: 507 self.halfClose(ch.sid(), self.SENDING, deffered=False)
508
509 - def send(self, cid, data):
510 """ 511 Send some data over the network. 512 @parameter cid: the connection id 513 @parameter data: the data to send, will be converted with protoOut callback 514 @return: the number of bytes in the out buffer 515 """ 516 ch = self._cidToCh(cid) 517 protoOut = self.protoOut if ch.protoOut is None else ch.protoOut 518 return self.sendRaw(cid, protoOut(data))
519
520 - def sendRaw(self, cid, data):
521 """ 522 Send bytes on the wire. In fact, only add data in the out buffer. 523 @return: the number of bytes in the out buffer 524 """ 525 ch = self._cidToCh(cid) 526 return ch.addOutData(data)
527
528 - def _createConnectedSocket(self, ch):
529 """ 530 Given a connection handle which should have a pending connection, 531 create a new socket. 532 533 Called when someone is connecting to us. 534 535 @param ch: the listening connection handle 536 """ 537 self.connectionCount += 1 538 # returns (socket, address) 539 connection = ch.socket.accept() 540 nch = ConnectionHandle(self, socket=connection[0], protoIn=ch.protoIn, protoOut=ch.protoOut, ssl=ch.ssl) 541 nch.socket.setblocking(False) 542 self.chs[nch.sid()] = nch 543 self._throwLowLevelEvent((nch.sid(), "NEW CONNECTION", connection[1], nch.socket.getsockname())) 544 self._throwHighLevelEvent(("incoming connection", nch.sid())) 545 # we are now ready to accept incoming data 546 nch.pollFor |= select.POLLIN 547 if ch.ssl: 548 try: 549 nch.enableSSL(serverSide=True) 550 except socket.error, error: 551 nch.ssl = False 552 self._manageErroneousConnection(ch, error[0]) 553 return 554 except: 555 ch.ssl = False 556 ch.pollFor = 0 557 raise
558 559
560 - def _completeConnectedSocket(self, ch):
561 """ 562 An outcoming connection to another peer is completed. Do the final work. 563 """ 564 ch.pollFor |= select.POLLIN 565 ## POLLOUT was needed for connection completion, so it is already set 566 if len(ch.getOutData()) == 0: 567 ch.pollFor &= ~select.POLLOUT 568 if ch.ssl: 569 try: 570 ch.enableSSL(serverSide=False) 571 except socket.error, error: 572 ch.ssl = False 573 self._manageErroneousConnection(ch, error[0]) 574 return 575 except: 576 ch.ssl = False 577 ch.pollFor = 0 578 raise 579 580 ch.connecting = False 581 self._throwLowLevelEvent((ch.sid(), "CONNECTED", ch.socket.getpeername(), ch.socket.getsockname())) 582 self._throwHighLevelEvent(("outcoming connection", ch.sid()))
583 584
585 - def _manageErroneousConnection(self, ch, error=None):
586 """ 587 This connection return us an error. Try to manage it. 588 """ 589 if ch.pollFor == 0: # already managed 590 return 591 ## we ask the connection to be closed and forgotten 592 ch.pollFor = 0 593 594 if error == None: 595 try: 596 error = ch.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 597 except AttributeError: 598 ## error on a fd (which is not a socket)... 599 ## broken pipe ? 600 ## XXX this code has to be tested 601 error = errno.EPIPE 602 if error != 0: 603 self._throwLowLevelEvent((ch.sid(), "ERROR", error, errno.errorcode[error], os.strerror(error))) 604 if errno.errorcode[error] == "EPIPE": 605 ch.brokenPipe = True 606 self._throwHighLevelEvent(("broken pipe", ch.sid())) 607 else: 608 self._throwHighLevelEvent(("connection error", ch.sid(), os.strerror(error))) 609 else: 610 ## we should never be here... 611 print >>sys.stderr, "Error : POLLERR and no error." 612 raise RuntimeError("Error : POLLERR and no error.")
613
614 - def _managePollReturn(self, descs):
615 """ 616 Analyze what poll tries to explain us... 617 """ 618 for desc, event in descs : 619 if desc == self.wakeupPipe[0]: ## flush the pipe 620 self._throwLowLevelEvent((self.wakeupPipe[0], "WAKE UP CLEARED")) 621 os.read(self.wakeupPipe[0], 255) 622 continue 623 624 ch = self.chs[desc] 625 626 if event & select.POLLERR == select.POLLERR: 627 self._manageErroneousConnection(ch) 628 continue 629 630 if event & select.POLLOUT == select.POLLOUT: 631 if ch.connecting: 632 self._completeConnectedSocket(ch) 633 else: 634 self._writeSocket(ch) 635 if event & select.POLLIN == select.POLLIN: 636 if ch.listening: 637 self._createConnectedSocket(ch) 638 else: 639 self._readSocket(ch) 640 641 if event & select.POLLHUP == select.POLLHUP and event & select.POLLERR != select.POLLERR and event & select.POLLIN != select.POLLIN: 642 ## Read/Write POLLHUP The other end has shut down one direction. (man:socket(7)) 643 ## or FD like pipe has been closed on the other end 644 self._manage_peer_eos(ch)
645
646 - def _manage_peer_eos(self, ch):
647 """ 648 Some objets signals end of readable stream with different methods 649 - POLLIN / read of 0 bytes (classical EOF) 650 - POLLHUP (pipes, mainly) 651 652 Here is the code that shoudl be called in this situation. It removes 653 the POLLIN flag, which should close the connection if no more polled. 654 655 The events are then thrown, in order to let the main application 656 hold this connection for future use. Unhold operation is also 657 available (Eg for managing ^D on terms) 658 """ 659 ch.pollFor &= ~select.POLLIN ## nothing more to read. 660 self._throwLowLevelEvent((ch.sid(), "PEER HALF CLOSED")) 661 self._throwHighLevelEvent(("end of stream", ch.sid()))
662 663
664 - def loop(self):
665 """ 666 Do one loop. You can set a timemout if you want to avoid to be 667 blocked. 668 return True if loop should be called again 669 return False when cm wants to stop. 670 """ 671 self._managePollList() 672 if self.connectionCount == 0 and not self.running: 673 return False 674 try: 675 fdescs = self.poll.poll(self.timeouts.getNextTimeout()) 676 for e in self.timeouts.popPastEvents(): 677 self._throwLowLevelEvent((None, "TIMEOUT", e["handle"])) 678 self._throwHighLevelEvent(("timeout", e["payload"])) 679 if len(fdescs): 680 for fdescState in fdescs: 681 self._throwLowLevelEvent((fdescState[0], convertPollState(fdescState[1]))) 682 else: 683 self._throwLowLevelEvent((None, "LOOP")) 684 self._managePollReturn(fdescs) 685 except bdb.BdbQuit: 686 raise 687 except select.error, error: 688 if error[0] != errno.EINTR: 689 sys.excepthook(*sys.exc_info()) 690 self._throwLowLevelEvent((None, "EXCEPTION", sys.exc_info())) 691 692 except: 693 if sys.exc_info()[0] == KeyboardInterrupt: 694 if self.stopOnKeyboardInterruptFlag: 695 self.stop() 696 else: 697 self._throwHighLevelEvent(("keyboard interrupt",)) 698 self._throwLowLevelEvent((None, "EXCEPTION", sys.exc_info())) 699 else: 700 sys.excepthook(*sys.exc_info()) 701 self._throwLowLevelEvent((None, "EXCEPTION", sys.exc_info())) 702 return True
703
704 - def main(self):
705 """ 706 Call me if you want that I manage the main execution loop. In this 707 case, low lelvel events will be thrown at the start and at the stop. 708 """ 709 self._throwLowLevelEvent((None, "MAIN LOOP STARTED")) 710 while self.loop(): 711 pass 712 self._throwLowLevelEvent((None, "MAIN LOOP STOPPED"))
713
714 - def stop(self):
715 """ 716 Stop the connection manager, and close each socket if any. 717 """ 718 if self.connectionCount == 0 and not self.running: 719 return 720 self.running = False 721 self.semaChs.acquire() 722 for cid in self.chs.keys(): 723 try: 724 self.disconnect(cid) 725 except (ValueError, socket.error): 726 ## When we stop a socket, another one can be closed at the same 727 ## time returning us a ValueError 728 pass 729 self._wakeup("Alerts poll that manager is shutting down " + str(self.connectionCount) + " " + str([str(cid) + " " + convertPollState(ch.pollFor) for cid, ch in self.chs.items()])) 730 self.semaChs.release()
731
732 - def stopAtLastSocketClosed(self, value = True):
733 """ 734 Ask the communication to auto-stop if it doesn't manage connection 735 anymore. 736 """ 737 self.running = not value
738
739 - def stopOnException(self, value=True):
740 """ 741 When an exception raise during a high level event dispatch, stop the manager. 742 """ 743 self.stopOnExceptionFlag = value
744
745 - def stopOnKeyboardInterrupt(self, value = True):
746 """ 747 Stop on ctrl + c 748 """ 749 self.stopOnKeyboardInterruptFlag = value
750
751 - def listen(self, port=8417, ipV6=True, protoIn=None, protoOut=None, ssl=False):
752 """ 753 Create a listening socket. 754 755 @param protoIn: specific protocol callback (see CommunicationManager.__init__ doc) 756 @param protoOut: specific protocol callback (see CommunicationManager.__init__ doc) 757 @return: the id of the listening socket (only used to close it) 758 @param ssl: Enable SSL 759 """ 760 if ipV6: 761 listening_socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 762 else: 763 listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 764 ch = ConnectionHandle(self, listening_socket, protoIn, protoOut, ssl) 765 ch.socket.bind(("", port)) 766 return self._addListeningSocket(ch, port)
767
768 - def listenUnix(self, address, protoIn=None, protoOut=None):
769 """ 770 Create a listening socket, via unix socket. 771 772 @param protoIn: specific protocol callback (see CommunicationManager.__init__ doc) 773 @param protoOut: specific protocol callback (see CommunicationManager.__init__ doc) 774 @return: the id of the listening socket (only used to close it) 775 """ 776 listening_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 777 ch = ConnectionHandle(self, listening_socket, protoIn, protoOut, False) 778 ch.socket.bind(address) 779 return self._addListeningSocket(ch, address)
780 781
782 - def _addListeningSocket(self, ch, portOrAddress):
783 # Socket can be reuse immediatly after closing 784 ch.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) 785 ch.socket.listen(5) 786 self.semaChs.acquire() 787 self.chs[ch.sid()] = ch 788 self._throwLowLevelEvent((ch.sid(), "LISTENING", portOrAddress)) 789 self._throwHighLevelEvent(("listening", ch.sid())) 790 ch.pollFor = select.POLLIN 791 ch.listening = True 792 self.connectionCount += 1 793 self._wakeup("Registering new listening socket %d" % ch.sid()) 794 self.semaChs.release() 795 return ch.sid()
796
797 - def _cidToCh(self, cid):
798 try: 799 return self.chs[cid] 800 except KeyError: 801 raise ValueError("CID not in use: %d" % cid)
802
803 - def disconnect(self, cid):
804 """ 805 Disconnect a socket. Even when the connection is hold. The socket 806 is completly disconnected once the event is thrown, and not before! 807 808 @note: If ch.dontClose is set, just unmanage it. 809 """ 810 ch = self._cidToCh(cid) 811 ch.hold = False 812 ch.pollFor &= ~select.POLLIN 813 self._throwLowLevelEvent((ch.sid(), "DISCONNECTING")) 814 self._wakeup("Alerts poll that socket %d is no more active" % ch.sid())
815 816 close = disconnect 817
818 - def hold(self, cid, after=None):
819 """ 820 Avoid pooling and reading from this socket. The in buffer is then 821 no more filled. 822 823 In this case, if data are coming from the network (or the other part 824 of the pipe /whathever fd), it will stay in the kernel buffer, or 825 will block on the other side, until you unhold this connection. 826 827 @param after: Read "after" bytes before holding 828 """ 829 ch = self._cidToCh(cid) 830 if after is not None: 831 ch.readUntil = after 832 return 833 ch.hold = True 834 ch.pollFor &= ~select.POLLIN 835 self._throwLowLevelEvent((ch.sid(), "HOLD")) 836 self._wakeup("Alerts poll that socket %d is now being hold" % ch.sid())
837
838 - def unhold(self, cid):
839 """ 840 Resume hold state. 841 842 All data that are arrived from the network will be read. 843 """ 844 ch = self._cidToCh(cid) 845 ch.hold = False 846 ch.pollFor |= select.POLLIN 847 self._throwLowLevelEvent((ch.sid(), "UNHOLD")) 848 self._wakeup("Alerts poll that socket %d is no more being hold" % ch.sid())
849
850 - def _translateAddress(self, address, port, ipV6):
851 """ 852 Return the right network address given the address, port and familyt address 853 """ 854 result = socket.getaddrinfo(address, port, socket.AF_INET6 if ipV6 else socket.AF_INET, socket.SOCK_STREAM) 855 return result[0][4]
856
857 - def connect(self, address="ip6-localhost", port=8417, ipV6=True, protoIn=None, protoOut=None, ssl=False):
858 """ 859 Connect to a peer. 860 861 @param protoIn: specific protocol callback (see CommunicationManager.__init__ doc) 862 @param protoOut: specific protocol callback (see CommunicationManager.__init__ doc) 863 @return: the socket number, which is used as id or None if the 864 connection fails at this step (a low level event is sent). 865 """ 866 867 if ipV6: 868 local_socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 869 else: 870 local_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 871 ch = ConnectionHandle(self, local_socket, protoIn, protoOut, ssl) 872 ch.socket.setblocking(False) 873 try: 874 self._throwLowLevelEvent((None, "WILL CONNECT TO", address, port, "IPV6 == " + str(ipV6))) 875 translatedAddress = self._translateAddress(address, port, ipV6) 876 ch.socket.connect(translatedAddress) 877 except socket.error, error: 878 if error[0] != errno.EINPROGRESS : 879 self._throwLowLevelEvent((None, "EXCEPTION", sys.exc_info())) 880 return None 881 else: 882 print >>sys.stderr, "Error : EINPROGRESS not returned." 883 return self._addSocket(ch, translatedAddress)
884
885 - def connectUnix(self, address="/tmp", protoIn=None, protoOut=None):
886 """ 887 Connect to a peer, via an unix socket. 888 889 @param protoIn: specific protocol callback (see CommunicationManager.__init__ doc) 890 @param protoOut: specific protocol callback (see CommunicationManager.__init__ doc) 891 @return: the socket number, which is used as id or None if the 892 connection fails at this step (a low level event is sent). 893 """ 894 895 local_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 896 ch = ConnectionHandle(self, local_socket, protoIn, protoOut, False) 897 ch.socket.setblocking(False) 898 try: 899 self._throwLowLevelEvent((None, "WILL CONNECT TO", address)) 900 ch.socket.connect(address) 901 except socket.error, error: 902 if error[0] != errno.EINPROGRESS : 903 self._throwLowLevelEvent((None, "EXCEPTION", sys.exc_info())) 904 return None 905 return self._addSocket(ch, address)
906 907
908 - def _addSocket(self, ch, address):
909 self.semaChs.acquire() 910 self.chs[ch.sid()] = ch 911 self._throwLowLevelEvent((ch.sid(), "CONNECTING", address, ch.socket.getsockname())) 912 ch.connecting = True 913 ch.pollFor |= select.POLLOUT 914 self.connectionCount += 1 915 self._wakeup("Alerts poll that a new socket %d is waiting for connection completition" % ch.sid()) 916 self.semaChs.release() 917 return ch.sid()
918
919 - def _throwLowLevelEvent(self, event):
920 for listener in self.lowLevelListeners: 921 try: 922 listener(event) 923 except bdb.BdbQuit: 924 raise 925 except Exception: 926 sys.excepthook(*sys.exc_info())
927
928 - def _throwHighLevelEvent(self, event):
929 for listener in self.highLevelListeners: 930 try: 931 listener(event) 932 except bdb.BdbQuit: 933 raise 934 except Exception: 935 sys.excepthook(*sys.exc_info()) 936 if self.stopOnExceptionFlag: 937 self.stop()
938 939
940 - def registerLowLevelListener(self, listener):
941 """ All things that refers to socket connection 942 """ 943 self.lowLevelListeners += [listener]
944
945 - def registerHighLevelListener(self, listener):
946 """ All things that refers to application 947 """ 948 self.highLevelListeners += [listener]
949
950 - def unregisterLowLevelListener(self, listener):
951 """ 952 Remove listener. 953 """ 954 self.lowLevelListeners.remove(listener)
955
956 - def unregisterHighLevelListener(self, listener):
957 """ 958 Remove listener. 959 """ 960 self.highLevelListeners.remove(listener)
961 962
963 - def addFDescriptor(self, fd, protoIn=None, protoOut=None, dontClose=True, hold=False):
964 """ 965 Add a file descriptor in the system. 966 The connection manager has been designed to play with socket, so 967 events labels will be a bit strange for simple FDs but the 968 meaning will be the same... 969 970 @param fd: the file descriptor to listen to. Be sure not to call 971 read or write operations on fds who are not able to do that. (write 972 a read only file, for example) 973 @param protoIn: specific protocol callback (see CommunicationManager.__init__ doc) 974 @param protoOut: specific protocol callback (see CommunicationManager.__init__ doc) 975 @param dontClose: disconnect acts like removeFDescriptor if set to True, 976 True by default since we don't know how to close it properly 977 (Eg Python file objects) 978 @param hold: this FD is added hold: no data will be read from it. 979 980 @return: the fd, used as a cid 981 """ 982 if fd in self.chs.keys(): 983 raise ValueError("FD already managed: %i" % fd) 984 985 ch = ConnectionHandle(self, fd, protoIn, protoOut) 986 ch.dontClose = dontClose 987 988 fstat = fcntl.fcntl(fd, fcntl.F_GETFL) 989 ch.fcntl_blocking = fstat & os.O_NONBLOCK 990 fcntl.fcntl(fd, fcntl.F_SETFL, fstat | os.O_NONBLOCK) 991 992 self.semaChs.acquire() 993 self.chs[ch.sid()] = ch 994 self._throwLowLevelEvent((ch.sid(), "FD ADDED")) 995 self._throwHighLevelEvent(("file descriptor managed", ch.sid())) 996 self.connectionCount += 1 997 self._wakeup("Alerts poll that a new FD %d has been added" % ch.sid()) 998 if hold: 999 self.hold(fd) 1000 self.semaChs.release() 1001 return ch.sid()
1002
1003 - def removeFDescriptor(self, fd):
1004 """ 1005 Remove a fd. 1006 1007 The fd is completly removed once the out buffer is empty and the 1008 corresponding event thrown. 1009 1010 @param fd: the file descriptor 1011 @return: the file descriptor, as seen by the system (IE, the real 1012 socket object). 1013 """ 1014 ch = self._cidToCh(fd) 1015 ch.dontClose = True 1016 self.disconnect(fd) 1017 return ch.socket
1018
1019 - def setTimeout(self, timeout, payload=None):
1020 """ 1021 Throw a Timeout event when timeout ms has passed. 1022 @param timeout: timeout in seconds (as a float) 1023 @return: a timeout handle (as a int) 1024 """ 1025 r = self.timeouts.addTimeout(timeout, payload) 1026 self._throwLowLevelEvent((None, "TIMEOUT ADDED", r, timeout)) 1027 self._wakeup("New timout is set (%f s) %d" % (timeout, r)) 1028 return r
1029
1030 - def cancelTimeout(self, handle):
1031 """ 1032 Cancel a timeout 1033 @param handle: the previously given handle 1034 """ 1035 if self.timeouts.cancelTimeout(handle): 1036 self._throwLowLevelEvent((None, "TIMEOUT CANCELED", handle))
1037
1038 - def halfClose(self, cid, how, deffered=True):
1039 """ 1040 Close one half of the connection (the sending or the receiving part). 1041 1042 It works only for sockets. For other objects, just use the 1043 close/disconnect usual methods. 1044 1045 1046 how can be : 1047 SENDING: you won't be able to send anything else. Your peer will be 1048 noticed of an end of stream. 1049 Close will happen now or after the out buffer is sent if it is not 1050 empty and deffered is True (default). 1051 If deffered is False, close happens immediatly whatever the 1052 consequences. 1053 1054 RECEIVING: you won't be able to receive anything. Your peer won't be 1055 noticed until it tries to send something to you. It will get 1056 a broken pipe error. You should hold the connection. If you 1057 don't you will get an "end of stream" event. If you don't manage 1058 it, the connection will then be fully closed (unless there is 1059 still something in the out buffer waiting to be sent) 1060 1061 Theses are two class constants. 1062 """ 1063 ch = self._cidToCh(cid) 1064 if how & self.SENDING == self.SENDING and not deffered: 1065 ch.socket.shutdown(socket.SHUT_WR) 1066 self._throwLowLevelEvent((cid, "WRITE CLOSE REQUESTED")) 1067 1068 if how & self.SENDING == self.SENDING and deffered: 1069 ch.halfClose = True 1070 1071 if how & self.RECEIVING == self.RECEIVING: 1072 ch.socket.shutdown(socket.SHUT_RD) 1073 self._throwLowLevelEvent((cid, "READ CLOSE REQUESTED"))
1074 1075 SENDING = 1 1076 RECEIVING = 2 1077
1078 - def changeInProtocolHandler(self, cid, newProtoIn):
1079 """ 1080 Allows you to change the in protocol handler. In buffer is not cleared. 1081 """ 1082 ch = self._cidToCh(cid) 1083 ch.protoIn = newProtoIn 1084 self._throwLowLevelEvent((cid, "PROTOCOL CHANGE")) 1085 self._manageInData(ch)
1086 1087
1088 -class TimeoutsManagement(object):
1089 - def __init__(self):
1090 self.pendings = [] 1091 self.nextHandle = 0
1092
1093 - def addTimeout(self, timeout, payload):
1094 """ 1095 Add a new timeout in the list. 1096 @param timeout: time to wait, in seconds, as a float 1097 @param payload: some data which will be given back when event will 1098 arise. 1099 @return: a handle to manage the new event 1100 """ 1101 tt = time.time() + timeout 1102 event = { 1103 "time": tt, 1104 "payload": payload, 1105 "handle": self.nextHandle, 1106 } 1107 self.nextHandle += 1 1108 pos = 0 1109 1110 while pos < len(self.pendings) and self.pendings[pos]["time"] < tt: 1111 pos += 1 1112 self.pendings.insert(pos, event) 1113 return event["handle"]
1114
1115 - def cancelTimeout(self, handle):
1116 """ 1117 Remove the handle from the list 1118 """ 1119 for e in self.pendings: 1120 if e["handle"] == handle: 1121 self.pendings.remove(e) 1122 return True 1123 return False
1124 1125
1126 - def getNextTimeout(self):
1127 """ 1128 Return the next timeout to wait for or None if there is no reason to 1129 timeout. 1130 @return: timeout, in milliseconds 1131 """ 1132 if len(self.pendings) == 0: 1133 return None 1134 now = time.time() 1135 return (self.pendings[0]["time"] - now) * 1000
1136
1137 - def popPastEvents(self):
1138 """ 1139 Return, and remove, the events that have to be treated. 1140 """ 1141 result = [] 1142 now = time.time() 1143 for e in self.pendings: 1144 if e["time"] < now: 1145 result.append(e) 1146 self.pendings = self.pendings[len(result):] 1147 return result
1148 1149 1150 pollStates = ["POLLIN", "POLLPRI", "POLLOUT", "POLLERR", "POLLHUP", "POLLNVAL"] 1151 pollMap = {} #: @undocumented 1152 for state in pollStates : 1153 pollMap[eval("select." + state)] = state 1154 1155 # some cleanup is always welcome. 1156 del state 1157 del pollStates 1158
1159 -def convertPollState(state):
1160 """ 1161 Given a value of ORed select states (POLLIN | POLLERR...), return a 1162 string with the names of the states marged with spaces ("POLLIN POLLERR"). 1163 """ 1164 1165 readableState = "" 1166 mark = 1 1167 for i in range(8): 1168 if mark & state: 1169 readableState += pollMap[mark & state] + " " 1170 mark <<= 1 1171 return readableState.strip()
1172
1173 -def returnRaw(ch):
1174 """ 1175 Default protoIn callback. Return all data as string. 1176 """ 1177 data = ch.getInData() 1178 ch.clearInData() 1179 return (ch.OK, [data])
1180
1181 -def returnIdentity(data):
1182 """ 1183 Default protoOut callback. Return all data as string. 1184 """ 1185 return "".join([str(d) for d in data])
1186 1187
1188 -def allLevelListener(event):
1189 """ 1190 Basic debug callback to see what's happen'. 1191 """ 1192 pprint.pprint(event)
1193 #sys.excepthook(*event[2]) 1194 1195 ## Running the module should'n raise any exception. It should test most features 1196 ## but is not considered a real test case since received events are not 1197 ## checked. 1198 if __name__ == "__main__" : 1199 com = CommunicationManager(blocking=False) 1200 com.registerLowLevelListener(allLevelListener) 1201 com.registerHighLevelListener(allLevelListener) 1202 try : 1203 b = com.listen() 1204 assert com.connect() == 6 1205 1206 (pin, pout) = os.pipe() 1207 com.addFDescriptor(pout) 1208 com.sendRaw(pout, "plop 1") 1209 print "----->", os.read(pin, 255) 1210 com.removeFDescriptor(pout) 1211 com.addFDescriptor(pin) 1212 os.write(pout, "plop 2") 1213 1214 com.sendRaw(6, "plop 3") 1215 com.send(7, "plop 4") 1216 com.addFDescriptor(0) 1217 com.addFDescriptor(1) 1218 com.removeFDescriptor(1) 1219 os.close(pout) 1220 time.sleep(1) 1221 a = com.removeFDescriptor(7) 1222 com.removeFDescriptor(0) 1223 com.close(b) 1224 time.sleep(1) 1225 1226 assert com.listenUnix("/tmp/testComManager") == 5 1227 time.sleep(1) 1228 assert com.connectUnix("/tmp/testComManager") == 9 1229 time.sleep(1) 1230 com.sendRaw(9, "plopretour") 1231 com.sendRaw(10, "plop") 1232 time.sleep(1) 1233 com.close(5) 1234 com.close(10) 1235 1236 except: 1237 sys.excepthook(*sys.exc_info()) 1238 com.stop() 1239