Package CyberRail :: Package common :: Module communicationmanager :: Class CommunicationManager
[hide private]
[frames] | no frames]

Class CommunicationManager

source code


Manage communication between differents parts of processes, via IO.

The communication manager is responsible for communication between different processes, via sockets or other IO devices. It will emit various events given what data is received on the created sockets.

The manager can be used in two modes:

In order to interact with the communication manager, you have to register some callback functions for the various emmited events.

Low level events are events focused on inputs and outputs operations. You won't need most of them since they are only usefull for debugging purposes. Included are low-level TCP connection process like "CONNECTING", "CONNECTION CLOSED", and detailed errors.

High level events are more application level events. They notify you for packets received formated by the protoIn callback (see below).

Moreover, you should provide a "protoIn" callback used for parsing the stream looking for higher level messages, and a "protoOut" callback that will receive these messages that have to be serialized to be carried by the stream. For example, if you use Json/Yaml messages, you can cut the stream based on various bounds and call the yaml/json parser. Respectively, you will call the json/yaml formatter for your high level messages.

Instance Methods [hide private]
 
__init__(self, blocking=True, protoIn=None, protoOut=None)
Create a communication manager.
source code
 
_wakeup(self, reason)
Wake up select in case we have work.
source code
 
_managePollList(self)
Manage all the connections in the polling object.
source code
 
_releaseSocket(self, ch)
Remove a socket from the ConnectionManager.
source code
 
_manageInData(self, ch)
Manage incoming data.
source code
 
_readSocket(self, ch)
Try reading socket.
source code
 
_writeSocket(self, ch)
Try writing the socket.
source code
 
send(self, cid, data)
Send some data over the network.
source code
 
sendRaw(self, cid, data)
Send bytes on the wire.
source code
 
_createConnectedSocket(self, ch)
Given a connection handle which should have a pending connection, create a new socket.
source code
 
_completeConnectedSocket(self, ch)
An outcoming connection to another peer is completed.
source code
 
_manageErroneousConnection(self, ch, error=None)
This connection return us an error.
source code
 
_managePollReturn(self, descs)
Analyze what poll tries to explain us...
source code
 
_manage_peer_eos(self, ch)
Some objets signals end of readable stream with different methods - POLLIN / read of 0 bytes (classical EOF) - POLLHUP (pipes, mainly)
source code
 
loop(self)
Do one loop.
source code
 
main(self)
Call me if you want that I manage the main execution loop.
source code
 
stop(self)
Stop the connection manager, and close each socket if any.
source code
 
stopAtLastSocketClosed(self, value=True)
Ask the communication to auto-stop if it doesn't manage connection anymore.
source code
 
stopOnException(self, value=True)
When an exception raise during a high level event dispatch, stop the manager.
source code
 
stopOnKeyboardInterrupt(self, value=True)
Stop on ctrl + c
source code
 
listen(self, port=8417, ipV6=True, protoIn=None, protoOut=None, ssl=False)
Create a listening socket.
source code
 
listenUnix(self, address, protoIn=None, protoOut=None)
Create a listening socket, via unix socket.
source code
 
_addListeningSocket(self, ch, portOrAddress) source code
 
_cidToCh(self, cid) source code
 
disconnect(self, cid)
Disconnect a socket.
source code
 
close(self, cid)
Disconnect a socket.
source code
 
hold(self, cid, after=None)
Avoid pooling and reading from this socket.
source code
 
unhold(self, cid)
Resume hold state.
source code
 
_translateAddress(self, address, port, ipV6)
Return the right network address given the address, port and familyt address
source code
 
connect(self, address='ip6-localhost', port=8417, ipV6=True, protoIn=None, protoOut=None, ssl=False)
Connect to a peer.
source code
 
connectUnix(self, address='/tmp', protoIn=None, protoOut=None)
Connect to a peer, via an unix socket.
source code
 
_addSocket(self, ch, address) source code
 
_throwLowLevelEvent(self, event) source code
 
_throwHighLevelEvent(self, event) source code
 
registerLowLevelListener(self, listener)
All things that refers to socket connection
source code
 
registerHighLevelListener(self, listener)
All things that refers to application
source code
 
unregisterLowLevelListener(self, listener)
Remove listener.
source code
 
