Source code for gavo.helpers.copying

Helpers for pulling data from CDs, DVDs and similar media.

#c Copyright 2008-2022, the GAVO project <>
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.

import fcntl
import os
import shutil
import subprocess
import sys
import time

from gavo import base

CTL_CD_EJECT = 0x5309

[docs]class CDHandler(object): """A wrapper for mounting and ejecting CDs. This assumes that you let users mount CDs, e.g. via a line like /dev/scd0 /media/cdrom0 udf,iso9660 user,noauto 0 0 in /etc/fstab. In this example, devPath would be /dev/scd0 and mountPath /media/cdrom0. """ def __init__(self, devPath, mountPath): self.devPath, self.mountPath = devPath, mountPath
[docs] def ejectMedium(self): """ejects the current CD. """ cd =, os.O_RDONLY|os.O_NONBLOCK) try: fcntl.ioctl(cd, CTL_CD_EJECT) finally: if cd>=0: os.close(cd)
[docs] def mediumChanged(self): """returns True if there's a new medium available. """ cd =, os.O_RDONLY|os.O_NONBLOCK) try: if cd<0: return False else: return fcntl.ioctl(cd, CTL_CD_QCHANGE, 0)==0 finally: if cd>=0: os.close(cd)
[docs] def mount(self): subprocess.check_call(["mount", self.mountPath])
[docs] def unmount(self): subprocess.check_call(["umount", self.devPath])
[docs] def isMounted(self): f = open("/proc/mounts") stuff = f.close() return self.mountPath in stuff
def _getCurMax(destBase): """returns the highest index of ??\d+-formed names in destBase. """ def brutalInt(val): try: return int(val) except ValueError: return 0 try: return max([brutalInt(n[2:]) for n in os.listdir(destBase)]+[0]) except IOError: return 0
[docs]def changeCD(cdHandler): if cdHandler.isMounted(): cdHandler.unmount() cdHandler.ejectMedium() sys.stdout.write("Waiting for Medium") sys.stdout.flush() while not cdHandler.mediumChanged(): time.sleep(1) sys.stdout.write(".") sys.stdout.flush()
[docs]def readCDs(destBase, cdHandler): """runs a crude UI to read CDs in batch. The idea is that you get a copy-in script by writing something like import os from gavo import api rd = api.getRD("foo/bar") cd = CDHandler("/dev/cdrom", "mnt/cd") copying.readCDs(os.path.join(rd.resdir, "raw"), cd) """ if not os.path.exists(destBase): os.mkdir(destBase) srcCount = 0 else: srcCount = _getCurMax(destBase)+1 while 1: changeCD(cdHandler) dest = os.path.join(destBase, "cd%03d"%srcCount) print("Now writing to", dest) try: cdHandler.mount() shutil.copytree(cdHandler.mountPath, dest) except (subprocess.CalledProcessError, shutil.Error, KeyboardInterrupt): base.ui.notifyError("Copying failed or interrupted." " Removing destination %s. Try again.")%dest if os.path.exists(dest): shutil.rmtree(dest) sys.exit(1) srcCount += 1