qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
From: John Snow <jsnow@redhat.com>
To: qemu-devel@nongnu.org
Cc: Eduardo Habkost <ehabkost@redhat.com>,
	Eric Blake <eblake@redhat.com>,
	Stefan Hajnoczi <stefanha@redhat.com>,
	Markus Armbruster <armbru@redhat.com>,
	Wainer dos Santos Moschetta <wainersm@redhat.com>,
	"Niteesh G . S ." <niteesh.gs@gmail.com>,
	Willian Rampazzo <wrampazz@redhat.com>,
	Cleber Rosa <crosa@redhat.com>, John Snow <jsnow@redhat.com>
Subject: [PATCH v4 09/27] python/aqmp: add AsyncProtocol.accept() method
Date: Wed, 15 Sep 2021 12:29:37 -0400	[thread overview]
Message-ID: <20210915162955.333025-10-jsnow@redhat.com> (raw)
In-Reply-To: <20210915162955.333025-1-jsnow@redhat.com>

It's a little messier than connect, because it wasn't designed to accept
*precisely one* connection. Such is life.

Signed-off-by: John Snow <jsnow@redhat.com>
Reviewed-by: Eric Blake <eblake@redhat.com>
---
 python/qemu/aqmp/protocol.py | 89 ++++++++++++++++++++++++++++++++++--
 1 file changed, 85 insertions(+), 4 deletions(-)

diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py
index 1dfd12895d..62c26ede5a 100644
--- a/python/qemu/aqmp/protocol.py
+++ b/python/qemu/aqmp/protocol.py
@@ -243,6 +243,24 @@ async def runstate_changed(self) -> Runstate:
         await self._runstate_event.wait()
         return self.runstate
 
+    @upper_half
+    @require(Runstate.IDLE)
+    async def accept(self, address: Union[str, Tuple[str, int]],
+                     ssl: Optional[SSLContext] = None) -> None:
+        """
+        Accept a connection and begin processing message queues.
+
+        If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
+
+        :param address:
+            Address to listen to; UNIX socket path or TCP address/port.
+        :param ssl: SSL context to use, if any.
+
+        :raise StateError: When the `Runstate` is not `IDLE`.
+        :raise ConnectError: If a connection could not be accepted.
+        """
+        await self._new_session(address, ssl, accept=True)
+
     @upper_half
     @require(Runstate.IDLE)
     async def connect(self, address: Union[str, Tuple[str, int]],
@@ -308,7 +326,8 @@ def _set_state(self, state: Runstate) -> None:
     @upper_half
     async def _new_session(self,
                            address: Union[str, Tuple[str, int]],
-                           ssl: Optional[SSLContext] = None) -> None:
+                           ssl: Optional[SSLContext] = None,
+                           accept: bool = False) -> None:
         """
         Establish a new connection and initialize the session.
 
@@ -317,9 +336,10 @@ async def _new_session(self,
         to be set back to `IDLE`.
 
         :param address:
-            Address to connect to;
+            Address to connect to/listen on;
             UNIX socket path or TCP address/port.
         :param ssl: SSL context to use, if any.
+        :param accept: Accept a connection instead of connecting when `True`.
 
         :raise ConnectError:
             When a connection or session cannot be established.
@@ -333,7 +353,7 @@ async def _new_session(self,
 
         try:
             phase = "connection"
-            await self._establish_connection(address, ssl)
+            await self._establish_connection(address, ssl, accept)
 
             phase = "session"
             await self._establish_session()
@@ -367,6 +387,7 @@ async def _establish_connection(
             self,
             address: Union[str, Tuple[str, int]],
             ssl: Optional[SSLContext] = None,
+            accept: bool = False
     ) -> None:
         """
         Establish a new connection.