unregisterHighLevelListener(self, listener)
Remove listener.
source code
 
addFDescriptor(self, fd, protoIn=None, protoOut=None, dontClose=True, hold=False)
Add a file descriptor in the system.
source code
 
removeFDescriptor(self, fd)
Remove a fd.
source code
 
setTimeout(self, timeout, payload=None)
Throw a Timeout event when timeout ms has passed.
source code
 
cancelTimeout(self, handle)
Cancel a timeout
source code
 
halfClose(self, cid, how, deffered=True)
Close one half of the connection (the sending or the receiving part).
source code
 
changeInProtocolHandler(self, cid, newProtoIn)
Allows you to change the in protocol handler.
source code

Inherited from object: __delattr__, __format__, __getattribute__, __hash__, __new__, __reduce__, __reduce_ex__, __repr__, __setattr__, __sizeof__, __str__, __subclasshook__

Class Variables [hide private]
  SENDING = 1
  RECEIVING = 2
Properties [hide private]

Inherited from object: __class__

Method Details [hide private]

__init__(self, blocking=True, protoIn=None, protoOut=None)
(Constructor)

source code 

Create a communication manager.

Parameters:
  • blocking - a boolean indicating if the Communication Manager should be running on its own thread (False), or if it will be called via its main function (True)
  • protoIn - a callback which will cuts the stream into packets. It take one parameter which will be an instance of ConnectionHandle, and should return a tuple of two values:
    • the constant OK, GARBAGE or UNDEFINED (see their docstrings)
    • a list of messages, that will be thrown as high events (one event per message).

    This callback has to manage the in buffer of the connection handle itself. It won't be clear until you do. Default is a callback where each packet is the bytes received at the last read call as a string.

  • protoOut - a callback wich will format a list of packet into a string. It take one parameter : a list of packet to send and return one string with the value converted. Default is a callback calling str() on each object, concatenating them.
Overrides: object.__init__

_wakeup(self, reason)

source code 

Wake up select in case we have work.

Parameters:
  • reason - Whe should we wake-up? This is a string used only for debug.

_managePollList(self)

source code 

Manage all the connections in the polling object. If an object polls for nothing, it is removed.

_releaseSocket(self, ch)

source code 

Remove a socket from the ConnectionManager. After then, there is no more trace of it here.

_manageInData(self, ch)

source code 

Manage incoming data. Throws events.

_readSocket(self, ch)

source code 

Try reading socket. Close it properly if needed.

send(self, cid, data)

source code 

Send some data over the network.

Parameters:
  • cid - the connection id
  • data - the data to send, will be converted with protoOut callback
Returns:
the number of bytes in the out buffer

sendRaw(self, cid, data)

source code 

Send bytes on the wire. In fact, only add data in the out buffer.

Returns:
the number of bytes in the out buffer

_createConnectedSocket(self, ch)

source code 

Given a connection handle which should have a pending connection, create a new socket.

Called when someone is connecting to us.

Parameters:
  • ch - the listening connection handle

_completeConnectedSocket(self, ch)

source code 

An outcoming connection to another peer is completed. Do the final work.

_manageErroneousConnection(self, ch, error=None)

source code 

This connection return us an error. Try to manage it.

_manage_peer_eos(self, ch)

source code 

Some objets signals end of readable stream with different methods
- POLLIN / read of 0 bytes (classical EOF)
- POLLHUP (pipes, mainly)

Here is the code that shoudl be called in this situation. It removes
the POLLIN flag, which should close the connection if no more polled.

The events are then thrown, in order to let the main application
hold this connection for future use. Unhold operation is also
available (Eg for managing ^D on terms)

loop(self)

source code 

Do one loop. You can set a timemout if you want to avoid to be
    blocked.
return True if loop should be called again
return False when cm wants to stop.

main(self)

source code 

Call me if you want that I manage the main execution loop. In this case, low lelvel events will be thrown at the start and at the stop.

listen(self, port=8417, ipV6=True, protoIn=None, protoOut=None, ssl=False)

source code 

Create a listening socket.

Parameters:
  • protoIn - specific protocol callback (see CommunicationManager.__init__ doc)
  • protoOut - specific protocol callback (see CommunicationManager.__init__ doc)
  • ssl - Enable SSL
