From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from vger.kernel.org (vger.kernel.org [23.128.96.18]) by smtp.lore.kernel.org (Postfix) with ESMTP id 5D890C46467 for ; Tue, 29 Nov 2022 21:58:34 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S236871AbiK2V6F (ORCPT ); Tue, 29 Nov 2022 16:58:05 -0500 Received: from lindbergh.monkeyblade.net ([23.128.96.19]:39438 "EHLO lindbergh.monkeyblade.net" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S236961AbiK2V5n (ORCPT ); Tue, 29 Nov 2022 16:57:43 -0500 Received: from kadath.azazel.net (unknown [IPv6:2001:8b0:135f:bcd1:e0cb:4eff:fedf:e608]) by lindbergh.monkeyblade.net (Postfix) with ESMTPS id D3E521E3C3 for ; Tue, 29 Nov 2022 13:57:41 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; q=dns/txt; c=relaxed/relaxed; d=azazel.net; s=20220717; h=Content-Transfer-Encoding:MIME-Version:References:In-Reply-To: Message-Id:Date:Subject:To:From:Sender:Reply-To:Cc:Content-Type:Content-ID: Content-Description:Resent-Date:Resent-From:Resent-Sender:Resent-To:Resent-Cc :Resent-Message-ID:List-Id:List-Help:List-Unsubscribe:List-Subscribe: List-Post:List-Owner:List-Archive; bh=vR1fpUF+yPWCqyrCy1m36KmHOvOPbhajvL0mJ6LqN1c=; b=jgV7HTwDBV+Lhk0AcvBx8cWgq1 wqGiW54bGDC31LxE2DiQ/zIEPyUvPrdLO+057Zmuge2GhGk6U5KFeorccwAG5uTmvjJ+SXdMe5pD4 LhJrtAwnoN5NQBJ9jGE5hxHAor39mnx665UqYEzrmKUdyzjbosW5F1m7z0HqhRkGn9Uw0rakNaQYp tVT2WAtxZpW2w4UHAuhZ/qzNt70NRTmwMNeBrCHHM13DW+y8KZmgma4mD2zy8U4xZGMKpPhVoujjx cmELl1CdQkv8aS4l3j27Uac4cwmjJ8qnLvE5KzZWmKyTdniLskJBhJEt9IP4ULkHQfJM4eLPAdahK KpKgcl7w==; Received: from ulthar.dreamlands.azazel.net ([2001:8b0:fb7d:d6d7:2e4d:54ff:fe4b:a9ae]) by kadath.azazel.net with esmtp (Exim 4.94.2) (envelope-from ) id 1p08SL-00DjQp-K8 for netfilter-devel@vger.kernel.org; Tue, 29 Nov 2022 21:47:57 +0000 From: Jeremy Sowden To: Netfilter Devel Subject: [PATCH ulogd2 v2 v2 25/34] db: synchronize access to ring-buffer Date: Tue, 29 Nov 2022 21:47:40 +0000 Message-Id: <20221129214749.247878-26-jeremy@azazel.net> X-Mailer: git-send-email 2.35.1 In-Reply-To: <20221129214749.247878-1-jeremy@azazel.net> References: <20221129214749.247878-1-jeremy@azazel.net> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SA-Exim-Connect-IP: 2001:8b0:fb7d:d6d7:2e4d:54ff:fe4b:a9ae X-SA-Exim-Mail-From: jeremy@azazel.net X-SA-Exim-Scanned: No (on kadath.azazel.net); SAEximRunCond expanded to false Precedence: bulk List-ID: X-Mailing-List: netfilter-devel@vger.kernel.org There are a mutex and condition-variable associated with the ring-buffer. However, they are not used to synchronize access to the buffer, but only to wake the thread that processes the buffer when new statements are added to it. Thus there is nothing to prevent concurrent modifications of the buffer. Instead, acquire the mutex before adding to the buffer, and, in the processing thread, copy the statement we're about to execute out of the buffer and release the mutex while processing it. Signed-off-by: Jeremy Sowden --- include/ulogd/db.h | 2 ++ util/db.c | 63 +++++++++++++++++++++++++++++++++++++--------- 2 files changed, 53 insertions(+), 12 deletions(-) diff --git a/include/ulogd/db.h b/include/ulogd/db.h index 6c2e3d07f463..bf4a19dea150 100644 --- a/include/ulogd/db.h +++ b/include/ulogd/db.h @@ -34,6 +34,8 @@ struct db_stmt_ring { struct db_stmt *elems; /* Buffer containing `size` statements of * `length` bytes */ + struct db_stmt *stmt; /* Currently executing statement */ + uint32_t size; /* No. of elements in ring buffer */ uint32_t used; /* No. of elements in ring buffer in use */ uint32_t length; /* Length of one element in ring buffer */ diff --git a/util/db.c b/util/db.c index 487eaed26153..6cfbcbc16791 100644 --- a/util/db.c +++ b/util/db.c @@ -372,10 +372,15 @@ _stop_db(struct ulogd_pluginstance *upi) } if (di->ring.size > 0) { pthread_cancel(di->ring.thread_id); + pthread_cond_destroy(&di->ring.cond); pthread_mutex_destroy(&di->ring.mutex); + free(di->ring.elems); + free(di->ring.stmt); + di->ring.elems = NULL; + di->ring.stmt = NULL; } } @@ -737,14 +742,22 @@ _start_ring(struct ulogd_pluginstance *upi) return 0; /* allocate */ + stmt_size = sizeof(*di->stmt) + di->stmt->size; stmt_align = __alignof__(*di->stmt); di->ring.length = stmt_size % stmt_align != 0 ? (1 + stmt_size / stmt_align) * stmt_align : stmt_size; + + di->ring.stmt = malloc(di->ring.length); + if (di->ring.stmt == NULL) + return -1; + di->ring.elems = calloc(di->ring.size, di->ring.length); - if (di->ring.elems == NULL) + if (di->ring.elems == NULL) { + free(di->ring.stmt); return -1; + } di->ring.wr_idx = di->ring.rd_idx = di->ring.used = 0; ulogd_log(ULOGD_NOTICE, "Allocating %" PRIu32 " elements of size %" PRIu32 " for ring\n", @@ -775,6 +788,7 @@ cond_error: pthread_cond_destroy(&di->ring.cond); alloc_error: free(di->ring.elems); + free(di->ring.stmt); return -1; } @@ -784,12 +798,14 @@ _add_to_ring(struct ulogd_pluginstance *upi) { struct db_instance *di = (struct db_instance *) &upi->private; + pthread_mutex_lock(&di->ring.mutex); + if (di->ring.used == di->ring.size) { if (!di->ring.full) { ulogd_log(ULOGD_ERROR, "No place left in ring\n"); di->ring.full = 1; } - return ULOGD_IRET_OK; + goto unlock_mutex; } if (di->ring.full) { @@ -801,6 +817,9 @@ _add_to_ring(struct ulogd_pluginstance *upi) _incr_ring_used(&di->ring, 1); pthread_cond_signal(&di->ring.cond); +unlock_mutex: + pthread_mutex_unlock(&di->ring.mutex); + return ULOGD_IRET_OK; } @@ -809,27 +828,47 @@ _process_ring(void *arg) { struct ulogd_pluginstance *upi = arg; struct db_instance *di = (struct db_instance *) &upi->private; + struct db_stmt *stmt = di->ring.stmt; pthread_mutex_lock(&di->ring.mutex); + while(1) { - /* wait cond */ + pthread_cond_wait(&di->ring.cond, &di->ring.mutex); + while (di->ring.used > 0) { - struct db_stmt *stmt = _get_ring_elem(&di->ring, - di->ring.rd_idx); - - if (di->driver->execute(upi, stmt) < 0) { - di->driver->close_db(upi); - while (di->driver->open_db(upi) < 0) - sleep(1); - /* try to re-run statement */ + + memcpy(stmt, _get_ring_elem(&di->ring, di->ring.rd_idx), + di->ring.length); + + pthread_mutex_unlock(&di->ring.mutex); + +exec_stmt: + if (di->driver->execute(upi, stmt) == 0) { + + pthread_mutex_lock(&di->ring.mutex); + + _incr_ring_used(&di->ring, -1); + continue; + } - _incr_ring_used(&di->ring, -1); + /* If the exec fails, close the DB connexion and try to + * open it again. Once the connexion is made, retry the + * statement. + */ + di->driver->close_db(upi); + while (di->driver->open_db(upi) < 0) + sleep(1); + goto exec_stmt; + } + } + pthread_mutex_unlock(&di->ring.mutex); + return NULL; } -- 2.35.1