13. Step 2: Go Async

When doing a lot of queries or long-running queries, run them asynchronously and in parallel.

Asynchronous means that you go to a service, submit your query there and immediately receive some sort of token. With this token, you can come back later and retrieve your result. In the meantime, you’re free to do whatever else you have to do – which includes turning off and/or moving your machine, for instance.

jobs = set()
for short_name, access_url, query in QUERIES:
  job = pyvo.dal.TAPService(access_url).submit_job(
    query.format(**locals()), maxrec=9000000)
  job.run()
  jobs.add((short_name, job))

 while jobs:
  time.sleep(5)
  for short_name, job in list(jobs):
    if job.phase not in ('QUEUED', 'EXECUTING'):
      jobs.remove((short_name, job))
      vohelper.send_table_to(conn, topcat_id,
          job.fetch_result().table, short_name)
      job.delete()

[See fetch3_async.py]

We told you sync is easier to program with. But on the other hand: With this program, all three queries run in parallel, which is nice, in particular if they take a while. Additionally, you have a little more control about when to receive the data.

What’s happening here? First, we submit all jobs. Rather than run_sync we now use TAPService’s submit_job method. While taking the same arguments as run_sync, it immediately returns. Since it can’t peek into the future, it can’t return the finished result. Instead, you get an object that one can use to manipulate the remote job. That remote job is not started by submit_job. It is instead waiting for further configuration (e.g., increasing its maximal runtime) or a request to put it into the processing queue.

For our task, it’s enough to just start the job using the run method. We then add it to a watch set of running jobs..

The rest of the code above is all about managing this set. In a polling loop – be sure to introduce sleeps or your code will hit the remote services all the time – we iterate through the jobs. Actually, we iterate over a copy of the job set since we want to delete completed from it, and we couldn’t do that if there was an iterator over it active.

In the loop body, we check the phase attribute of the job. Although this looks like an attribute access, in each iteration PyVO goes to the remote service and asks it what our job is doing. While it is in either QUEUING or EXECUTING states, it’s still worth waiting for a result. Other states include PENDING (not yet started), COMPLETED (done, result available), ERROR (done, some kind of failure happened; call the raise_if_error method to turn it into a python exception), and ABORTED (interrupted by client or operator intervention).

Once we find a job is done, we remove it from the job list and send the result over to TOPCAT as before.

Finally, we delete the remote job. That’s a nice thing to do. Services will eventually delete your job anyway (you can figure out when and even change that date in the job’s destruction attribute), but it’s good style to discard jobs once you don’t need them any more.

Note that PyVO also gives you a run_async method on TAPServices – this works exactly like run_sync, i.e., it will block until the results are in. Use it if you have to go async because your job runs too long for sync (in general, sync jobs have to finish in seconds to minutes, while async jobs can run for hours) but you want to avoid the dance with checking the phases.

Files


Markus Demleitner, Hendrik Heinl