Package CyberRail :: Package common :: Module httpclient
[hide private]
[frames] | no frames]

Source Code for Module CyberRail.common.httpclient

   1  # -*- coding: utf-8 -*- 
   2  """ 
   3      An asynchronous HTTP 1.1 client, with stream management. 
   4       
   5      This module allows you to send HTTP 1.1 requests asynchronously: this way 
   6      you can, while the request is managed, do other stuffs. In order for this to 
   7      work, you'll have to use the communication manager module, that manage 
   8      asynchronous operations on sockets and file descriptors. 
   9       
  10      See Request class for details. 
  11       
  12      If you don't plan to use a specific instance of ConnectionsPool, please 
  13      register a communication manager before use (see setCommunicationManager()). 
  14       
  15       
  16      Created on 21 juil. 09 
  17      @author: Emmanuel Coirier 
  18  """ 
  19  ## XXX Add missing state "writing response" when a response should ba write 
  20  ## to a file before returning the request. 
  21   
  22  _connectionsPool = None 
  23  """ 
  24      The global connection pool. Instanciated on the fly. 
  25  """ 
  26  _communicationManager = None 
  27  """ 
  28      The communication manager used by the connection pool. 
  29  """ 
  30   
  31  DELAYED=-1 
  32  """ 
  33      The request body will be defined later. 
  34  """ 
  35  CALLBACK=-2 
  36  """ 
  37      The response body will be transmited via callback 
  38  """ 
  39  STORE=-3 
  40  """ 
  41      The response body will be stored in the response 
  42  """ 
  43   
  44  import re 
  45  import os 
  46  from urllib import unquote_plus 
  47  import base64 
  48   
  49   
50 -def _getConnectionPool():
51 """ 52 Return the module's connection pool. 53 Raise an error if a communication manager has not been set. 54 """ 55 global _connectionsPool, _communicationManager 56 57 if _connectionsPool is not None: 58 return _connectionsPool 59 if _communicationManager is None: 60 raise RuntimeError("Communication Manager has not been defined") 61 62 _connectionsPool = ConnectionsPool(_communicationManager) 63 return _connectionsPool
64
65 -def setCommunicationManager(cm):
66 """ 67 Set the communication manager that will be used to create TCP 68 connections. 69 70 You should call this function before using other part of this module. 71 You should call this function only one time. 72 """ 73 global _communicationManager 74 _communicationManager = cm
75
76 -class HTTPError(Exception):
77 pass
78
79 -class Connection(object):
80 """ 81 A connection to a server. You shouldn't use this class directly. 82 83 This class is mainly a storage class for connections informations 84 """ 85
86 - def __init__(self, host, port, cm, cid):
87 """ 88 @param host: the host to connect to 89 @param port: the port to connect to 90 @param cm: reference to the communication manager 91 @param cid: the socket to use 92 """ 93 self.host = host 94 self.port = port 95 self.cm = cm 96 self.cid = cid 97 self.callback = None 98 self.errorCallback = None 99 self.timeoutHandle = None
100
101 - def associate(self, callback, errorCallback):
102 """ 103 Associate this connection with the caller. 104 @param callback: will be use when normal events are received 105 @param errorCallback: will be use when erroneous events are received 106 """ 107 self.callback = callback 108 self.errorCallback = errorCallback 109 if self.timeoutHandle is not None: 110 self.cm.cancelTimeout(self.timeoutHandle)
111
112 - def free(self, newTimeout):
113 """ 114 Release this connection. 115 @param newTimeout: time, in seconds, to wait before closing the 116 connection 117 """ 118 self.callback = None 119 self.errorCallback = None 120 self.timeoutHandle = self.cm.setTimeout(newTimeout, self.cid)
121
122 - def isFree(self):
123 """ 124 @return: True if this connection is free. 125 """ 126 return self.callback is None
127
128 -class Headers(dict):
129 130 # def __init__(self): 131 # dict.__init__(self) 132
133 - def __getitem__(self, key):
134 return dict.__getitem__(self, key.lower())
135
136 - def __setitem__(self, key, value):
137 dict.__setitem__(self, key.lower(), value)
138
139 - def __delitem__(self, key):
140 dict.__delitem__(self, key.lower())
141 142
143 -class ConnectionsPool(object):
144 """ 145 Manage connections to hosts requested by the http client. The manager 146 is responsible to keep alive connections, in order to reuse them when 147 possible. 148 149 It also keeps track of the number of connection per host/port, in order 150 to conforme to RFC 2616 chapt 8.1.4 last §. It is so able to hald a 151 request until a connection has been maid free (either by closing it or 152 by releasing it) 153 154 """
155 - def __init__(self, communicationManager):
156 """ 157 Construct an HTTP Connection Pool. 158 @param communicationManager: the communicationManager that will 159 gives us the TCP connections. 160 """ 161 self.cm = communicationManager 162 163 self.consByKey = {} 164 self.consByCid = {} 165 self.pendingRequests = {} 166 167 self.maxRequestPerHost = 2 168 self.versions = {} 169 self.cm.registerHighLevelListener(self._onEvent)
170 171
172 - def _onEvent(self, event):
173 """ 174 Called when an event is received from the communication manager. 175 Dispatch this event to the proper handler given the event. 176 """ 177 eventType = event[0] 178 cid = event[1] 179 180 if cid not in self.consByCid.keys(): 181 return 182 183 if eventType == "connection error": 184 self._manageErroneousConnection(cid, event[2]) 185 elif eventType == "connection closed": 186 self._removeClosedConnection(cid) 187 elif eventType == "outcoming connection": 188 self._manageReadyConnection(cid) 189 elif eventType == "packet": 190 pass 191 elif eventType == "timeout": 192 self.cm.close(cid) 193 else: 194 raise RuntimeError("Event not managed: " + eventType)
195
196 - def _manageErroneousConnection(self, cid, msg):
197 """ 198 Error event management. Call the error callback. 199 """ 200 con = self.consByCid[cid] 201 if con.errorCallback is not None: 202 con.errorCallback(msg)
203
204 - def _manageReadyConnection(self, cid):
205 """ 206 Connection is ready to be used (when just opened) 207 """ 208 con = self.consByCid[cid] 209 con.callback(con)
210
211 - def _removeClosedConnection(self, cid):
212 """ 213 Connection has been closed, so remove it from the pool. 214 """ 215 if cid not in self.consByCid.keys(): 216 return 217 con = self.consByCid[cid] 218 del self.consByCid[cid] 219 220 key = self.getKey(con.host, con.port) 221 self.consByKey[key].remove(con) 222 223 self._managePendingRequests()
224 225 226
227 - def getKey(self, host, port):
228 """ 229 Return an unique key, given an host and a port 230 """ 231 return (host, port)
232
233 - def requestConnection(self, host, port, ssl, callback, errorCallback):
234 """ 235 Ask for the pool manager to enable a connection suitable for an 236 HTTP Request. This call is asynchronous. If possible, another TCP 237 connection will be used, or if possible, a new connection will be 238 established, or the request will be kept hold if all the connection 239 for this host are busy. 240 241 @param host: the host to connect to 242 @param port: the port to connect to 243 @param callback: called when the connection becomes available 244 @param errorCallback: called when IO errors arise (not HTTP errors) 245 """ 246 247 key = self.getKey(host, port) 248 249 if key in self.pendingRequests.keys(): 250 self.pendingRequests[key].append((host, port, ssl, callback, errorCallback)) 251 else: 252 self.pendingRequests[key] = [(host, port, ssl, callback, errorCallback)] 253 254 self._managePendingRequests()
255 256
257 - def _managePendingRequests(self):
258 """ 259 Manage pending request given the actual state of the connection 260 pool. 261 """ 262 for key in self.pendingRequests.keys(): 263 conInfo = self.pendingRequests[key][0] 264 host, port, ssl, cb, cbe = conInfo 265 266 if key not in self.consByKey.keys() or len(self.consByKey[key]) < 1: 267 # in this case, there is pending request, but no connection at all 268 self.consByKey[key] = [] 269 270 271 cons = self.consByKey[key] 272 # look for existing connections, free or busy... 273 ok = False 274 for con in cons: 275 if con.isFree(): 276 con.associate(cb, cbe) 277 cb(con) 278 ok = True 279 break 280 281 # all existing connections are busy, perhaps can we try to create a new one 282 if not ok and len(cons) < self.maxRequestPerHost: 283 try: 284 con = self._createConnection(host, port, ssl) 285 con.associate(cb, cbe) 286 cons.append(con) 287 except HTTPError, e: 288 cbe(e) 289 ok = True 290 291 if ok: 292 self.pendingRequests[key].pop(0) 293 294 if len(self.pendingRequests[key]) == 0: 295 del self.pendingRequests[key]
296 297
298 - def _createConnection(self, host, port, ssl):
299 """ 300 Create a new TCP connection. 301 """ 302 cid = self.cm.connect(host, port, ipV6=False, ssl=ssl) 303 if cid is None: 304 raise HTTPError("Unable to connect to %s:%s" % (host, port)) 305 con = Connection(host, port, self.cm, cid) 306 self.consByCid[cid] = con 307 return con
308 309
310 - def releaseValidConnection(self, con, newTimeout):
311 """ 312 Release the connection. 313 Do not call this if the connection has been closed, since the cid 314 is now invalid (and has perhaps already been set to another connection) 315 316 @note: The connection Pool has its own mechanism to handle closed 317 connections. 318 """ 319 con.free(newTimeout) 320 self._managePendingRequests()
321
322 - def setHostVersion(self, host, port, version):
323 """ 324 Set the HTTP version of the server software. 325 """ 326 self.versions[self.getKey(host, port)] = version
327
328 - def getHostVersion(self, host, port):
329 """ 330 Get the HTTP version of the server software. 331 """ 332 try: 333 return self.versions[self.getKey(host, port)] 334 except KeyError: 335 return None
336
337 -class Request(object):
338 """ 339 A request is all the data needed by an HTTP transaction to complete. 340 341 In order to complete an HTTP transaction, you have to build a Request, 342 configure it with yours needs, send it, eventually feed it with 343 a body if it is needed, and read the response (if any). 344 345 - build a request: call the constructor of this class with the 346 appropriate parameters 347 - configure it: use the various methods to set aspect of the request, 348 like HTTP method, request body method, etc. 349 - send it: call send with a callback, that will be used when request 350 needs your attention 351 - feed it (optionnal): depending of the request body method, see below 352 - read the response: depending of the response body method, see below 353 354 A request contains headers. You can set whatever headers you want, 355 but be aware that HTTP only contains a delimited set of headers. In 356 doubt, do not set anything. Requests without user-set-headers should be 357 valid. 358 359 A request can contain a body if you need it. While preparing the request, 360 you can : 361 - set the body with a string or bytes 362 - set the body with a stream (from the communication manager) 363 - set the body while the request is sent (either with a defined size or no) 364 See setBody() for more details. 365 366 Once your request is ready, call send(). You will receive the answer via 367 a callback. 368 369 @note: A request will pass through these differents steps. Each transition 370 can lead to the error state. 371 - new 372 This is the initial state where you construct your request. At no 373 time network is involved at this step. 374 375 When the request is ready, call send() 376 377 The next state is "constructing". 378 379 - constructing 380 In this state, the request will collect all needed informations to 381 build itself. It can stay a long time in this state if the body 382 of the request is in a stream and we need to buffer it in order 383 to compute its size. 384 385 If the body is a stream, the server is unknow or know to be 386 something else than HTTP/1.1, and the Content-Length header is not 387 set, the stream is read until EOF, data becomes a string, and the 388 size is now computed and available. 389 390 The next state is "waitingforconnection". 391 392 - waitingforconnection 393 When the request is ready a connection is created or reused, or if 394 not possible, the request is hold until a connection becomes 395 available. 396 397 Once the connection is ready, the start line and headers are sent and 398 the next state is called. 399 400 If there is no body to sent, the next step is "waitingforresponse". 401 402 If a body should be sent, and continue mechanism has not been disabled, 403 the next state is "waitingforpreresponse", else "sending". 404 405 - waitingforpreresponse (optionnal) 406 407 If remote server is know to be an HTTP/1.1 server and the continue 408 mechanism has not been disabled, the request will hold until the 409 server accepts the body of this request. In this case, the next state 410 is "sending". 411 412 In case of error, the next state is either "receiving" or "finish", 413 depending of the pre response. Body is then not sent. 414 415 - sending 416 417 If a body has to be sent, we are now in this step, and given the 418 preparation of the request, we either send the content or wait 419 for content to be sent, from the caller (you). 420 421 If the request body is a string, it is sent. 422 If the request body is a stream, it is read and sent until 423 Content-Length bytes have been sent. 424 If the request body is a DELAYED, the callback will be called and 425 you'll have to furnish Content-Length bytes in order to go to the 426 next step. 427 428 Once the request body has been sent, we enter the "waitingforresponse" 429 state. 430 431 - waitingforresponse 432 433 The request has been sent, we wait for the response from the remote 434 server. Waiting for the response means waiting for the response line 435 and all the headers. 436 437 The next state is either "receiving" or "finish", depending of the 438 response (and the inclusion of a response body). 439 440 - receiving 441 442 A body is being sent. The request is waiting for you to read the body. 443 444 If the response body is a stream, the stream will be written with all 445 the data read from the response body. 446 447 If the response body is STORE, the data will be kept in the response 448 449 If the data is CALLBACK, the callback will be called for each 450 received chunk of data. 451 452 Next state is "finish" 453 454 - finish 455 456 The request is finished: the TCP connection has been released (stored 457 back in the pool or closed). This is a final state. 458 459 You don't have to wait for this step to create another request. But 460 each request can be used only one time. 461 462 - error 463 464 An unexpected IO error has occured (IE the TCP connection has been 465 lost in the middle of the transaction). This is a final state. 466 467 You shouldn't try to contact the same host before knowing why there 468 is this error. 469 """ 470 471
472 - def __init__(self, url=None, method="GET", headers=None, credentials=None, body=None):
473 """ 474 Create a request 475 @param url: The url to call (the same as setUrl) 476 @param method: HTTP method to use (the same as setMethod) 477 @param headers: A dict containing specific headers (like setHeaders) 478 @param credentials: credentials that could be used for this request (not used) 479 @param body: body that should be used dor this request (the same as setBody) 480 """ 481 self.headers = Headers({ 482 "User-Agent": "cmhttpclient/1.0" 483 }) 484 485 if headers: 486 self.headers = self.headers.update(headers) 487 self.body = body 488 self.method = method 489 if url: 490 self.setUrl(url) 491 if credentials is not None: 492 self.credentials = credentials 493 self.state = "new" 494 self.version = "HTTP/1.1" 495 self.con = None 496 self.responseBodyMethod = STORE 497 self.freeConTimeout = 5 498 self.hints = { 499 "Connection": "open" 500 }
501 502 503 URI = re.compile(r"^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?")
504 - def setUrl(self, url):
505 """ 506 Set various elements of the request given the value of the URL: 507 - host 508 - port 509 - path 510 - credentials 511 """ 512 513 groups = self.URI.match(url).groups() 514 scheme, authority, self.path, self.query, self.fragment = groups[1], groups[3], groups[4], groups[6], groups[8] 515 self.ssl = scheme == "https" 516 if len(self.path) == 0: 517 self.path = "/" 518 519 if self.query is not None: 520 self.path += "?" + self.query 521 522 authorityinfo = authority.split("@") 523 if len(authorityinfo) < 2: 524 credentials = None 525 hostinfo = authorityinfo[0] 526 else: 527 credentials, hostinfo = authorityinfo 528 529 if credentials is not None: 530 credentials = credentials.split(":", 1) 531 self.credentials = [unquote_plus(c) for c in credentials] 532 533 hostport = hostinfo.split(":") 534 if len(hostport) < 2: 535 self.host = hostport[0] 536 self.port = 443 if self.ssl else 80 537 else: 538 self.host, self.port = hostport 539 540 self.setHeader("Host", hostinfo) 541 self.url = url
542
543 - def setMethod(self, method):
544 """ 545 Set the HTTP method, like "GET", "POST", "HEAD", etc. 546 547 You could put any method you want, but you should know that HTTP 548 only know a small predefined set of methods. 549 """ 550 self.method = method
551
552 - def setHint(self, key, value):
553 """ 554 Set some optionnal features of the request: 555 556 - connection: 557 - "open": the connection will stay open after the request, if 558 possible 559 - "close": the connection will get closed after the request 560 - preresponse: 561 - True: the 100-continue mechanism will be used if possible 562 - version: 563 - "poll": the server will get polled for its version before the 564 request 565 """ 566 self.hints[key] = value
567
568 - def setBody(self, data):
569 """ 570 There is 4 ways to set the body. 571 572 - data is None (default) 573 No body will be sent. 574 575 - data is a string or bytes. 576 There is no restriction for this. Will be sent without interaction 577 with the caller. 578 579 - data is a stream (actually a tuple with a communication manager 580 and a cid) 581 582 In this case, all the data in the in-buffer will be sent until 583 eof, or if a Content-length header has been sent, until n bytes have 584 been read. 585 586 Depending the version of HTTP, requirements will not be the same : 587 588 If HTTP version is 1.0 and no Content-length is set, the 589 stream will be entirely buffered to know the size of the body. 590 591 If HTTP version is 1.1 and no Content-length is set, the 592 stream will be read until EOF, then the body will finish. 593 594 If Content-length is set, the stream will be read until the body size 595 is reach. If the stream is not large enough, the connection will be 596 aborted, and an exception will rise. You should avoid setting 597 Content-length unless you know what you are doing. If server is 598 know to be HTTP/1.1, the Content-length header will be discarded 599 from the request. 600 601 It is wise to pass the stream in hold state, in order not to fill 602 too much memory. 603 604 - data is DELAYED (a module global variable) 605 606 You will be able to set the data in the "sending" request state. 607 608 You could send as much data as you want with addBodyData(). HTTP 609 server version must be 1.1 in order for this to work finely. 610 611 You can use this with HTTP 1.0, but you must set the 612 Content-length header before calling send(). 613 In this case, finishBodyData() becomes unnecessary and sholdn't 614 be used. 615 The connection will be stalled until you add the right number 616 of bytes. 617 """ 618 self.body = data
619
620 - def setResponseBodyMethod(self, method):
621 """ 622 There is 4 ways to set the method the body will be retreived. 623 624 - method is None 625 Body, if any, is discarded. 626 627 - method is STORE (default) 628 All body will be read and stored in the Response object. The 629 callback of the send method will be called once the body 630 is completly retrieved. 631 632 - method is CALLBACK 633 The callback specified in the send method will be called each 634 time a part of the response is received. The response buffer 635 will contain only the received part. 636 637 - method is a stream (actually a tuple with a communication manager 638 and a cid) 639 The body response will be written on this stream, then the 640 callback will be called. The Response won't contain any body 641 response. 642 """ 643 644 self.responseBodyMethod = method 645 646 if self.responseBodyMethod == STORE: 647 pass 648 elif self.responseBodyMethod == CALLBACK: 649 pass 650 elif type(self.responseBodyMethod) == type(()): 651 cm, cid = self.responseBodyMethod 652 if type(cm) != communicationmanager.CommunicationManager: 653 raise HTTPError("CommunicationManager must be the first tuple element.") 654 if type(cid) != type(1): 655 raise HTTPError("File Descriptor, as int, must be the second tuple element.") 656 elif self.responseBodyMethod == None: 657 pass 658 else: 659 raise HTTPError("ResponseBodyMethod not handled: %s" % str(method))
660 661
662 - def setHeader(self, key, value):
663 self.headers[key] = value
664
665 - def getHeaders(self):
666 return self.headers
667
668 - def send(self, callback, connectionsPool=None):
669 """ 670 Send the request. 671 If the request body has been set to DELAYED (and server has not 672 refused it), callback will be called when reaching the "sending" 673 state. 674 If the response body has been set to CALLBACK, this callback 675 will be called when reaching "receiving" state. 676 Else, callback will be called at the "finish" state. 677 678 It is not wise to change some part of the request after calling 679 this method. 680 681 @param callback: a callable that will be called whenever a caller 682 interaction is needed. See setRequestBody() and setResponseBodyMethod 683 for complete protocol with this callback. This one takes a response 684 as first argument. 685 @param connectionsPool: if None, the global connectionsPool will be 686 used (default case). 687 """ 688 689 if self.state != "new": 690 raise HTTPError("Request is not in the new state.") 691 692 self.state = "constructing" 693 self.callback = callback 694 695 self.conPool = connectionsPool if connectionsPool is not None else _getConnectionPool() 696 697 if self.hints.get("version", None) == "poll" and self._getRemoteVersion() is None: 698 req = Request(self.url, method="HEAD", body=None) 699 req.send(self._pollFinished, self.conPool) 700 else: 701 self._prepareBody()
702
703 - def _pollFinished(self, response):
704 self._prepareBody()
705
707 if "preresponse" not in self.hints: 708 return False 709 710 if not self.hints["preresponse"]: 711 return False 712 713 if self.body is None: 714 return False 715 716 if self._getRemoteVersion() != "HTTP/1.1": 717 return False 718 719 return True
720
721 - def _onEvent(self, event):
722 """ 723 Event from the communication manager. 724 """ 725 if event[1] != self.con.cid: 726 return 727 eventType = event[0] 728 if eventType == "connection closed": 729 self._onConnectionClosed() 730 elif eventType == "packet": 731 self._onData(event[2])
732 733
734 - def _prepareBody(self):
735 """ 736 Fix Content-length, transfert encoding, and so on given various 737 parameters in the requests 738 """ 739 ### XXX Headers are case-insentive 740 if self.body == None: 741 headersToDelete = ["Transfer-Encoding", "Content-Length"] 742 for header in headersToDelete: 743 try: 744 del self.headers[header] 745 except KeyError: 746 pass 747 self._beginConnection() 748 return 749 750 if type(self.body) == type(""): 751 length = len(self.body) 752 self.headers["Content-Length"] = str(length) 753 try: 754 del self.headers["Transfer-Encoding"] 755 except KeyError: 756 pass 757 self._beginConnection() 758 return 759 760 if self.body == DELAYED: 761 if self._getRemoteVersion() == "HTTP/1.1": 762 try: 763 del self.headers["Content-Length"] 764 except KeyError: 765 pass 766 self.headers["Transfer-Encoding"] = "chunked" 767 else: 768 if "Content-Length" not in self.headers.keys(): 769 self._cancelRequest("DELAYED Body without Content-Length is no valid unless server is know to be HTTP/1.1 (which is not the case)") 770 return 771 else: 772 try: 773 self.bodyBytesToSend = int(self.headers["Content-Length"]) 774 except ValueError: 775 self._cancelRequest("Content-Length '%s' is not an integer." % str(self.headers["Content-Length"])) 776 return 777 self._beginConnection() 778 return 779 780 if type(self.body) == type(()): #cm + cid 781 if self._getRemoteVersion() == "HTTP/1.1": 782 try: 783 del self.headers["Content-Length"] 784 except KeyError: 785 pass 786 self.headers["Transfer-Encoding"] = "chunked" 787 self._beginConnection() 788 else: 789 if "Content-Length" not in self.headers.keys(): 790 ## We have to buffer all the body before use 791 self._buffersBody() 792 else: 793 self._beginConnection()
794
795 - def _buffersBody(self, event=None):
796 """ 797 We have to buffer the entire body in order to fix the Content-Length 798 header. We are in the HTTP/1.0 protocol, so we can't use chunked 799 encoding. 800 """ 801 self.buffer = "" 802 cm, cid = self.body 803 cm.registerHighLevelListener(self._onBufferRequestBodyDataRead) 804 cm.unhold(cid)
805
806 - def _onBufferRequestBodyDataRead(self, event):
807 """ 808 Read events when buffering the body request. 809 """ 810 cm, cid = self.body 811 if cid != event[1]: 812 return 813 if event[0] == "packet": 814 self.buffer += event[2] 815 elif event[0] in ("connection closed", "file descriptor unmanaged"): # EOF 816 self.headers["Content-Length"] = str(len(self.buffer)) 817 self.body = self.buffer 818 del self.buffer 819 cm.unregisterHighLevelListener(self._onBufferRequestBodyDataRead) 820 self._beginConnection()
821 822
823 - def _beginConnection(self):
824 """ 825 Ask a connection 826 """ 827 self.state = "waitingforconnection" 828 self._preparePreamble() 829 self.conPool.requestConnection(self.host, self.port, self.ssl, self._onConnectionReady, self._onIOError)
830
831 - def _cancelRequest(self, reason):
832 """ 833 In case of IO errors. 834 """ 835 self.state = "error" 836 resp = Response(request=self) 837 resp.setIoError(error) 838 self.callback(resp)
839
840 - def _preparePreamble(self):
841 """ 842 Construct the HTTP Request based on the informations we have 843 """ 844 845 self.preamble = self.method + " " + self.path + " " + self.version + "\n" 846 847 if self._shouldWeWaitForPreresponse(): 848 self.headers["Expect"] = "100-continue" 849 850 for headerName, headerKey in self.headers.items(): 851 if type(headerKey) == type(""): 852 self.preamble += headerName + ": " + headerKey + "\n" 853 elif type(headerKey) == type([]): 854 self.preamble += headerName + ": " + ", ".join(headerKey) + "\n" 855 self.preamble += "\n" 856 857 self.preamble = self.preamble.replace("\n", "\r\n")
858 859
860 - def _onConnectionReady(self, con):
861 """ 862 The connection is now ready, we can send something on it. 863 """ 864 self.con = con 865 self.con.cm.registerHighLevelListener(self._onEvent) 866 self.con.cm.send(self.con.cid, self.preamble) 867 if self._shouldWeWaitForPreresponse(): 868 self.state = "waitingforpreresponse" 869 self.response = Response(request=self) 870 elif self.body is None: 871 self.state = "waitingforresponse" 872 self.response = Response(request=self) 873 else: 874 self.state = "sending" 875 self._sendBody()
876
877 - def _sendBody(self):
878 """ 879 Send the body... 880 """ 881 if type(self.body) == type(""): 882 self.con.cm.send(self.con.cid, self.body) 883 self.state = "waitingforresponse" 884 self.response = Response(request=self, previousResponse=getattr(self, "response", None)) 885 elif type(self.body) == type(()): 886 cm, cid = self.body 887 if "Content-Length" in self.headers.keys(): 888 cm.hold(cid, after=self.headers["Content-Length"]) 889 890 cm.registerLowLevelListener(self._onLiveBodyDataRead) 891 cm.unhold(cid) 892 893 elif data == DELAYED: 894 self.callback(None)
895
896 - def _onLiveBodyDataRead(self, event):
897 """ 898 Data read from the stream for the request body, then sent. 899 """ 900 cid, eventType = event[0], event[1] 901 if cid != self.body[1]: 902 return 903 if eventType == "READ": 904 self.addBodyData(event[2]) 905 elif eventType in ["FD REMOVED", "CONNECTION CLOSED", "HOLD"]: 906 self.addBodyData("") 907 cm.unregisterLowLevelListener(self._onLiveBodyDataRead)
908
909 - def addBodyData(self, data, endingHeaders=None):
910 """ 911 Send part of the request body. 912 When request body method is DELAYED, the callback is called and 913 you have to furnish some data with this method. 914 915 Once all data has been sent, you have to (HTTP/1.1) call this with 916 an empty string, and some final headers (not yet implemented). It is 917 safe to call this method for request body termination with HTTP/1.0 918 919 @note: do not call this when not in the "sending" state. 920 """ 921 if self.state != "sending": 922 raise HTTPError('Not in the "sending" state.') 923 924 if self.headers["Transfer-Encoding"] == "chunked": 925 self._sendBodyChunk(data) 926 if len(data) == 0: 927 self.state = "waitingforresponse" 928 self.response = Response(request=self) 929 return 930 931 if self.bodyBytesToSend > 0: 932 data = data[:self.bodyBytesToSend] 933 self.bodyBytesToSend -= len(data) 934 self._sendBodyChunk(data) 935 936 if self.bodyBytesToSend == 0: 937 # body completly sent 938 del self.bodyBytesToSend 939 self.state = "waitingforresponse" 940 self.response = Response(request=self)
941 942
943 - def _sendBodyChunk(self, data):
944 """ 945 Send a chunk of the request body. A chunk can just be a part of 946 the stream, or chunked data, as defined in RFC. 947 """ 948 if self.headers["Transfer-Encoding"] == "chunked": 949 data = "%x\r\n%s\r\n" % (len(data), data) 950 self.con.cm.send(self.con.cid, data)
951 952
953 - def _onData(self, data):
954 """ 955 Some data is arriving on our socket... 956 """ 957 if self.state == "waitingforpreresponse": 958 self._onPreResponseHeaders(data) 959 elif self.state == "waitingforresponse": 960 self._onResponseHeaders(data) 961 elif self.state == "receiving": 962 self._onResponseBody(data) 963 elif self.state == "sending": 964 # Receiving response while still sending? We have to stop sending body. 965 self._stopSendingBody() 966 self.state == "waitingforresponse" 967 self.response = Response(request=self) 968 self._onResponseHeaders(data) 969 else: 970 self._onIOError("Receiving Data while being in state %s" % self.state, data)
971
972 - def _stopSendingBody(self):
973 """ 974 In case of emergency... Something went wrong 975 """ 976 if type(self.body) == type(()): 977 cm.unregisterLowLevelListener(self._onLiveBodyDataRead)
978 979
980 - def _onConnectionClosed(self):
981 """ 982 Server has close the connection while we used it. It is ok in 983 receiving state if the server didn't give us a content length or 984 a chunked encoding. 985 In all other case, either its a bug somewhere, or an IO error 986 occured. 987 """ 988 if self.state == "receiving": 989 self.state = "finish" 990 self._cleanup() 991 self._closeResponseBody() 992 else: 993 self._onIOError("Server closed the connection prematurly.")
994
995 - def _onPreResponseHeaders(self, data):
996 """ 997 Response is coming. Headers first. 998 """ 999 allHeadersReceived = self.response.addToBuffer(data) 1000 if allHeadersReceived and self.response.statusCode == "100": 1001 self.state = "sending" 1002 self._sendBody() 1003 elif allHeadersReceived and self.response.statusCode != "100": 1004 finish
1005
1006 - def _getRemoteVersion(self):
1007 return self.conPool.getHostVersion(self.host, self.port)
1008
1009 - def _onResponseHeaders(self, data):
1010 """ 1011 Response is coming. Headers first. 1012 """ 1013 allHeadersReceived = self.response.addToBuffer(data) 1014 if allHeadersReceived: 1015 self.conPool.setHostVersion(self.host, self.port, self.response.httpVersion) 1016 1017 if allHeadersReceived and self.response.hasBody() and self.method != "HEAD": 1018 self.state = "receiving" 1019 self._onResponseBody() 1020 elif allHeadersReceived and (not self.response.hasBody() or self.method == "HEAD"): 1021 self.state = "finish" 1022 if not self.response.hasConnectionClose() and self._getRemoteVersion() == "HTTP/1.1": 1023 self.conPool.releaseValidConnection(self.con, self.freeConTimeout) 1024 self._cleanup() 1025 self._closeResponseBody()
1026
1027 - def _onResponseBody(self, data=None):
1028 """ 1029 Manage data that should be the body of the response... 1030 """ 1031 if data is not None: 1032 self.response.addToBuffer(data) 1033 1034 if self.responseBodyMethod == STORE: 1035 pass 1036 elif self.responseBodyMethod == CALLBACK: 1037 self.callback(self.response) 1038 self.response.clearBody() 1039 elif type(self.responseBodyMethod) == type(()): 1040 cm, cid = self.responseBodyMethod 1041 cm.send(cid, self.response.getBody()) 1042 self.response.clearBody() 1043 elif self.responseBodyMethod == None: 1044 self.response.clearBody() 1045 #import pdb;pdb.set_trace() 1046 if (self.response.bodyLength is not None and \ 1047 self.response.bodyRetrieved >= self.response.bodyLength) or \ 1048 (self.response.finished): 1049 # the entire response has been read (or even more, which is a server bug) 1050 self.state = "finish" 1051 if not self.response.hasConnectionClose() and self._getRemoteVersion() == "HTTP/1.1": 1052 self.conPool.releaseValidConnection(self.con, self.freeConTimeout) 1053 self._cleanup() 1054 self._closeResponseBody()
1055
1056 - def _onIOError(self, error, data=None):
1057 """ 1058 Called when an IO error occusrs. 1059 """ 1060 self.state = "error" 1061 self._cleanup() 1062 if type(self.body) == type(()) and self.state == "sending": 1063 cm, cid = self.body 1064 cm.unregisterLowLevelListener(self._onLiveBodyDataRead) 1065 1066 if type(self.responseBodyMethod) == type(()): 1067 cm, cid = self.responseBodyMethod 1068 cm.close(cid) 1069 resp = Response(request=self) 1070 resp.setIoError(error) 1071 resp.buffer = data 1072 self.callback(resp)
1073 1074 1075
1076 - def _cleanup(self):
1077 """ 1078 The HTTP transaction is now completed, but the TCP connection is 1079 still alive. We do some cleanup before returning the one. 1080 """ 1081 if self.con is not None: 1082 self.con.cm.unregisterHighLevelListener(self._onEvent) 1083 del self.con
1084
1085 - def _closeResponseBody(self):
1086 """ 1087 Call the callback when the response is finished. And close its storage 1088 if its a stream 1089 """ 1090 if type(self.responseBodyMethod) == type(()): 1091 cm, cid = self.responseBodyMethod 1092 cm.close(cid) 1093 self.callback(self.response)
1094
1095 -class Response(object):
1096 """ 1097 A Response is the data the server returns when we request it. It 1098 contains headers and a body. 1099 """ 1100 1101 multiValuedHeaders = [] 1102
1103 - def __init__(self, request=None, previousResponse=None):
1104 """ 1105 Create a response. 1106 @param previousResponse: the previous response, if the transaction 1107 includes more than one response. 1108 """ 1109 self.previousResponse = previousResponse 1110 ## XXX huhu cycle ! 1111 self.request = request 1112 self.nextRequest = None 1113 self.ioError = None 1114 self.statusCode = None 1115 self.statusString = None 1116 self.httpVersion = None 1117 self.buffer = "" 1118 self.body = "" 1119 self.bodyLength = None 1120 self.bodyRetrieved = 0 1121 self.headersRead = False 1122 self.headers = Headers() 1123 self.chunked = False 1124 self.finished = False 1125 1126 self.eolMark = None
1127
1128 - def __str__(self):
1129 if self.ioError is not None: 1130 return "<Response: ioError(%s)>" % self.ioError 1131 elif self.statusCode is None: 1132 return "<Response: pending>" 1133 else: 1134 if self.previousResponse: 1135 return "<Response: %s %s (%s) %s>" % (self.statusCode, self.statusString, self.httpVersion, str(self.previousResponse)) 1136 else: 1137 return "<Response: %s %s (%s)>" % (self.statusCode, self.statusString, self.httpVersion)
1138 1139
1140 - def setIoError(self, error):
1141 """ 1142 In case of io error. 1143 """ 1144 self.ioError = error
1145
1146 - def clearBody(self):
1147 """ 1148 Clear the body of the response. 1149 """ 1150 self.body = ""
1151
1152 - def addToBuffer(self, data):
1153 """ 1154 Add response data to the response buffer. Once all headers are retreived, 1155 this buffer only contains body data (if any) 1156 1157 @return: True when all responses headers has arrived 1158 """ 1159 self.buffer += data 1160 if not self.headersRead: 1161 if self._manageBufferForHeaders(): 1162 self._manageBufferForBody() 1163 return True 1164 else: 1165 self._manageBufferForBody() 1166 1167 return False
1168 - def _manageBufferForHeaders(self):
1169 """ 1170 Check if headers are completly arrived 1171 @return: True if all headers have been read. In this case, you should 1172 no more call this method. 1173 """ 1174 # check for malformed end of line 1175 1176 if self.headersRead: 1177 return True 1178 1179 if self.eolMark is None: 1180 if "\r\n" in self.buffer: 1181 self.eolMark = "\r\n" 1182 elif "\r\n" not in self.buffer and "\n" in self.buffer: 1183 self.eolMark = "\n" 1184 else: 1185 return False 1186 1187 if not self.statusCode and self.eolMark in self.buffer: 1188 self._parseStatusLine(self.buffer.split(self.eolMark)[0]) 1189 1190 if self.eolMark * 2 in self.buffer: 1191 bound = self.buffer.find(self.eolMark * 2) + len(self.eolMark * 2) 1192 headers = self.buffer[:bound] 1193 self.buffer = self.buffer[bound:] 1194 self._parseHeaders(headers.split(self.eolMark)[1:-2]) 1195 self._checkForBody() 1196 self.headersRead = True 1197 return True 1198 1199 return False
1200
1201 - def _manageBufferForBody(self):
1202 """ 1203 Parse the body for chunked blocks, if any. 1204 """ 1205 if not self.chunked: 1206 self.body += self.buffer 1207 self.bodyRetrieved += len(self.buffer) 1208 self.buffer = "" 1209 else: 1210 try: 1211 chunk_header, data = self.buffer.split(self.eolMark, 1) 1212 chunk_length_as_hex = chunk_header.split(";", 1)[0] 1213 chunk_length = int(chunk_length_as_hex, 16) 1214 while len(data) >= chunk_length + 2 and chunk_length !=0: 1215 begin = len(chunk_header) + 2 1216 end = begin + chunk_length 1217 data = self.buffer[begin:end] 1218 self.buffer = self.buffer[end + 2:] 1219 self.body += data 1220 self.bodyRetrieved += len(data) 1221 chunk_header, data = self.buffer.split(self.eolMark, 1) 1222 chunk_length_as_hex = chunk_header.split(";", 1)[0] 1223 chunk_length = int(chunk_length_as_hex, 16) 1224 if chunk_length == 0: 1225 self.finished = True 1226 except ValueError, e: 1227 # unpacking has failed because buffer is not filled enough. 1228 # Nothing to do. 1229 if self.eolMark in self.buffer: 1230 raise
1231
1232 - def _checkForBody(self):
1233 """ 1234 Set appropriate state of the response to manage a body. 1235 """ 1236 if self.statusCode[0] == "1" or self.statusCode in ["204", "304"]: 1237 self.bodyLength = 0 1238 return 1239 if "transfer-encoding" in self.headers.keys() \ 1240 and "chunked" in self.headers["transfer-encoding"]: 1241 self.chunked = True 1242 1243 if "content-length" in self.headers.keys(): 1244 self.bodyLength = int(self.headers["content-length"]) 1245 return 1246 1247 return
1248
1249 - def hasBody(self):
1250 """ 1251 Return true if response has a body, or would have, if your request 1252 is a HEAD request. 1253 1254 @note: Remember that even if body length is not known, a body can be sent. 1255 """ 1256 return self.bodyLength > 0 or self.bodyLength is None
1257
1258 - def hasConnectionClose(self):
1259 """ 1260 Return True if server indicated that connection will be closed 1261 """ 1262 conHeader = getattr(self.headers, "Connection", "keepAlive") 1263 return "close" in conHeader
1264
1265 - def getBody(self):
1266 """ 1267 Return the body, or part of it if your are in CALLBACK mode. 1268 """ 1269 return self.body
1270
1271 - def getHeaders(self):
1272 """ 1273 Return the headers 1274 """ 1275 return self.headers
1276
1277 - def _parseHeaders(self, lines):
1278 """ 1279 Parse the headers in the lines 1280 @param lines: lines which contain headers 1281 1282 @bug: In this implementation, headers can't be multi-lines. 1283 """ 1284 ## XXX a header can be defined on multiple lines ! see RFC2616 §4.2 1285 for line in lines: 1286 header, value = line.split(":", 1) 1287 if "," in value and header in self.multiValuedHeaders: 1288 self.headers[header] = [v.strip() for v in value.split(",")] 1289 else: 1290 self.headers[header] = value.strip()
1291 1292
1293 - def _parseStatusLine(self, line):
1294 """ 1295 Set the status code and string and the version of the server. 1296 """ 1297 line = line.strip() 1298 self.httpVersion, self.statusCode, self.statusString = line.split(" ", 2)
1299 1300
1301 -class BasicAuthRequest(Request):
1302 """ 1303 HTTP Request conforming to RFC 2617 for Basic Authentication only. 1304 """ 1305
1306 - def _prepareBody(self):
1307 if self.credentials is not None: 1308 self.setHeader("Authorization", "Basic " + base64.b64encode(self.credentials[0] + ":" + self.credentials[1])) 1309 Request._prepareBody(self)
1310
1311 - def setCredentials(self, login, password):
1312 self.credentials = (login, password)
1313
1314 -def onHttpResponse(resp):
1315 cm.stopAtLastSocketClosed() 1316 print "-> ", resp 1317 import pprint 1318 pprint.pprint(resp.getHeaders()) 1319 print "'" + str(resp.getBody()) + "'"
1320 1321 1322 import communicationmanager 1323 if __name__ == "__main__": 1324 cm = communicationmanager.CommunicationManager() 1325 cm.registerLowLevelListener(communicationmanager.allLevelListener) 1326 #cm.registerHighLevelListener(communicationmanager.allLevelListener) 1327 setCommunicationManager(cm) 1328 1329 req = Request() 1330 req.setUrl("https://localhost:7080/") 1331 #req.setUrl("https://cyberrail.org/svn/") 1332 #req.setMethod("POST") 1333 1334 #req.setHint("version", "poll") 1335 #req.setHint("preresponse", True) 1336 #req.setBody("coucou") 1337 1338 fh2 = open("/tmp/httpResult.html", "w") 1339 fd = fh2.fileno() 1340 cid = cm.addFDescriptor(fd) 1341 req.setResponseBodyMethod((cm, cid)) 1342 1343 req.send(onHttpResponse) 1344 1345 cm.main() 1346 print "fin" 1347