qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
From: John Snow <jsnow@redhat.com>
To: qemu-devel@nongnu.org
Cc: Eduardo Habkost <ehabkost@redhat.com>,
	Eric Blake <eblake@redhat.com>,
	Stefan Hajnoczi <stefanha@redhat.com>,
	Markus Armbruster <armbru@redhat.com>,
	Wainer dos Santos Moschetta <wainersm@redhat.com>,
	"Niteesh G . S ." <niteesh.gs@gmail.com>,
	Willian Rampazzo <wrampazz@redhat.com>,
	Cleber Rosa <crosa@redhat.com>, John Snow <jsnow@redhat.com>
Subject: [PATCH 16/20] python/aqmp: Add message routing to QMP protocol
Date: Thu,  1 Jul 2021 00:13:09 -0400	[thread overview]
Message-ID: <20210701041313.1696009-17-jsnow@redhat.com> (raw)
In-Reply-To: <20210701041313.1696009-1-jsnow@redhat.com>

Add the ability to handle and route messages in qmp_protocol.py. The
interface for actually sending anything still isn't added until next
commit.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/qemu/aqmp/qmp_protocol.py | 98 +++++++++++++++++++++++++++++++-
 1 file changed, 96 insertions(+), 2 deletions(-)

diff --git a/python/qemu/aqmp/qmp_protocol.py b/python/qemu/aqmp/qmp_protocol.py
index 5872bfc017..04c8a8cb54 100644
--- a/python/qemu/aqmp/qmp_protocol.py
+++ b/python/qemu/aqmp/qmp_protocol.py
@@ -7,15 +7,18 @@
 incoming connection from that server.
 """
 
+# The import workarounds here are fixed in the next commit.
+import asyncio  # pylint: disable=unused-import # noqa
 import logging
 from typing import (
     Dict,
     List,
     Mapping,
     Optional,
+    Union,
 )
 
-from .error import ProtocolError
+from .error import AQMPError, ProtocolError
 from .events import Events
 from .message import Message
 from .models import Greeting
@@ -56,6 +59,53 @@ class NegotiationError(_WrappedProtocolError):
     """
 
 
+class ExecInterruptedError(AQMPError):
+    """
+    Exception raised when an RPC is interrupted.
+
+    This error is raised when an execute() statement could not be
+    completed.  This can occur because the connection itself was
+    terminated before a reply was received.
+
+    The true cause of the interruption will be available via `disconnect()`.
+    """
+
+
+class _MsgProtocolError(ProtocolError):
+    """
+    Abstract error class for protocol errors that have a `Message` object.
+
+    This Exception class is used for protocol errors where the `Message`
+    was mechanically understood, but was found to be inappropriate or
+    malformed.
+
+    :param error_message: Human-readable string describing the error.
+    :param msg: The QMP `Message` that caused the error.
+    """
+    def __init__(self, error_message: str, msg: Message):
+        super().__init__(error_message)
+        #: The received `Message` that caused the error.
+        self.msg: Message = msg
+
+    def __str__(self) -> str:
+        return "\n".join([
+            super().__str__(),
+            f"  Message was: {str(self.msg)}\n",
+        ])
+
+
+class ServerParseError(_MsgProtocolError):
+    """
+    The Server sent a `Message` indicating parsing failure.
+
+    i.e. A reply has arrived from the server, but it is missing the "ID"
+    field, indicating a parsing error.
+
+    :param error_message: Human-readable string describing the error.
+    :param msg: The QMP `Message` that caused the error.
+    """
+
+
 class QMP(AsyncProtocol[Message], Events):
     """
     Implements a QMP client connection.
@@ -98,6 +148,9 @@ async def run(self, address='/tmp/qemu.socket'):
     #: Logger object used for debugging messages.
     logger = logging.getLogger(__name__)
 
+    # Type alias for pending execute() result items
+    _PendingT = Union[Message, ExecInterruptedError]
+
     def __init__(self, name: Optional[str] = None) -> None:
         super().__init__(name)
         Events.__init__(self)
@@ -112,6 +165,9 @@ def __init__(self, name: Optional[str] = None) -> None:
         # Cached Greeting, if one was awaited.
         self._greeting: Optional[Greeting] = None
 
+        # Incoming RPC reply messages
+        self._pending: Dict[str, 'asyncio.Queue[QMP._PendingT]'] = {}
+
     @upper_half
     async def _begin_new_session(self) -> None:
         """
@@ -191,10 +247,27 @@ async def _negotiate(self) -> None:
             self.logger.error("%s:\n%s\n", emsg, pretty_traceback())
             raise
 
