From: "Gix, Brian" <brian.gix@intel.com>
To: "linux-bluetooth@vger.kernel.org"
<linux-bluetooth@vger.kernel.org>,
"Stotland, Inga" <inga.stotland@intel.com>
Cc: "luiz.dentz@gmail.com" <luiz.dentz@gmail.com>,
"johan.hedberg@gmail.com" <johan.hedberg@gmail.com>
Subject: Re: [PATCH BlueZ v3] test: Add unified test for mesh node example app
Date: Mon, 11 Mar 2019 22:20:50 +0000 [thread overview]
Message-ID: <1552342848.29289.3.camel@intel.com> (raw)
In-Reply-To: <20190310075313.13914-1-inga.stotland@intel.com>
Applied
On Sat, 2019-03-09 at 23:53 -0800, Inga Stotland wrote:
> This adds one script, test-mesh, to replace three test-join,
> example-onoff-server and example-onoff-client.
> This is menu driven test that allows provisioning (join) and/or
> connecting existing (attach) nodes.
> ---
> Makefile.tools | 2 +-
> test/agent.py | 14 +-
> test/example-onoff-client | 288 -------------
> test/example-onoff-server | 365 -----------------
> test/test-mesh | 842 ++++++++++++++++++++++++++++++++++++++
> 5 files changed, 854 insertions(+), 657 deletions(-)
> delete mode 100644 test/example-onoff-client
> delete mode 100644 test/example-onoff-server
> create mode 100755 test/test-mesh
>
> diff --git a/Makefile.tools b/Makefile.tools
> index 0f94bbbe7..379e127b6 100644
> --- a/Makefile.tools
> +++ b/Makefile.tools
> @@ -463,7 +463,7 @@ test_scripts += test/sap_client.py test/bluezutils.py \
> test/test-hfp test/opp-client test/ftp-client \
> test/pbap-client test/map-client test/example-advertisement \
> test/example-gatt-server test/example-gatt-client \
> - test/test-gatt-profile
> + test/test-gatt-profile test/test-mesh test/agent.py
>
> if BTPCLIENT
> noinst_PROGRAMS += tools/btpclient
> diff --git a/test/agent.py b/test/agent.py
> index 22c92f952..778dbe092 100755
> --- a/test/agent.py
> +++ b/test/agent.py
> @@ -1,9 +1,16 @@
> -#!/usr/bin/python
> +#!/usr/bin/python3
>
> import sys
> import dbus
> import dbus.service
> -import dbus.mainloop.glib
> +
> +try:
> + from termcolor import colored, cprint
> + set_green = lambda x: colored(x, 'green', attrs=['bold'])
> + set_cyan = lambda x: colored(x, 'cyan', attrs=['bold'])
> +except ImportError:
> + set_green = lambda x: x
> + set_cyan = lambda x: x
>
> AGENT_IFACE = 'org.bluez.mesh.ProvisionAgent1'
> AGENT_PATH = "/mesh/test/agent"
> @@ -37,4 +44,5 @@ class Agent(dbus.service.Object):
>
> @dbus.service.method(AGENT_IFACE, in_signature="su", out_signature="")
> def DisplayNumeric(self, type, value):
> - print("DisplayNumeric type=", type, " number=", value)
> + print(set_cyan('DisplayNumeric ('), type,
> + set_cyan(') number ='), set_green(value))
> diff --git a/test/example-onoff-client b/test/example-onoff-client
> deleted file mode 100644
> index e4a87eb12..000000000
> --- a/test/example-onoff-client
> +++ /dev/null
> @@ -1,288 +0,0 @@
> -#!/usr/bin/env python3
> -
> -import sys
> -import struct
> -import numpy
> -import dbus
> -import dbus.service
> -import dbus.exceptions
> -
> -try:
> - from gi.repository import GObject
> -except ImportError:
> - import gobject as GObject
> -from dbus.mainloop.glib import DBusGMainLoop
> -
> -MESH_SERVICE_NAME = 'org.bluez.mesh'
> -DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
> -DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
> -
> -MESH_NETWORK_IFACE = 'org.bluez.mesh.Network1'
> -MESH_NODE_IFACE = 'org.bluez.mesh.Node1'
> -MESH_ELEMENT_IFACE = 'org.bluez.mesh.Element1'
> -
> -VENDOR_ID_NONE = 0xffff
> -
> -app = None
> -bus = None
> -mainloop = None
> -node = None
> -token = numpy.uint64(0x76bd4f2372477600)
> -
> -def unwrap(item):
> - if isinstance(item, dbus.Boolean):
> - return bool(item)
> - if isinstance(item, (dbus.UInt16, dbus.Int16, dbus.UInt32, dbus.Int32,
> - dbus.UInt64, dbus.Int64)):
> - return int(item)
> - if isinstance(item, dbus.Byte):
> - return bytes([int(item)])
> - if isinstance(item, dbus.String):
> - return item
> - if isinstance(item, (dbus.Array, list, tuple)):
> - return [unwrap(x) for x in item]
> - if isinstance(item, (dbus.Dictionary, dict)):
> - return dict([(unwrap(x), unwrap(y)) for x, y in item.items()])
> -
> - print('Dictionary item not handled')
> - print(type(item))
> - return item
> -
> -def attach_app_cb(node_path, dict_array):
> - print('Mesh application registered ', node_path)
> - print(type(node_path))
> - print(type(dict_array))
> - print(dict_array)
> -
> - els = unwrap(dict_array)
> - print("Get Elements")
> - for el in els:
> - print(el)
> - idx = struct.unpack('b', el[0])[0]
> - print('Configuration for Element ', end='')
> - print(idx)
> - models = el[1]
> -
> - element = app.get_element(idx)
> - element.set_model_config(models)
> -
> - obj = bus.get_object(MESH_SERVICE_NAME, node_path)
> - global node
> - node = dbus.Interface(obj, MESH_NODE_IFACE)
> -
> -def error_cb(error):
> - print('D-Bus call failed: ' + str(error))
> -
> -def generic_reply_cb():
> - print('D-Bus call done')
> -
> -def interfaces_removed_cb(object_path, interfaces):
> - if not mesh_net:
> - return
> -
> - if object_path == mesh_net[2]:
> - print('Service was removed')
> - mainloop.quit()
> -
> -class Application(dbus.service.Object):
> -
> - def __init__(self, bus):
> - self.path = '/example'
> - self.elements = []
> - dbus.service.Object.__init__(self, bus, self.path)
> -
> - def get_path(self):
> - return dbus.ObjectPath(self.path)
> -
> - def add_element(self, element):
> - self.elements.append(element)
> -
> - def get_element(self, idx):
> - for ele in self.elements:
> - if ele.get_index() == idx:
> - return ele
> -
> - @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
> - def GetManagedObjects(self):
> - response = {}
> - print('GetManagedObjects')
> - for element in self.elements:
> - response[element.get_path()] = element.get_properties()
> - return response
> -
> -class Element(dbus.service.Object):
> - PATH_BASE = '/example/ele'
> -
> - def __init__(self, bus, index):
> - self.path = self.PATH_BASE + format(index, '02x')
> - print(self.path)
> - self.models = []
> - self.bus = bus
> - self.index = index
> - dbus.service.Object.__init__(self, bus, self.path)
> -
> - def _get_sig_models(self):
> - ids = []
> - for model in self.models:
> - id = model.get_id()
> - vendor = model.get_vendor()
> - if vendor == VENDOR_ID_NONE:
> - ids.append(id)
> - return ids
> -
> - def get_properties(self):
> - return {
> - MESH_ELEMENT_IFACE: {
> - 'Index': dbus.Byte(self.index),
> - 'Models': dbus.Array(
> - self._get_sig_models(), signature='q')
> - }
> - }
> -
> - def add_model(self, model):
> - model.set_path(self.path)
> - self.models.append(model)
> -
> - def get_index(self):
> - return self.index
> -
> - def set_model_config(self, config):
> - print('Set element models config')
> -
> - @dbus.service.method(MESH_ELEMENT_IFACE,
> - in_signature="qqbay", out_signature="")
> - def MessageReceived(self, source, key, is_sub, data):
> - print('Message Received on Element ', end='')
> - print(self.index)
> - for model in self.models:
> - model.process_message(source, key, data)
> -
> - @dbus.service.method(MESH_ELEMENT_IFACE,
> - in_signature="qa{sv}", out_signature="")
> -
> - def UpdateModelConfiguration(self, model_id, config):
> - print('UpdateModelConfig ', end='')
> - print(hex(model_id))
> - for model in self.models:
> - if model_id == model.get_id():
> - model.set_config(config)
> - return
> -
> - @dbus.service.method(MESH_ELEMENT_IFACE,
> - in_signature="", out_signature="")
> - def get_path(self):
> - return dbus.ObjectPath(self.path)
> -
> -class Model():
> - def __init__(self, model_id):
> - self.cmd_ops = []
> - self.model_id = model_id
> - self.vendor = VENDOR_ID_NONE
> - self.path = None
> -
> - def set_path(self, path):
> - self.path = path
> -
> - def get_id(self):
> - return self.model_id
> -
> - def get_vendor(self):
> - return self.vendor
> -
> - def process_message(self, source, key, data):
> - print('Model process message')
> -
> - def set_publication(self, period):
> - self.period = period
> -
> - def set_bindings(self, bindings):
> - self.bindings = bindings
> -
> - def set_config(self, config):
> - if 'Bindings' in config:
> - self.bindings = config.get('Bindings')
> - print('Bindings: ', end='')
> - print(self.bindings)
> - if 'PublicationPeriod' in config:
> - self.set_publication(config.get('PublicationPeriod'))
> - print('Model publication period ', end='')
> - print(self.pub_period, end='')
> - print(' ms')
> -
> -class OnOffClient(Model):
> - def __init__(self, model_id):
> - Model.__init__(self, model_id)
> - self.cmd_ops = { 0x8201, # get
> - 0x8202, # set
> - 0x8203 } # set unacknowledged
> - print('OnOff Client')
> -
> - def _reply_cb(state):
> - print('State ', end='');
> - print(state)
> -
> - def _send_message(self, dest, key, data, reply_cb):
> - print('OnOffClient send data')
> - node.Send(self.path, dest, key, data, reply_handler=reply_cb,
> - error_handler=error_cb)
> -
> - def get_state(self, dest, key):
> - opcode = 0x8201
> - data = struct.pack('<H', opcode)
> - self._send_message(dest, key, data, self._reply_cb)
> -
> - def set_state(self, dest, key, state):
> - opcode = 0x8202
> - data = struct.pack('<HB', opcode, state)
> - self._send_message(dest, key, data, self._reply_cb)
> -
> - def process_message(self, source, key, data):
> - print('OnOffClient process message len ', end = '')
> - datalen = len(data)
> - print(datalen)
> -
> - if datalen!=3:
> - return
> -
> - opcode, state=struct.unpack('<HB',bytes(data))
> - if opcode != 0x8202 :
> - print('Bad opcode ', end='')
> - print(hex(opcode))
> - return
> -
> - print('Got state ', end = '')
> - print(hex(state))
> -
> -def attach_app_error_cb(error):
> - print('Failed to register application: ' + str(error))
> - mainloop.quit()
> -
> -def main():
> -
> - DBusGMainLoop(set_as_default=True)
> -
> - global bus
> - bus = dbus.SystemBus()
> - global mainloop
> - global app
> -
> - mesh_net = dbus.Interface(bus.get_object(MESH_SERVICE_NAME,
> - "/org/bluez/mesh"),
> - MESH_NETWORK_IFACE)
> - mesh_net.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
> -
> - app = Application(bus)
> - first_ele = Element(bus, 0x00)
> - first_ele.add_model(OnOffClient(0x1001))
> - app.add_element(first_ele)
> -
> - mainloop = GObject.MainLoop()
> -
> - print('Attach')
> - mesh_net.Attach(app.get_path(), token,
> - reply_handler=attach_app_cb,
> - error_handler=attach_app_error_cb)
> - mainloop.run()
> -
> -if __name__ == '__main__':
> - main()
> diff --git a/test/example-onoff-server b/test/example-onoff-server
> deleted file mode 100644
> index 131b6415c..000000000
> --- a/test/example-onoff-server
> +++ /dev/null
> @@ -1,365 +0,0 @@
> -#!/usr/bin/env python3
> -
> -import sys
> -import struct
> -import numpy
> -import dbus
> -import dbus.service
> -import dbus.exceptions
> -
> -from threading import Timer
> -import time
> -
> -
> -try:
> - from gi.repository import GObject
> -except ImportError:
> - import gobject as GObject
> -from dbus.mainloop.glib import DBusGMainLoop
> -
> -MESH_SERVICE_NAME = 'org.bluez.mesh'
> -DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
> -DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
> -
> -MESH_NETWORK_IFACE = 'org.bluez.mesh.Network1'
> -MESH_NODE_IFACE = 'org.bluez.mesh.Node1'
> -MESH_APPLICATION_IFACE = 'org.bluez.mesh.Application1'
> -MESH_ELEMENT_IFACE = 'org.bluez.mesh.Element1'
> -
> -APP_COMPANY_ID = 0x05f1
> -APP_PRODUCT_ID = 0x0001
> -APP_VERSION_ID = 0x0001
> -
> -VENDOR_ID_NONE = 0xffff
> -
> -app = None
> -bus = None
> -mainloop = None
> -node = None
> -
> -token = numpy.uint64(0x76bd4f2372476578)
> -
> -def generic_error_cb(error):
> - print('D-Bus call failed: ' + str(error))
> -
> -def generic_reply_cb():
> - print('D-Bus call done')
> -
> -def unwrap(item):
> - if isinstance(item, dbus.Boolean):
> - return bool(item)
> - if isinstance(item, (dbus.UInt16, dbus.Int16, dbus.UInt32, dbus.Int32,
> - dbus.UInt64, dbus.Int64)):
> - return int(item)
> - if isinstance(item, dbus.Byte):
> - return bytes([int(item)])
> - if isinstance(item, dbus.String):
> - return item
> - if isinstance(item, (dbus.Array, list, tuple)):
> - return [unwrap(x) for x in item]
> - if isinstance(item, (dbus.Dictionary, dict)):
> - return dict([(unwrap(x), unwrap(y)) for x, y in item.items()])
> -
> - print('Dictionary item not handled')
> - print(type(item))
> - return item
> -
> -def attach_app_cb(node_path, dict_array):
> - print('Mesh application registered ', node_path)
> -
> - obj = bus.get_object(MESH_SERVICE_NAME, node_path)
> -
> - global node
> - node = dbus.Interface(obj, MESH_NODE_IFACE)
> -
> - els = unwrap(dict_array)
> - print("Get Elements")
> -
> - for el in els:
> - idx = struct.unpack('b', el[0])[0]
> - print('Configuration for Element ', end='')
> - print(idx)
> -
> - models = el[1]
> - element = app.get_element(idx)
> - element.set_model_config(models)
> -
> -def interfaces_removed_cb(object_path, interfaces):
> - if not mesh_net:
> - return
> -
> - if object_path == mesh_net[2]:
> - print('Service was removed')
> - mainloop.quit()
> -
> -def send_response(path, dest, key, data):
> - print('send response ', end='')
> - print(data)
> - node.Send(path, dest, key, data, reply_handler=generic_reply_cb,
> - error_handler=generic_error_cb)
> -
> -def send_publication(path, model_id, data):
> - print('send publication ', end='')
> - print(data)
> - node.Publish(path, model_id, data,
> - reply_handler=generic_reply_cb,
> - error_handler=generic_error_cb)
> -
> -class PubTimer():
> - def __init__(self):
> - self.seconds = None
> - self.func = None
> - self.thread = None
> - self.busy = False
> -
> - def _timeout_cb(self):
> - self.func()
> - self.busy = True
> - self._schedule_timer()
> - self.busy =False
> -
> - def _schedule_timer(self):
> - self.thread = Timer(self.seconds, self._timeout_cb)
> - self.thread.start()
> -
> - def start(self, seconds, func):
> - self.func = func
> - self.seconds = seconds
> - if not self.busy:
> - self._schedule_timer()
> -
> - def cancel(self):
> - print('Cancel timer')
> - if self.thread is not None:
> - print('Cancel thread')
> - self.thread.cancel()
> - self.thread = None
> -
> -class Application(dbus.service.Object):
> -
> - def __init__(self, bus):
> - self.path = '/example'
> - self.elements = []
> - dbus.service.Object.__init__(self, bus, self.path)
> -
> - def get_path(self):
> - return dbus.ObjectPath(self.path)
> -
> - def add_element(self, element):
> - self.elements.append(element)
> -
> - def get_element(self, idx):
> - for ele in self.elements:
> - if ele.get_index() == idx:
> - return ele
> -
> - def get_properties(self):
> - return {
> - MESH_APPLICATION_IFACE: {
> - 'CompanyID': dbus.UInt16(APP_COMPANY_ID),
> - 'ProductID': dbus.UInt16(APP_PRODUCT_ID),
> - 'VersionID': dbus.UInt16(APP_VERSION_ID)
> - }
> - }
> -
> - @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
> - def GetManagedObjects(self):
> - response = {}
> - print('GetManagedObjects')
> - response[self.path] = self.get_properties()
> - for element in self.elements:
> - response[element.get_path()] = element.get_properties()
> - return response
> -
> -class Element(dbus.service.Object):
> - PATH_BASE = '/example/ele'
> -
> - def __init__(self, bus, index):
> - self.path = self.PATH_BASE + format(index, '02x')
> - print(self.path)
> - self.models = []
> - self.bus = bus
> - self.index = index
> - dbus.service.Object.__init__(self, bus, self.path)
> -
> - def _get_sig_models(self):
> - ids = []
> - for model in self.models:
> - id = model.get_id()
> - vendor = model.get_vendor()
> - if vendor == VENDOR_ID_NONE:
> - ids.append(id)
> - return ids
> -
> - def get_properties(self):
> - return {
> - MESH_ELEMENT_IFACE: {
> - 'Index': dbus.Byte(self.index),
> - 'Models': dbus.Array(
> - self._get_sig_models(), signature='q')
> - }
> - }
> -
> - def add_model(self, model):
> - model.set_path(self.path)
> - self.models.append(model)
> -
> - def get_index(self):
> - return self.index
> -
> - def set_model_config(self, configs):
> - print('Set element models config')
> - for config in configs:
> - mod_id = config[0]
> - self.UpdateModelConfiguration(mod_id, config[1])
> -
> - @dbus.service.method(MESH_ELEMENT_IFACE,
> - in_signature="qqbay", out_signature="")
> - def MessageReceived(self, source, key, is_sub, data):
> - print('Message Received on Element ', end='')
> - print(self.index)
> - for model in self.models:
> - model.process_message(source, key, data)
> -
> - @dbus.service.method(MESH_ELEMENT_IFACE,
> - in_signature="qa{sv}", out_signature="")
> -
> - def UpdateModelConfiguration(self, model_id, config):
> - print('UpdateModelConfig ', end='')
> - print(hex(model_id))
> - for model in self.models:
> - if model_id == model.get_id():
> - model.set_config(config)
> - return
> -
> - @dbus.service.method(MESH_ELEMENT_IFACE,
> - in_signature="", out_signature="")
> -
> - def get_path(self):
> - return dbus.ObjectPath(self.path)
> -
> -class Model():
> - def __init__(self, model_id):
> - self.cmd_ops = []
> - self.model_id = model_id
> - self.vendor = VENDOR_ID_NONE
> - self.bindings = []
> - self.pub_period = 0
> - self.pub_id = 0
> - self.path = None
> -
> - def set_path(self, path):
> - self.path = path
> -
> - def get_id(self):
> - return self.model_id
> -
> - def get_vendor(self):
> - return self.vendor
> -
> - def process_message(self, source, key, data):
> - print('Model process message')
> -
> - def set_publication(self, period):
> - self.pub_period = period
> -
> - def set_config(self, config):
> - if 'Bindings' in config:
> - self.bindings = config.get('Bindings')
> - print('Bindings: ', end='')
> - print(self.bindings)
> - if 'PublicationPeriod' in config:
> - self.set_publication(config.get('PublicationPeriod'))
> - print('Model publication period ', end='')
> - print(self.pub_period, end='')
> - print(' ms')
> -
> -class OnOffServer(Model):
> - def __init__(self, model_id):
> - Model.__init__(self, model_id)
> - self.cmd_ops = { 0x8201, # get
> - 0x8202, # set
> - 0x8203 } # set unacknowledged
> -
> - print("OnOff Server ", end="")
> - self.state = 0
> - print('State ', end='')
> - self.timer = PubTimer()
> -
> - def process_message(self, source, key, data):
> - datalen = len(data)
> - print('OnOff Server process message len ', datalen)
> -
> - if datalen!=2 and datalen!=3:
> - return
> -
> - if datalen==2:
> - op_tuple=struct.unpack('<H',bytes(data))
> - opcode = op_tuple[0]
> - if opcode != 0x8201:
> - print(hex(opcode))
> - return
> - print('Get state')
> - elif datalen==3:
> - opcode,self.state=struct.unpack('<HB',bytes(data))
> - if opcode != 0x8202 and opcode != 0x8203:
> - print(hex(opcode))
> - return
> - print('Set state: ', end='')
> - print(self.state)
> -
> - rsp_data = struct.pack('<HB', 0x8204, self.state)
> - send_response(self.path, source, key, rsp_data)
> -
> - def publish(self):
> - print('Publish')
> - data = struct.pack('B', self.state)
> - send_publication(self.path, self.model_id, data)
> -
> - def set_publication(self, period):
> - if period == 0:
> - self.pub_period = 0
> - self.timer.cancel()
> - return
> -
> - # We do not handle ms in this example
> - if period < 1000:
> - return
> -
> - self.pub_period = period
> - self.timer.start(period/1000, self.publish)
> -
> -def attach_app_error_cb(error):
> - print('Failed to register application: ' + str(error))
> - mainloop.quit()
> -
> -def main():
> -
> - DBusGMainLoop(set_as_default=True)
> -
> - global bus
> - bus = dbus.SystemBus()
> - global mainloop
> - global app
> -
> - mesh_net = dbus.Interface(bus.get_object(MESH_SERVICE_NAME,
> - "/org/bluez/mesh"),
> - MESH_NETWORK_IFACE)
> - mesh_net.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
> -
> - app = Application(bus)
> - first_ele = Element(bus, 0x00)
> - first_ele.add_model(OnOffServer(0x1000))
> - app.add_element(first_ele)
> -
> - mainloop = GObject.MainLoop()
> -
> - print('Attach')
> - mesh_net.Attach(app.get_path(), token,
> - reply_handler=attach_app_cb,
> - error_handler=attach_app_error_cb)
> -
> - mainloop.run()
> -
> -if __name__ == '__main__':
> - main()
> diff --git a/test/test-mesh b/test/test-mesh
> new file mode 100755
> index 000000000..fd02207bc
> --- /dev/null
> +++ b/test/test-mesh
> @@ -0,0 +1,842 @@
> +#!/usr/bin/env python3
> +
> +###################################################################
> +#
> +# This is a unified test sample for BT Mesh
> +#
> +# To run the test:
> +# test-mesh [token]
> +#
> +# 'token' is an optional argument. It must be a 16-digit
> +# hexadecimal number. The token must be associated with
> +# an existing node. The token is generated and assigned
> +# to a node as a result of successful provisioning (see
> +# explanation of "join" option).
> +# When the token is set, the menu operations "attach"
> +# and "remove" may be performed on a node specified
> +# by this token.
> +#
> +# The test imitates a device with 2 elements:
> +# element 0: OnOff Server model
> +# element 1: OnOff Client model
> +#
> +# The main menu:
> +# 1 - set node ID (token)
> +# 2 - join mesh network
> +# 3 - attach mesh node
> +# 4 - remove node
> +# 5 - client menu
> +# 6 - exit
> +#
> +# The main menu options explained:
> +# 1 - set token
> +# Set the unique node token.
> +# The token can be set from command line arguments as
> +# well.
> +#
> +# 2 - join
> +# Request provisioning of a device to become a node
> +# on a mesh network. The test generates device UUID
> +# which is displayed and will need to be provided to
> +# an outside entity that acts as a Provisioner. Also,
> +# during the provisioning process, an agent that is
> +# part of the test, will request (or will be requested)
> +# to perform a specified operation, e.g., a number will
> +# be displayed and this number will need to be entered
> +# on the Provisioner's side.
> +# In case of successful provisioning, the application
> +# automatically attaches as a node to the daemon. A node
> +# 'token' is returned to the application and is used
> +# for the runtime of the test.
> +#
> +# 3 - attach
> +# Attach the application to bluetoothd-daemon as a node.
> +# For the call to be successful, the valid node token must
> +# be already set, either from command arguments or by
> +# executing "set token" operation or automatically after
> +# successfully executing "join" operation in the same test
> +# run.
> +#
> +# 4 - remove
> +# Permanently removes any node configuration from daemon
> +# and persistent storage. After this operation, the node
> +# is permanently forgotten by the daemon and the associated
> +# node token is no longer valid.
> +#
> +# 5 - client menu
> +# Enter On/Off client submenu.
> +#
> +# 6 - exit
> +#
> +###################################################################
> +import sys
> +import struct
> +import fcntl
> +import os
> +import numpy
> +import random
> +import dbus
> +import dbus.service
> +import dbus.exceptions
> +
> +from threading import Timer
> +import time
> +
> +try:
> + from gi.repository import GLib
> +except ImportError:
> + import glib as GLib
> +from dbus.mainloop.glib import DBusGMainLoop
> +
> +try:
> + from termcolor import colored, cprint
> + set_error = lambda x: colored('!' + x, 'red', attrs=['bold'])
> + set_cyan = lambda x: colored(x, 'cyan', attrs=['bold'])
> + set_green = lambda x: colored(x, 'green', attrs=['bold'])
> + set_yellow = lambda x: colored(x, 'yellow', attrs=['bold'])
> +except ImportError:
> + print('!!! Install termcolor module for better experience !!!')
> + set_error = lambda x: x
> + set_cyan = lambda x: x
> + set_green = lambda x: x
> + set_yellow = lambda x: x
> +
> +# Provisioning agent
> +try:
> + import agent
> +except ImportError:
> + agent = None
> +
> +MESH_SERVICE_NAME = 'org.bluez.mesh'
> +DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
> +DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
> +
> +MESH_NETWORK_IFACE = 'org.bluez.mesh.Network1'
> +MESH_NODE_IFACE = 'org.bluez.mesh.Node1'
> +MESH_APPLICATION_IFACE = 'org.bluez.mesh.Application1'
> +MESH_ELEMENT_IFACE = 'org.bluez.mesh.Element1'
> +
> +APP_COMPANY_ID = 0x05f1
> +APP_PRODUCT_ID = 0x0001
> +APP_VERSION_ID = 0x0001
> +
> +VENDOR_ID_NONE = 0xffff
> +
> +app = None
> +bus = None
> +mainloop = None
> +node = None
> +mesh_net = None
> +
> +menu_level = 0
> +dst_addr = 0x0000
> +app_idx = 0
> +
> +# Node token housekeeping
> +token = None
> +have_token = False
> +
> +user_input = 0
> +
> +
> +def app_exit():
> + global mainloop
> + global app
> +
> + for el in app.elements:
> + for model in el.models:
> + if model.timer != None:
> + model.timer.cancel()
> + mainloop.quit()
> +
> +def array_to_string(b_array):
> + str = ""
> + for b in b_array:
> + str += "%02x" % b
> + return str
> +
> +def generic_error_cb(error):
> + print(set_error('D-Bus call failed: ') + str(error))
> +
> +def generic_reply_cb():
> + return
> +
> +def attach_app_error_cb(error):
> + print(set_error('Failed to register application: ') + str(error))
> +
> +def attach(token):
> + print('Attach mesh node to bluetooth-meshd daemon')
> +
> + mesh_net.Attach(app.get_path(), token,
> + reply_handler=attach_app_cb,
> + error_handler=attach_app_error_cb)
> +
> +def join_cb():
> + print('Join procedure started')
> +
> +def join_error_cb(reason):
> + print('Join procedure failed: ', reason)
> +
> +def unwrap(item):
> + if isinstance(item, dbus.Boolean):
> + return bool(item)
> + if isinstance(item, (dbus.UInt16, dbus.Int16, dbus.UInt32, dbus.Int32,
> + dbus.UInt64, dbus.Int64)):
> + return int(item)
> + if isinstance(item, dbus.Byte):
> + return bytes([int(item)])
> + if isinstance(item, dbus.String):
> + return item
> + if isinstance(item, (dbus.Array, list, tuple)):
> + return [unwrap(x) for x in item]
> + if isinstance(item, (dbus.Dictionary, dict)):
> + return dict([(unwrap(x), unwrap(y)) for x, y in item.items()])
> +
> + print(set_error('Dictionary item not handled: ') + type(item))
> +
> + return item
> +
> +def attach_app_cb(node_path, dict_array):
> + print('Mesh application registered ', node_path)
> +
> + obj = bus.get_object(MESH_SERVICE_NAME, node_path)
> +
> + global node
> + node = dbus.Interface(obj, MESH_NODE_IFACE)
> +
> + els = unwrap(dict_array)
> +
> + for el in els:
> + idx = struct.unpack('b', el[0])[0]
> +
> + models = el[1]
> + element = app.get_element(idx)
> + element.set_model_config(models)
> +
> +def interfaces_removed_cb(object_path, interfaces):
> + print('Removed')
> + if not mesh_net:
> + return
> +
> + print(object_path)
> + if object_path == mesh_net[2]:
> + print('Service was removed')
> + app_exit()
> +
> +def send_response(path, dest, key, data):
> + node.Send(path, dest, key, data, reply_handler=generic_reply_cb,
> + error_handler=generic_error_cb)
> +
> +def send_publication(path, model_id, data):
> + print('Send publication ', end='')
> + print(data)
> + node.Publish(path, model_id, data,
> + reply_handler=generic_reply_cb,
> + error_handler=generic_error_cb)
> +
> +def print_state(state):
> + print('State is ', end='')
> + if state == 0:
> + print('OFF')
> + elif state == 1:
> + print('ON')
> + else:
> + print('UNKNOWN')
> +class PubTimer():
> + def __init__(self):
> + self.seconds = None
> + self.func = None
> + self.thread = None
> + self.busy = False
> +
> + def _timeout_cb(self):
> + self.func()
> + self.busy = True
> + self._schedule_timer()
> + self.busy =False
> +
> + def _schedule_timer(self):
> + self.thread = Timer(self.seconds, self._timeout_cb)
> + self.thread.start()
> +
> + def start(self, seconds, func):
> + self.func = func
> + self.seconds = seconds
> + if not self.busy:
> + self._schedule_timer()
> +
> + def cancel(self):
> + if self.thread is not None:
> + self.thread.cancel()
> + self.thread = None
> +
> +class Application(dbus.service.Object):
> +
> + def __init__(self, bus):
> + self.path = '/example'
> + self.agent = None
> + self.elements = []
> + dbus.service.Object.__init__(self, bus, self.path)
> +
> + def set_agent(self, agent):
> + self.agent = agent
> +
> + def get_path(self):
> + return dbus.ObjectPath(self.path)
> +
> + def add_element(self, element):
> + self.elements.append(element)
> +
> + def get_element(self, idx):
> + for ele in self.elements:
> + if ele.get_index() == idx:
> + return ele
> +
> + def get_properties(self):
> + return {
> + MESH_APPLICATION_IFACE: {
> + 'CompanyID': dbus.UInt16(APP_COMPANY_ID),
> + 'ProductID': dbus.UInt16(APP_PRODUCT_ID),
> + 'VersionID': dbus.UInt16(APP_VERSION_ID)
> + }
> + }
> +
> + @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
> + def GetManagedObjects(self):
> + response = {}
> + response[self.path] = self.get_properties()
> + response[self.agent.get_path()] = self.agent.get_properties()
> + for element in self.elements:
> + response[element.get_path()] = element.get_properties()
> + return response
> +
> + @dbus.service.method(MESH_APPLICATION_IFACE,
> + in_signature="t", out_signature="")
> + def JoinComplete(self, value):
> + global token
> + global have_token
> +
> + print('JoinComplete with token ' + set_green(hex(value)))
> +
> + token = value
> + have_token = True
> +
> + attach(token)
> +
> + @dbus.service.method(MESH_APPLICATION_IFACE,
> + in_signature="s", out_signature="")
> + def JoinFailed(self, value):
> + print(set_error('JoinFailed '), value)
> +
> +
> +class Element(dbus.service.Object):
> + PATH_BASE = '/example/ele'
> +
> + def __init__(self, bus, index):
> + self.path = self.PATH_BASE + format(index, '02x')
> + self.models = []
> + self.bus = bus
> + self.index = index
> + dbus.service.Object.__init__(self, bus, self.path)
> +
> + def _get_sig_models(self):
> + ids = []
> + for model in self.models:
> + id = model.get_id()
> + vendor = model.get_vendor()
> + if vendor == VENDOR_ID_NONE:
> + ids.append(id)
> + return ids
> +
> + def get_properties(self):
> + return {
> + MESH_ELEMENT_IFACE: {
> + 'Index': dbus.Byte(self.index),
> + 'Models': dbus.Array(
> + self._get_sig_models(), signature='q')
> + }
> + }
> +
> + def add_model(self, model):
> + model.set_path(self.path)
> + self.models.append(model)
> +
> + def get_index(self):
> + return self.index
> +
> + def set_model_config(self, configs):
> + for config in configs:
> + mod_id = config[0]
> + self.UpdateModelConfiguration(mod_id, config[1])
> +
> + @dbus.service.method(MESH_ELEMENT_IFACE,
> + in_signature="qqbay", out_signature="")
> + def MessageReceived(self, source, key, is_sub, data):
> + print('Message Received on Element ', end='')
> + print(self.index)
> + for model in self.models:
> + model.process_message(source, key, data)
> +
> + @dbus.service.method(MESH_ELEMENT_IFACE,
> + in_signature="qa{sv}", out_signature="")
> +
> + def UpdateModelConfiguration(self, model_id, config):
> + print('UpdateModelConfig ', end='')
> + print(hex(model_id))
> + for model in self.models:
> + if model_id == model.get_id():
> + model.set_config(config)
> + return
> +
> + @dbus.service.method(MESH_ELEMENT_IFACE,
> + in_signature="", out_signature="")
> +
> + def get_path(self):
> + return dbus.ObjectPath(self.path)
> +
> +class Model():
> + def __init__(self, model_id):
> + self.cmd_ops = []
> + self.model_id = model_id
> + self.vendor = VENDOR_ID_NONE
> + self.bindings = []
> + self.pub_period = 0
> + self.pub_id = 0
> + self.path = None
> + self.timer = None
> +
> + def set_path(self, path):
> + self.path = path
> +
> + def get_id(self):
> + return self.model_id
> +
> + def get_vendor(self):
> + return self.vendor
> +
> + def process_message(self, source, key, data):
> + return
> +
> + def set_publication(self, period):
> + self.pub_period = period
> +
> + def set_config(self, config):
> + if 'Bindings' in config:
> + self.bindings = config.get('Bindings')
> + print('Bindings: ', end='')
> + print(self.bindings)
> + if 'PublicationPeriod' in config:
> + self.set_publication(config.get('PublicationPeriod'))
> + print('Model publication period ', end='')
> + print(self.pub_period, end='')
> + print(' ms')
> +
> + def print_bindings(self):
> + print(set_cyan('Model'), set_cyan('%04x' % self.model_id),
> + set_cyan('is bound to application key(s): '), end = '')
> +
> + if len(self.bindings) == 0:
> + print(set_cyan('** None **'))
> + for b in self.bindings:
> + print(set_cyan('%04x' % b), set_cyan(', '))
> +
> +########################
> +# On Off Server Model
> +########################
> +class OnOffServer(Model):
> + def __init__(self, model_id):
> + Model.__init__(self, model_id)
> + self.cmd_ops = { 0x8201, # get
> + 0x8202, # set
> + 0x8203, # set unacknowledged
> + 0x8204 } # status
> +
> + print("OnOff Server ")
> + self.state = 0
> + print_state(self.state)
> + self.timer = PubTimer()
> +
> + def process_message(self, source, key, data):
> + datalen = len(data)
> + print('OnOff Server process message len: ', datalen)
> +
> + if datalen != 2 and datalen != 3:
> + # The opcode is not recognized by this model
> + return
> +
> + if datalen == 2:
> + op_tuple=struct.unpack('<H',bytes(data))
> + opcode = op_tuple[0]
> + if opcode != 0x8201:
> + # The opcode is not recognized by this model
> + return
> + print('Get state')
> + elif datalen == 3:
> + opcode,self.state=struct.unpack('<HB',bytes(data))
> + if opcode != 0x8202 and opcode != 0x8203:
> + # The opcode is not recognized by this model
> + return
> + print_state(self.state)
> +
> + rsp_data = struct.pack('<HB', 0x8204, self.state)
> + send_response(self.path, source, key, rsp_data)
> +
> + def set_publication(self, period):
> +
> + # We do not handle ms in this example
> + if period < 1000:
> + return
> +
> + self.pub_period = period
> + if period == 0:
> + self.timer.cancel()
> + return
> +
> + self.timer.start(period/1000, self.publish)
> +
> +
> + def publish(self):
> + print('Publish')
> + data = struct.pack('<HB', 0x8204, self.state)
> + send_publication(self.path, self.model_id, data)
> +
> +########################
> +# On Off Client Model
> +########################
> +class OnOffClient(Model):
> + def __init__(self, model_id):
> + Model.__init__(self, model_id)
> + self.cmd_ops = { 0x8201, # get
> + 0x8202, # set
> + 0x8203, # set unacknowledged
> + 0x8204 } # status
> + print('OnOff Client')
> +
> + def _reply_cb(state):
> + print('State ', end='');
> + print(state)
> +
> + def _send_message(self, dest, key, data, reply_cb):
> + print('OnOffClient send data')
> + node.Send(self.path, dest, key, data, reply_handler=reply_cb,
> + error_handler=generic_error_cb)
> +
> + def get_state(self, dest, key):
> + opcode = 0x8201
> + data = struct.pack('<H', opcode)
> + self._send_message(dest, key, data, self._reply_cb)
> +
> + def set_state(self, dest, key, state):
> + opcode = 0x8202
> + print('State:', state)
> + data = struct.pack('<HB', opcode, state)
> + self._send_message(dest, key, data, self._reply_cb)
> +
> + def process_message(self, source, key, data):
> + print('OnOffClient process message len = ', end = '')
> + datalen = len(data)
> + print(datalen)
> +
> + if datalen != 3:
> + # The opcode is not recognized by this model
> + return
> +
> + opcode, state=struct.unpack('<HB',bytes(data))
> +
> + if opcode != 0x8204 :
> + # The opcode is not recognized by this model
> + return
> +
> + print(set_yellow('Got state '), end = '')
> +
> + state_str = "ON"
> + if state == 0:
> + state_str = "OFF"
> +
> + print(set_green(state_str), set_yellow('from'),
> + set_green('%04x' % source))
> +
> +########################
> +# Menu functions
> +########################
> +class MenuHandler(object):
> + def __init__(self, callback):
> + self.cb = callback
> + flags = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL)
> + flags |= os.O_NONBLOCK
> + fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, flags)
> + sys.stdin.flush()
> + GLib.io_add_watch(sys.stdin, GLib.IO_IN, self.input_callback)
> +
> + def input_callback(self, fd, condition):
> + chunk = fd.read()
> + buffer = ''
> + for char in chunk:
> + buffer += char
> + if char == '\n':
> + self.cb(buffer)
> +
> + return True
> +
> +def process_input(input_str):
> + if menu_level == 0:
> + process_main_menu(input_str)
> + elif menu_level == 1:
> + process_client_menu(input_str)
> + else:
> + print(set_error('BUG: bad menu level'))
> +
> +def switch_menu(level):
> + global menu_level
> +
> + if level > 1:
> + return
> +
> + if level == 0:
> + main_menu()
> + elif level == 1:
> + client_menu()
> +
> + menu_level = level
> +
> +########################
> +# Main menu functions
> +########################
> +def process_main_menu(input_str):
> + global token
> + global user_input
> + global have_token
> +
> + str = input_str.strip()
> +
> + if user_input == 1:
> + res = set_token(str)
> + user_input = 0
> +
> + if res == False:
> + main_menu()
> +
> + return
> +
> + # Allow entering empty lines for better output visibility
> + if len(str) == 0:
> + return
> +
> + if str.isdigit() == False:
> + main_menu()
> + return
> +
> + opt = int(str)
> +
> + if opt > 6:
> + print(set_error('Unknown menu option: '), opt)
> + main_menu()
> + elif opt == 1:
> + if have_token:
> + print('Token already set')
> + return
> +
> + user_input = 1;
> + print(set_cyan('Enter 16-digit hex node ID:'))
> + elif opt == 2:
> + if agent == None:
> + print(set_error('Provisioning agent not found'))
> + return
> +
> + join_mesh()
> + elif opt == 3:
> + if have_token == False:
> + print(set_error('Token is not set'))
> + main_menu()
> + return
> +
> + attach(token)
> + elif opt == 4:
> + if have_token == False:
> + print(set_error('Token is not set'))
> + main_menu()
> + return
> +
> + print('Remove mesh node')
> + mesh_net.Leave(token, reply_handler=generic_reply_cb,
> + error_handler=generic_error_cb)
> + have_token = False
> + elif opt == 5:
> + switch_menu(1)
> + elif opt == 6:
> + app_exit()
> +
> +
> +def main_menu():
> + print(set_cyan('*** MAIN MENU ***'))
> + print(set_cyan('1 - set node ID (token)'))
> + print(set_cyan('2 - join mesh network'))
> + print(set_cyan('3 - attach mesh node'))
> + print(set_cyan('4 - remove node'))
> + print(set_cyan('5 - client menu'))
> + print(set_cyan('6 - exit'))
> +
> +def set_token(str):
> + global token
> + global have_token
> +
> + if len(str) != 16:
> + print(set_error('Expected 16 digits'))
> + return False
> +
> + try:
> + input_number = int(str, 16)
> + except ValueError:
> + print(set_error('Not a valid hexadecimal number'))
> + return False
> +
> + token = numpy.uint64(input_number)
> + have_token = True
> +
> + return True
> +
> +def join_mesh():
> + uuid = bytearray.fromhex("0a0102030405060708090A0B0C0D0E0F")
> +
> + caps = ["out-numeric"]
> + oob = ["other"]
> +
> + random.shuffle(uuid)
> + uuid_str = array_to_string(uuid)
> + print('Joining with UUID ' + set_green(uuid_str))
> +
> + mesh_net.Join(app.get_path(), uuid,
> + reply_handler=join_cb,
> + error_handler=join_error_cb)
> +
> +##############################
> +# On/Off Client menu functions
> +##############################
> +def process_client_menu(input_str):
> + global user_input
> + global dst_addr
> + global app_idx
> +
> + res = -1
> + str = input_str.strip()
> +
> + if user_input == 1:
> + res = set_value(str)
> + if res != -1:
> + dst_addr = res
> + elif user_input == 2:
> + res = set_value(str)
> + if res != -1:
> + app_idx = res
> +
> + if user_input != 0:
> + user_input = 0
> + if res == -1:
> + client_menu()
> + return
> +
> + # Allow entering empty lines for better output visibility
> + if len(str) == 0:
> + return
> +
> + if str.isdigit() == False:
> + client_menu()
> + return
> +
> + opt = int(str)
> +
> + if opt > 7:
> + print(set_error('Unknown menu option: '), opt)
> + client_menu()
> + return
> +
> + if opt >= 3 and opt <= 5 and dst_addr == 0x0000:
> + print(set_error('Destination address not set!'))
> + return
> +
> + if opt == 1:
> + user_input = 1;
> + print(set_cyan('Enter 4-digit hex destination address:'))
> + elif opt == 2:
> + user_input = 2;
> + app.elements[1].models[0].print_bindings()
> + print(set_cyan('Choose application key index:'))
> + elif opt == 3:
> + app.elements[1].models[0].get_state(dst_addr, app_idx)
> + elif opt == 4 or opt == 5:
> + app.elements[1].models[0].set_state(dst_addr, app_idx, opt - 4)
> + elif opt == 6:
> + switch_menu(0)
> + elif opt == 7:
> + app_exit()
> +
> +def client_menu():
> + print(set_cyan('*** ON/OFF CLIENT MENU ***'))
> + print(set_cyan('1 - set destination address'))
> + print(set_cyan('2 - set application key index'))
> + print(set_cyan('3 - get state'))
> + print(set_cyan('4 - set state OFF'))
> + print(set_cyan('5 - set state ON'))
> + print(set_cyan('6 - back to main menu'))
> + print(set_cyan('7 - exit'))
> +
> +def set_value(str):
> +
> + if len(str) != 4:
> + print(set_error('Expected 4 digits'))
> + return -1
> +
> + try:
> + value = int(str, 16)
> + except ValueError:
> + print(set_error('Not a valid hexadecimal number'))
> + return -1
> +
> + return value
> +
> +########################
> +# Main entry
> +########################
> +def main():
> +
> + DBusGMainLoop(set_as_default=True)
> +
> + global bus
> + bus = dbus.SystemBus()
> + global mainloop
> + global app
> + global mesh_net
> +
> + if len(sys.argv) > 1 :
> + set_token(sys.argv[1])
> +
> + mesh_net = dbus.Interface(bus.get_object(MESH_SERVICE_NAME,
> + "/org/bluez/mesh"),
> + MESH_NETWORK_IFACE)
> + mesh_net.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
> +
> + app = Application(bus)
> +
> + # Provisioning agent
> + if agent != None:
> + app.set_agent(agent.Agent(bus))
> +
> + first_ele = Element(bus, 0x00)
> + second_ele = Element(bus, 0x01)
> +
> + print(set_yellow('Register OnOff Server model on element 0'))
> + first_ele.add_model(OnOffServer(0x1000))
> +
> + print(set_yellow('Register OnOff Client model on element 1'))
> + second_ele.add_model(OnOffClient(0x1001))
> + app.add_element(first_ele)
> + app.add_element(second_ele)
> +
> + mainloop = GLib.MainLoop()
> +
> + main_menu()
> + event_catcher = MenuHandler(process_input);
> + mainloop.run()
> +
> +if __name__ == '__main__':
> + main()
prev parent reply other threads:[~2019-03-11 22:20 UTC|newest]
Thread overview: 2+ messages / expand[flat|nested] mbox.gz Atom feed top
2019-03-10 7:53 [PATCH BlueZ v3] test: Add unified test for mesh node example app Inga Stotland
2019-03-11 22:20 ` Gix, Brian [this message]
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=1552342848.29289.3.camel@intel.com \
--to=brian.gix@intel.com \
--cc=inga.stotland@intel.com \
--cc=johan.hedberg@gmail.com \
--cc=linux-bluetooth@vger.kernel.org \
--cc=luiz.dentz@gmail.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).