# -*- Mode: Python; coding: utf-8; indent-tabs-mode: nil; tab-width: 4 -*-
#
# Autopilot Functional Test Tool
# Copyright (C) 2012-2013 Canonical
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""This module contains the code to retrieve state via DBus calls.
Under normal circumstances, the only thing you should need to use from this
module is the DBusIntrospectableObject class.
"""
from __future__ import absolute_import
from contextlib import contextmanager
import sys
import logging
import re
import six
from uuid import uuid4
from autopilot.introspection.types import create_value_instance
from autopilot.introspection.utilities import translate_state_keys
from autopilot.utilities import (
get_debug_logger,
sleep,
Timer,
)
_object_registry = {}
_logger = logging.getLogger(__name__)
[docs]class StateNotFoundError(RuntimeError):
"""Raised when a piece of state information is not found.
This exception is commonly raised when the application has destroyed (or
not yet created) the object you are trying to access in autopilot. This
typically happens for a number of possible reasons:
* The UI widget you are trying to access with
:py:meth:`~DBusIntrospectionObject.select_single` or
:py:meth:`~DBusIntrospectionObject.wait_select_single` or
:py:meth:`~DBusIntrospectionObject.select_many` does not exist yet.
* The UI widget you are trying to access has been destroyed by the
application.
"""
def __init__(self, class_name=None, **filters):
"""Construct a StateNotFoundError.
:raises ValueError: if neither the class name not keyword arguments
are specified.
"""
if class_name is None and not filters:
raise ValueError("Must specify either class name or filters.")
if class_name is None:
self._message = \
u"Object not found with properties {}.".format(
repr(filters)
)
elif not filters:
self._message = u"Object not found with name '{}'.".format(
class_name
)
else:
self._message = \
u"Object not found with name '{}' and properties {}.".format(
class_name,
repr(filters)
)
def __str__(self):
if six.PY3:
return self._message
else:
return self._message.encode('utf8')
def __unicode__(self):
return self._message
class IntrospectableObjectMetaclass(type):
"""Metaclass to insert appropriate classes into the object registry."""
def __new__(cls, classname, bases, classdict):
"""Add class name to type registry."""
class_object = type.__new__(cls, classname, bases, classdict)
if classname in (
'ApplicationProxyObject',
'CustomEmulatorBase',
'DBusIntrospectionObject',
):
return class_object
if getattr(class_object, '_id', None) is not None:
if class_object._id in _object_registry:
_object_registry[class_object._id][classname] = class_object
else:
_object_registry[class_object._id] = {classname: class_object}
return class_object
def get_classname_from_path(object_path):
return object_path.split("/")[-1]
def _object_passes_filters(instance, **kwargs):
"""Return true if *instance* satisifies all the filters present in
kwargs."""
with instance.no_automatic_refreshing():
for attr, val in kwargs.items():
if not hasattr(instance, attr) or getattr(instance, attr) != val:
# Either attribute is not present, or is present but with
# the wrong value - don't add this instance to the results
# list.
return False
return True
DBusIntrospectionObjectBase = IntrospectableObjectMetaclass(
'DBusIntrospectionObjectBase',
(object,),
{}
)
[docs]class DBusIntrospectionObject(DBusIntrospectionObjectBase):
"""A class that supports transparent data retrieval from the application
under test.
This class is the base class for all objects retrieved from the application
under test. It handles transparently refreshing attribute values when
needed, and contains many methods to select child objects in the
introspection tree.
"""
def __init__(self, state_dict, path, backend):
self.__state = {}
self.__refresh_on_attribute = True
self._set_properties(state_dict)
self._path = path
self._poll_time = 10
self._backend = backend
def _set_properties(self, state_dict):
"""Creates and set attributes of *self* based on contents of
*state_dict*.
.. note:: Translates '-' to '_', so a key of 'icon-type' for example
becomes 'icon_type'.
"""
self.__state = {}
for key, value in translate_state_keys(state_dict).items():
# don't store id in state dictionary -make it a proper instance
# attribute
if key == 'id':
self.id = int(value[1])
try:
self.__state[key] = create_value_instance(value, self, key)
except ValueError as e:
_logger.warning(
"While constructing attribute '%s.%s': %s",
self.__class__.__name__,
key,
str(e)
)
[docs] def get_children_by_type(self, desired_type, **kwargs):
"""Get a list of children of the specified type.
Keyword arguments can be used to restrict returned instances. For
example::
get_children_by_type('Launcher', monitor=1)
will return only Launcher instances that have an attribute 'monitor'
that is equal to 1. The type can also be specified as a string, which
is useful if there is no emulator class specified::
get_children_by_type('Launcher', monitor=1)
Note however that if you pass a string, and there is an emulator class
defined, autopilot will not use it.
:param desired_type: Either a string naming the type you want, or a
class of the type you want (the latter is used when defining
custom emulators)
.. seealso::
Tutorial Section :ref:`custom_proxy_classes`
"""
#TODO: if kwargs has exactly one item in it we should specify the
# restriction in the XPath query, so it gets processed in the Unity C++
# code rather than in Python.
instances = self.get_children()
result = []
for instance in instances:
# Skip items that are not instances of the desired type:
if isinstance(desired_type, six.string_types):
if instance.__class__.__name__ != desired_type:
continue
elif not isinstance(instance, desired_type):
continue
#skip instances that fail attribute check:
if _object_passes_filters(instance, **kwargs):
result.append(instance)
return result
[docs] def get_properties(self):
"""Returns a dictionary of all the properties on this class.
This can be useful when you want to log all the properties exported
from your application for a particular object. Every property in the
returned dictionary can be accessed as attributes of the object as
well.
"""
# Since we're grabbing __state directly there's no implied state
# refresh, so do it manually:
self.refresh_state()
props = self.__state.copy()
props['id'] = self.id
return props
[docs] def get_children(self):
"""Returns a list of all child objects.
This returns a list of all children. To return only children of a
specific type, use :meth:`get_children_by_type`. To get objects
further down the introspection tree (i.e.- nodes that may not
necessarily be immeadiate children), use :meth:`select_single` and
:meth:`select_many`.
"""
# Thomi: 2014-03-20: There used to be a call to 'self.refresh_state()'
# here. That's not needed, since the only thing we use is the proxy
# path, which isn't affected by the current state.
query = self.get_class_query_string() + "/*"
state_dicts = self.get_state_by_path(query)
children = [self.make_introspection_object(i) for i in state_dicts]
return children
[docs] def get_parent(self):
"""Returns the parent of this object.
If this object has no parent (i.e.- it is the root of the introspection
tree). Then it returns itself.
"""
query = self.get_class_query_string() + "/.."
parent_state_dicts = self.get_state_by_path(query)
parent = self.make_introspection_object(parent_state_dicts[0])
return parent
[docs] def select_single(self, type_name='*', **kwargs):
"""Get a single node from the introspection tree, with type equal to
*type_name* and (optionally) matching the keyword filters present in
*kwargs*.
You must specify either *type_name*, keyword filters or both.
This method searches recursively from the instance this method is
called on. Calling :meth:`select_single` on the application (root)
proxy object will search the entire tree. Calling
:meth:`select_single` on an object in the tree will only search it's
descendants.
Example usage::
app.select_single('QPushButton', objectName='clickme')
# returns a QPushButton whose 'objectName' property is 'clickme'.
If nothing is returned from the query, this method raises
StateNotFoundError.
:param type_name: Either a string naming the type you want, or a class
of the appropriate type (the latter case is for overridden emulator
classes).
:raises ValueError: if the query returns more than one item. *If
you want more than one item, use select_many instead*.
:raises TypeError: if neither *type_name* or keyword filters are
provided.
:raises StateNotFoundError: if the requested object was not found.
.. seealso::
Tutorial Section :ref:`custom_proxy_classes`
"""
instances = self.select_many(type_name, **kwargs)
if len(instances) > 1:
raise ValueError("More than one item was returned for query")
if not instances:
raise StateNotFoundError(type_name, **kwargs)
return instances[0]
[docs] def wait_select_single(self, type_name='*', **kwargs):
"""Get a proxy object matching some search criteria, retrying if no
object is found until a timeout is reached.
This method is identical to the :meth:`select_single` method, except
that this method will poll the application under test for 10 seconds
in the event that the search criteria does not match anything.
This method will return single proxy object from the introspection
tree, with type equal to *type_name* and (optionally) matching the
keyword filters present in *kwargs*.
You must specify either *type_name*, keyword filters or both.
This method searches recursively from the proxy object this method is
called on. Calling :meth:`select_single` on the application (root)
proxy object will search the entire tree. Calling
:meth:`select_single` on an object in the tree will only search it's
descendants.
Example usage::
app.wait_select_single('QPushButton', objectName='clickme')
# returns a QPushButton whose 'objectName' property is 'clickme'.
# will poll the application until such an object exists, or will
# raise StateNotFoundError after 10 seconds.
If nothing is returned from the query, this method raises
StateNotFoundError after 10 seconds.
:param type_name: Either a string naming the type you want, or a class
of the appropriate type (the latter case is for overridden emulator
classes).
:raises ValueError: if the query returns more than one item. *If
you want more than one item, use select_many instead*.
:raises TypeError: if neither *type_name* or keyword filters are
provided.
:raises StateNotFoundError: if the requested object was not found.
.. seealso::
Tutorial Section :ref:`custom_proxy_classes`
"""
for i in range(self._poll_time):
try:
return self.select_single(type_name, **kwargs)
except StateNotFoundError:
if i == self._poll_time - 1:
raise
sleep(1)
[docs] def select_many(self, type_name='*', **kwargs):
"""Get a list of nodes from the introspection tree, with type equal to
*type_name* and (optionally) matching the keyword filters present in
*kwargs*.
You must specify either *type_name*, keyword filters or both.
This method searches recursively from the instance this method is
called on. Calling :meth:`select_many` on the application (root) proxy
object will search the entire tree. Calling :meth:`select_many` on an
object in the tree will only search it's descendants.
Example Usage::
app.select_many('QPushButton', enabled=True)
# returns a list of QPushButtons that are enabled.
As mentioned above, this method searches the object tree recursively::
file_menu = app.select_one('QMenu', title='File')
file_menu.select_many('QAction')
# returns a list of QAction objects who appear below file_menu in
# the object tree.
.. warning::
The order in which objects are returned is not guaranteed. It is
bad practise to write tests that depend on the order in which
this method returns objects. (see :ref:`object_ordering` for more
information).
If you only want to get one item, use :meth:`select_single` instead.
:param type_name: Either a string naming the type you want, or a class
of the appropriate type (the latter case is for overridden emulator
classes).
:raises TypeError: if neither *type_name* or keyword filters are
provided.
.. seealso::
Tutorial Section :ref:`custom_proxy_classes`
"""
if not isinstance(type_name, str) and issubclass(
type_name, DBusIntrospectionObject):
type_name = type_name.__name__
if type_name == "*" and not kwargs:
raise TypeError("You must specify either a type name or a filter.")
_logger.debug(
"Selecting objects of %s with attributes: %r",
'any type' if type_name == '*' else 'type ' + type_name, kwargs)
server_side_filters = []
client_side_filters = {}
for k, v in kwargs.items():
# LP Bug 1209029: The XPathSelect protocol does not allow all valid
# node names or values. We need to decide here whether the filter
# parameters are going to work on the backend or not. If not, we
# just do the processing client-side. See the
# _is_valid_server_side_filter_param function (below) for the
# specific requirements.
if _is_valid_server_side_filter_param(k, v):
server_side_filters.append(
_get_filter_string_for_key_value_pair(k, v)
)
else:
client_side_filters[k] = v
filter_str = '[{}]'.format(','.join(server_side_filters)) \
if server_side_filters else ""
query_path = "%s//%s%s" % (
self.get_class_query_string(),
type_name,
filter_str
)
state_dicts = self.get_state_by_path(query_path)
instances = [self.make_introspection_object(i) for i in state_dicts]
return [i for i in instances
if _object_passes_filters(i, **client_side_filters)]
[docs] def refresh_state(self):
"""Refreshes the object's state.
You should probably never have to call this directly. Autopilot
automatically retrieves new state every time this object's attributes
are read.
:raises StateNotFound: if the object in the application under test
has been destroyed.
"""
_, new_state = self.get_new_state()
self._set_properties(new_state)
[docs] def get_all_instances(self):
"""Get all instances of this class that exist within the Application
state tree.
For example, to get all the LauncherIcon instances::
icons = LauncherIcon.get_all_instances()
.. warning::
Using this method is slow - it requires a complete scan of the
introspection tree. You should only use this when you're not sure
where the objects you are looking for are located. Depending on
the application you are testing, you may get duplicate results
using this method.
:return: List (possibly empty) of class instances.
"""
cls_name = type(self).__name__
instances = self.get_state_by_path("//%s" % (cls_name))
return [self.make_introspection_object(i) for i in instances]
[docs] def get_root_instance(self):
"""Get the object at the root of this tree.
This will return an object that represents the root of the
introspection tree.
"""
instances = self.get_state_by_path("/")
if len(instances) != 1:
_logger.error("Could not retrieve root object.")
return None
return self.make_introspection_object(instances[0])
def __getattr__(self, name):
# avoid recursion if for some reason we have no state set (should never
# happen).
if name == '__state':
raise AttributeError()
if name in self.__state:
if self.__refresh_on_attribute:
self.refresh_state()
return self.__state[name]
# attribute not found.
raise AttributeError(
"Class '%s' has no attribute '%s'." %
(self.__class__.__name__, name))
[docs] def get_state_by_path(self, piece):
"""Get state for a particular piece of the state tree.
You should probably never need to call this directly.
:param piece: an XPath-like query that specifies which bit of the tree
you want to look at.
:raises TypeError: on invalid *piece* parameter.
"""
if not isinstance(piece, six.string_types):
raise TypeError(
"XPath query must be a string, not %r", type(piece))
with Timer("GetState %s" % piece):
data = self._backend.introspection_iface.GetState(piece)
if len(data) > 15:
_logger.warning(
"Your query '%s' returned a lot of data (%d items). This "
"is likely to be slow. You may want to consider optimising"
" your query to return fewer items.",
piece,
len(data)
)
return data
[docs] def get_new_state(self):
"""Retrieve a new state dictionary for this class instance.
You should probably never need to call this directly.
.. note:: The state keys in the returned dictionary are not translated.
"""
try:
return self.get_state_by_path(self.get_class_query_string())[0]
except IndexError:
raise StateNotFoundError(self.__class__.__name__, id=self.id)
[docs] def wait_until_destroyed(self, timeout=10):
"""Block until this object is destroyed in the application.
Block until the object this instance is a proxy for has been destroyed
in the applicaiton under test. This is commonly used to wait until a
UI component has been destroyed.
:param timeout: The number of seconds to wait for the object to be
destroyed. If not specified, defaults to 10 seconds.
:raises RuntimeError: if the method timed out.
"""
for i in range(timeout):
try:
self.get_new_state()
sleep(1)
except StateNotFoundError:
return
else:
raise RuntimeError(
"Object was not destroyed after %d seconds" % timeout
)
[docs] def get_class_query_string(self):
"""Get the XPath query string required to refresh this class's
state."""
if not self._path.startswith('/'):
return "//" + self._path + "[id=%d]" % self.id
else:
return self._path + "[id=%d]" % self.id
[docs] def make_introspection_object(self, dbus_tuple):
"""Make an introspection object given a DBus tuple of
(path, state_dict).
This only works for classes that derive from DBusIntrospectionObject.
:returns: A proxy object that derives from DBusIntrospectionObject
:raises ValueError: if more than one class is appropriate for this
dbus_tuple
"""
path, state = dbus_tuple
class_object = _get_proxy_object_class(
_object_registry[self._id], type(self), path, state)
return class_object(state, path, self._backend)
[docs] def print_tree(self, output=None, maxdepth=None, _curdepth=0):
"""Print properties of the object and its children to a stream.
When writing new tests, this can be called when it is too difficult to
find the widget or property that you are interested in in "vis".
.. warning:: Do not use this in production tests, this is expensive and
not at all appropriate for actual testing. Only call this
temporarily and replace with proper select_single/select_many
calls.
:param output: A file object or path name where the output will be
written to. If not given, write to stdout.
:param maxdepth: If given, limit the maximum recursion level to that
number, i. e. only print children which have at most maxdepth-1
intermediate parents.
"""
if maxdepth is not None and _curdepth > maxdepth:
return
indent = " " * _curdepth
if output is None:
output = sys.stdout
elif isinstance(output, six.string_types):
output = open(output, 'w')
# print path
if _curdepth > 0:
output.write("\n")
output.write("%s== %s ==\n" % (indent, self._path))
# Thomi 2014-03-20: For all levels other than the top level, we can
# avoid an entire dbus round trip if we grab the underlying property
# dictionary directly. We can do this since the print_tree function
# that called us will have retrieved us via a call to get_children(),
# which gets the latest state anyway.
if _curdepth > 0:
properties = self.__state.copy()
else:
properties = self.get_properties()
# print properties
try:
for key in sorted(properties.keys()):
output.write("%s%s: %r\n" % (indent, key, properties[key]))
# print children
if maxdepth is None or _curdepth < maxdepth:
for c in self.get_children():
c.print_tree(output, maxdepth, _curdepth + 1)
except StateNotFoundError as error:
output.write("%sError: %s\n" % (indent, error))
@contextmanager
[docs] def no_automatic_refreshing(self):
"""Context manager function to disable automatic DBus refreshing when
retrieving attributes.
Example usage:
with instance.no_automatic_refreshing():
# access lots of attributes.
This can be useful if you need to check lots of attributes in a tight
loop, or if you want to atomicaly check several attributes at once.
"""
try:
self.__refresh_on_attribute = False
yield
finally:
self.__refresh_on_attribute = True
@classmethod
[docs] def validate_dbus_object(cls, path, _state):
"""Return whether this class is the appropriate proxy object class for
a given dbus path and state.
The default version matches the name of the dbus object and the class.
Subclasses of CustomProxyObject can override it to define a different
validation method.
:param path: The dbus path of the object to check
:param state: The dbus state dict of the object to check
(ignored in default implementation)
:returns: Whether this class is appropriate for the dbus object
"""
name = get_classname_from_path(path)
return cls.__name__ == name
def _is_valid_server_side_filter_param(key, value):
"""Return True if the key and value parameters are valid for server-side
processing.
"""
key_is_valid = re.match(
r'^[a-zA-Z0-9_\-]+( [a-zA-Z0-9_\-])*$',
key
) is not None
if type(value) == int:
return key_is_valid and (-2**31 <= value <= 2**31 - 1)
elif type(value) == bool:
return key_is_valid
elif type(value) == six.binary_type:
return key_is_valid
elif type(value) == six.text_type:
try:
value.encode('ascii')
return key_is_valid
except UnicodeEncodeError:
pass
return False
def _get_filter_string_for_key_value_pair(key, value):
"""Return a string representing the filter query for this key/value pair.
The value must be suitable for server-side filtering. Raises ValueError if
this is not the case.
"""
if isinstance(value, six.text_type):
if six.PY3:
escaped_value = value.encode("unicode_escape")\
.decode('ASCII')\
.replace("'", "\\'")
else:
escaped_value = value.encode('utf-8').encode("string_escape")
# note: string_escape codec escapes "'" but not '"'...
escaped_value = escaped_value.replace('"', r'\"')
return '{}="{}"'.format(key, escaped_value)
elif isinstance(value, six.binary_type):
if six.PY3:
escaped_value = value.decode('utf-8')\
.encode("unicode_escape")\
.decode('ASCII')\
.replace("'", "\\'")
else:
escaped_value = value.encode("string_escape")
# note: string_escape codec escapes "'" but not '"'...
escaped_value = escaped_value.replace('"', r'\"')
return '{}="{}"'.format(key, escaped_value)
elif isinstance(value, int) or isinstance(value, bool):
return "{}={}".format(key, repr(value))
else:
raise ValueError("Unsupported value type: {}".format(type(value)))
def _get_proxy_object_class(proxy_class_dict, default_class, path, state):
"""Return a custom proxy class, either from the list or the default.
Use helper functions to check the class list or return the default.
:param proxy_class_dict: dict of proxy classes to try
:param default_class: default class to use if nothing in dict matches
:param path: dbus path
:param state: dbus state
:returns: appropriate custom proxy class
:raises ValueError: if more than one class in the dict matches
"""
class_type = _try_custom_proxy_classes(proxy_class_dict, path, state)
if class_type:
return class_type
return _get_default_proxy_class(default_class,
get_classname_from_path(path))
def _try_custom_proxy_classes(proxy_class_dict, path, state):
"""Identify which custom proxy class matches the dbus path and state.
If more than one class in proxy_class_dict matches, raise an exception.
:param proxy_class_dict: dict of proxy classes to try
:param path: dbus path
:param state: dbus state dict
:returns: matching custom proxy class
:raises ValueError: if more than one class matches
"""
possible_classes = [c for c in proxy_class_dict.values() if
c.validate_dbus_object(path, state)]
if len(possible_classes) > 1:
raise ValueError(
'More than one custom proxy class matches this object: '
'Matching classes are: %s. State is %s. Path is %s.'
','.join([repr(c) for c in possible_classes]),
repr(state),
path)
if len(possible_classes) == 1:
return possible_classes[0]
return None
def _get_default_proxy_class(default_class, name):
"""Return a custom proxy object class of the default or a base class.
We want the object to inherit from CustomEmulatorBase, not the object
class that is doing the selecting.
:param default_class: default class to use if no bases match
:param name: name of new class
:returns: custom proxy object class
"""
get_debug_logger().warning(
"Generating introspection instance for type '%s' based on generic "
"class.", name)
for base in default_class.__bases__:
if issubclass(base, CustomEmulatorBase):
base_class = base
break
else:
base_class = default_class
return type(str(name), (base_class,), {})
class _CustomEmulatorMeta(IntrospectableObjectMetaclass):
def __new__(cls, name, bases, d):
# only consider classes derived from CustomEmulatorBase
if name != 'CustomEmulatorBase':
# and only if they don't already have an Id set.
have_id = False
for base in bases:
if hasattr(base, '_id'):
have_id = True
break
if not have_id:
d['_id'] = uuid4()
return super(_CustomEmulatorMeta, cls).__new__(cls, name, bases, d)
CustomEmulatorBase = _CustomEmulatorMeta('CustomEmulatorBase',
(DBusIntrospectionObject, ),
{})
CustomEmulatorBase.__doc__ = \
"""This class must be used as a base class for any custom emulators defined
within a test case.
.. seealso::
Tutorial Section :ref:`custom_proxy_classes`
Information on how to write custom emulators.
"""