+    @bottom_half
+    async def _bh_disconnect(self, force: bool = False) -> None:
+        await super()._bh_disconnect(force)
+
+        if self._pending:
+            self.logger.debug("Cancelling pending executions")
+        keys = self._pending.keys()
+        for key in keys:
+            self.logger.debug("Cancelling execution '%s'", key)
+            self._pending[key].put_nowait(
+                ExecInterruptedError("Disconnected")
+            )
+
+        self.logger.debug("QMP Disconnected.")
+
     @bottom_half
     async def _on_message(self, msg: Message) -> None:
         """
         Add an incoming message to the appropriate queue/handler.
+
+        :raise ServerParseError: When Message has no 'event' nor 'id' member
         """
         # Incoming messages are not fully parsed/validated here;
         # do only light peeking to know how to route the messages.
@@ -204,7 +277,27 @@ async def _on_message(self, msg: Message) -> None:
             return
 
         # Below, we assume everything left is an execute/exec-oob response.
-        # ... Which we'll implement in the next commit!
+
+        if 'id' not in msg:
+            # This is (very likely) a server parsing error.
+            # It doesn't inherently belong to any pending execution.
+            # Instead of performing clever recovery, just terminate.
+            # See "NOTE" in qmp-spec.txt, section 2.4.2
+            raise ServerParseError("Server sent a message without an ID,"
+                                   " indicating parse failure.", msg)
+
+        assert 'id' in msg
+        exec_id = str(msg['id'])
+
+        if exec_id not in self._pending:
+            # qmp-spec.txt, section 2.4:
+            # 'Clients should drop all the responses
+            # that have an unknown "id" field.'
+            self.logger.warning("Unknown ID '%s', message dropped.", exec_id)
+            self.logger.debug("Unroutable message: %s", str(msg))
+            return
+
+        await self._pending[exec_id].put(msg)
 
     @upper_half
     @bottom_half
@@ -237,6 +330,7 @@ def _do_send(self, msg: Message) -> None:
     def _cleanup(self) -> None:
         super()._cleanup()
         self._greeting = None
+        assert not self._pending
 
     @classmethod
     def make_execute_msg(cls, cmd: str,
-- 
2.31.1



  parent reply	other threads:[~2021-07-01  4:23 UTC|newest]

Thread overview: 25+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-07-01  4:12 [PATCH 00/20] python: introduce Asynchronous QMP package John Snow
2021-07-01  4:12 ` [PATCH 01/20] python/pylint: Add exception for TypeVar names ('T') John Snow
2021-07-01  4:12 ` [PATCH 02/20] python/pylint: disable too-many-function-args John Snow
2021-07-01  4:12 ` [PATCH 03/20] python/aqmp: add asynchronous QMP (AQMP) subpackage John Snow
2021-07-01  4:12 ` [PATCH 04/20] python/aqmp: add error classes John Snow
2021-07-01  4:12 ` [PATCH 05/20] python/aqmp: add asyncio compatibility wrappers John Snow
2021-07-01  4:12 ` [PATCH 06/20] python/aqmp: add generic async message-based protocol support John Snow
2021-07-01  4:13 ` [PATCH 07/20] python/aqmp: add runstate state machine to AsyncProtocol John Snow
2021-07-01  4:13 ` [PATCH 08/20] python/aqmp: add logging " John Snow
2021-07-01  4:13 ` [PATCH 09/20] python/aqmp: add AsyncProtocol.accept() method John Snow
2021-07-01  4:13 ` [PATCH 10/20] python/aqmp: add _cb_inbound and _cb_inbound logging hooks John Snow
2021-07-01  4:13 ` [PATCH 11/20] python/aqmp: add AsyncProtocol._readline() method John Snow
2021-07-01  4:13 ` [PATCH 12/20] python/aqmp: add QMP Message format John Snow
2021-07-07 14:52   ` Niteesh G. S.
2021-07-08 16:50     ` John Snow
2021-07-01  4:13 ` [PATCH 13/20] python/aqmp: add well-known QMP object models John Snow
2021-07-01  4:13 ` [PATCH 14/20] python/aqmp: add QMP event support John Snow
2021-07-01  4:13 ` [PATCH 15/20] python/aqmp: add QMP protocol support John Snow
2021-07-01  4:13 ` John Snow [this message]
2021-07-01  4:13 ` [PATCH 17/20] python/aqmp: add execute() interfaces John Snow
2021-07-01  4:13 ` [PATCH 18/20] python/aqmp: add _raw() execution interface John Snow
2021-07-01  4:13 ` [PATCH 19/20] python/aqmp: add asyncio_run compatibility wrapper John Snow
2021-07-01  4:13 ` [PATCH 20/20] python/aqmp: add scary message John Snow
2021-07-05 13:19 ` [PATCH 00/20] python: introduce Asynchronous QMP package Stefan Hajnoczi
2021-07-08 13:24   ` John Snow

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210701041313.1696009-17-jsnow@redhat.com \
    --to=jsnow@redhat.com \
    --cc=armbru@redhat.com \
    --cc=crosa@redhat.com \
    --cc=eblake@redhat.com \
    --cc=ehabkost@redhat.com \
    --cc=niteesh.gs@gmail.com \
    --cc=qemu-devel@nongnu.org \
    --cc=stefanha@redhat.com \
    --cc=wainersm@redhat.com \
    --cc=wrampazz@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).