1 """
2 An interface to querying TAP servers (i.e., a TAP client).
3 """
4
5
6
7
8
9
10
11 import httplib
12 import socket
13 import time
14 import traceback
15 import urllib
16 import urlparse
17 from cStringIO import StringIO
18 from email.Message import Message
19 from email.MIMEMultipart import MIMEMultipart
20 from xml import sax
21
22 from gavo import utils
23 from gavo.votable import votparse
24 from gavo.votable.model import VOTable as V
25
26
27
28 PENDING = "PENDING"
29 QUEUED = "QUEUED"
30 EXECUTING = "EXECUTING"
31 COMPLETED = "COMPLETED"
32 ERROR = "ERROR"
33 ABORTED = "ABORTED"
34 UNKNOWN = "UNKNOWN"
35
36
37 debug = False
38
39
40 -class Error(utils.Error):
41 """The base class for all TAP-related exceptions.
42 """
43
46 """is raised when the remote server violated the local assumptions.
47 """
48
51 """is raised when request detects the server returned an invalid
52 status.
53
54 These are constructed with the status returnd (available as
55 foundStatus) data payload of the response (available as payload).
56 """
57 - def __init__(self, msg, foundStatus, payload, hint=None):
58 ProtocolError.__init__(self, msg, hint)
59 self.args = [msg, foundStatus, payload, hint]
60 self.payload, self.foundStatus = payload, foundStatus
61
64 """is raised when the remote size signals an error.
65
66 The content of the remote error document can be retrieved in the
67 remoteMessage attribute.
68 """
70 self.remoteMessage = remoteMessage
71 Error.__init__(self,
72 "Remote: "+remoteMessage,
73 hint="This means that"
74 " something in your query was bad according to the server."
75 " Details may be available in the Exceptions' remoteMessage"
76 " attribute")
77 self.args = [remoteMessage]
78
80 return self.remoteMessage
81
84 """is raised by certain check functions when the remote side has aborted
85 the job.
86 """
90
92 return "The remote side has aborted the job"
93
96 """is raised when a generic network error happens (can't connect,...)
97 """
98
149
152 """returns the message from a TAP error VOTable.
153
154 if votString is not a TAP error VOTable, it is returned verbatim.
155
156 TODO: For large responses, this may take a while. It's probably
157 not worth it in such cases. Or at all. Maybe we should hunt
158 for the INFO somewhere else?
159 """
160 try:
161 for el in votparse.parseString(votString, watchset=[V.INFO]):
162 if isinstance(el, V.INFO):
163 if el.name=="QUERY_STATUS" and el.value=="ERROR":
164 return el.text_
165 else:
166
167 for _ in el: pass
168 except Exception:
169
170 pass
171 return votString
172
175 """returns a "parser" class for _parseWith just calling a function on a string.
176
177 _parseWith is designed for utils.StartEndParsers, but it's convenient
178 to use it when there's no XML in the responses as well.
179
180 So, this class wraps a simple function into a StartEndParser-compatible
181 form.
182 """
183 class FlatParser(object):
184 def parseString(self, data):
185 self.result = parseFunc(data)
186 def getResult(self):
187 return self.result
188 return FlatParser
189
192 """uses the utils.StartEndParser-compatible parser to parse the string data.
193 """
194 try:
195 parser.parseString(data)
196 return parser.getResult()
197 except (ValueError, IndexError, sax.SAXParseException):
198 if debug:
199 traceback.print_exc()
200 f = open("server_response", "w")
201 f.write(data)
202 f.close()
203 raise ProtocolError("Malformed response document.", hint=
204 "If debug was enabled, you will find the server response in"
205 " the file server_response.")
206
209 """A parser accepting both plain text and XML replies.
210
211 Of course, the XML replies are against the standard, but -- ah, well.
212 """
215
221
224
227 quote = None
229 val = None
230 if literal and literal!="NULL":
231 val = utils.parseISODT(literal)
232 return val
233
236
243
246
249 """A dictionary that only has lower-case keys but treats keys in any
250 capitalization as equivalent.
251 """
254
257
260
263
274
287
288
289 -class _InfoParser(_ParametersParser, _ResultsParser):
291 self.info = {}
292 _ParametersParser._initialize(self)
293 _ResultsParser._initialize(self)
294
297
298 _end_phase = _end_jobId
299
302
305
306 - def _end_job(self,name, attrs, content):
309
312
327
330 return dict((k.split(":")[-1], v) for k,v in attrs.items())
331
334
335
336
340
342 self.curCap = {"interfaces": []}
343 self.curCap["standardID"] = attrs.get("standardID")
344
348
350 attrs = _pruneAttrNS(attrs)
351 self.curInterface = {"type": attrs["type"], "role": attrs.get("role")}
352
354 self.curCap["interfaces"].append(self.curInterface)
355 self.curInterface = None
356
358 self.curInterface["accessURL"] = content.strip()
359 self.curInterface["use"] = attrs.get("use")
360
363
366
371
374
376 self.curCol = V.FIELD()
377
379 self.tables[-1][self.curCol]
380 self.curCol = None
381
391
401
402 _end_name = _endColOrTableAttr
403
406
407 _end_unit = _end_ucd = _endColAttr
408
410 self.curCol(datatype=content.strip())
411 if "arraysize" in attrs:
412 self.curCol(arraysize=attrs["arraysize"])
413
416
419 """a container type for a result returned by an UWS service.
420
421 It exposes id, href, and type attributes.
422 """
423 - def __init__(self, href, id=None, type=None):
424 self.href, self.id, self.type = href, id, type
425
430
440
441
442 -def request(scheme, host, path, data="", customHeaders={}, method="GET",
443 expectedStatus=None, followRedirects=False, setResponse=None,
444 timeout=None):
445 """returns a HTTPResponse object for an HTTP request to path on host.
446
447 This function builds a new connection for every request.
448
449 On the returned object, you cannot use the read() method. Instead
450 any data returned by the server is available in the data attribute.
451
452 data usually is a byte string, but you can also pass a dictionary
453 which then will be serialized using _FormData above.
454
455 You can set followRedirects to True. This means that the
456 303 "See other" codes that many UWS action generate will be followed
457 and the document at the other end will be obtained. For many
458 operations this will lead to an error; only do this for slightly
459 broken services.
460
461 In setResponse, you can pass in a callable that is called with the
462 server response body as soon as it is in. This is for when you want
463 to store the response even if request raises an error later on
464 (i.e., for sync querying).
465 """
466 if scheme=="http":
467 connClass = httplib.HTTPConnection
468 elif scheme=="https":
469 connClass = httplib.HTTPSConnection
470 else:
471 assert False
472
473 headers = {"connection": "close",
474 "user-agent": "Python TAP library http://soft.g-vo.org/subpkgs"}
475
476 if not isinstance(data, basestring):
477 if _canUseFormEncoding(data):
478 data = urllib.urlencode(data)
479 headers["Content-Type"] = "application/x-www-form-urlencoded"
480
481 else:
482 form = _FormData.fromDict(data)
483 data = form.forHTTPUpload()
484 headers["Content-Type"] = form.get_content_type()+'; boundary="%s"'%(
485 form.get_boundary())
486 headers["Content-Length"] = len(data)
487 headers.update(customHeaders)
488
489 try:
490 try:
491 conn = connClass(host, timeout=timeout)
492 except TypeError:
493 conn = httplib.HTTPConnection(host)
494 conn.request(method, path, data, headers)
495 except (socket.error, httplib.error) as ex:
496 raise NetworkError("Problem connecting to %s (%s)"%
497 (host, str(ex)))
498
499 resp = conn.getresponse()
500 resp.data = resp.read()
501 if setResponse is not None:
502 setResponse(resp.data)
503 conn.close()
504
505 if ((followRedirects and resp.status==303)
506 or resp.status==301
507 or resp.status==302):
508 parts = urlparse.urlparse(resp.getheader("location"))
509 return request(parts.scheme, parts.netloc, parts.path,
510 method="GET", expectedStatus=expectedStatus,
511 followRedirects=followRedirects-1)
512
513 if expectedStatus is not None:
514 if resp.status!=expectedStatus:
515 raise WrongStatus("Expected status %s, got status %s"%(
516 expectedStatus, resp.status), resp.status, resp.data)
517 return resp
518
521
522 def getter(self):
523 destURL = self.jobPath+methodPath
524 response = request(self.destScheme, self.destHost, destURL,
525 expectedStatus=200)
526 return _parseWith(parser(), response.data)
527 return getter
528
531
532 def setter(self, value):
533 destURL = self.jobPath+methodPath
534 request(self.destScheme, self.destHost, destURL,
535 {parameterName: serializer(value)}, method="POST",
536 expectedStatus=303)
537 return setter
538
541 """A helper class for classes constructed with an ADQL endpoint.
542 """
544 self.endpointURL = endpointURL.rstrip("/")
545 parts = urlparse.urlsplit(self.endpointURL)
546 self.destScheme = parts.scheme
547 self.destHost = parts.hostname
548 if parts.port:
549 self.destHost = "%s:%s"%(self.destHost, parts.port)
550 self.destPath = parts.path
551 if self.destPath.endswith("/"):
552 self.destPath = self.destPath[:-1]
553
556 """A facade for an ADQL-based async TAP job.
557
558 Construct it with the URL of the async endpoint and a query.
559
560 Alternatively, you can give the endpoint URL and a jobId as a
561 keyword parameter. This only makes sense if the service has
562 handed out the jobId before (e.g., when a different program takes
563 up handling of a job started before).
564
565 See :dachsdoc:`adql.html` for details.
566 """
567 - def __init__(self, endpointURL, query=None, jobId=None, lang="ADQL",
568 userParams={}, timeout=None):
569 self._defineEndpoint(endpointURL)
570 self.timeout = timeout
571 self.destPath = utils.ensureOneSlash(self.destPath)+"async"
572 if query is not None:
573 self.jobId, self.jobPath = None, None
574 self._createJob(query, lang, userParams)
575 elif jobId is not None:
576 self.jobId = jobId
577 else:
578 raise Error("Must construct ADQLTAPJob with at least query or jobId")
579 self._computeJobPath()
580
582 self.jobPath = "%s/%s"%(self.destPath, self.jobId)
583
585 params = {
586 "REQUEST": "doQuery",
587 "LANG": lang,
588 "QUERY": query}
589 for k,v in userParams.iteritems():
590 params[k] = str(v)
591 response = request(self.destScheme, self.destHost, self.destPath, params,
592 method="POST", expectedStatus=303, timeout=self.timeout)
593
594 try:
595 self.jobId = urlparse.urlsplit(
596 response.getheader("location", "")).path.split("/")[-1]
597 except ValueError:
598 raise utils.logOldExc(
599 ProtocolError("Job creation returned invalid job id"))
600
601 - def delete(self, usePOST=False):
602 """removes the job on the remote side.
603
604 usePOST=True can be used for servers that do not support the DELETE
605 HTTP method (a.k.a. "are broken").
606 """
607 if self.jobPath is not None:
608 if usePOST:
609 request(self.destScheme, self.destHost, self.jobPath, method="POST",
610 data={"ACTION": "DELETE"}, expectedStatus=303,
611 timeout=self.timeout)
612 else:
613 request(self.destScheme, self.destHost, self.jobPath, method="DELETE",
614 expectedStatus=303, timeout=self.timeout)
615
617 """asks the remote side to start the job.
618 """
619 request(self.destScheme, self.destHost, self.jobPath+"/phase",
620 {"PHASE": "RUN"}, method="POST", expectedStatus=303,
621 timeout=self.timeout)
622
624 """asks the remote side to abort the job.
625 """
626 request(self.destScheme, self.destHost, self.jobPath+"/phase",
627 {"PHASE": "ABORT"}, method="POST", expectedStatus=303,
628 timeout=self.timeout)
629
639
640 - def waitForPhases(self, phases, pollInterval=1, increment=1.189207115002721,
641 giveUpAfter=None):
642 """waits for the job's phase to become one of the set phases.
643
644 This method polls. Initially, it does increases poll times
645 exponentially with increment until it queries every two minutes.
646
647 The magic number in increment is 2**(1/4.).
648
649 giveUpAfter, if given, is the number of iterations this method will
650 do. If none of the desired phases have been found until then,
651 raise a ProtocolError.
652 """
653 attempts = 0
654 while True:
655 curPhase = self.phase
656 if curPhase in phases:
657 break
658 time.sleep(pollInterval)
659 pollInterval = min(120, pollInterval*increment)
660 attempts += 1
661 if giveUpAfter:
662 if attempts>giveUpAfter:
663 raise ProtocolError("None of the states in %s were reached"
664 " in time."%repr(phases),
665 hint="After %d attempts, phase was %s"%(attempts, curPhase))
666
667 - def run(self, pollInterval=1):
676
677 executionDuration = property(
678 _makeAtomicValueGetter("/executionduration", _makeFlatParser(float)),
679 _makeAtomicValueSetter("/executionduration", str, "EXECUTIONDURATION"))
680
681 destruction = property(
682 _makeAtomicValueGetter("/destruction", _makeFlatParser(utils.parseISODT)),
683 _makeAtomicValueSetter("/destruction",
684 lambda dt: dt.strftime("%Y-%m-%dT%H:%M:%S.000"), "DESTRUCTION"))
685
687 return self.endpointURL+"/async/%s%s"%(self.jobId, jobPath)
688
690
691 response = request(self.destScheme, self.destHost, self.jobPath+path,
692 expectedStatus=200, timeout=self.timeout)
693 return _parseWith(parser, response.data)
694
695 @property
697 """returns a dictionary of much job-related information.
698 """
699 return self._queryJobResource("", _InfoParser())
700
701 @property
703 """returns the phase the job is in according to the server.
704 """
705 return self._queryJobResource("/phase", _PhaseParser())
706
707 @property
709 """returns the estimate the server gives for the run time of the job.
710 """
711 return self._queryJobResource("/quote", _QuoteParser())
712
713 @property
715 """returns the owner of the job.
716 """
717 return self._queryJobResource("/owner", _makeFlatParser(str)())
718
719 @property
721 """returns a dictionary mapping passed parameters to server-provided
722 string representations.
723
724 To set a parameter, use the setParameter function. Changing the
725 dictionary returned here will have no effect.
726 """
727 return self._queryJobResource("/parameters", _ParametersParser())
728
729 @property
731 """returns a list of UWSResult instances.
732 """
733 return self._queryJobResource("/results", _ResultsParser())
734
736 """returns the URL of the ADQL result table.
737 """
738 if simple:
739 return self.makeJobURL("/results/result")
740 else:
741 return self.allResults[0].href
742
744 """returns a file-like object you can read the default TAP result off.
745
746 To have the embedded VOTable returned, say
747 votable.load(job.openResult()).
748
749 If you pass simple=False, the URL will be taken from the
750 service's result list (the first one given there). Otherwise (the
751 default), results/result is used.
752 """
753 return urllib.urlopen(self.getResultURL())
754
759
761 """returns the error message the server gives, verbatim.
762 """
763 data = request(self.destScheme, self.destHost, self.jobPath+"/error",
764 expectedStatus=200, followRedirects=True,
765 timeout=self.timeout).data
766 return _getErrorInfo(data)
767
769 """adds uploaded tables, either from a file or as a remote URL.
770
771 You should not try to change UPLOAD yourself (e.g., using setParameter).
772
773 Data is either a string (i.e. a URI) or a file-like object (an upload).
774 """
775 uploadFragments = []
776 form = _FormData()
777 if isinstance(data, basestring):
778 assert ',' not in data
779 assert ';' not in data
780 uploadFragments.append("%s,%s"%(name, data))
781
782 else:
783 uploadKey = utils.intToFunnyWord(id(data))
784 form.addFile(uploadKey, uploadKey, data.read())
785 uploadFragments.append("%s,param:%s"%(name, uploadKey))
786
787 form.addParam("UPLOAD", ";".join(uploadFragments))
788 request(self.destScheme, self.destHost, self.jobPath+"/parameters",
789 method="POST",
790 data=form.forHTTPUpload(), expectedStatus=303,
791 customHeaders={"content-type":
792 form.get_content_type()+'; boundary="%s"'%(form.get_boundary())})
793
796 """A facade for a synchronous TAP Job.
797
798 This really is just a very glorified urllib.urlopen. Maybe some
799 superficial parallels to ADQLTAPJob are useful.
800
801 You can construct it, add uploads, and then start or run the thing.
802 Methods that make no sense at all for sync jobs ("phase") silently
803 return some more or less sensible fakes.
804 """
805 - def __init__(self, endpointURL, query=None, jobId=None, lang="ADQL",
806 userParams={}, timeout=None):
807 self._defineEndpoint(endpointURL)
808 self.query, self.lang = query, lang
809 self.userParams = userParams.copy()
810 self.result = None
811 self.uploads = []
812 self._errorFromServer = None
813 self.timeout = timeout
814
815 - def postToService(self, params):
816 return request(self.destScheme, self.destHost, self.destPath+"/sync",
817 params,
818 method="POST", followRedirects=3, expectedStatus=200,
819 setResponse=self._setErrorFromServer, timeout=self.timeout)
820
821 - def delete(self, usePOST=None):
824
826 """does nothing.
827
828 You could argue that this could come from a different thread and we
829 could try to interrupt the ongoing request. Well, if you want it,
830 try it yourself or ask the author.
831 """
832
834 if self._errorFromServer is not None:
835 raise Error(self._errorFromServer)
836
837 - def waitForPhases(self, phases, pollInterval=None, increment=None,
838 giveUpAfter=None):
842
844
845
846
847
848
849
850 self._errorFromServer = _getErrorInfo(data)
851
853 params={
854 "REQUEST": "doQuery",
855 "LANG": self.lang,
856 "QUERY": self.query}
857 params.update(self.userParams)
858 if self.uploads:
859 upFrags = []
860 for name, key, data in self.uploads:
861 upFrags.append("%s,param:%s"%(name, key))
862 params[key] = data
863 params["UPLOAD"] = ";".join(upFrags)
864
865 params = dict((k, str(v)) for k,v in params.iteritems())
866
867 try:
868 resp = self.postToService(params)
869 self.result = LocalResult(resp.data, "TAPResult", resp.getheader(
870 "Content-Type"))
871 except Exception as msg:
872
873
874 if not self._errorFromServer:
875 self._errorFromServer = str(msg)
876 raise
877 else:
878
879 self._errorFromServer = None
880 return self
881
882 - def run(self, pollInterval=None):
884
885 @property
888
889 @property
892
893 @property
896
897 @property
900
901 @property
903 return self.userParameters
904
905 @property
907 if self.result is None:
908 return []
909 else:
910 return [self.result]
911
916
919
921 return self._errorFromServer
922
930
933 """A facade for an ADQL endpoint.
934
935 This is only needed for inspecting server metadata (i.e., in general
936 only for rather fancy applications).
937 """
939 self._defineEndpoint(endpointURL)
940
941 - def createJob(self, query, lang="ADQL-2.0", userParams={}):
943
944 @property
946 """returns True, False, or None (undecidable).
947
948 None is returned when /availability gives a 404 (which is legal)
949 or the returned document doesn't parse.
950 """
951 try:
952 response = request(self.destScheme, self.destHost,
953 self.destPath+"/availability", expectedStatus=200)
954 res = _parseWith(_AvailabilityParser(), response.data)
955 except WrongStatus:
956 res = None
957 return res
958
959 @property
961 """returns a dictionary containing some meta info on the remote service.
962
963 Keys to look for include title, identifier, contact (the mail address),
964 and referenceURL.
965
966 If the remote server doesn't return capabilities as expected, an
967 empty dict is returned.
968 """
969 return _parseWith(_CapabilitiesParser(),
970 request(self.destScheme, self.destHost,
971 self.destPath+"/capabilities").data)
972
973 @property
975 """returns a sequence of table definitions for the tables accessible
976 through this service.
977
978 The table definitions come as gavo.votable.Table instances.
979 """
980 return _parseWith(_TablesParser(),
981 request(self.destScheme, self.destHost, self.destPath+"/tables").data)
982