@@ -375,6 +396,7 @@ async def _establish_connection(
             Address to connect to/listen on;
             UNIX socket path or TCP address/port.
         :param ssl: SSL context to use, if any.
+        :param accept: Accept a connection instead of connecting when `True`.
         """
         assert self.runstate == Runstate.IDLE
         self._set_state(Runstate.CONNECTING)
@@ -384,7 +406,66 @@ async def _establish_connection(
         # otherwise yield.
         await asyncio.sleep(0)
 
-        await self._do_connect(address, ssl)
+        if accept:
+            await self._do_accept(address, ssl)
+        else:
+            await self._do_connect(address, ssl)
+
+    @upper_half
+    async def _do_accept(self, address: Union[str, Tuple[str, int]],
+                         ssl: Optional[SSLContext] = None) -> None:
+        """
+        Acting as the transport server, accept a single connection.
+
+        :param address:
+            Address to listen on; UNIX socket path or TCP address/port.
+        :param ssl: SSL context to use, if any.
+
+        :raise OSError: For stream-related errors.
+        """
+        self.logger.debug("Awaiting connection on %s ...", address)
+        connected = asyncio.Event()
+        server: Optional[asyncio.AbstractServer] = None
+
+        async def _client_connected_cb(reader: asyncio.StreamReader,
+                                       writer: asyncio.StreamWriter) -> None:
+            """Used to accept a single incoming connection, see below."""
+            nonlocal server
+            nonlocal connected
+
+            # A connection has been accepted; stop listening for new ones.
+            assert server is not None
+            server.close()
+            await server.wait_closed()
+            server = None
+
+            # Register this client as being connected
+            self._reader, self._writer = (reader, writer)
+
+            # Signal back: We've accepted a client!
+            connected.set()
+
+        if isinstance(address, tuple):
+            coro = asyncio.start_server(
+                _client_connected_cb,
+                host=address[0],
+                port=address[1],
+                ssl=ssl,
+                backlog=1,
+            )
+        else:
+            coro = asyncio.start_unix_server(
+                _client_connected_cb,
+                path=address,
+                ssl=ssl,
+                backlog=1,
+            )
+
+        server = await coro     # Starts listening
+        await connected.wait()  # Waits for the callback to fire (and finish)
+        assert server is None
+
+        self.logger.debug("Connection accepted.")
 
     @upper_half
     async def _do_connect(self, address: Union[str, Tuple[str, int]],
-- 
2.31.1



  parent reply	other threads:[~2021-09-15 16:43 UTC|newest]

Thread overview: 29+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-09-15 16:29 [PATCH v4 00/27] python: introduce Asynchronous QMP package John Snow
2021-09-15 16:29 ` [PATCH v4 01/27] python/aqmp: add asynchronous QMP (AQMP) subpackage John Snow
2021-09-15 16:29 ` [PATCH v4 02/27] python/aqmp: add error classes John Snow
2021-09-15 16:29 ` [PATCH v4 03/27] python/pylint: Add exception for TypeVar names ('T') John Snow
2021-09-15 16:29 ` [PATCH v4 04/27] python/aqmp: add asyncio compatibility wrappers John Snow
2021-09-15 16:29 ` [PATCH v4 05/27] python/aqmp: add generic async message-based protocol support John Snow
2021-09-15 16:29 ` [PATCH v4 06/27] python/aqmp: add runstate state machine to AsyncProtocol John Snow
2021-09-15 16:29 ` [PATCH v4 07/27] python/aqmp: Add logging utility helpers John Snow
2021-09-15 16:29 ` [PATCH v4 08/27] python/aqmp: add logging to AsyncProtocol John Snow
2021-09-15 16:29 ` John Snow [this message]
2021-09-15 16:29 ` [PATCH v4 10/27] python/aqmp: add configurable read buffer limit John Snow
2021-09-15 16:29 ` [PATCH v4 11/27] python/aqmp: add _cb_inbound and _cb_outbound logging hooks John Snow
2021-09-15 16:29 ` [PATCH v4 12/27] python/aqmp: add AsyncProtocol._readline() method John Snow
2021-09-15 16:29 ` [PATCH v4 13/27] python/aqmp: add QMP Message format John Snow
2021-09-15 16:29 ` [PATCH v4 14/27] python/aqmp: add well-known QMP object models John Snow
2021-09-15 16:29 ` [PATCH v4 15/27] python/aqmp: add QMP event support John Snow
2021-09-15 16:29 ` [PATCH v4 16/27] python/pylint: disable too-many-function-args John Snow
2021-09-15 16:29 ` [PATCH v4 17/27] python/aqmp: add QMP protocol support John Snow
2021-09-15 16:29 ` [PATCH v4 18/27] python/pylint: disable no-member check John Snow
2021-09-15 16:29 ` [PATCH v4 19/27] python/aqmp: Add message routing to QMP protocol John Snow
2021-09-15 16:29 ` [PATCH v4 20/27] python/aqmp: add execute() interfaces John Snow
2021-09-15 16:29 ` [PATCH v4 21/27] python/aqmp: add _raw() execution interface John Snow
2021-09-15 16:29 ` [PATCH v4 22/27] python/aqmp: add asyncio_run compatibility wrapper John Snow
2021-09-15 16:29 ` [PATCH v4 23/27] python/aqmp: add scary message John Snow
2021-09-15 16:29 ` [PATCH v4 24/27] python: bump avocado to v90.0 John Snow
2021-09-15 16:29 ` [PATCH v4 25/27] python/aqmp: add AsyncProtocol unit tests John Snow
2021-09-15 16:29 ` [PATCH v4 26/27] python/aqmp: add LineProtocol tests John Snow
2021-09-15 16:29 ` [PATCH v4 27/27] python/aqmp: Add Coverage.py support John Snow
2021-09-20 19:51 ` [PATCH v4 00/27] python: introduce Asynchronous QMP package John Snow

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210915162955.333025-10-jsnow@redhat.com \
    --to=jsnow@redhat.com \
    --cc=armbru@redhat.com \
    --cc=crosa@redhat.com \
    --cc=eblake@redhat.com \
    --cc=ehabkost@redhat.com \
    --cc=niteesh.gs@gmail.com \
    --cc=qemu-devel@nongnu.org \
    --cc=stefanha@redhat.com \
    --cc=wainersm@redhat.com \
    --cc=wrampazz@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).