From: "Benoît Canet" <benoit@irqsave.net>
To: qemu-devel@nongnu.org
Cc: kwolf@redhat.com, pbonzini@redhat.com,
"Benoît Canet" <benoit@irqsave.net>,
stefanha@redhat.com
Subject: [Qemu-devel] [PATCH V5 1/5] throttle: Add a new throttling API implementing continuous leaky bucket.
Date: Mon, 12 Aug 2013 18:53:12 +0200 [thread overview]
Message-ID: <1376326396-7676-2-git-send-email-benoit@irqsave.net> (raw)
In-Reply-To: <1376326396-7676-1-git-send-email-benoit@irqsave.net>
Implement the continuous leaky bucket algorithm devised on IRC as a separate
module.
Signed-off-by: Benoit Canet <benoit@irqsave.net>
---
include/qemu/throttle.h | 105 +++++++++++++
util/Makefile.objs | 1 +
util/throttle.c | 391 +++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 497 insertions(+)
create mode 100644 include/qemu/throttle.h
create mode 100644 util/throttle.c
diff --git a/include/qemu/throttle.h b/include/qemu/throttle.h
new file mode 100644
index 0000000..e03bc3e
--- /dev/null
+++ b/include/qemu/throttle.h
@@ -0,0 +1,105 @@
+/*
+ * QEMU throttling infrastructure
+ *
+ * Copyright (C) Nodalink, SARL. 2013
+ *
+ * Author:
+ * Benoît Canet <benoit.canet@irqsave.net>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation; either version 2 or
+ * (at your option) version 3 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef THROTTLING_H
+#define THROTTLING_H
+
+#include <stdint.h>
+#include "qemu-common.h"
+#include "qemu/timer.h"
+
+#define NANOSECONDS_PER_SECOND 1000000000.0
+
+#define BUCKETS_COUNT 6
+
+typedef enum {
+ THROTTLE_BPS_TOTAL = 0,
+ THROTTLE_BPS_READ = 1,
+ THROTTLE_BPS_WRITE = 2,
+ THROTTLE_OPS_TOTAL = 3,
+ THROTTLE_OPS_READ = 4,
+ THROTTLE_OPS_WRITE = 5,
+} BucketType;
+
+typedef struct LeakyBucket {
+ double ups; /* units per second */
+ double max; /* leaky bucket max in units */
+ double bucket; /* bucket in units */
+} LeakyBucket;
+
+/* The following structure is used to configure a ThrottleState
+ * It contains a bit of state: the bucket field of the LeakyBucket structure.
+ * However it allows to keep the code clean and the bucket field is reset to
+ * zero at the right time.
+ */
+typedef struct ThrottleConfig {
+ LeakyBucket buckets[6]; /* leaky buckets */
+ uint64_t unit_size; /* size of an unit in bytes */
+ uint64_t op_size; /* size of an operation in units */
+} ThrottleConfig;
+
+typedef struct ThrottleState {
+ ThrottleConfig cfg; /* configuration */
+ int64_t previous_leak; /* timestamp of the last leak done */
+ QEMUTimer * timers[2]; /* timers used to do the throttling */
+ QEMUClock *clock; /* the clock used */
+} ThrottleState;
+
+/* operations on single leaky buckets */
+void throttle_leak_bucket(LeakyBucket *bkt, int64_t delta);
+
+int64_t throttle_compute_wait(LeakyBucket *bkt);
+
+/* expose timer computation function for unit tests */
+bool throttle_compute_timer(ThrottleState *ts,
+ bool is_write,
+ int64_t now,
+ int64_t *next_timer);
+
+/* init/destroy cycle */
+void throttle_init(ThrottleState *ts,
+ QEMUClock *clock,
+ void (read_timer)(void *),
+ void (write_timer)(void *),
+ void *timer_opaque);
+
+void throttle_destroy(ThrottleState *ts);
+
+bool throttle_have_timer(ThrottleState *ts);
+
+/* configuration */
+bool throttle_enabled(ThrottleConfig *cfg);
+
+bool throttle_conflicting(ThrottleConfig *cfg);
+
+bool throttle_is_valid(ThrottleConfig *cfg);
+
+void throttle_config(ThrottleState *ts, ThrottleConfig *cfg);
+
+void throttle_get_config(ThrottleState *ts, ThrottleConfig *cfg);
+
+/* usage */
+bool throttle_allowed(ThrottleState *ts, bool is_write);
+
+void throttle_account(ThrottleState *ts, bool is_write, uint64_t size);
+
+#endif
diff --git a/util/Makefile.objs b/util/Makefile.objs
index dc72ab0..2bb13a2 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -11,3 +11,4 @@ util-obj-y += iov.o aes.o qemu-config.o qemu-sockets.o uri.o notify.o
util-obj-y += qemu-option.o qemu-progress.o
util-obj-y += hexdump.o
util-obj-y += crc32c.o
+util-obj-y += throttle.o
diff --git a/util/throttle.c b/util/throttle.c
new file mode 100644
index 0000000..2f25d44
--- /dev/null
+++ b/util/throttle.c
@@ -0,0 +1,391 @@
+/*
+ * QEMU throttling infrastructure
+ *
+ * Copyright (C) Nodalink, SARL. 2013
+ *
+ * Author:
+ * Benoît Canet <benoit.canet@irqsave.net>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation; either version 2 or
+ * (at your option) version 3 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "qemu/throttle.h"
+#include "qemu/timer.h"
+
+/* This function make a bucket leak
+ *
+ * @bkt: the bucket to make leak
+ * @delta: the time delta
+ */
+void throttle_leak_bucket(LeakyBucket *bkt, int64_t delta)
+{
+ double leak;
+
+ /* compute how much to leak */
+ leak = (bkt->ups * (double) delta) / NANOSECONDS_PER_SECOND;
+
+ /* make the bucket leak */
+ bkt->bucket = MAX(bkt->bucket - leak, 0);
+}
+
+/* Calculate the time delta since last leak and make proportionals leaks
+ *
+ * @now: the current timestamp in ns
+ */
+static void throttle_do_leak(ThrottleState *ts, int64_t now)
+{
+ /* compute the time elapsed since the last leak */
+ int64_t delta = now - ts->previous_leak;
+ int i;
+
+ ts->previous_leak = now;
+
+ if (delta <= 0) {
+ return;
+ }
+
+ /* make each bucket leak */
+ for (i = 0; i < BUCKETS_COUNT; i++) {
+ throttle_leak_bucket(&ts->cfg.buckets[i], delta);
+ }
+}
+
+/* do the real job of computing the time to wait
+ *
+ * @limit: the throttling limit
+ * @extra: the number of operation to delay
+ * @ret: the time to wait in ns
+ */
+static int64_t throttle_do_compute_wait(double limit, double extra)
+{
+ double wait = extra * NANOSECONDS_PER_SECOND;
+ wait /= limit;
+ return wait;
+}
+
+/* This function compute the wait time in ns that a leaky bucket should trigger
+ *
+ * @bkt: the leaky bucket we operate on
+ * @ret: the resulting wait time in ns or 0 if the operation can go through
+ */
+int64_t throttle_compute_wait(LeakyBucket *bkt)
+{
+ double extra; /* the number of extra units blocking the io */
+
+ if (!bkt->ups) {
+ return 0;
+ }
+
+ extra = bkt->bucket - bkt->max;
+
+ if (extra <= 0) {
+ return 0;
+ }
+
+ return throttle_do_compute_wait(bkt->ups, extra);
+}
+
+/* This function compute the time that must be waited while this IO
+ *
+ * @is_write: true if the current IO is a write, false if it's a read
+ * @ret: time to wait
+ */
+static int64_t throttle_compute_wait_for(ThrottleState *ts,
+ bool is_write,
+ int64_t now)
+{
+ BucketType to_check[2][4] = { {THROTTLE_BPS_TOTAL,
+ THROTTLE_OPS_TOTAL,
+ THROTTLE_BPS_READ,
+ THROTTLE_OPS_READ},
+ {THROTTLE_BPS_TOTAL,
+ THROTTLE_OPS_TOTAL,
+ THROTTLE_BPS_WRITE,
+ THROTTLE_OPS_WRITE}, };
+ int64_t wait, max_wait = 0;
+ int i;
+
+ for (i = 0; i < 4; i++) {
+ BucketType index = to_check[is_write][i];
+ wait = throttle_compute_wait(&ts->cfg.buckets[index]);
+ if (wait > max_wait) {
+ max_wait = wait;
+ }
+ }
+
+ return max_wait;
+}
+
+/* compute the timer for this type of operation
+ *
+ * @is_write: the type of operation
+ * @now: the current clock timerstamp
+ * @next_timer: the resulting timer
+ * @ret: true if a timer must be set
+ */
+bool throttle_compute_timer(ThrottleState *ts,
+ bool is_write,
+ int64_t now,
+ int64_t *next_timer)
+{
+ int64_t wait;
+
+ /* leak proportionally to the time elapsed */
+ throttle_do_leak(ts, now);
+
+ /* compute the wait time if any */
+ wait = throttle_compute_wait_for(ts, is_write, now);
+
+ /* if the code must wait compute when the next timer should fire */
+ if (wait) {
+ *next_timer = now + wait;
+ return true;
+ }
+
+ /* else no need to wait at all */
+ *next_timer = now;
+ return false;
+}
+
+/* To be called first on the ThrottleState */
+void throttle_init(ThrottleState *ts,
+ QEMUClock *clock,
+ void (read_timer)(void *),
+ void (write_timer)(void *),
+ void *timer_opaque)
+{
+ memset(ts, 0, sizeof(ThrottleState));
+
+ ts->clock = clock;
+ ts->timers[0] = qemu_new_timer_ns(ts->clock, read_timer, timer_opaque);
+ ts->timers[1] = qemu_new_timer_ns(ts->clock, write_timer, timer_opaque);
+}
+
+/* destroy a timer */
+static void throttle_timer_destroy(QEMUTimer **timer)
+{
+ assert(*timer != NULL);
+
+ if (qemu_timer_pending(*timer)) {
+ qemu_del_timer(*timer);
+ }
+
+ qemu_free_timer(*timer);
+ *timer = NULL;
+}
+
+/* To be called last on the ThrottleState */
+void throttle_destroy(ThrottleState *ts)
+{
+ int i;
+
+ for (i = 0; i < 2; i++) {
+ throttle_timer_destroy(&ts->timers[i]);
+ }
+}
+
+/* is any throttling timer configured */
+bool throttle_have_timer(ThrottleState *ts)
+{
+ if (ts->timers[0]) {
+ return true;
+ }
+
+ return false;
+}
+
+/* Does any throttling must be done
+ *
+ * @cfg: the throttling configuration to inspect
+ * @ret: true if throttling must be done else false
+ */
+bool throttle_enabled(ThrottleConfig *cfg)
+{
+ int i;
+
+ for (i = 0; i < BUCKETS_COUNT; i++) {
+ if (cfg->buckets[i].ups > 0) {
+ return true;
+ }
+ }
+
+ return false;
+}
+
+/* return true if any two throttling parameters conflicts
+ *
+ * @cfg: the throttling configuration to inspect
+ * @ret: true if any conflict detected else false
+ */
+bool throttle_conflicting(ThrottleConfig *cfg)
+{
+ bool bps_flag, ops_flag;
+ bool bps_max_flag, ops_max_flag;
+
+ bps_flag = cfg->buckets[THROTTLE_BPS_TOTAL].ups &&
+ (cfg->buckets[THROTTLE_BPS_READ].ups ||
+ cfg->buckets[THROTTLE_BPS_WRITE].ups);
+
+ ops_flag = cfg->buckets[THROTTLE_OPS_TOTAL].ups &&
+ (cfg->buckets[THROTTLE_OPS_READ].ups ||
+ cfg->buckets[THROTTLE_OPS_WRITE].ups);
+
+ bps_max_flag = cfg->buckets[THROTTLE_BPS_TOTAL].max &&
+ (cfg->buckets[THROTTLE_BPS_READ].max ||
+ cfg->buckets[THROTTLE_BPS_WRITE].max);
+
+ ops_max_flag = cfg->buckets[THROTTLE_OPS_TOTAL].max &&
+ (cfg->buckets[THROTTLE_OPS_READ].max ||
+ cfg->buckets[THROTTLE_OPS_WRITE].max);
+
+ return bps_flag || ops_flag || bps_max_flag || ops_max_flag;
+}
+
+/* check if a throttling configuration is valid
+ * @cfg: the throttling configuration to inspect
+ * @ret: true if valid else false
+ */
+bool throttle_is_valid(ThrottleConfig *cfg)
+{
+ bool invalid = false;
+ int i;
+
+ for (i = 0; i < BUCKETS_COUNT; i++) {
+ if (cfg->buckets[i].ups < 0) {
+ invalid = true;
+ }
+ }
+
+ for (i = 0; i < BUCKETS_COUNT; i++) {
+ if (cfg->buckets[i].max < 0) {
+ invalid = true;
+ }
+ }
+
+ return !invalid;
+}
+
+/* fix bucket parameters */
+static void throttle_fix_bucket(LeakyBucket *bkt)
+{
+ double min = bkt->ups / 10;
+ /* zero bucket level */
+ bkt->bucket = 0;
+
+ /* take care of not using cpu and also improve throttling precision */
+ if (bkt->ups &&
+ bkt->max < min) {
+ bkt->max = min;
+ }
+}
+
+/* take care of canceling a timer */
+static void throttle_cancel_timer(QEMUTimer *timer)
+{
+ assert(timer != NULL);
+ if (!qemu_timer_pending(timer)) {
+ return;
+ }
+
+ qemu_del_timer(timer);
+}
+
+/* Used to configure the throttle
+ *
+ * @ts: the throttle state we are working on
+ * @cfg: the config to set
+ */
+void throttle_config(ThrottleState *ts, ThrottleConfig *cfg)
+{
+ int i;
+
+ ts->cfg = *cfg;
+
+ for (i = 0; i < BUCKETS_COUNT; i++) {
+ throttle_fix_bucket(&ts->cfg.buckets[i]);
+ }
+
+ ts->previous_leak = qemu_get_clock_ns(ts->clock);
+
+ for (i = 0; i < 2; i++) {
+ throttle_cancel_timer(ts->timers[i]);
+ }
+}
+
+/* used to get config
+ *
+ * @ts: the throttle state we are working on
+ * @cfg: where to write the config
+ */
+void throttle_get_config(ThrottleState *ts, ThrottleConfig *cfg)
+{
+ *cfg = ts->cfg;
+}
+
+
+/* compute if an operation must be allowed and set a timer if not
+ *
+ * NOTE: this function is not unit tested due to it's usage of qemu_mod_timer
+ *
+ * @is_write: the type of operation (read/write)
+ * @ret: true if the operation is allowed to flow else if must wait
+ */
+bool throttle_allowed(ThrottleState *ts, bool is_write)
+{
+ int64_t now = qemu_get_clock_ns(ts->clock);
+ int64_t next_timer;
+ bool must_wait;
+
+ must_wait = throttle_compute_timer(ts,
+ is_write,
+ now,
+ &next_timer);
+
+ /* if the request is throttled arm timer */
+ if (must_wait) {
+ qemu_mod_timer(ts->timers[is_write], next_timer);
+ }
+
+ return !must_wait;
+}
+
+/* do the accounting for this operation
+ *
+ * @is_write: the type of operation (read/write)
+ * size: the size of the operation
+ */
+void throttle_account(ThrottleState *ts, bool is_write, uint64_t size)
+{
+ double bytes_size;
+
+ /* if cfg.op_size is not defined we will acccount exactly 1 operation */
+ double units = 1.0;
+ if (ts->cfg.op_size) {
+ units = (double) size / ts->cfg.op_size;
+ }
+
+ bytes_size = size * ts->cfg.unit_size;
+
+ ts->cfg.buckets[THROTTLE_BPS_TOTAL].bucket += bytes_size;
+ ts->cfg.buckets[THROTTLE_OPS_TOTAL].bucket += units;
+
+ if (is_write) {
+ ts->cfg.buckets[THROTTLE_BPS_WRITE].bucket += bytes_size;
+ ts->cfg.buckets[THROTTLE_OPS_WRITE].bucket += units;
+ } else {
+ ts->cfg.buckets[THROTTLE_BPS_READ].bucket += bytes_size;
+ ts->cfg.buckets[THROTTLE_OPS_READ].bucket += units;
+ }
+}
+
--
1.7.10.4
next prev parent reply other threads:[~2013-08-12 16:52 UTC|newest]
Thread overview: 17+ messages / expand[flat|nested] mbox.gz Atom feed top
2013-08-12 16:53 [Qemu-devel] [PATCH V5 0/5] Continuous Leaky Bucket Throttling Benoît Canet
2013-08-12 16:53 ` Benoît Canet [this message]
2013-08-14 8:52 ` [Qemu-devel] [PATCH V5 1/5] throttle: Add a new throttling API implementing continuous leaky bucket Fam Zheng
2013-08-16 11:45 ` Stefan Hajnoczi
2013-08-19 11:27 ` Paolo Bonzini
2013-08-12 16:53 ` [Qemu-devel] [PATCH V5 2/5] throttle: Add units tests Benoît Canet
2013-08-12 16:53 ` [Qemu-devel] [PATCH V5 3/5] block: Enable the new throttling code in the block layer Benoît Canet
2013-08-14 9:31 ` Fam Zheng
2013-08-14 9:50 ` Fam Zheng
2013-08-16 12:02 ` Stefan Hajnoczi
2013-08-12 16:53 ` [Qemu-devel] [PATCH V5 4/5] block: Add support for throttling burst max in QMP and the command line Benoît Canet
2013-08-16 12:07 ` Stefan Hajnoczi
2013-08-12 16:53 ` [Qemu-devel] [PATCH V5 5/5] block: Add iops_sector_count to do the iops accounting for a given io size Benoît Canet
2013-08-14 9:48 ` Fam Zheng
2013-08-14 18:31 ` Benoît Canet
2013-08-16 12:10 ` Stefan Hajnoczi
2013-08-16 12:14 ` [Qemu-devel] [PATCH V5 0/5] Continuous Leaky Bucket Throttling Stefan Hajnoczi
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=1376326396-7676-2-git-send-email-benoit@irqsave.net \
--to=benoit@irqsave.net \
--cc=kwolf@redhat.com \
--cc=pbonzini@redhat.com \
--cc=qemu-devel@nongnu.org \
--cc=stefanha@redhat.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.