1  """ 
  2  An interface to querying TAP servers (i.e., a TAP client). 
  3  """ 
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11  import httplib 
 12  import socket 
 13  import time 
 14  import traceback 
 15  import urllib 
 16  import urlparse 
 17  from cStringIO import StringIO 
 18  from email.Message import Message 
 19  from email.MIMEMultipart import MIMEMultipart 
 20  from xml import sax 
 21   
 22  from gavo import utils 
 23  from gavo.votable import votparse 
 24  from gavo.votable.model import VOTable as V 
 25   
 26   
 27   
 28  PENDING = "PENDING" 
 29  QUEUED = "QUEUED" 
 30  EXECUTING = "EXECUTING" 
 31  COMPLETED = "COMPLETED" 
 32  ERROR = "ERROR" 
 33  ABORTED = "ABORTED" 
 34  UNKNOWN = "UNKNOWN" 
 35   
 36   
 37  debug = False 
 38   
 39   
 40 -class Error(utils.Error): 
  41          """The base class for all TAP-related exceptions. 
 42          """ 
  43   
 46          """is raised when the remote server violated the local assumptions. 
 47          """ 
  48   
 51          """is raised when request detects the server returned an invalid 
 52          status. 
 53   
 54          These are constructed with the status returnd (available as 
 55          foundStatus) data payload of the response (available as payload). 
 56          """ 
 57 -        def __init__(self, msg, foundStatus, payload, hint=None): 
  58                  ProtocolError.__init__(self, msg, hint) 
 59                  self.args = [msg, foundStatus, payload, hint] 
 60                  self.payload, self.foundStatus = payload, foundStatus 
   61   
 64          """is raised when the remote size signals an error. 
 65   
 66          The content of the remote error document can be retrieved in the  
 67          remoteMessage attribute. 
 68          """ 
 70                  self.remoteMessage = remoteMessage 
 71                  Error.__init__(self,  
 72                          "Remote: "+remoteMessage, 
 73                          hint="This means that" 
 74                          " something in your query was bad according to the server." 
 75                          "  Details may be available in the Exceptions' remoteMessage" 
 76                          " attribute") 
 77                  self.args = [remoteMessage] 
  78   
 80                  return self.remoteMessage 
   81   
 84          """is raised by certain check functions when the remote side has aborted 
 85          the job. 
 86          """ 
 90           
 92                  return "The remote side has aborted the job" 
   93   
 96          """is raised when a generic network error happens (can't connect,...) 
 97          """ 
  98   
149                   
152          """returns the message from a TAP error VOTable. 
153   
154          if votString is not a TAP error VOTable, it is returned verbatim. 
155   
156          TODO: For large responses, this may take a while.  It's probably 
157          not worth it in such cases.  Or at all.  Maybe we should  hunt 
158          for the INFO somewhere else? 
159          """ 
160          try: 
161                  for el in votparse.parseString(votString, watchset=[V.INFO]): 
162                          if isinstance(el, V.INFO): 
163                                  if el.name=="QUERY_STATUS" and el.value=="ERROR": 
164                                          return el.text_ 
165                          else: 
166                                   
167                                  for _ in el: pass 
168          except Exception: 
169                   
170                  pass 
171          return votString 
 172   
175          """returns a "parser" class for _parseWith just calling a function on a string. 
176   
177          _parseWith is designed for utils.StartEndParsers, but it's convenient 
178          to use it when there's no XML in the responses as well. 
179   
180          So, this class wraps a simple function into a StartEndParser-compatible 
181          form. 
182          """ 
183          class FlatParser(object): 
184                  def parseString(self, data): 
185                          self.result = parseFunc(data) 
 186                  def getResult(self): 
