From: John Snow <jsnow@redhat.com>
To: qemu-devel@nongnu.org
Cc: Eduardo Habkost <ehabkost@redhat.com>,
Eric Blake <eblake@redhat.com>,
Stefan Hajnoczi <stefanha@redhat.com>,
Markus Armbruster <armbru@redhat.com>,
Wainer dos Santos Moschetta <wainersm@redhat.com>,
"Niteesh G . S ." <niteesh.gs@gmail.com>,
Willian Rampazzo <wrampazz@redhat.com>,
Cleber Rosa <crosa@redhat.com>, John Snow <jsnow@redhat.com>
Subject: [PATCH 09/20] python/aqmp: add AsyncProtocol.accept() method
Date: Thu, 1 Jul 2021 00:13:02 -0400 [thread overview]
Message-ID: <20210701041313.1696009-10-jsnow@redhat.com> (raw)
In-Reply-To: <20210701041313.1696009-1-jsnow@redhat.com>
It's a little messier than connect, because it wasn't designed to accept
*precisely one* connection. Such is life.
Signed-off-by: John Snow <jsnow@redhat.com>
---
python/qemu/aqmp/protocol.py | 85 ++++++++++++++++++++++++++++++++++--
1 file changed, 82 insertions(+), 3 deletions(-)
diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py
index dd8564ee02..a32a8cbbf6 100644
--- a/python/qemu/aqmp/protocol.py
+++ b/python/qemu/aqmp/protocol.py
@@ -242,6 +242,24 @@ def runstate(self) -> Runstate:
"""The current `Runstate` of the connection."""
return self._runstate
+ @upper_half
+ @require(Runstate.IDLE)
+ async def accept(self, address: Union[str, Tuple[str, int]],
+ ssl: Optional[SSLContext] = None) -> None:
+ """
+ Accept a connection and begin processing message queues.
+
+ If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
+
+ :param address:
+ Address to listen to; UNIX socket path or TCP address/port.
+ :param ssl: SSL context to use, if any.
+
+ :raise StateError: When the `Runstate` is not `IDLE`.
+ :raise ConnectError: If a connection could not be accepted.
+ """
+ await self._new_session(address, ssl, accept=True)
+
@upper_half
@require(Runstate.IDLE)
async def connect(self, address: Union[str, Tuple[str, int]],
@@ -302,7 +320,8 @@ def _set_state(self, state: Runstate) -> None:
@upper_half
async def _new_session(self,
address: Union[str, Tuple[str, int]],
- ssl: Optional[SSLContext] = None) -> None:
+ ssl: Optional[SSLContext] = None,
+ accept: bool = False) -> None:
"""
Establish a new connection and initialize the session.
@@ -311,9 +330,10 @@ async def _new_session(self,
to be set back to `IDLE`.
:param address:
- Address to connect to;
+ Address to connect to/listen on;
UNIX socket path or TCP address/port.
:param ssl: SSL context to use, if any.
+ :param accept: Accept a connection instead of connecting when `True`.
:raise ConnectError:
When a connection or session cannot be established.
@@ -332,7 +352,10 @@ async def _new_session(self,
phase = "connection"
try:
- await self._do_connect(address, ssl)
+ if accept:
+ await self._do_accept(address, ssl)
+ else:
+ await self._do_connect(address, ssl)
phase = "session"
await self._begin_new_session()
@@ -351,6 +374,62 @@ async def _new_session(self,
assert self.runstate == Runstate.RUNNING
+ @upper_half
+ async def _do_accept(self, address: Union[str, Tuple[str, int]],
+ ssl: Optional[SSLContext] = None) -> None:
+ """
+ Acting as the transport server, accept a single connection.
+
+ :param address:
+ Address to listen on; UNIX socket path or TCP address/port.
+ :param ssl: SSL context to use, if any.
+
+ :raise OSError: For stream-related errors.
+ """
+ self.logger.debug("Awaiting connection ...")
+ connected = asyncio.Event()
+ server: Optional[asyncio.AbstractServer] = None
+
+ async def _client_connected_cb(reader: asyncio.StreamReader,
+ writer: asyncio.StreamWriter) -> None:
+ """Used to accept a single incoming connection, see below."""
+ nonlocal server
+ nonlocal connected
+
+ # A connection has been accepted; stop listening for new ones.
+ assert server is not None
+ server.close()
+ await server.wait_closed()
+ server = None
+
+ # Register this client as being connected
+ self._reader, self._writer = (reader, writer)
+
+ # Signal back: We've accepted a client!
+ connected.set()
+
+ if isinstance(address, tuple):
+ coro = asyncio.start_server(
+ _client_connected_cb,
+ host=address[0],
+ port=address[1],
+ ssl=ssl,
+ backlog=1,
+ )
+ else:
+ coro = asyncio.start_unix_server(
+ _client_connected_cb,
+ path=address,
+ ssl=ssl,
+ backlog=1,
+ )
+
+ server = await coro # Starts listening
+ await connected.wait() # Waits for the callback to fire (and finish)
+ assert server is None
+
+ self.logger.debug("Connection accepted")
+
@upper_half
async def _do_connect(self, address: Union[str, Tuple[str, int]],
ssl: Optional[SSLContext] = None) -> None:
--
2.31.1
next prev parent reply other threads:[~2021-07-01 4:22 UTC|newest]
Thread overview: 25+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-07-01 4:12 [PATCH 00/20] python: introduce Asynchronous QMP package John Snow
2021-07-01 4:12 ` [PATCH 01/20] python/pylint: Add exception for TypeVar names ('T') John Snow
2021-07-01 4:12 ` [PATCH 02/20] python/pylint: disable too-many-function-args John Snow
2021-07-01 4:12 ` [PATCH 03/20] python/aqmp: add asynchronous QMP (AQMP) subpackage John Snow
2021-07-01 4:12 ` [PATCH 04/20] python/aqmp: add error classes John Snow
2021-07-01 4:12 ` [PATCH 05/20] python/aqmp: add asyncio compatibility wrappers John Snow
2021-07-01 4:12 ` [PATCH 06/20] python/aqmp: add generic async message-based protocol support John Snow
2021-07-01 4:13 ` [PATCH 07/20] python/aqmp: add runstate state machine to AsyncProtocol John Snow
2021-07-01 4:13 ` [PATCH 08/20] python/aqmp: add logging " John Snow
2021-07-01 4:13 ` John Snow [this message]
2021-07-01 4:13 ` [PATCH 10/20] python/aqmp: add _cb_inbound and _cb_inbound logging hooks John Snow
2021-07-01 4:13 ` [PATCH 11/20] python/aqmp: add AsyncProtocol._readline() method John Snow
2021-07-01 4:13 ` [PATCH 12/20] python/aqmp: add QMP Message format John Snow
2021-07-07 14:52 ` Niteesh G. S.
2021-07-08 16:50 ` John Snow
2021-07-01 4:13 ` [PATCH 13/20] python/aqmp: add well-known QMP object models John Snow
2021-07-01 4:13 ` [PATCH 14/20] python/aqmp: add QMP event support John Snow
2021-07-01 4:13 ` [PATCH 15/20] python/aqmp: add QMP protocol support John Snow
2021-07-01 4:13 ` [PATCH 16/20] python/aqmp: Add message routing to QMP protocol John Snow
2021-07-01 4:13 ` [PATCH 17/20] python/aqmp: add execute() interfaces John Snow
2021-07-01 4:13 ` [PATCH 18/20] python/aqmp: add _raw() execution interface John Snow
2021-07-01 4:13 ` [PATCH 19/20] python/aqmp: add asyncio_run compatibility wrapper John Snow
2021-07-01 4:13 ` [PATCH 20/20] python/aqmp: add scary message John Snow
2021-07-05 13:19 ` [PATCH 00/20] python: introduce Asynchronous QMP package Stefan Hajnoczi
2021-07-08 13:24 ` John Snow
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20210701041313.1696009-10-jsnow@redhat.com \
--to=jsnow@redhat.com \
--cc=armbru@redhat.com \
--cc=crosa@redhat.com \
--cc=eblake@redhat.com \
--cc=ehabkost@redhat.com \
--cc=niteesh.gs@gmail.com \
--cc=qemu-devel@nongnu.org \
--cc=stefanha@redhat.com \
--cc=wainersm@redhat.com \
--cc=wrampazz@redhat.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).