diff --git a/src/box/vinyl.c b/src/box/vinyl.c index 2f1dceb874c979fc7a1741f9ca21ce2fc76d8d2b..fc2091bee812bfdd3c590381daf3b45fa798eb43 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -44,6 +44,7 @@ #include "vy_scheduler.h" #include "vy_stat.h" +#include <math.h> #include <small/lsregion.h> #include <coio_file.h> @@ -100,6 +101,17 @@ struct vy_env { struct vy_quota quota; /** Timer for updating quota watermark. */ ev_timer quota_timer; + /** + * Amount of quota used since the last + * invocation of the quota timer callback. + */ + size_t quota_use_curr; + /** + * Quota use rate, in bytes per second. + * Calculated as exponentially weighted + * moving average of quota_use_curr. + */ + size_t quota_use_rate; /** Common index environment. */ struct vy_index_env index_env; /** Environment for cache subsystem */ @@ -126,6 +138,19 @@ struct vy_env { int write_threads; }; +enum { + /** + * Time interval between successive updates of + * quota watermark and use rate, in seconds. + */ + VY_QUOTA_UPDATE_INTERVAL = 1, + /** + * Period of time over which the quota use rate + * is averaged, in seconds. + */ + VY_QUOTA_RATE_AVG_PERIOD = 5, +}; + /** Mask passed to vy_gc(). */ enum { /** Delete incomplete runs. */ @@ -232,12 +257,6 @@ vy_stat_dump_bandwidth(struct vy_stat *s) return histogram_percentile(s->dump_bw, 10); } -static int64_t -vy_stat_tx_write_rate(struct vy_stat *s) -{ - return rmean_mean(s->rmean, VY_STAT_TX_WRITE); -} - /** Cursor. */ struct vy_cursor { /** @@ -2107,6 +2126,7 @@ vy_prepare(struct vy_env *env, struct vy_tx *tx) rmean_collect(env->stat->rmean, VY_STAT_TX, 1); rmean_collect(env->stat->rmean, VY_STAT_TX_OPS, tx->write_count); rmean_collect(env->stat->rmean, VY_STAT_TX_WRITE, write_size); + env->quota_use_curr += write_size; return 0; } @@ -2179,7 +2199,15 @@ vy_env_quota_timer_cb(ev_loop *loop, ev_timer *timer, int events) struct vy_env *e = timer->data; - int64_t tx_write_rate = vy_stat_tx_write_rate(e->stat); + /* + * Update the quota use rate with the new measurement. + */ + static const double weight = 1 - exp(-VY_QUOTA_UPDATE_INTERVAL / + (double)VY_QUOTA_RATE_AVG_PERIOD); + e->quota_use_rate = (1 - weight) * e->quota_use_rate + + weight * e->quota_use_curr / VY_QUOTA_UPDATE_INTERVAL; + e->quota_use_curr = 0; + int64_t dump_bandwidth = vy_stat_dump_bandwidth(e->stat); /* @@ -2191,10 +2219,10 @@ vy_env_quota_timer_cb(ev_loop *loop, ev_timer *timer, int events) * * limit - watermark watermark * ----------------- = -------------- - * tx_write_rate dump_bandwidth + * quota_use_rate dump_bandwidth */ size_t watermark = ((double)e->quota.limit * dump_bandwidth / - (dump_bandwidth + tx_write_rate + 1)); + (dump_bandwidth + e->quota_use_rate + 1)); vy_quota_set_watermark(&e->quota, watermark); } @@ -2307,7 +2335,8 @@ vy_env_new(const char *path, size_t memory, size_t cache, int read_threads, mempool_create(&e->cursor_pool, slab_cache, sizeof(struct vy_cursor)); vy_quota_create(&e->quota, vy_env_quota_exceeded_cb); - ev_timer_init(&e->quota_timer, vy_env_quota_timer_cb, 0, 1.); + ev_timer_init(&e->quota_timer, vy_env_quota_timer_cb, 0, + VY_QUOTA_UPDATE_INTERVAL); e->quota_timer.data = e; ev_timer_start(loop(), &e->quota_timer); vy_cache_env_create(&e->cache_env, slab_cache, cache);