import dbus.service
import dbus.mainloop.glib
import random
+import collections
mainloop = GLib.MainLoop()
return dbus.ObjectPath("/")
class ExportedObj(dbus.service.Object):
+
+ DBusInterface = collections.namedtuple('DBusInterface', ['dbus_iface', 'get_props_func', 'prop_changed_func'])
+
def __init__(self, bus, object_path):
dbus.service.Object.__init__(self, bus, object_path)
self._bus = bus
self.path = object_path
self.__dbus_ifaces = {}
- def add_dbus_interface(self, dbus_iface, get_props_func):
- self.__dbus_ifaces[dbus_iface] = get_props_func
+ def add_dbus_interface(self, dbus_iface, get_props_func, prop_changed_func):
+ self.__dbus_ifaces[dbus_iface] = ExportedObj.DBusInterface(dbus_iface, get_props_func, prop_changed_func)
+
+ def __dbus_interface_get(self, dbus_iface):
+ if dbus_iface not in self.__dbus_ifaces:
+ raise UnknownInterfaceException()
+ return self.__dbus_ifaces[dbus_iface]
+
+ def _dbus_property_get(self, dbus_iface, propname = None):
+ props = self.__dbus_interface_get(dbus_iface).get_props_func()
+ if propname is None:
+ return props
+ if propname not in props:
+ raise UnknownPropertyException()
+ return props[propname]
- def _get_dbus_properties(self, iface):
- return self.__dbus_ifaces[iface]()
+ def _dbus_property_notify(self, dbus_iface, propname):
+ prop = self._dbus_property_get(dbus_iface, propname)
+ self.__dbus_interface_get(dbus_iface).prop_changed_func(self, { propname: prop })
@dbus.service.method(dbus_interface=dbus.PROPERTIES_IFACE, in_signature='s', out_signature='a{sv}')
- def GetAll(self, iface):
- if iface not in self.__dbus_ifaces.keys():
- raise UnknownInterfaceException()
- return self._get_dbus_properties(iface)
+ def GetAll(self, dbus_iface):
+ return self._dbus_property_get(dbus_iface)
@dbus.service.method(dbus_interface=dbus.PROPERTIES_IFACE, in_signature='ss', out_signature='v')
- def Get(self, iface, name):
- if iface not in self.__dbus_ifaces.keys():
- raise UnknownInterfaceException()
- props = self._get_dbus_properties(iface)
- if not name in props.keys():
- raise UnknownPropertyException()
- return props[name]
+ def Get(self, dbus_iface, name):
+ return self._dbus_property_get(dbus_iface, name)
###################################################################
IFACE_DEVICE = 'org.freedesktop.NetworkManager.Device'
object_path = "/org/freedesktop/NetworkManager/Devices/%d" % Device.counter
Device.counter = Device.counter + 1
ExportedObj.__init__(self, bus, object_path)
- self.add_dbus_interface(IFACE_DEVICE, self.__get_props)
+ self.add_dbus_interface(IFACE_DEVICE, self.__get_props, Device.PropertiesChanged)
self.iface = iface
self.udi = "/sys/devices/virtual/%s" % iface
pass
def __notify(self, propname):
- props = self._get_dbus_properties(IFACE_DEVICE)
- changed = { propname: props[propname] }
- Device.PropertiesChanged(self, changed)
+ self._dbus_property_notify(IFACE_DEVICE, propname)
@dbus.service.signal(IFACE_DEVICE, signature='a{sv}')
def PropertiesChanged(self, changed):
class WiredDevice(Device):
def __init__(self, bus, iface, mac, subchannels):
Device.__init__(self, bus, iface, NM_DEVICE_TYPE_ETHERNET)
- self.add_dbus_interface(IFACE_WIRED, self.__get_props)
+ self.add_dbus_interface(IFACE_WIRED, self.__get_props, WiredDevice.PropertiesChanged)
if mac is None:
self.mac = random_mac()
return props
def __notify(self, propname):
- props = self._get_dbus_properties(IFACE_WIRED)
- changed = { propname: props[propname] }
- WiredDevice.PropertiesChanged(self, changed)
+ self._dbus_property_notify(IFACE_WIRED, propname)
@dbus.service.signal(IFACE_WIRED, signature='a{sv}')
def PropertiesChanged(self, changed):
class VlanDevice(Device):
def __init__(self, bus, iface):
Device.__init__(self, bus, iface, NM_DEVICE_TYPE_VLAN)
- self.add_dbus_interface(IFACE_VLAN, self.__get_props)
+ self.add_dbus_interface(IFACE_VLAN, self.__get_props, VlanDevice.PropertiesChanged)
self.mac = random_mac()
self.carrier = False
path = "/org/freedesktop/NetworkManager/AccessPoint/%d" % WifiAp.counter
WifiAp.counter = WifiAp.counter + 1
ExportedObj.__init__(self, bus, path)
- self.add_dbus_interface(IFACE_WIFI_AP, self.__get_props)
+ self.add_dbus_interface(IFACE_WIFI_AP, self.__get_props, WifiAp.PropertiesChanged)
self.ssid = ssid
if mac:
return props
def __notify(self, propname):
- props = self._get_dbus_properties(IFACE_WIFI_AP)
- changed = { propname: props[propname] }
- WifiAp.PropertiesChanged(self, changed)
+ self._dbus_property_notify(IFACE_WIFI_AP, propname)
@dbus.service.signal(IFACE_WIFI_AP, signature='a{sv}')
def PropertiesChanged(self, changed):
class WifiDevice(Device):
def __init__(self, bus, iface):
Device.__init__(self, bus, iface, NM_DEVICE_TYPE_WIFI)
- self.add_dbus_interface(IFACE_WIFI, self.__get_props)
+ self.add_dbus_interface(IFACE_WIFI, self.__get_props, WifiDevice.PropertiesChanged)
self.mac = random_mac()
self.aps = []
return props
def __notify(self, propname):
- props = self._get_dbus_properties(IFACE_WIFI)
- changed = { propname: props[propname] }
- WifiDevice.PropertiesChanged(self, changed)
+ self._dbus_property_notify(IFACE_WIFI, propname)
@dbus.service.signal(IFACE_WIFI, signature='a{sv}')
def PropertiesChanged(self, changed):
path = "/org/freedesktop/NetworkManager/Nsp/%d" % WimaxNsp.counter
WimaxNsp.counter = WimaxNsp.counter + 1
ExportedObj.__init__(self, bus, path)
- self.add_dbus_interface(IFACE_WIMAX_NSP, self.__get_props)
+ self.add_dbus_interface(IFACE_WIMAX_NSP, self.__get_props, WimaxNsp.PropertiesChanged)
self.name = name
self.strength = random.randint(0, 100)
return props
def __notify(self, propname):
- props = self._get_dbus_properties(IFACE_WIMAX_NSP)
- changed = { propname: props[propname] }
- WimaxNsp.PropertiesChanged(self, changed)
+ self._dbus_property_notify(IFACE_WIMAX_NSP, propname)
@dbus.service.signal(IFACE_WIMAX_NSP, signature='a{sv}')
def PropertiesChanged(self, changed):
class WimaxDevice(Device):
def __init__(self, bus, iface):
Device.__init__(self, bus, iface, NM_DEVICE_TYPE_WIMAX)
- self.add_dbus_interface(IFACE_WIMAX, self.__get_props)
+ self.add_dbus_interface(IFACE_WIMAX, self.__get_props, WimaxDevice.PropertiesChanged)
self.mac = random_mac()
self.bsid = random_mac()
return props
def __notify(self, propname):
- props = self._get_dbus_properties(IFACE_WIMAX)
- changed = { propname: props[propname] }
- WimaxDevice.PropertiesChanged(self, changed)
+ self._dbus_property_notify(IFACE_WIMAX, propname)
@dbus.service.signal(IFACE_WIMAX, signature='a{sv}')
def PropertiesChanged(self, changed):
object_path = "/org/freedesktop/NetworkManager/ActiveConnection/%d" % ActiveConnection.counter
ActiveConnection.counter = ActiveConnection.counter + 1
ExportedObj.__init__(self, bus, object_path)
- self.add_dbus_interface(IFACE_ACTIVE_CONNECTION, self.__get_props)
+ self.add_dbus_interface(IFACE_ACTIVE_CONNECTION, self.__get_props, ActiveConnection.PropertiesChanged)
self.device = device
self.conn = connection
class NetworkManager(ExportedObj):
def __init__(self, bus, object_path):
ExportedObj.__init__(self, bus, object_path)
- self.add_dbus_interface(IFACE_NM, self.__get_props)
+ self.add_dbus_interface(IFACE_NM, self.__get_props, NetworkManager.PropertiesChanged)
self._bus = bus;
self.devices = []
@dbus.service.method(dbus_interface=IFACE_NM, in_signature='', out_signature='ao')
def GetDevices(self):
- return self._get_dbus_properties(IFACE_NM)[PM_DEVICES]
+ return to_path_array(self.devices)
@dbus.service.method(dbus_interface=IFACE_NM, in_signature='s', out_signature='o')
def GetDeviceByIpIface(self, ip_iface):
return props
def __notify(self, propname):
- props = self._get_dbus_properties(IFACE_NM)
- changed = { propname: props[propname] }
- NetworkManager.PropertiesChanged(self, changed)
+ self._dbus_property_notify(IFACE_NM, propname)
@dbus.service.signal(IFACE_NM, signature='a{sv}')
def PropertiesChanged(self, changed):