Skip to content
Snippets Groups Projects
Commit e69838a1 authored by Konstantin Osipov's avatar Konstantin Osipov
Browse files

Blueprint memcached-separte-port: a follow up

Make sure we do not try to modify data in
read-only mode via memcached port.
parent 0a13f6a1
No related branches found
No related tags found
No related merge requests found
......@@ -28,6 +28,7 @@
#include <mod/box/index.h>
#include "exception.h"
#include "iproto.h"
#include <tbuf.h>
struct tarantool_cfg;
......@@ -128,14 +129,12 @@ enum box_mode {
ENUM(messages, MESSAGES);
struct box_txn *txn_alloc(u32 flags);
void box_process(struct box_txn *txn, u32 op, struct tbuf *request_data);
extern iproto_callback rw_callback;
/* These 3 are used to implemente memcached 'GET' */
struct box_txn *txn_alloc(u32 flags);
void tuple_txn_ref(struct box_txn *txn, struct box_tuple *tuple);
void txn_cleanup(struct box_txn *txn);
void *next_field(void *f);
void append_field(struct tbuf *b, void *f);
void *tuple_field(struct box_tuple *tuple, size_t i);
#endif /* TARANTOOL_BOX_H_INCLUDED */
......@@ -34,7 +34,6 @@
#include <cfg/warning.h>
#include <errcode.h>
#include <fiber.h>
#include <iproto.h>
#include <log_io.h>
#include <pickle.h>
#include <salloc.h>
......
This diff is collapsed.
......@@ -30,7 +30,7 @@
}%%
static int __attribute__((noinline))
memcached_dispatch(struct box_txn *txn)
memcached_dispatch()
{
int cs;
u8 *p, *pe;
......@@ -154,7 +154,7 @@ memcached_dispatch(struct box_txn *txn)
stats.cmd_set++;
@try {
store(txn, key, exptime, flags, bytes, data);
store(key, exptime, flags, bytes, data);
stats.total_items++;
add_iov(b->data, b->len);
add_iov("\r\n", 2);
......@@ -178,7 +178,7 @@ memcached_dispatch(struct box_txn *txn)
add_iov("NOT_FOUND\r\n", 11);
} else {
@try {
delete(txn, key);
delete(key);
add_iov("DELETED\r\n", 9);
}
@catch (ClientError *e) {
......@@ -190,6 +190,7 @@ memcached_dispatch(struct box_txn *txn)
}
action get {
struct box_txn *txn = txn_alloc(BOX_QUIET);
txn->op = SELECT;
fiber_register_cleanup((void *)txn_cleanup, txn);
stat_collect(stat_base, MEMC_GET, 1);
......
......@@ -65,7 +65,7 @@ natoq(const u8 *start, const u8 *end)
}
static void
store(struct box_txn *txn, void *key, u32 exptime, u32 flags, u32 bytes, u8 *data)
store(void *key, u32 exptime, u32 flags, u32 bytes, u8 *data)
{
u32 box_flags = BOX_QUIET, cardinality = 4;
static u64 cas = 42;
......@@ -96,20 +96,26 @@ store(struct box_txn *txn, void *key, u32 exptime, u32 flags, u32 bytes, u8 *dat
int key_len = load_varint32(&key);
say_debug("memcached/store key:(%i)'%.*s' exptime:%"PRIu32" flags:%"PRIu32" cas:%"PRIu64,
key_len, key_len, (u8 *)key, exptime, flags, cas);
box_process(txn, INSERT, req); /* FIXME: handle RW/RO */
/*
* Use a box dispatch wrapper which handles correctly
* read-only/read-write modes.
*/
rw_callback(INSERT, req);
}
static void
delete(struct box_txn *txn, void *key)
delete(void *key)
{
u32 key_len = 1;
u32 box_flags = BOX_QUIET;
struct tbuf *req = tbuf_alloc(fiber->pool);
tbuf_append(req, &cfg.memcached_namespace, sizeof(u32));
tbuf_append(req, &box_flags, sizeof(box_flags));
tbuf_append(req, &key_len, sizeof(key_len));
tbuf_append_field(req, key);
box_process(txn, DELETE_1_3, req);
rw_callback(DELETE, req);
}
static struct box_tuple *
......@@ -206,7 +212,7 @@ do { \
add_iov("SERVER_ERROR object too large for cache\r\n", 41); \
} else { \
@try { \
store(txn, key, exptime, flags, bytes, data); \
store(key, exptime, flags, bytes, data); \
stats.total_items++; \
add_iov("STORED\r\n", 8); \
} \
......@@ -223,7 +229,6 @@ do { \
void
memcached_handler(void *_data __attribute__((unused)))
{
struct box_txn *txn;
stats.total_connections++;
stats.curr_connections++;
int r, p;
......@@ -237,8 +242,7 @@ memcached_handler(void *_data __attribute__((unused)))
}
dispatch:
txn = txn_alloc(BOX_QUIET);
p = memcached_dispatch(txn);
p = memcached_dispatch();
if (p < 0) {
say_debug("negative dispatch, closing connection");
goto exit;
......@@ -400,12 +404,13 @@ memcached_expire_loop(void *data __attribute__((unused)))
}
while (keys_to_delete->len > 0) {
struct box_txn *txn = txn_alloc(BOX_QUIET);
@try {
delete(txn, read_field(keys_to_delete));
delete(read_field(keys_to_delete));
expired_keys++;
}
@catch (ClientError *e) {
/* expire is off when replication is on */
assert(e->errcode != ER_NONMASTER);
/* The error is already logged. */
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment