Skip to content
Snippets Groups Projects
Commit c20e0449 authored by Vladislav Shpilevoy's avatar Vladislav Shpilevoy Committed by Kirill Yukhin
Browse files

cord_buf: introduce ownership management

The global ibuf used for hot Lua and Lua C code didn't have
ownership management. As a result, it could be reused in some
unexpected ways during Lua GC via __gc handlers, even if it was
currently in use in some code below the stack.

The patch makes cord_ibuf_take() steal the global buffer from its
global stash, and assign to the current fiber. cord_ibuf_put()
puts it back to the stash, and detaches from the fiber. If yield
happens before cord_ibuf_put(), the buffer is detached
automatically.

Fiber attach/detach is done via on_yield/on_stop triggers. The
buffer is not supposed to survive a yield, so this allows to
free/put the buffer back to the stash even if the owner didn't do
that. For instance, if a Lua exception was raised before
cord_ibuf_put() was called.

This makes cord buffer being safe to use in any yield-free code,
even if Lua GC might be started. And in non-Lua code as well.

Part of #5632
parent ade45685
No related merge requests found
......@@ -5,6 +5,7 @@
*/
#include "cord_buf.h"
#include "fiber.h"
#include "trigger.h"
#include "small/ibuf.h"
......@@ -13,35 +14,154 @@ enum {
CORD_IBUF_START_CAPACITY = 16384,
};
static struct ibuf *cord_buf_global = NULL;
/** Global buffer with automatic collection on fiber yield. */
struct cord_buf {
/** Base buffer. */
struct ibuf base;
/**
* Triggers on fiber stop/yield when the buffer is either destroyed or
* cached to the global stash for later reuse.
*/
struct trigger on_stop;
struct trigger on_yield;
#ifndef NDEBUG
/**
* Fiber owning the buffer right now. Used for debug and sanity checks
* only.
*/
struct fiber *owner;
#endif
};
struct ibuf *
cord_ibuf_take(void)
/**
* The global buffer last saved to the cache. Having it here is supposed to
* help to reuse the buffer's already allocated data sometimes.
*/
static struct cord_buf *cord_buf_global = NULL;
static inline void
cord_buf_put(struct cord_buf *buf);
static void
cord_buf_delete(struct cord_buf *buf);
static inline void
cord_buf_set_owner(struct cord_buf *buf)
{
assert(cord_is_main());
struct ibuf *buf = cord_buf_global;
if (buf != NULL) {
ibuf_reset(buf);
return buf;
}
buf = malloc(sizeof(*buf));
assert(buf->owner == NULL);
struct fiber *f = fiber();
trigger_add(&f->on_stop, &buf->on_stop);
trigger_add(&f->on_yield, &buf->on_yield);
#ifndef NDEBUG
buf->owner = f;
#endif
ibuf_reset(&buf->base);
}
static inline void
cord_buf_clear_owner(struct cord_buf *buf)
{
assert(buf->owner == fiber());
trigger_clear(&buf->on_stop);
trigger_clear(&buf->on_yield);
#ifndef NDEBUG
buf->owner = NULL;
#endif
}
static int
cord_buf_on_stop(struct trigger *trigger, void *event)
{
(void)event;
struct cord_buf *buf = trigger->data;
assert(trigger == &buf->on_stop);
cord_buf_put(buf);
return 0;
}
static int
cord_buf_on_yield(struct trigger *trigger, void *event)
{
(void)event;
struct cord_buf *buf = trigger->data;
assert(trigger == &buf->on_yield);
cord_buf_put(buf);
return 0;
}
static struct cord_buf *
cord_buf_new(void)
{
struct cord_buf *buf = malloc(sizeof(*buf));
if (buf == NULL)
panic("Couldn't allocate thread buffer");
ibuf_create(buf, &cord()->slabc, CORD_IBUF_START_CAPACITY);
cord_buf_global = buf;
ibuf_create(&buf->base, &cord()->slabc, CORD_IBUF_START_CAPACITY);
trigger_create(&buf->on_stop, cord_buf_on_stop, buf, NULL);
trigger_create(&buf->on_yield, cord_buf_on_yield, buf, NULL);
#ifndef NDEBUG
buf->owner = NULL;
#endif
return buf;
}
static inline void
cord_buf_put(struct cord_buf *buf)
{
assert(cord_is_main());
cord_buf_clear_owner(buf);
/*
* Delete if the stash is busy. It could happen if there was >= 2
* buffers at some point and one of them is already saved back to the
* stash.
*
* XXX: in future it might be useful to consider saving the buffers into
* a list. Maybe keep always at most 2 buffers, because usually there
* are at most 2 contexts: normal Lua and Lua during GC. Recursive
* GC is supposed to be rare, no need to optimize it.
*/
if (cord_buf_global == NULL)
cord_buf_global = buf;
else
cord_buf_delete(buf);
}
static inline struct cord_buf *
cord_buf_take(void)
{
assert(cord_is_main());
struct cord_buf *buf = cord_buf_global;
if (buf != NULL)
cord_buf_global = NULL;
else
buf = cord_buf_new();
cord_buf_set_owner(buf);
return buf;
}
static void
cord_buf_delete(struct cord_buf *buf)
{
assert(buf->owner == NULL);
ibuf_destroy(&buf->base);
TRASH(buf);
free(buf);
}
struct ibuf *
cord_ibuf_take(void)
{
return &cord_buf_take()->base;
}
void
cord_ibuf_put(struct ibuf *ibuf)
{
(void)ibuf;
assert(ibuf == cord_buf_global);
cord_buf_put((struct cord_buf *)ibuf);
}
void
cord_ibuf_drop(struct ibuf *ibuf)
{
ibuf_reinit(ibuf);
assert(ibuf == cord_buf_global);
cord_ibuf_put(ibuf);
}
......@@ -18,7 +18,9 @@ struct ibuf *
cord_ibuf_take(void);
/**
* Put the global ibuf back.
* Put the global ibuf back. It is not necessary - the buffer is put back on the
* next yield. But then it can't be reused/freed until the yield. Put it back
* manually when possible.
*/
void
cord_ibuf_put(struct ibuf *ibuf);
......@@ -29,6 +31,8 @@ cord_ibuf_put(struct ibuf *ibuf);
* because it is often needed from Lua, and allows not to call :recycle() there,
* which would be an additional FFI call before cord_ibuf_put().
*
* Drop is not necessary though, see the put() comment.
*
* XXX: recycle of the global buffer is a workaround for the ibuf being used in
* some places working with Lua API, where it wasn't wanted to "reuse" it
* anyhow. Instead, the global buffer is used to protect from the buffer leak in
......
#!/usr/bin/env tarantool
local tap = require('tap')
local fiber = require('fiber')
local buffer = require('buffer')
local cord_ibuf_take = buffer.internal.cord_ibuf_take
local cord_ibuf_put = buffer.internal.cord_ibuf_put
local cord_ibuf_drop = buffer.internal.cord_ibuf_drop
local function test_cord_ibuf(test)
test:plan(10)
local ibuf1 = cord_ibuf_take()
test:is(ibuf1:size(), 0, 'is empty')
ibuf1:alloc(1)
test:is(ibuf1:size(), 1, 'alloc 1')
cord_ibuf_put(ibuf1)
ibuf1 = cord_ibuf_take()
test:is(ibuf1:size(), 0, 'is empty again')
ibuf1:alloc(1)
cord_ibuf_drop(ibuf1)
ibuf1 = cord_ibuf_take()
test:is(ibuf1:capacity(), 0, 'has no capacity')
local pos1 = ibuf1:alloc(1)
pos1[0] = 1
local ibuf2 = cord_ibuf_take()
test:isnt(ibuf1, ibuf2, 'can have 2 cord buffers')
test:is(ibuf2:size(), 0, 'second is empty')
local pos2 = ibuf2:alloc(1)
pos2[0] = 2
test:is(pos1[0], 1, 'change does not affect the first buffer')
cord_ibuf_put(ibuf2)
ibuf1 = ibuf2
fiber.yield()
ibuf2 = cord_ibuf_take()
test:is(ibuf1, ibuf2, 'yield drops the ownership')
cord_ibuf_put(ibuf2)
ibuf1 = nil
local f = fiber.new(function()
ibuf1 = cord_ibuf_take()
end)
f:set_joinable(true)
f:join()
test:isnt(ibuf1, nil, 'took a cord buf in a new fiber')
ibuf2 = cord_ibuf_take()
test:is(ibuf1, ibuf2, 'was freed on fiber stop and reused')
cord_ibuf_put(ibuf2)
end
local test = tap.test('buffer')
test:plan(1)
test:test("cord buffer", test_cord_ibuf)
os.exit(test:check() and 0 or 1)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment