trying to fix pipelining

also trying to understand it.

according to python:
1) you send a request
2) you MUST get response headers for (1) (THIS IS MANDATORY)
3) you send another request
4) you get response body for (2)
5) response headers for (3)
6) response body for (5)

Only two requests can be pipelined. Surely this is an unavoidable, wait no it's just written into the code to error out if you don't do it that way.

according to reality:
1) you send a request
2) you do not get response headers for (1)
3) you repeat steps 1-2 until enough responses are queued
4) you receive those responses as header,body,header,body...

they even name it with a __ so to make it hard to override, but the state can safely go to Idle after a request has sent, whether or not response headers have come in. Sure the connection might close, but then you adjust to not pipeline, and re-send the rest of your requests over a new connection.
This commit is contained in:
user 2015-09-21 18:58:20 +00:00
parent 8077a32b99
commit 0021eb65c5
1 changed files with 39 additions and 26 deletions

View File

@ -1,5 +1,5 @@
#!/usr/bin/python3 #!/usr/bin/python3
from http.client import HTTPConnection,HTTPException,BadStatusLine from http.client import HTTPConnection,HTTPException,BadStatusLine,_CS_IDLE
import json import json
import base64 import base64
from contextlib import closing from contextlib import closing
@ -35,57 +35,72 @@ def maybeReplace(location,base,encoding=None):
return replace(location,base,encoding=encoding,path=path)(handle) return replace(location,base,encoding=encoding,path=path)(handle)
return deco return deco
class Penguin:
"idk"
def __init__(self, url, recv, diemessage):
self.url = url
self.recv = recv
self.diemessage = diemessage
class Pipeline(list): class Pipeline(list):
"Gawd why am I being so elaborate?" "Gawd why am I being so elaborate?"
def __init__(self, threshold=10): def __init__(self, threshold=10):
"threshold is how many requests in parallel to pipeline" "threshold is how many requests in parallel to pipeline"
self.threshold = threshold self.threshold = threshold
self.sent = True self.sent = True
def __enter__(self,*a): def __enter__(self):
self.reopen() self.reopen()
def __exit__(self): return self
def __exit__(self,typ,exn,trace):
self.send()
self.drain() self.drain()
def reopen(self): def reopen(self):
self.c = HTTPConnection(server) self.c = HTTPConnection(server)
self.send() self.send()
def append(self,url,recv,diemessage): def append(self,url,recv,diemessage):
super().append((url,recv,diemessage)) self.sent = False
super().append(Penguin(url,recv,diemessage))
if len(self) > self.threshold: if len(self) > self.threshold:
self.send() self.send()
self.drain() self.drain()
def trydrain(self): def trydrain(self):
for url,recv,diemessage in self: for penguin in self:
try: try:
recv(self.c) penguin.response.begin()
penguin.recv(penguin.response)
except BadStatusLine as e: except BadStatusLine as e:
print('derped requesting',penguin.url)
return False return False
except HTTPException as e: except HTTPException as e:
die(diemessage+' (url='+url+')') die(penguin.diemessage+' '+repr(e)+' (url='+penguin.url+')')
self.clear() self.clear()
return True return True
def drain(self): def drain(self):
print('draining pipeline...') print('draining pipeline...')
assert self.sent, "Can't drain without sending the requests!" assert self.sent, "Can't drain without sending the requests!"
self.sent = False self.sent = False
while trydrain() is not True: while self.trydrain() is not True:
self.c.close() self.c.close()
print('derped requesting',url)
print('drain failed, trying again') print('drain failed, trying again')
time.sleep(1) time.sleep(1)
self.reopen() self.reopen()
def trysend(self): def trysend(self):
for url,_,diemessage in pipeline: for penguin in pipeline:
try: try:
self.c.request("GET", url) self.c.request("GET", penguin.url)
self.c._HTTPConnection__state = _CS_IDLE
penguin.response = self.c.response_class(self.c.sock,
method="GET")
# begin LATER so we can send multiple requests w/out response headers
except BadStatusLine: except BadStatusLine:
return False return False
except HTTPException as e: except HTTPException as e:
die(diemessage) die(diemessage+' because of a '+repr(e))
return True return True
def send(self): def send(self):
if self.sent: return if self.sent: return
print('filling pipeline...') print('filling pipeline...')
while self.tryresend() is not True: while self.trysend() is not True:
self.c.close() self.c.close()
print('derped resending') print('derped resending')
time.sleep(1) time.sleep(1)
@ -137,23 +152,21 @@ with Pipeline() as pipeline:
f.write(str(s["author"]) + '\n') f.write(str(s["author"]) + '\n')
f.write(str(s["license"])) f.write(str(s["license"]))
url = "/skins/1/" + str(s["id"]) + ".png" url = "/skins/1/" + str(s["id"]) + ".png"
def tryget(c): def tryget(r):
with closing(c.getresponse()) as r: if r.status != 200:
if r.status != 200: print("Error", r.status)
print("Error", r.status) return
return @replace(skinsdir,previewbase,path=preview)
@replace(skinsdir,previewbase,path=preview) def go(f):
def go(f): shutil.copyfileobj(r,f)
shutil.copyfileobj(r,f)
pipeline.append(url,tryget, pipeline.append(url,tryget,
"Couldn't get {} because of a {}".format( "Couldn't get {} because of a".format(
s["id"], s["id"]))
e))
if not foundOne: if not foundOne:
print("No skins updated on this page. Seems we're done?") print("No skins updated on this page. Seems we're done?")
#raise SystemExit #raise SystemExit
addpage(1) addpage(curpage)
while pages > curpage: while pages > curpage:
curpage = curpage + 1 curpage = curpage + 1
addpage(curpage) addpage(curpage)