187                          return self.result 
188          return FlatParser 
189   
192          """uses the utils.StartEndParser-compatible parser to parse the string data. 
193          """ 
194          try: 
195                  parser.parseString(data) 
196                  return parser.getResult() 
197          except (ValueError, IndexError, sax.SAXParseException): 
198                  if debug: 
199                          traceback.print_exc() 
200                          f = open("server_response", "w") 
201                          f.write(data) 
202                          f.close() 
203                  raise ProtocolError("Malformed response document.", hint= 
204                          "If debug was enabled, you will find the server response in" 
205                          " the file server_response.") 
 206   
209          """A parser accepting both plain text and XML replies. 
210   
211          Of course, the XML replies are against the standard, but -- ah, well. 
212          """ 
215           
221           
 224   
227          quote = None 
229                  val = None 
230                  if literal and literal!="NULL": 
231                          val = utils.parseISODT(literal) 
232                  return val 
 233   
236   
243   
 246   
249          """A dictionary that only has lower-case keys but treats keys in any 
250          capitalization as equivalent. 
251          """ 
254           
257           
260           
 263           
274   
287   
288   
289 -class _InfoParser(_ParametersParser, _ResultsParser): 
 291                  self.info = {} 
292                  _ParametersParser._initialize(self) 
293                  _ResultsParser._initialize(self) 
 294           
297           
298          _end_phase = _end_jobId 
299           
302           
305           
306 -        def _end_job(self,name, attrs, content): 
 309           
 312           
327   
330          return dict((k.split(":")[-1], v) for k,v in attrs.items()) 
 331   
334   
335   
336   
340   
342                  self.curCap = {"interfaces": []} 
343                  self.curCap["standardID"] = attrs.get("standardID") 
 344   
348   
350                  attrs = _pruneAttrNS(attrs) 
351                  self.curInterface = {"type": attrs["type"], "role": attrs.get("role")} 
 352   
354                  self.curCap["interfaces"].append(self.curInterface) 
355                  self.curInterface = None 
 356   
358                  self.curInterface["accessURL"] = content.strip() 
359                  self.curInterface["use"] = attrs.get("use") 
 360           
 363   
366   
371           
374           
376                  self.curCol = V.FIELD() 
 377   
379                  self.tables[-1][self.curCol] 
380                  self.curCol = None 
 381   
391   
401           
402          _end_name = _endColOrTableAttr 
403           
406   
407          _end_unit = _end_ucd = _endColAttr 
408           
410                  self.curCol(datatype=content.strip()) 
411                  if "arraysize" in attrs: 
412                          self.curCol(arraysize=attrs["arraysize"]) 
 413   
 416   
419          """a container type for a result returned by an UWS service. 
420   
421          It exposes id, href, and type attributes. 
422          """ 
423 -        def __init__(self, href, id=None, type=None): 
 424                  self.href, self.id, self.type = href, id, type 
  425   
