qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
From: John Snow <jsnow@redhat.com>
To: qemu-devel@nongnu.org
Cc: Kevin Wolf <kwolf@redhat.com>,
	Peter Maydell <peter.maydell@linaro.org>,
	Daniel Berrange <berrange@redhat.com>,
	Beraldo Leal <bleal@redhat.com>, Cleber Rosa <crosa@redhat.com>,
	John Snow <jsnow@redhat.com>
Subject: [PATCH 04/10] python/aqmp: split _client_connected_cb() out as _incoming()
Date: Fri, 25 Feb 2022 15:59:42 -0500	[thread overview]
Message-ID: <20220225205948.3693480-5-jsnow@redhat.com> (raw)
In-Reply-To: <20220225205948.3693480-1-jsnow@redhat.com>

As part of disentangling the monolithic nature of _do_accept(), split
out the incoming callback to prepare for factoring out the "wait for a
peer" step. Namely, this means using an event signal we can wait on from
outside of this method.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/qemu/aqmp/protocol.py | 83 +++++++++++++++++++++++++-----------
 1 file changed, 58 insertions(+), 25 deletions(-)

diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py
index b7e5e635d8..56f05b9030 100644
--- a/python/qemu/aqmp/protocol.py
+++ b/python/qemu/aqmp/protocol.py
@@ -242,6 +242,10 @@ def __init__(self, name: Optional[str] = None) -> None:
         # Workaround for bind()
         self._sock: Optional[socket.socket] = None
 
+        # Server state for start_server() and _incoming()
+        self._server: Optional[asyncio.AbstractServer] = None
+        self._accepted: Optional[asyncio.Event] = None
+
     def __repr__(self) -> str:
         cls_name = type(self).__name__
         tokens = []
@@ -425,6 +429,54 @@ def _set_state(self, state: Runstate) -> None:
         self._runstate_event.set()
         self._runstate_event.clear()
 
