/usr/share/checkbox/scripts/removable_storage_watcher is in checkbox 0.13.7.
This file is owned by root:root, with mode 0o755.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 | #!/usr/bin/python
import sys
import gobject
import dbus
import argparse
from dbus.mainloop.glib import DBusGMainLoop
class StorageDeviceListener(object):
def __init__(self, action, devices):
self._action = action
self._devices = devices
self._bus = dbus.SystemBus(mainloop=DBusGMainLoop())
self._loop = gobject.MainLoop()
self._error = False
def check(self, timeout):
if self._action == 'insert':
self._bus.add_signal_receiver(self.add_detected, signal_name='DeviceAdded', dbus_interface='org.freedesktop.UDisks')
elif self._action == 'remove':
self._bus.add_signal_receiver(self.remove_detected, signal_name='DeviceRemoved', dbus_interface='org.freedesktop.UDisks')
def timeout_callback():
print "%s seconds have expired waiting for the device to be inserted." % timeout
self._error = True
self._loop.quit()
gobject.timeout_add_seconds(timeout, timeout_callback)
self._loop.run()
return self._error
def job_change_detected(self, devices, job_in_progress, job_id, job_num_tasks, job_cur_task_id, job_cur_task_percentage):
if job_id == "FilesystemMount" and self.is_device_inserted():
print "Expected device %s inserted" % self._device
self._loop.quit()
def add_detected(self, added_path):
self._bus.add_signal_receiver(self.job_change_detected, signal_name='DeviceJobChanged', dbus_interface='org.freedesktop.UDisks')
def remove_detected(self, removed_path):
if not self.is_device_inserted():
print "Removable storage device has been removed"
#TODO: figure out a way to get the DriveConnectionInterface of the
#device that was just removed.
self._loop.quit()
def is_device_inserted(self):
ud_manager_obj = self._bus.get_object("org.freedesktop.UDisks", "/org/freedesktop/UDisks")
ud_manager = dbus.Interface(ud_manager_obj, 'org.freedesktop.UDisks')
for dev in ud_manager.EnumerateDevices():
try:
device_obj = self._bus.get_object("org.freedesktop.UDisks", dev)
device_props = dbus.Interface(device_obj, dbus.PROPERTIES_IFACE)
self._device = device_props.Get('org.freedesktop.UDisks.Device', "DriveConnectionInterface")
if not device_props.Get('org.freedesktop.UDisks.Device',"DeviceIsDrive"):
if self._device in self._devices:
return True
except dbus.DBusException:
pass
return False
def main():
parser = argparse.ArgumentParser(description="Wait for the specified device to be inserted or removed.")
parser.add_argument('action', choices=['insert','remove'])
parser.add_argument('device', choices=['usb','sdio','firewire','scsi'],nargs=argparse.REMAINDER)
parser.add_argument('--timeout', type=int, default=10)
args = parser.parse_args()
listener = StorageDeviceListener(args.action, args.device)
return(listener.check(args.timeout))
if __name__ == "__main__":
sys.exit(main())
|