430   
440   
441   
442 -def request(scheme, host, path, data="", customHeaders={}, method="GET", 
443                  expectedStatus=None, followRedirects=False, setResponse=None, 
444                  timeout=None): 
 445          """returns a HTTPResponse object for an HTTP request to path on host. 
446   
447          This function builds a new connection for every request. 
448   
449          On the returned object, you cannot use the read() method.       Instead 
450          any data returned by the server is available in the data attribute. 
451   
452          data usually is a byte string, but you can also pass a dictionary 
453          which then will be serialized using _FormData above. 
454   
455          You can set followRedirects to True.  This means that the  
456          303 "See other" codes that many UWS action generate will be followed  
457          and the document at the other end will be obtained.  For many 
458          operations this will lead to an error; only do this for slightly 
459          broken services. 
460   
461          In setResponse, you can pass in a callable that is called with the 
462          server response body as soon as it is in.  This is for when you want 
463          to store the response even if request raises an error later on 
464          (i.e., for sync querying). 
465          """ 
466          if scheme=="http": 
467                  connClass = httplib.HTTPConnection 
468          elif scheme=="https": 
469                  connClass = httplib.HTTPSConnection 
470          else: 
471                  assert False 
472   
473          headers = {"connection": "close", 
474                  "user-agent": "Python TAP library http://soft.g-vo.org/subpkgs"} 
475   
476          if not isinstance(data, basestring): 
477                  if _canUseFormEncoding(data): 
478                          data = urllib.urlencode(data) 
479                          headers["Content-Type"] = "application/x-www-form-urlencoded" 
480   
481                  else: 
482                          form = _FormData.fromDict(data) 
483                          data = form.forHTTPUpload() 
484                          headers["Content-Type"] = form.get_content_type()+'; boundary="%s"'%( 
485                                          form.get_boundary()) 
486          headers["Content-Length"] = len(data) 
487          headers.update(customHeaders) 
488   
489          try: 
490                  try: 
491                          conn = connClass(host, timeout=timeout) 
492                  except TypeError:  
493                          conn = httplib.HTTPConnection(host) 
494                  conn.request(method, path, data, headers) 
495          except (socket.error, httplib.error) as ex: 
496                  raise NetworkError("Problem connecting to %s (%s)"% 
497                          (host, str(ex))) 
498   
499          resp = conn.getresponse() 
500          resp.data = resp.read() 
501          if setResponse is not None: 
502                  setResponse(resp.data) 
503          conn.close() 
504   
505          if ((followRedirects and resp.status==303) 
506                          or resp.status==301 
507                          or resp.status==302): 
508                  parts = urlparse.urlparse(resp.getheader("location")) 
509                  return request(parts.scheme, parts.netloc, parts.path, 
510                          method="GET", expectedStatus=expectedStatus,  
511                          followRedirects=followRedirects-1) 
512   
513          if expectedStatus is not None: 
514                  if resp.status!=expectedStatus: 
515                          raise WrongStatus("Expected status %s, got status %s"%( 
516                                  expectedStatus, resp.status), resp.status, resp.data) 
517          return resp 
 518   
521   
522          def getter(self): 
523                  destURL = self.jobPath+methodPath 
524                  response = request(self.destScheme, self.destHost, destURL,  
525                          expectedStatus=200) 
526                  return _parseWith(parser(), response.data) 
 527          return getter 
528   
531   
532          def setter(self, value): 
533                  destURL = self.jobPath+methodPath 
534                  request(self.destScheme, self.destHost, destURL,  
535                          {parameterName: serializer(value)}, method="POST", 
536                          expectedStatus=303) 
 537          return setter 
538   
541          """A helper class for classes constructed with an ADQL endpoint. 
542          """ 
544                  self.endpointURL = endpointURL.rstrip("/") 
545                  parts = urlparse.urlsplit(self.endpointURL) 
546                  self.destScheme = parts.scheme 
547                  self.destHost = parts.hostname 
548                  if parts.port: 
549                          self.destHost = "%s:%s"%(self.destHost, parts.port) 
550                  self.destPath = parts.path 
551                  if self.destPath.endswith("/"): 
552                          self.destPath = self.destPath[:-1] 
  553   
556          """A facade for an ADQL-based async TAP job. 
557   
558          Construct it with the URL of the async endpoint and a query. 
559   
560          Alternatively, you can give the endpoint URL and a jobId as a 
561          keyword parameter.  This only makes sense if the service has 
562          handed out the jobId before (e.g., when a different program takes 
563          up handling of a job started before). 
564   
565          See :dachsdoc:`adql.html` for details. 
566          """ 
567 -        def __init__(self, endpointURL, query=None, jobId=None, lang="ADQL",  
568                          userParams={}, timeout=None): 
 569                  self._defineEndpoint(endpointURL) 
570                  self.timeout = timeout 
571                  self.destPath = utils.ensureOneSlash(self.destPath)+"async" 
572                  if query is not None: 
573                          self.jobId, self.jobPath = None, None 
574                          self._createJob(query, lang, userParams) 
575                  elif jobId is not None: 
576                          self.jobId = jobId 
577                  else: 
578                          raise Error("Must construct ADQLTAPJob with at least query or jobId") 
579                  self._computeJobPath() 
 580           
582                  self.jobPath = "%s/%s"%(self.destPath, self.jobId) 
 583   