+    @bottom_half  # However, it does not run from the R/W tasks.
+    async def _stop_server(self) -> None:
+        """
+        Stop listening for / accepting new incoming connections.
+        """
+        if self._server is None:
+            return
+
+        try:
+            self.logger.debug("Stopping server.")
+            self._server.close()
+            await self._server.wait_closed()
+            self.logger.debug("Server stopped.")
+        finally:
+            self._server = None
+
+    @bottom_half  # However, it does not run from the R/W tasks.
+    async def _incoming(self,
+                        reader: asyncio.StreamReader,
+                        writer: asyncio.StreamWriter) -> None:
+        """
+        Accept an incoming connection and signal the upper_half.
+
+        This method does the minimum necessary to accept a single
+        incoming connection. It signals back to the upper_half ASAP so
+        that any errors during session initialization can occur
+        naturally in the caller's stack.
+
+        :param reader: Incoming `asyncio.StreamReader`
+        :param writer: Incoming `asyncio.StreamWriter`
+        """
+        peer = writer.get_extra_info('peername', 'Unknown peer')
+        self.logger.debug("Incoming connection from %s", peer)
+
+        if self._reader or self._writer:
+            # Sadly, we can have more than one pending connection
+            # because of https://bugs.python.org/issue46715
+            # Close any extra connections we don't actually want.
+            self.logger.warning("Extraneous connection inadvertently accepted")
+            writer.close()
+            return
+
+        # A connection has been accepted; stop listening for new ones.
+        assert self._accepted is not None
+        await self._stop_server()
+        self._reader, self._writer = (reader, writer)
+        self._accepted.set()
+
     def _bind_hack(self, address: Union[str, Tuple[str, int]]) -> None:
         """
         Used to create a socket in advance of accept().
@@ -469,30 +521,11 @@ async def _do_accept(self, address: SocketAddrT,
         self._set_state(Runstate.CONNECTING)
 
         self.logger.debug("Awaiting connection on %s ...", address)
-        connected = asyncio.Event()
-        server: Optional[asyncio.AbstractServer] = None
-
-        async def _client_connected_cb(reader: asyncio.StreamReader,
-                                       writer: asyncio.StreamWriter) -> None:
-            """Used to accept a single incoming connection, see below."""
-            nonlocal server
-            nonlocal connected
-
-            # A connection has been accepted; stop listening for new ones.
-            assert server is not None
-            server.close()
-            await server.wait_closed()
-            server = None
-
-            # Register this client as being connected
-            self._reader, self._writer = (reader, writer)
-
-            # Signal back: We've accepted a client!
-            connected.set()
+        self._accepted = asyncio.Event()
 
         if isinstance(address, tuple):
             coro = asyncio.start_server(
-                _client_connected_cb,
+                self._incoming,
                 host=None if self._sock else address[0],
                 port=None if self._sock else address[1],
                 ssl=ssl,
@@ -502,7 +535,7 @@ async def _client_connected_cb(reader: asyncio.StreamReader,
             )
         else:
             coro = asyncio.start_unix_server(
-                _client_connected_cb,
+                self._incoming,
                 path=None if self._sock else address,
                 ssl=ssl,
                 backlog=1,
@@ -515,9 +548,9 @@ async def _client_connected_cb(reader: asyncio.StreamReader,
         # otherwise yield.
         await asyncio.sleep(0)
 
-        server = await coro     # Starts listening
-        await connected.wait()  # Waits for the callback to fire (and finish)
-        assert server is None
+        self._server = await coro    # Starts listening
+        await self._accepted.wait()  # Waits for the callback to finish
+        assert self._server is None
         self._sock = None
 
         self.logger.debug("Connection accepted.")
-- 
2.34.1



  parent reply	other threads:[~2022-02-25 21:08 UTC|newest]

Thread overview: 25+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2022-02-25 20:59 [PATCH 00/10] Python: Fix qmp race condition on accept() John Snow
2022-02-25 20:59 ` [PATCH 01/10] python/aqmp: add _session_guard() John Snow
2022-03-04 17:34   ` Daniel P. Berrangé
2022-02-25 20:59 ` [PATCH 02/10] python/aqmp: rename 'accept()' to 'start_server_and_accept()' John Snow
2022-03-04 17:48   ` Daniel P. Berrangé
2022-02-25 20:59 ` [PATCH 03/10] python/aqmp: remove _new_session and _establish_connection John Snow
2022-03-04 17:50   ` Daniel P. Berrangé
2022-02-25 20:59 ` John Snow [this message]
2022-03-04 17:53   ` [PATCH 04/10] python/aqmp: split _client_connected_cb() out as _incoming() Daniel P. Berrangé
2022-02-25 20:59 ` [PATCH 05/10] python/aqmp: squelch pylint warning for too many lines John Snow
2022-03-04 17:55   ` Daniel P. Berrangé
2022-02-25 20:59 ` [PATCH 06/10] python/aqmp: refactor _do_accept() into two distinct steps John Snow
2022-03-04 17:57   ` Daniel P. Berrangé
2022-02-25 20:59 ` [PATCH 07/10] python/aqmp: stop the server during disconnect() John Snow
2022-03-04 17:57   ` Daniel P. Berrangé
2022-02-25 20:59 ` [PATCH 08/10] python/aqmp: add start_server() and accept() methods John Snow
2022-03-04 17:59   ` Daniel P. Berrangé
2022-02-25 20:59 ` [PATCH 09/10] python/aqmp: fix race condition in legacy.py John Snow
2022-03-04 18:01   ` Daniel P. Berrangé
2022-03-04 18:23     ` John Snow
2022-02-25 20:59 ` [PATCH 10/10] python/aqmp: drop _bind_hack() John Snow
2022-03-04 18:03   ` Daniel P. Berrangé
2022-03-03 22:37 ` [PATCH 00/10] Python: Fix qmp race condition on accept() John Snow
2022-03-04 17:49 ` Kevin Wolf
2022-03-04 18:28   ` John Snow

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20220225205948.3693480-5-jsnow@redhat.com \
    --to=jsnow@redhat.com \
    --cc=berrange@redhat.com \
    --cc=bleal@redhat.com \
    --cc=crosa@redhat.com \
    --cc=kwolf@redhat.com \
    --cc=peter.maydell@linaro.org \
    --cc=qemu-devel@nongnu.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).