30. Splitting Up Queries

It usually pays to try and optimize ADQL queries (and we’ll finally write a guide on this one of these days). But sometimes you just need to partition queries; for instance, your result set otherwise becomes too large, or your query really takes that long. In the latter case, you can play with execution_duration on async jobs:

job = svc.run_async("...")
job.execution_duration=10000

This will not help you when you hit the hard match limit. In such cases, the recommended way is to use the table’s primary key to partition the data; usually, that should be the column with the UCD meta.id;meta.main. For a rough partition, where the partition sizes may be grossly different, just figure out the maximum value of the identifier. For our light version of Gaia DR2, you could query:

SELECT max(source_id) FROM gaia.dr2light
\endverbatim

(if that's slow, you probably haven't chosen a good primary key); in
this case, that yields 6917528997577384320.

With that number, you can enter a program like this:

\startverbatim
import pyvo

MAX_ID, N_PART = 6917528997577384320+1, 10
partition_limits = [(MAX_ID/N_PART)*i
  for i in range(N_PART+1)]

svc = pyvo.dal.TAPService("http://dc.g-vo.org/tap")
main_query = "SELECT count(*) FROM cur_part"

for lower, upper in zip(partition_limits[:-1], partition_limits[1:]):
  result = svc.run_sync("WITH sample AS "
    "(SELECT * FROM gaia.dr2light"
    "  WHERE source_id BETWEEN {} and {}) ".format(lower, upper-1)
    +main_query)
    print(result)

(Exercise: Can you see why the +1 is necessary in the MAX_ID assignment)

You’ll obviously have to adapt that a bit when the primary key is a string, but that’s rare these days.

Since most astronomical objects are distributed highly unevenly on the sky, this will yield chunks of very different sizes for commen schemes, where the idenfier somehow encodes the sky position.

If you have a use case where you need a guaranteed maximum result size per partition, you will have to use two passes, first figuing out the distribution of objects and then computing the desired partition from that.

Here’s an example for how one might go about this:

from astropy import table
import pyvo

MAX_ID, ROW_TARGET = 6917528997577384320+1, 10000000

ENDPOINT = "http://dc.g-vo.org/tap"

# the 20000 is just the number of bins to use; make it too small, and
# your inital bins may already overflow ROW_TARGET
ID_DIVISOR = MAX_ID/10000

QUERY = """
select round(source_id/%d) as bin, count(*) as ct
from gaia.dr2light
group by bin
"""%ID_DIVISOR


def get_bin_sizes():
"""returns a ordered sequence of (bin_center, num_objects) rows.
"""
# since the partitioning query already is expensive, cache it,
# and use the cache if it's there.
try:
with open("partitions.vot", "rb") as f:
tbl = table.Table.read(f)
except IOError:
# Fetch from source; takes about 1 hour
print("Fetching partitions from source; this will take a while"
" (provide partitions.vot to avoid re-querying)")
svc = pyvo.dal.TAPService(ENDPOINT)
res = svc.run_async(QUERY, maxrec=1000000)
tbl = res.table
with open("partitions.vot", "wb") as f:
tbl.write(output=f, format="votable")

res = [(row["bin"], row["ct"]) for row in tbl]
res.sort()
return res


def get_partition_limits(bin_sizes):
"""returns a list of limits of source_id ranges exhausting the whole
catalog.

bin_sizes is what get_bin_sizes returns (and it must be sorted by
bin center).
"""
limits, cur_count = [0], 0
for bin_center, bin_count in bin_sizes:
if cur_count+bin_count>MAX_ROWS:
limits.append(int(bin_center*ID_DIVISOR-ID_DIVISOR/2))
cur_count = 0
cur_count += bin_count
limits.append(MAX_ID)
return limits


def get_data_for(svc, query, low, high):
"""returns a TAP result for the (simple) query in the partition
between low and high.

query needs to query the ‘‘sample‘‘ table.
"""
job = svc.submit_job("WITH sample AS "
    "(SELECT * FROM gaia.dr2light"
    "  WHERE source_id BETWEEN {} and {}) ".format(lower, upper-1)
    +query, maxrec=ROW_TARGET)
  try:
  job.run()
  job.wait()
  return job.fetch_result()
finally:
job.delete()


def main():
  svc = pyvo.dal.TAPService(ENDPOINT)
for ct, (low, high) in enumerate(zip(limits[:-1], limits[1:])):
print("{}/{}".format(ct, len(limits)))
res = get_data_for(svc, low, high-1)
# do your thing here

But, most importantly: If you need any of this, you’re probably doing it wrong.


Markus Demleitner, Hendrik Heinl