585                  params = { 
586                          "REQUEST": "doQuery", 
587                          "LANG": lang, 
588                          "QUERY": query} 
589                  for k,v in userParams.iteritems(): 
590                          params[k] = str(v) 
591                  response = request(self.destScheme, self.destHost, self.destPath, params, 
592                          method="POST", expectedStatus=303, timeout=self.timeout) 
593                   
594                  try: 
595                          self.jobId = urlparse.urlsplit( 
596                                  response.getheader("location", "")).path.split("/")[-1] 
597                  except ValueError: 
598                          raise utils.logOldExc( 
599                                  ProtocolError("Job creation returned invalid job id")) 
 600   
601 -        def delete(self, usePOST=False): 
 602                  """removes the job on the remote side. 
603   
604                  usePOST=True can be used for servers that do not support the DELETE 
605                  HTTP method (a.k.a. "are broken"). 
606                  """ 
607                  if self.jobPath is not None: 
608                          if usePOST: 
609                                  request(self.destScheme, self.destHost, self.jobPath, method="POST", 
610                                          data={"ACTION": "DELETE"}, expectedStatus=303,  
611                                          timeout=self.timeout) 
612                          else: 
613                                  request(self.destScheme, self.destHost, self.jobPath, method="DELETE", 
614                                          expectedStatus=303, timeout=self.timeout) 
 615   
617                  """asks the remote side to start the job. 
618                  """ 
619                  request(self.destScheme, self.destHost, self.jobPath+"/phase",  
620                          {"PHASE": "RUN"}, method="POST", expectedStatus=303,  
621                          timeout=self.timeout) 
 622   
624                  """asks the remote side to abort the job. 
625                  """ 
626                  request(self.destScheme, self.destHost, self.jobPath+"/phase",  
627                          {"PHASE": "ABORT"}, method="POST", expectedStatus=303, 
628                          timeout=self.timeout) 
 629   
639   
640 -        def waitForPhases(self, phases, pollInterval=1, increment=1.189207115002721, 
641                          giveUpAfter=None): 
 642                  """waits for the job's phase to become one of the set phases. 
643   
644                  This method polls.  Initially, it does increases poll times 
645                  exponentially with increment until it queries every two minutes. 
646   
647                  The magic number in increment is 2**(1/4.). 
648   
649                  giveUpAfter, if given, is the number of iterations this method will 
650                  do.  If none of the desired phases have been found until then, 
651                  raise a ProtocolError. 
652                  """ 
653                  attempts = 0 
654                  while True: 
655                          curPhase = self.phase  
656                          if curPhase in phases: 
657                                  break 
658                          time.sleep(pollInterval) 
659                          pollInterval = min(120, pollInterval*increment) 
660                          attempts += 1 
661                          if giveUpAfter: 
662                                  if attempts>giveUpAfter: 
663                                          raise ProtocolError("None of the states in %s were reached" 
664                                                  " in time."%repr(phases), 
665                                          hint="After %d attempts, phase was %s"%(attempts, curPhase)) 
 666   
667 -        def run(self, pollInterval=1): 
 676   
677          executionDuration = property( 
678                  _makeAtomicValueGetter("/executionduration", _makeFlatParser(float)), 
679                  _makeAtomicValueSetter("/executionduration", str, "EXECUTIONDURATION")) 
680   
681          destruction = property( 
682                  _makeAtomicValueGetter("/destruction", _makeFlatParser(utils.parseISODT)), 
683                  _makeAtomicValueSetter("/destruction",  
684                          lambda dt: dt.strftime("%Y-%m-%dT%H:%M:%S.000"), "DESTRUCTION")) 
685   
687                  return self.endpointURL+"/async/%s%s"%(self.jobId, jobPath) 
 688   
690                   
691                  response = request(self.destScheme, self.destHost, self.jobPath+path, 
692                          expectedStatus=200, timeout=self.timeout) 
693                  return _parseWith(parser, response.data) 
 694   