Returns:
the id of the listening socket (only used to close it)

listenUnix(self, address, protoIn=None, protoOut=None)

source code 

Create a listening socket, via unix socket.

Parameters:
  • protoIn - specific protocol callback (see CommunicationManager.__init__ doc)
  • protoOut - specific protocol callback (see CommunicationManager.__init__ doc)
Returns:
the id of the listening socket (only used to close it)

disconnect(self, cid)

source code 

Disconnect a socket. Even when the connection is hold. The socket is completly disconnected once the event is thrown, and not before!

Note: If ch.dontClose is set, just unmanage it.

close(self, cid)

source code 

Disconnect a socket. Even when the connection is hold. The socket is completly disconnected once the event is thrown, and not before!

Note: If ch.dontClose is set, just unmanage it.

hold(self, cid, after=None)

source code 

Avoid pooling and reading from this socket. The in buffer is then no more filled.

In this case, if data are coming from the network (or the other part of the pipe /whathever fd), it will stay in the kernel buffer, or will block on the other side, until you unhold this connection.

Parameters:
  • after - Read "after" bytes before holding

unhold(self, cid)

source code 

Resume hold state.

All data that are arrived from the network will be read.

connect(self, address='ip6-localhost', port=8417, ipV6=True, protoIn=None, protoOut=None, ssl=False)

source code 

Connect to a peer.

Parameters:
  • protoIn - specific protocol callback (see CommunicationManager.__init__ doc)
  • protoOut - specific protocol callback (see CommunicationManager.__init__ doc)
Returns:
the socket number, which is used as id or None if the connection fails at this step (a low level event is sent).

connectUnix(self, address='/tmp', protoIn=None, protoOut=None)

source code 

Connect to a peer, via an unix socket.

Parameters:
  • protoIn - specific protocol callback (see CommunicationManager.__init__ doc)
  • protoOut - specific protocol callback (see CommunicationManager.__init__ doc)
Returns:
the socket number, which is used as id or None if the connection fails at this step (a low level event is sent).

addFDescriptor(self, fd, protoIn=None, protoOut=None, dontClose=True, hold=False)

source code 

Add a file descriptor in the system. The connection manager has been designed to play with socket, so events labels will be a bit strange for simple FDs but the meaning will be the same...

Parameters:
  • fd - the file descriptor to listen to. Be sure not to call read or write operations on fds who are not able to do that. (write a read only file, for example)
  • protoIn - specific protocol callback (see CommunicationManager.__init__ doc)
  • protoOut - specific protocol callback (see CommunicationManager.__init__ doc)
  • dontClose - disconnect acts like removeFDescriptor if set to True, True by default since we don't know how to close it properly (Eg Python file objects)
  • hold - this FD is added hold: no data will be read from it.
Returns:
the fd, used as a cid

removeFDescriptor(self, fd)

source code 

Remove a fd.

The fd is completly removed once the out buffer is empty and the corresponding event thrown.

Parameters:
  • fd - the file descriptor
Returns:
the file descriptor, as seen by the system (IE, the real socket object).

setTimeout(self, timeout, payload=None)

source code 

Throw a Timeout event when timeout ms has passed.

Parameters:
  • timeout - timeout in seconds (as a float)
Returns:
a timeout handle (as a int)

cancelTimeout(self, handle)

source code 

Cancel a timeout

Parameters:
  • handle - the previously given handle

halfClose(self, cid, how, deffered=True)

source code 

Close one half of the connection (the sending or the receiving part).

It works only for sockets. For other objects, just use the
close/disconnect usual methods.


how can be :
SENDING: you won't be able to send anything else. Your peer will be
    noticed of an end of stream.
    Close will happen now or after the out buffer is sent if it is not
    empty and deffered is True (default).
    If deffered is False, close happens immediatly whatever the
    consequences.

RECEIVING: you won't be able to receive anything. Your peer won't be
    noticed until it tries to send something to you. It will get
    a broken pipe error. You should hold the connection. If you
    don't you will get an "end of stream" event. If you don't manage
    it, the connection will then be fully closed (unless there is
    still something in the out buffer waiting to be sent)

Theses are two class constants.

changeInProtocolHandler(self, cid, newProtoIn)

source code 

Allows you to change the in protocol handler. In buffer is not cleared.