695          @property 
697                  """returns a dictionary of much job-related information. 
698                  """ 
699                  return self._queryJobResource("", _InfoParser()) 
 700   
701          @property 
703                  """returns the phase the job is in according to the server. 
704                  """ 
705                  return self._queryJobResource("/phase", _PhaseParser()) 
 706   
707          @property 
709                  """returns the estimate the server gives for the run time of the job. 
710                  """ 
711                  return self._queryJobResource("/quote", _QuoteParser()) 
 712   
713          @property 
715                  """returns the owner of the job. 
716                  """ 
717                  return self._queryJobResource("/owner", _makeFlatParser(str)()) 
 718   
719          @property 
721                  """returns a dictionary mapping passed parameters to server-provided 
722                  string representations. 
723   
724                  To set a parameter, use the setParameter function.  Changing the 
725                  dictionary returned here will have no effect. 
726                  """ 
727                  return self._queryJobResource("/parameters", _ParametersParser()) 
 728   
729          @property 
731                  """returns a list of UWSResult instances. 
732                  """ 
733                  return self._queryJobResource("/results", _ResultsParser()) 
 734   
736                  """returns the URL of the ADQL result table. 
737                  """ 
738                  if simple: 
739                          return self.makeJobURL("/results/result") 
740                  else: 
741                          return self.allResults[0].href 
 742   
744                  """returns a file-like object you can read the default TAP result off. 
745   
746                  To have the embedded VOTable returned, say 
747                  votable.load(job.openResult()). 
748   
749                  If you pass simple=False, the URL will be taken from the 
750                  service's result list (the first one given there).  Otherwise (the 
751                  default), results/result is used. 
752                  """ 
753                  return urllib.urlopen(self.getResultURL()) 
 754   
759   
761                  """returns the error message the server gives, verbatim. 
762                  """ 
763                  data = request(self.destScheme, self.destHost, self.jobPath+"/error", 
764                          expectedStatus=200, followRedirects=True, 
765                          timeout=self.timeout).data 
766                  return _getErrorInfo(data) 
 767   
769                  """adds uploaded tables, either from a file or as a remote URL. 
770   
771                  You should not try to change UPLOAD yourself (e.g., using setParameter). 
772   
773                  Data is either a string (i.e. a URI) or a file-like object (an upload). 
774                  """ 
775                  uploadFragments = [] 
776                  form = _FormData() 
777                  if isinstance(data, basestring):  
778                          assert ',' not in data 
779                          assert ';' not in data 
780                          uploadFragments.append("%s,%s"%(name, data)) 
781   
782                  else:  
783                          uploadKey = utils.intToFunnyWord(id(data)) 
784                          form.addFile(uploadKey, uploadKey, data.read()) 
785                          uploadFragments.append("%s,param:%s"%(name, uploadKey)) 
786   
787                  form.addParam("UPLOAD", ";".join(uploadFragments)) 
788                  request(self.destScheme, self.destHost, self.jobPath+"/parameters",  
789                          method="POST", 
790                          data=form.forHTTPUpload(), expectedStatus=303,  
791                          customHeaders={"content-type":  
792                                  form.get_content_type()+'; boundary="%s"'%(form.get_boundary())}) 
  793   
796          """A facade for a synchronous TAP Job. 
797   
798          This really is just a very glorified urllib.urlopen.  Maybe some 
799          superficial parallels to ADQLTAPJob are useful. 
800   
801          You can construct it, add uploads, and then start or run the thing. 
802          Methods that make no sense at all for sync jobs ("phase") silently 
803          return some more or less sensible fakes. 
804          """ 
805 -        def __init__(self, endpointURL, query=None, jobId=None, lang="ADQL",  
806                          userParams={}, timeout=None): 
 807                  self._defineEndpoint(endpointURL) 
808                  self.query, self.lang = query, lang 
809                  self.userParams = userParams.copy() 
810                  self.result = None 
811                  self.uploads = [] 
812                  self._errorFromServer = None 
813                  self.timeout = timeout 
 814           
815 -        def postToService(self, params): 
 816                  return request(self.destScheme, self.destHost, self.destPath+"/sync",  
817                          params, 
818                          method="POST", followRedirects=3, expectedStatus=200, 
819                          setResponse=self._setErrorFromServer, timeout=self.timeout) 
 820   
821 -        def delete(self, usePOST=None): 
 824           
826                  """does nothing. 
827   
828                  You could argue that this could come from a different thread and we 
829                  could try to interrupt the ongoing request.  Well, if you want it, 
830                  try it yourself or ask the author. 
831                  """ 
 832   
834                  if self._errorFromServer is not None: 
835                          raise Error(self._errorFromServer) 
 836   
837 -        def waitForPhases(self, phases, pollInterval=None, increment=None, 
838                          giveUpAfter=None): 
 842   
844                   
845                   
846                   
847                   
848                   
849                   
850                  self._errorFromServer = _getErrorInfo(data) 
 851   
853                  params={ 
854                          "REQUEST": "doQuery", 
855                          "LANG": self.lang, 
856                          "QUERY": self.query} 
857                  params.update(self.userParams) 
858                  if self.uploads: 
859                          upFrags = [] 
860                          for name, key, data in self.uploads: 
861                                  upFrags.append("%s,param:%s"%(name, key)) 
862                                  params[key] = data 
863                          params["UPLOAD"] = ";".join(upFrags) 
864   
865                  params = dict((k, str(v)) for k,v in params.iteritems()) 
866   
867                  try: 
868                          resp = self.postToService(params) 
869                          self.result = LocalResult(resp.data, "TAPResult", resp.getheader( 
870                                  "Content-Type")) 
871                  except Exception as msg: 
872                           
873                           
874                          if not self._errorFromServer: 
875                                  self._errorFromServer = str(msg) 
876                          raise 
877                  else: 
878                           
879                          self._errorFromServer = None 
880                  return self 
 881   
882 -        def run(self, pollInterval=None): 
 884   
885          @property 
888   
889          @property 
892           
893          @property 
896           
897          @property 
900           
901          @property 
903                  return self.userParameters 
 904           
905          @property 
907                  if self.result is None: 
908                          return [] 
909                  else: 
910                          return [self.result] 
 911   
916   
919   
921                  return self._errorFromServer 
 922   
 930   
933          """A facade for an ADQL endpoint. 
934   
935          This is only needed for inspecting server metadata (i.e., in general 
936          only for rather fancy applications). 
937          """ 
939                  self._defineEndpoint(endpointURL) 
 940           
941 -        def createJob(self, query, lang="ADQL-2.0", userParams={}): 
 943   
944          @property 
946                  """returns True, False, or None (undecidable). 
947   
948                  None is returned when /availability gives a 404 (which is legal) 
949                  or the returned document doesn't parse. 
950                  """ 
951                  try: 
952                          response = request(self.destScheme, self.destHost,  
953                                  self.destPath+"/availability", expectedStatus=200) 
954                          res = _parseWith(_AvailabilityParser(), response.data) 
955                  except WrongStatus: 
956                          res = None 
957                  return res 
 958           
959          @property 
961                  """returns a dictionary containing some meta info on the remote service. 
962   
963                  Keys to look for include title, identifier, contact (the mail address), 
964                  and referenceURL. 
965   
966                  If the remote server doesn't return capabilities as expected, an 
967                  empty dict is returned. 
968                  """ 
969                  return _parseWith(_CapabilitiesParser(),  
970                                  request(self.destScheme, self.destHost,  
971                                          self.destPath+"/capabilities").data) 
 972   
973          @property 
975                  """returns a sequence of table definitions for the tables accessible 
976                  through this service. 
977   
978                  The table definitions come as gavo.votable.Table instances. 
979                  """ 
980                  return _parseWith(_TablesParser(), 
981                                  request(self.destScheme, self.destHost, self.destPath+"/tables").data) 
  982