From 2b6ed72088f3bc64f90eb153d3d6292911be99f8 Mon Sep 17 00:00:00 2001 From: Roman Tsisyk <roman@tsisyk.com> Date: Fri, 20 May 2016 07:21:24 +0300 Subject: [PATCH] phia: remove void *-style polymorphism from struct phia_document (#1452) --- src/box/phia.c | 553 +++++++++++++---------------------------- src/box/phia.h | 34 +++ src/box/phia_engine.cc | 12 +- src/box/phia_index.cc | 43 ++-- src/box/phia_space.cc | 14 +- 5 files changed, 241 insertions(+), 415 deletions(-) diff --git a/src/box/phia.c b/src/box/phia.c index e5f38d4be2..1fd13c765b 100644 --- a/src/box/phia.c +++ b/src/box/phia.c @@ -423,47 +423,6 @@ struct sstrace { char message[100]; }; -enum ssorder { - SS_LT, - SS_LTE, - SS_GT, - SS_GTE, - SS_EQ, - SS_STOP -}; - -static inline enum ssorder -ss_orderof(char *order, int size) -{ - enum ssorder cmp = SS_STOP; - if (strncmp(order, ">", size) == 0) { - cmp = SS_GT; - } else - if (strncmp(order, ">=", size) == 0) { - cmp = SS_GTE; - } else - if (strncmp(order, "<", size) == 0) { - cmp = SS_LT; - } else - if (strncmp(order, "<=", size) == 0) { - cmp = SS_LTE; - } - return cmp; -} - -static inline char* -ss_ordername(enum ssorder o) -{ - switch (o) { - case SS_LT: return "<"; - case SS_LTE: return "<="; - case SS_GT: return ">"; - case SS_GTE: return ">="; - default: break; - } - return NULL; -} - struct ssbuf { char *reserve; char *s, *p, *e; @@ -3051,7 +3010,7 @@ sf_limitfree(struct sflimit *b, struct ssa *a) } static inline void -sf_limitset(struct sflimit *b, struct sfscheme *s, struct sfv *fields, enum ssorder order) +sf_limitset(struct sflimit *b, struct sfscheme *s, struct sfv *fields, enum phia_order order) { int i; for (i = 0; i < s->fields_count; i++) { @@ -3061,7 +3020,7 @@ sf_limitset(struct sflimit *b, struct sfscheme *s, struct sfv *fields, enum ssor struct sffield *part = s->fields[i]; switch (part->type) { case SS_U32: - if (order == SS_LT || order == SS_LTE) { + if (order == PHIA_LT || order == PHIA_LE) { v->pointer = (char*)&b->u32_max; v->size = sizeof(uint32_t); } else { @@ -3070,7 +3029,7 @@ sf_limitset(struct sflimit *b, struct sfscheme *s, struct sfv *fields, enum ssor } break; case SS_U32REV: - if (order == SS_LT || order == SS_LTE) { + if (order == PHIA_LT || order == PHIA_LE) { v->pointer = (char*)&b->u32_min; v->size = sizeof(uint32_t); } else { @@ -3079,7 +3038,7 @@ sf_limitset(struct sflimit *b, struct sfscheme *s, struct sfv *fields, enum ssor } break; case SS_U64: - if (order == SS_LT || order == SS_LTE) { + if (order == PHIA_LT || order == PHIA_LE) { v->pointer = (char*)&b->u64_max; v->size = sizeof(uint64_t); } else { @@ -3088,7 +3047,7 @@ sf_limitset(struct sflimit *b, struct sfscheme *s, struct sfv *fields, enum ssor } break; case SS_U64REV: - if (order == SS_LT || order == SS_LTE) { + if (order == PHIA_LT || order == PHIA_LE) { v->pointer = (char*)&b->u64_min; v->size = sizeof(uint64_t); } else { @@ -3097,7 +3056,7 @@ sf_limitset(struct sflimit *b, struct sfscheme *s, struct sfv *fields, enum ssor } break; case SS_I64: - if (order == SS_LT || order == SS_LTE) { + if (order == PHIA_LT || order == PHIA_LE) { v->pointer = (char*)&b->i64_max; v->size = sizeof(int64_t); } else { @@ -3106,7 +3065,7 @@ sf_limitset(struct sflimit *b, struct sfscheme *s, struct sfv *fields, enum ssor } break; case SS_STRING: - if (order == SS_LT || order == SS_LTE) { + if (order == PHIA_LT || order == PHIA_LE) { v->pointer = b->string_max; v->size = b->string_max_size; } else { @@ -5190,7 +5149,7 @@ sv_mergeadd(struct svmerge *m, struct ssiter *i) */ struct PACKED svmergeiter { - enum ssorder order; + enum phia_order order; struct svmerge *merge; struct svmergesrc *src, *end; struct svmergesrc *v; @@ -5303,12 +5262,12 @@ static inline void sv_mergeiter_next(struct svmergeiter *im) { switch (im->order) { - case SS_GT: - case SS_GTE: + case PHIA_GT: + case PHIA_GE: sv_mergeiter_gt(im); break; - case SS_LT: - case SS_LTE: + case PHIA_LT: + case PHIA_LE: sv_mergeiter_lt(im); break; default: assert(0); @@ -5317,7 +5276,7 @@ sv_mergeiter_next(struct svmergeiter *im) static inline int sv_mergeiter_open(struct svmergeiter *im, struct runtime *r, - struct svmerge *m, enum ssorder o) + struct svmerge *m, enum phia_order o) { im->merge = m; im->r = r; @@ -5726,13 +5685,13 @@ struct PACKED svindexiter { struct ssrbnode *v; struct svref *vcur; struct sv current; - enum ssorder order; + enum phia_order order; }; static struct ssiterif sv_indexiterif; static inline int -sv_indexiter_open(struct ssiter *i, struct runtime *r, struct svindex *index, enum ssorder o, void *key, int keysize) +sv_indexiter_open(struct ssiter *i, struct runtime *r, struct svindex *index, enum phia_order o, void *key, int keysize) { i->vif = &sv_indexiterif; struct svindexiter *ii = (struct svindexiter*)i->priv; @@ -5744,8 +5703,8 @@ sv_indexiter_open(struct ssiter *i, struct runtime *r, struct svindex *index, en int rc; int eq = 0; switch (ii->order) { - case SS_LT: - case SS_LTE: + case PHIA_LT: + case PHIA_LE: if (unlikely(key == NULL)) { ii->v = ss_rbmax(&ii->index->i); break; @@ -5756,7 +5715,7 @@ sv_indexiter_open(struct ssiter *i, struct runtime *r, struct svindex *index, en switch (rc) { case 0: eq = 1; - if (ii->order == SS_LT) + if (ii->order == PHIA_LT) ii->v = ss_rbprev(&ii->index->i, ii->v); break; case 1: @@ -5764,8 +5723,8 @@ sv_indexiter_open(struct ssiter *i, struct runtime *r, struct svindex *index, en break; } break; - case SS_GT: - case SS_GTE: + case PHIA_GT: + case PHIA_GE: if (unlikely(key == NULL)) { ii->v = ss_rbmin(&ii->index->i); break; @@ -5776,7 +5735,7 @@ sv_indexiter_open(struct ssiter *i, struct runtime *r, struct svindex *index, en switch (rc) { case 0: eq = 1; - if (ii->order == SS_GT) + if (ii->order == PHIA_GT) ii->v = ss_rbnext(&ii->index->i, ii->v); break; case -1: @@ -5828,12 +5787,12 @@ sv_indexiter_next(struct ssiter *i) return; } switch (ii->order) { - case SS_LT: - case SS_LTE: + case PHIA_LT: + case PHIA_LE: ii->v = ss_rbprev(&ii->index->i, ii->v); break; - case SS_GT: - case SS_GTE: + case PHIA_GT: + case PHIA_GE: ii->v = ss_rbnext(&ii->index->i, ii->v); break; default: assert(0); @@ -7049,7 +7008,7 @@ struct PACKED sdpageiter { int64_t pos; struct sdv *v; struct sv current; - enum ssorder order; + enum phia_order order; void *key; int keysize; struct runtime *r; @@ -7210,7 +7169,7 @@ sd_pageiter_lt(struct sdpageiter *i, int e) static inline int sd_pageiter_open(struct sdpageiter *pi, struct runtime *r, struct ssbuf *xfbuf, - struct sdpage *page, enum ssorder o, void *key, int keysize) + struct sdpage *page, enum phia_order o, void *key, int keysize) { pi->r = r; pi->page = page; @@ -7226,13 +7185,13 @@ sd_pageiter_open(struct sdpageiter *pi, struct runtime *r, struct ssbuf *xfbuf, } int rc = 0; switch (pi->order) { - case SS_GT: rc = sd_pageiter_gt(pi, 0); + case PHIA_GT: rc = sd_pageiter_gt(pi, 0); break; - case SS_GTE: rc = sd_pageiter_gt(pi, 1); + case PHIA_GE: rc = sd_pageiter_gt(pi, 1); break; - case SS_LT: rc = sd_pageiter_lt(pi, 0); + case PHIA_LT: rc = sd_pageiter_lt(pi, 0); break; - case SS_LTE: rc = sd_pageiter_lt(pi, 1); + case PHIA_LE: rc = sd_pageiter_lt(pi, 1); break; default: assert(0); } @@ -7260,8 +7219,8 @@ sd_pageiter_next(struct sdpageiter *pi) if (pi->v == NULL) return; switch (pi->order) { - case SS_GTE: - case SS_GT: + case PHIA_GE: + case PHIA_GT: pi->pos++; if (unlikely(pi->pos >= pi->page->h->count)) { sd_pageiter_end(pi); @@ -7269,8 +7228,8 @@ sd_pageiter_next(struct sdpageiter *pi) } pi->v = sd_pagev(pi->page, pi->pos); break; - case SS_LT: - case SS_LTE: { + case PHIA_LT: + case PHIA_LE: { /* key (dup) (dup) key (eof) */ struct sdv *v; int64_t pos = pi->pos + 1; @@ -7485,7 +7444,7 @@ struct PACKED sdindexiter { struct sdindex *index; struct sdindexpage *v; int pos; - enum ssorder cmp; + enum phia_order cmp; void *key; int keysize; struct runtime *r; @@ -7518,7 +7477,7 @@ sd_indexiter_route(struct sdindexiter *i) static inline int sd_indexiter_open(struct sdindexiter *ii, struct runtime *r, - struct sdindex *index, enum ssorder o, void *key, + struct sdindex *index, enum phia_order o, void *key, int keysize) { ii->r = r; @@ -7536,11 +7495,11 @@ sd_indexiter_open(struct sdindexiter *ii, struct runtime *r, } if (ii->key == NULL) { switch (ii->cmp) { - case SS_LT: - case SS_LTE: ii->pos = ii->index->h->count - 1; + case PHIA_LT: + case PHIA_LE: ii->pos = ii->index->h->count - 1; break; - case SS_GT: - case SS_GTE: ii->pos = 0; + case PHIA_GT: + case PHIA_GE: ii->pos = 0; break; default: assert(0); @@ -7554,18 +7513,18 @@ sd_indexiter_open(struct sdindexiter *ii, struct runtime *r, struct sdindexpage *p = sd_indexpage(ii->index, ii->pos); int rc; switch (ii->cmp) { - case SS_LTE: - case SS_LT: + case PHIA_LE: + case PHIA_LT: rc = sf_compare(ii->r->scheme, sd_indexpage_min(ii->index, p), p->sizemin, ii->key, ii->keysize); - if (rc == 1 || (rc == 0 && ii->cmp == SS_LT)) + if (rc == 1 || (rc == 0 && ii->cmp == PHIA_LT)) ii->pos--; break; - case SS_GTE: - case SS_GT: + case PHIA_GE: + case PHIA_GT: rc = sf_compare(ii->r->scheme, sd_indexpage_max(ii->index, p), p->sizemax, ii->key, ii->keysize); - if (rc == -1 || (rc == 0 && ii->cmp == SS_GT)) + if (rc == -1 || (rc == 0 && ii->cmp == PHIA_GT)) ii->pos++; break; default: assert(0); @@ -7587,11 +7546,11 @@ static inline void sd_indexiter_next(struct sdindexiter *ii) { switch (ii->cmp) { - case SS_LT: - case SS_LTE: ii->pos--; + case PHIA_LT: + case PHIA_LE: ii->pos--; break; - case SS_GT: - case SS_GTE: ii->pos++; + case PHIA_GT: + case PHIA_GE: ii->pos++; break; default: assert(0); @@ -7796,7 +7755,7 @@ struct sdreadarg { struct sdindexiter *index_iter; struct sdpageiter *page_iter; struct ssfile *file; - enum ssorder o; + enum phia_order o; int has; uint64_t has_vlsn; int use_compression; @@ -7914,7 +7873,7 @@ sd_read_open(struct ssiter *iptr, struct sdreadarg *arg, void *key, int keysize) if (i->ref == NULL) return 0; if (arg->has) { - assert(arg->o == SS_GTE); + assert(arg->o == PHIA_GE); if (likely(i->ref->lsnmax <= arg->has_vlsn)) { i->ref = NULL; return 0; @@ -9872,7 +9831,7 @@ static void si_write(struct sitx*, struct svlog*, struct svlogindex*, uint64_t, int); struct siread { - enum ssorder order; + enum phia_order order; void *prefix; void *key; uint32_t keysize; @@ -9893,7 +9852,7 @@ struct siread { }; static int -si_readopen(struct siread*, struct si*, struct sicache*, enum ssorder, +si_readopen(struct siread*, struct si*, struct sicache*, enum phia_order, uint64_t, void*, uint32_t, void*, uint32_t); @@ -9908,7 +9867,7 @@ static int si_readcommited(struct si*, struct runtime*, struct sv*, int); struct PACKED siiter { struct si *index; struct ssrbnode *v; - enum ssorder order; + enum phia_order order; void *key; int keysize; }; @@ -9918,7 +9877,7 @@ ss_rbget(si_itermatch, static inline int si_iter_open(struct siiter *ii, struct runtime *r, - struct si *index, enum ssorder o, void *key, int keysize) + struct si *index, enum phia_order o, void *key, int keysize) { ii->index = index; ii->order = o; @@ -9932,12 +9891,12 @@ si_iter_open(struct siiter *ii, struct runtime *r, } if (unlikely(ii->key == NULL)) { switch (ii->order) { - case SS_LT: - case SS_LTE: + case PHIA_LT: + case PHIA_LE: ii->v = ss_rbmax(&ii->index->i); break; - case SS_GT: - case SS_GTE: + case PHIA_GT: + case PHIA_GE: ii->v = ss_rbmin(&ii->index->i); break; default: @@ -9981,12 +9940,12 @@ static inline void si_iter_next(struct siiter *ii) { switch (ii->order) { - case SS_LT: - case SS_LTE: + case PHIA_LT: + case PHIA_LE: ii->v = ss_rbprev(&ii->index->i, ii->v); break; - case SS_GT: - case SS_GTE: + case PHIA_GT: + case PHIA_GE: ii->v = ss_rbnext(&ii->index->i, ii->v); break; default: assert(0); @@ -10323,9 +10282,9 @@ si_branchcreate(struct si *index, struct sdc *c, if (unlikely(rc == -1)) return -1; struct svmergesrc *s = sv_mergeadd(&vmerge, NULL); - sv_indexiter_open(&s->src, r, vindex, SS_GTE, NULL, 0); + sv_indexiter_open(&s->src, r, vindex, PHIA_GE, NULL, 0); struct svmergeiter im; - sv_mergeiter_open(&im, r, &vmerge, SS_GTE); + sv_mergeiter_open(&im, r, &vmerge, PHIA_GE); /* merge iter is not used */ struct sdmergeconf mergeconf = { @@ -10551,7 +10510,7 @@ si_compact(struct si *index, struct sdc *c, struct siplan *plan, .compression_if = compression_if, .has = 0, .has_vlsn = 0, - .o = SS_GTE, + .o = PHIA_GE, .file = &node->file, .r = r }; @@ -10564,7 +10523,7 @@ si_compact(struct si *index, struct sdc *c, struct siplan *plan, b = b->next; } struct svmergeiter im; - sv_mergeiter_open(&im, r, &merge, SS_GTE); + sv_mergeiter_open(&im, r, &merge, PHIA_GE); rc = si_merge(index, c, node, vlsn, vlsn_lru, &im, size_stream, count); sv_mergefree(&merge, r->a); return rc; @@ -10589,7 +10548,7 @@ si_compact_index(struct si *index, struct sdc *c, struct siplan *plan, uint64_t size_stream = sv_indexused(vindex); struct ssiter i; - sv_indexiter_open(&i, &index->r, vindex, SS_GTE, NULL, 0); + sv_indexiter_open(&i, &index->r, vindex, PHIA_GE, NULL, 0); return si_compact(index, c, plan, vlsn, vlsn_lru, &i, size_stream); } @@ -10702,7 +10661,7 @@ si_redistribute(struct si *index, struct runtime *r, struct sdc *c, struct sinod (void)index; struct svindex *vindex = si_nodeindex(node); struct ssiter i; - sv_indexiter_open(&i, r, vindex, SS_GTE, NULL, 0); + sv_indexiter_open(&i, r, vindex, PHIA_GE, NULL, 0); while (sv_indexiter_has(&i)) { struct sv *v = sv_indexiter_get(&i); @@ -10759,7 +10718,7 @@ si_redistribute_set(struct si *index, struct runtime *r, uint64_t now, struct sv index->update_time = now; /* match node */ struct siiter ii; - si_iter_open(&ii, r, index, SS_GTE, sv_vpointer(v->v), v->v->size); + si_iter_open(&ii, r, index, PHIA_GE, sv_vpointer(v->v), v->v->size); struct sinode *node = si_iter_get(&ii); assert(node != NULL); /* update node */ @@ -10776,7 +10735,7 @@ si_redistribute_index(struct si *index, struct runtime *r, struct sdc *c, struct { struct svindex *vindex = si_nodeindex(node); struct ssiter i; - sv_indexiter_open(&i, r, vindex, SS_GTE, NULL, 0); + sv_indexiter_open(&i, r, vindex, PHIA_GE, NULL, 0); while (sv_indexiter_has(&i)) { struct sv *v = sv_indexiter_get(&i); int rc = ss_bufadd(&c->b, r->a, &v->v, sizeof(struct svref**)); @@ -11801,7 +11760,7 @@ static int si_profiler(struct siprofiler *p) } static int -si_readopen(struct siread *q, struct si *i, struct sicache *c, enum ssorder o, +si_readopen(struct siread *q, struct si *i, struct sicache *c, enum phia_order o, uint64_t vlsn, void *prefix, uint32_t prefixsize, void *key, uint32_t keysize) @@ -11931,7 +11890,7 @@ si_getindex(struct siread *q, struct sinode *n) int rc; if (first->count > 0) { rc = sv_indexiter_open(&i, q->r, first, - SS_GTE, q->key, q->keysize); + PHIA_GE, q->key, q->keysize); if (rc) { goto result; } @@ -11939,7 +11898,7 @@ si_getindex(struct siread *q, struct sinode *n) if (likely(second == NULL || !second->count)) return 0; rc = sv_indexiter_open(&i, q->r, second, - SS_GTE, q->key, q->keysize); + PHIA_GE, q->key, q->keysize); if (! rc) { return 0; } @@ -11991,7 +11950,7 @@ si_getbranch(struct siread *q, struct sinode *n, struct sicachebranch *c) .compression_if = compression_if, .has = q->has, .has_vlsn = q->vlsn, - .o = SS_GTE, + .o = PHIA_GE, .file = &n->file, .r = q->r }; @@ -12004,7 +11963,7 @@ si_getbranch(struct siread *q, struct sinode *n, struct sicachebranch *c) sv_mergereset(&q->merge); sv_mergeadd(&q->merge, &c->i); struct svmergeiter im; - sv_mergeiter_open(&im, q->r, &q->merge, SS_GTE); + sv_mergeiter_open(&im, q->r, &q->merge, PHIA_GE); uint64_t vlsn = q->vlsn; if (unlikely(q->has)) vlsn = UINT64_MAX; @@ -12021,7 +11980,7 @@ si_get(struct siread *q) { assert(q->key != NULL); struct siiter ii; - si_iter_open(&ii, q->r, q->index, SS_GTE, q->key, q->keysize); + si_iter_open(&ii, q->r, q->index, PHIA_GE, q->key, q->keysize); struct sinode *node; node = si_iter_get(&ii); assert(node != NULL); @@ -12205,7 +12164,7 @@ si_range(struct siread *q) } rc = 1; - /* convert upsert search to SS_EQ */ + /* convert upsert search to PHIA_EQ */ if (q->upsert_eq) { rc = sf_compare(q->r->scheme, sv_pointer(v), sv_size(v), q->key, q->keysize); @@ -12230,12 +12189,12 @@ si_range(struct siread *q) static int si_read(struct siread *q) { switch (q->order) { - case SS_EQ: + case PHIA_EQ: return si_get(q); - case SS_LT: - case SS_LTE: - case SS_GT: - case SS_GTE: + case PHIA_LT: + case PHIA_LE: + case PHIA_GT: + case PHIA_GE: return si_range(q); default: break; @@ -12248,7 +12207,7 @@ si_readcommited(struct si *index, struct runtime *r, struct sv *v, int recover) { /* search node index */ struct siiter ii; - si_iter_open(&ii, r, index, SS_GTE, sv_pointer(v), sv_size(v)); + si_iter_open(&ii, r, index, PHIA_GE, sv_pointer(v), sv_size(v)); struct sinode *node; node = si_iter_get(&ii); assert(node != NULL); @@ -12261,7 +12220,7 @@ si_readcommited(struct si *index, struct runtime *r, struct sv *v, int recover) struct svindex *first = si_nodeindex_priority(node, &second); if (likely(first->count > 0)) { struct ssiter i; - rc = sv_indexiter_open(&i, r, first, SS_GTE, + rc = sv_indexiter_open(&i, r, first, PHIA_GE, sv_pointer(v), sv_size(v)); if (rc) { struct sv *ref = sv_indexiter_get(&i); @@ -12271,7 +12230,7 @@ si_readcommited(struct si *index, struct runtime *r, struct sv *v, int recover) } if (second && !second->count) { struct ssiter i; - rc = sv_indexiter_open(&i, r, second, SS_GTE, + rc = sv_indexiter_open(&i, r, second, PHIA_GE, sv_pointer(v), sv_size(v)); if (rc) { struct sv *ref = sv_indexiter_get(&i); @@ -12286,7 +12245,7 @@ si_readcommited(struct si *index, struct runtime *r, struct sv *v, int recover) for (b = node->branch; b; b = b->next) { struct sdindexiter ii; - sd_indexiter_open(&ii, r, &b->index, SS_GTE, + sd_indexiter_open(&ii, r, &b->index, PHIA_GE, sv_pointer(v), sv_size(v)); struct sdindexpage *page = sd_indexiter_get(&ii); if (page == NULL) @@ -13049,7 +13008,7 @@ static inline int si_set(struct sitx *x, struct svv *v, uint64_t time) index->update_time = time; /* match node */ struct siiter ii; - si_iter_open(&ii, &index->r, index, SS_GTE, + si_iter_open(&ii, &index->r, index, PHIA_GE, sv_vpointer(v), v->size); struct sinode *node = si_iter_get(&ii); assert(node != NULL); @@ -13251,7 +13210,7 @@ struct screadarg { struct sv vup; struct sicache *cache; int cachegc; - enum ssorder order; + enum phia_order order; int has; int upsert; int upsert_eq; @@ -13975,12 +13934,25 @@ struct phia_env { struct runtime r; }; +struct phia_index { + struct phia_env *env; + uint32_t created; + struct siprofiler rtp; + struct sischeme *scheme; + struct si *index; + struct runtime *r; + struct sxindex coindex; + uint64_t txn_min; + uint64_t txn_max; + /** Member of env->db list. */ + struct rlist link; +}; + struct phia_document { - struct so o; struct phia_index *db; int created; struct sv v; - enum ssorder order; + enum phia_order order; int orderset; int flagset; struct sfv fields[8]; @@ -14023,11 +13995,10 @@ phia_env_get_error(struct phia_env *env) static struct phia_document * phia_document_new(struct phia_env*, struct phia_index *, const struct sv*); -static int -phia_document_destroy(struct so *o) +int +phia_document_delete(struct phia_document *v) { - struct phia_document *v = se_cast(o, struct phia_document*, SEDOCUMENT); - struct phia_env *e = o->env; + struct phia_env *e = v->db->env; if (v->v.v) si_gcv(&e->r, v->v.v); v->v.v = NULL; @@ -14043,7 +14014,7 @@ phia_document_destroy(struct so *o) static inline int phia_document_validate_ro(struct phia_document *o, struct phia_index *dest) { - struct phia_env *e = o->o.env; + struct phia_env *e = o->db->env; if (unlikely(o->db != dest)) return sr_error(&e->error, "%s", "incompatible document parent db"); struct svv *v = o->v.v; @@ -14057,10 +14028,9 @@ phia_document_validate_ro(struct phia_document *o, struct phia_index *dest) static inline int phia_document_create(struct phia_document *o); -static int -phia_document_open(struct so *o) +int +phia_document_open(struct phia_document *v) { - struct phia_document *v = se_cast(o, struct phia_document*, SEDOCUMENT); if (likely(v->created)) { assert(v->v.v != NULL); return 0; @@ -14075,7 +14045,7 @@ phia_document_open(struct so *o) static inline int phia_document_validate(struct phia_document *o, struct phia_index *dest, uint8_t flags) { - struct phia_env *e = o->o.env; + struct phia_env *e = o->db->env; if (unlikely(o->db != dest)) return sr_error(&e->error, "%s", "incompatible document parent db"); struct svv *v = o->v.v; @@ -14094,20 +14064,6 @@ phia_document_validate(struct phia_document *o, struct phia_index *dest, uint8_t return 0; } -struct phia_index { - struct phia_env *env; - uint32_t created; - struct siprofiler rtp; - struct sischeme *scheme; - struct si *index; - struct runtime *r; - struct sxindex coindex; - uint64_t txn_min; - uint64_t txn_max; - /** Member of env->db list. */ - struct rlist link; -}; - static inline int phia_index_active(struct phia_index *o) { return si_active(o->index); @@ -14848,7 +14804,7 @@ phia_cursor_get(struct phia_cursor *c, struct phia_document *key) { struct phia_index *db = key->db; if (unlikely(! key->orderset)) - key->order = SS_GTE; + key->order = PHIA_GE; /* this statistics might be not complete, because * last statement is not accounted here */ c->read_disk += key->read_disk; @@ -15200,10 +15156,10 @@ phia_index_result(struct phia_env *e, struct scread *r) * the result one */ v->orderset = 1; v->order = r->arg.order; - if (v->order == SS_GTE) - v->order = SS_GT; - else if (v->order == SS_LTE) - v->order = SS_LT; + if (v->order == PHIA_GE) + v->order = PHIA_GT; + else if (v->order == PHIA_LE) + v->order = PHIA_LT; /* set prefix */ if (r->arg.prefix) { @@ -15226,7 +15182,7 @@ phia_index_read(struct phia_index *db, struct phia_document *o, /* prepare the key */ int auto_close = !o->created; - int rc = phia_document_open(&o->o); + int rc = phia_document_open(o); if (unlikely(rc == -1)) goto error; rc = phia_document_validate_ro(o, db); @@ -15241,7 +15197,7 @@ phia_index_read(struct phia_index *db, struct phia_document *o, struct phia_document *ret = NULL; /* concurrent */ - if (x_search && o->order == SS_EQ) { + if (x_search && o->order == PHIA_EQ) { /* note: prefix is ignored during concurrent * index search */ int rc = sx_get(x, &db->coindex, &o->v, &vup); @@ -15259,7 +15215,7 @@ phia_index_read(struct phia_index *db, struct phia_document *o, sv_vunref(db->r, vup.v); } if (auto_close) - phia_document_destroy(&o->o); + phia_document_delete(o); return ret; } } else { @@ -15307,8 +15263,8 @@ phia_index_read(struct phia_index *db, struct phia_document *o, } if (sf_upserthas(&db->scheme->fmt_upsert)) { arg->upsert = 1; - if (arg->order == SS_EQ) { - arg->order = SS_GTE; + if (arg->order == PHIA_EQ) { + arg->order = PHIA_GE; arg->upsert_eq = 1; } } @@ -15323,11 +15279,11 @@ phia_index_read(struct phia_index *db, struct phia_document *o, sc_readclose(&q); if (auto_close) - phia_document_destroy(&o->o); + phia_document_delete(o); return ret; error: if (auto_close) - phia_document_destroy(&o->o); + phia_document_delete(o); return NULL; } @@ -15456,61 +15412,6 @@ static int phia_index_recoverend(struct phia_index *db) return 0; } - -enum { - SE_DOCUMENT_FIELD, - SE_DOCUMENT_ORDER, - SE_DOCUMENT_PREFIX, - SE_DOCUMENT_LSN, - SE_DOCUMENT_RAW, - SE_DOCUMENT_FLAGS, - SE_DOCUMENT_CACHE_ONLY, - SE_DOCUMENT_OLDEST_ONLY, - SE_DOCUMENT_EVENT, - SE_DOCUMENT_REUSE, - SE_DOCUMENT_UNKNOWN -}; - -static inline int -phia_document_opt(const char *path) -{ - switch (path[0]) { - case 'o': - if (likely(strcmp(path, "order") == 0)) - return SE_DOCUMENT_ORDER; - if (likely(strcmp(path, "oldest_only") == 0)) - return SE_DOCUMENT_OLDEST_ONLY; - break; - case 'l': - if (likely(strcmp(path, "lsn") == 0)) - return SE_DOCUMENT_LSN; - break; - case 'p': - if (likely(strcmp(path, "prefix") == 0)) - return SE_DOCUMENT_PREFIX; - break; - case 'r': - if (likely(strcmp(path, "raw") == 0)) - return SE_DOCUMENT_RAW; - if (likely(strcmp(path, "reuse") == 0)) - return SE_DOCUMENT_REUSE; - break; - case 'f': - if (likely(strcmp(path, "flags") == 0)) - return SE_DOCUMENT_FLAGS; - break; - case 'c': - if (likely(strcmp(path, "cache_only") == 0)) - return SE_DOCUMENT_CACHE_ONLY; - break; - case 'e': - if (likely(strcmp(path, "event") == 0)) - return SE_DOCUMENT_EVENT; - break; - } - return SE_DOCUMENT_FIELD; -} - static inline int phia_document_create(struct phia_document *o) { @@ -15545,7 +15446,7 @@ phia_document_create(struct phia_document *o) memset(o->fields, 0, sizeof(o->fields)); o->fields[0].pointer = o->prefix; o->fields[0].size = o->prefixsize; - sf_limitset(&e->limit, &db->scheme->scheme, o->fields, SS_GTE); + sf_limitset(&e->limit, &db->scheme->scheme, o->fields, PHIA_GE); goto allocate; } } @@ -15570,14 +15471,15 @@ phia_document_create(struct phia_document *o) return 0; } -static struct sfv* -phia_document_setfield(struct phia_document *v, const char *path, void *pointer, int size) +int +phia_document_set_field(struct phia_document *v, const char *path, + const char *pointer, int size) { - struct phia_env *e = v->o.env; struct phia_index *db = v->db; + struct phia_env *e = db->env; struct sffield *field = sf_schemefind(&db->scheme->scheme, (char*)path); if (unlikely(field == NULL)) - return NULL; + return -1; assert(field->position < (int)(sizeof(v->fields) / sizeof(struct sfv))); struct sfv *fv = &v->fields[field->position]; if (size == 0) @@ -15591,169 +15493,61 @@ phia_document_setfield(struct phia_document *v, const char *path, void *pointer, if (unlikely(size > fieldsize_max)) { sr_error(&e->error, "field '%s' is too big (%d limit)", pointer, fieldsize_max); - return NULL; + return -1; } if (fv->pointer == NULL) { v->fields_count++; if (field->key) v->fields_count_keys++; } - fv->pointer = pointer; + fv->pointer = (char *)pointer; fv->size = size; sr_statkey(&e->stat, size); - return fv; -} - -static int -phia_document_setstring(struct so *o, const char *path, void *pointer, int size) -{ - struct phia_document *v = se_cast(o, struct phia_document*, SEDOCUMENT); - struct phia_env *e = o->env; - if (unlikely(v->v.v)) - return sr_error(&e->error, "%s", "document is read-only"); - switch (phia_document_opt(path)) { - case SE_DOCUMENT_FIELD: { - struct sfv *fv = phia_document_setfield(v, path, pointer, size); - if (unlikely(fv == NULL)) - return -1; - break; - } - case SE_DOCUMENT_ORDER: - if (size == 0) - size = strlen(pointer); - enum ssorder cmp = ss_orderof(pointer, size); - if (unlikely(cmp == SS_STOP)) { - sr_error(&e->error, "%s", "bad order name"); - return -1; - } - v->order = cmp; - v->orderset = 1; - break; - case SE_DOCUMENT_PREFIX: - v->prefix = pointer; - v->prefixsize = size; - break; - case SE_DOCUMENT_RAW: - v->raw = pointer; - v->rawsize = size; - break; - default: - return -1; - } return 0; } -static void* -phia_document_getstring(struct so *o, const char *path, int *size) -{ - struct phia_document *v = se_cast(o, struct phia_document*, SEDOCUMENT); - switch (phia_document_opt(path)) { - case SE_DOCUMENT_FIELD: { - /* match field */ - struct phia_index *db = v->db; - struct sffield *field = sf_schemefind(&db->scheme->scheme, (char*)path); - if (unlikely(field == NULL)) - return NULL; - /* result document */ - if (v->v.v) - return sv_field(&v->v, db->r, field->position, (uint32_t*)size); - /* field document */ - assert(field->position < (int)(sizeof(v->fields) / sizeof(struct sfv))); - struct sfv *fv = &v->fields[field->position]; - if (fv->pointer == NULL) - return NULL; - if (size) - *size = fv->size; - return fv->pointer; - } - case SE_DOCUMENT_PREFIX: { - if (v->prefix == NULL) - return NULL; - if (size) - *size = v->prefixsize; - return v->prefix; - } - case SE_DOCUMENT_ORDER: { - char *order = ss_ordername(v->order); - if (size) - *size = strlen(order) + 1; - return order; - } - case SE_DOCUMENT_EVENT: { - char *type = "none"; - if (v->event == 1) - type = "on_backup"; - if (size) - *size = strlen(type); - return type; - } - case SE_DOCUMENT_RAW: - if (v->raw) { - if (size) - *size = v->rawsize; - return v->raw; - } - if (v->v.v == NULL) - return NULL; - if (size) - *size = sv_size(&v->v); - return sv_pointer(&v->v); - } - return NULL; +void +phia_document_set_order(struct phia_document *doc, enum phia_order order) +{ + doc->order = order; + doc->orderset = 1; } -static int -phia_document_setint(struct so *o, const char *path, int64_t num) +char * +phia_document_field(struct phia_document *v, const char *path, int *size) { - struct phia_document *v = se_cast(o, struct phia_document*, SEDOCUMENT); - switch (phia_document_opt(path)) { - case SE_DOCUMENT_CACHE_ONLY: - v->cache_only = num; - break; - case SE_DOCUMENT_OLDEST_ONLY: - v->oldest_only = num; - break; - default: - return -1; - } - return 0; + /* match field */ + struct phia_index *db = v->db; + struct sffield *field = sf_schemefind(&db->scheme->scheme, (char*)path); + if (unlikely(field == NULL)) + return NULL; + /* result document */ + if (v->v.v) + return sv_field(&v->v, db->r, field->position, (uint32_t*)size); + /* field document */ + assert(field->position < (int)(sizeof(v->fields) / sizeof(struct sfv))); + struct sfv *fv = &v->fields[field->position]; + if (fv->pointer == NULL) + return NULL; + if (size) + *size = fv->size; + return fv->pointer; } -static int64_t -phia_document_getint(struct so *o, const char *path) -{ - struct phia_document *v = se_cast(o, struct phia_document*, SEDOCUMENT); - switch (phia_document_opt(path)) { - case SE_DOCUMENT_LSN: { - uint64_t lsn = -1; - if (v->v.v) - lsn = ((struct svv*)(v->v.v))->lsn; - return lsn; - } - case SE_DOCUMENT_EVENT: - return v->event; - case SE_DOCUMENT_CACHE_ONLY: - return v->cache_only; - case SE_DOCUMENT_FLAGS: { - uint64_t flags = -1; - if (v->v.v) - flags = ((struct svv*)(v->v.v))->flags; - return flags; - } - } - return -1; +void +phia_document_set_cache_only(struct phia_document *doc, bool cache_only) +{ + doc->cache_only = cache_only; } -static struct soif sedocumentif = +int64_t +phia_document_lsn(struct phia_document *v) { - .open = phia_document_open, - .destroy = phia_document_destroy, - .setstring = phia_document_setstring, - .setint = phia_document_setint, - .getstring = phia_document_getstring, - .getint = phia_document_getint, - .get = NULL, -}; + uint64_t lsn = -1; + if (v->v.v) + lsn = ((struct svv*)(v->v.v))->lsn; + return lsn; +} static struct phia_document * phia_document_new(struct phia_env *e, struct phia_index *db, const struct sv *vp) @@ -15765,8 +15559,7 @@ phia_document_new(struct phia_env *e, struct phia_index *db, const struct sv *vp return NULL; } memset(doc, 0, sizeof(*doc)); - so_init(&doc->o, &se_o[SEDOCUMENT], &sedocumentif, e); - doc->order = SS_EQ; + doc->order = PHIA_EQ; doc->db = db; if (vp) { doc->v = *vp; @@ -15820,7 +15613,7 @@ phia_tx_write(struct phia_tx *t, struct phia_document *o, uint8_t flags) } /* create document */ - int rc = phia_document_open(&o->o); + int rc = phia_document_open(o); if (unlikely(rc == -1)) goto error; rc = phia_document_validate(o, db, flags); @@ -15834,7 +15627,7 @@ phia_tx_write(struct phia_tx *t, struct phia_document *o, uint8_t flags) int size = sv_vsize(v); if (auto_close) { ss_quota(&e->quota, SS_QADD, size); - phia_document_destroy(&o->o); + phia_document_delete(o); } /* concurrent index only */ @@ -15847,7 +15640,7 @@ phia_tx_write(struct phia_tx *t, struct phia_document *o, uint8_t flags) return 0; error: if (auto_close) - phia_document_destroy(&o->o); + phia_document_delete(o); return -1; } @@ -15895,7 +15688,7 @@ phia_tx_get(struct phia_tx *t, struct phia_document *key) } return phia_index_read(db, key, &t->t, 1, NULL); error: - phia_document_destroy(&key->o); + phia_document_delete(key); return NULL; } @@ -15945,7 +15738,7 @@ phia_tx_prepare(struct sx *x, struct sv *v, struct phia_index *db, void *ptr) arg->prefixsize = 0; arg->cache = cache; arg->cachegc = 0; - arg->order = SS_EQ; + arg->order = PHIA_EQ; arg->has = 1; arg->upsert = 0; arg->upsert_eq = 0; diff --git a/src/box/phia.h b/src/box/phia.h index dc77ce61cf..babcfa3884 100644 --- a/src/box/phia.h +++ b/src/box/phia.h @@ -164,6 +164,40 @@ phia_tx_set_half_commit(struct phia_tx *tx, bool half_commit); struct phia_document * phia_tx_get(struct phia_tx *t, struct phia_document *key); +/* + * Document + */ + +enum phia_order { + PHIA_LT, + PHIA_LE, + PHIA_GT, + PHIA_GE, + PHIA_EQ +}; + +int +phia_document_open(struct phia_document *v); + +int +phia_document_delete(struct phia_document *v); + +int +phia_document_set_field(struct phia_document *doc, const char *path, + const char *value, int size); + +char * +phia_document_field(struct phia_document *v, const char *path, int *size); + +void +phia_document_set_cache_only(struct phia_document *doc, bool cache_only); + +void +phia_document_set_order(struct phia_document *doc, enum phia_order order); + +int64_t +phia_document_lsn(struct phia_document *doc); + #ifdef __cplusplus } #endif diff --git a/src/box/phia_engine.cc b/src/box/phia_engine.cc index b84416a82d..5a1a0a4375 100644 --- a/src/box/phia_engine.cc +++ b/src/box/phia_engine.cc @@ -69,7 +69,7 @@ phia_get_parts(struct key_def *key_def, struct phia_document *obj, }; for (uint32_t i = 0; i < key_def->part_count; i++) { int len = 0; - parts[i].iov_base = phia_getstring(obj, PARTNAMES[i], &len); + parts[i].iov_base = phia_document_field(obj, PARTNAMES[i], &len); parts[i].iov_len = len; assert(parts[i].iov_base != NULL); switch (key_def->parts[i].type) { @@ -124,7 +124,7 @@ phia_tuple_new(struct phia_document *obj, struct key_def *key_def, assert(key_def->part_count <= 8); struct iovec parts[8]; int valuesize = 0; - void *value = phia_getstring(obj, "value", &valuesize); + void *value = phia_document_field(obj, "value", &valuesize); uint32_t field_count = 0; size_t size = phia_get_parts(key_def, obj, value, valuesize, parts, &field_count); @@ -149,7 +149,7 @@ phia_tuple_data_new(struct phia_document *obj, struct key_def *key_def, assert(key_def->part_count <= 8); struct iovec parts[8]; int valuesize = 0; - void *value = phia_getstring(obj, "value", &valuesize); + void *value = phia_document_field(obj, "value", &valuesize); uint32_t field_count = 0; size_t size = phia_get_parts(key_def, obj, value, valuesize, parts, &field_count); @@ -279,7 +279,7 @@ phia_read_free_cb(struct coio_task *ptr) struct phia_read_task *task = (struct phia_read_task *) ptr; if (task->result != NULL) - phia_destroy(task->result); + phia_document_delete(task->result); mempool_free(&phia_read_pool, task); return 0; } @@ -434,7 +434,7 @@ join_send_space(struct space *sp, void *data) struct phia_document *obj = phia_document(pk->db); while ((obj = phia_cursor_get(cursor, obj))) { - int64_t lsn = phia_getint(obj, "lsn"); + int64_t lsn = phia_document_lsn(obj); uint32_t tuple_size; char *tuple = phia_tuple_data_new(obj, pk->key_def, &tuple_size); @@ -443,7 +443,7 @@ join_send_space(struct space *sp, void *data) tuple, tuple_size, lsn); } catch (Exception *e) { free(tuple); - phia_destroy(obj); + phia_document_delete(obj); throw; } free(tuple); diff --git a/src/box/phia_index.cc b/src/box/phia_index.cc index c067286a18..d5dd97c402 100644 --- a/src/box/phia_index.cc +++ b/src/box/phia_index.cc @@ -53,7 +53,6 @@ PhiaIndex::createDocument(const char *key, const char **keyend) struct phia_document *obj = phia_document(db); if (obj == NULL) tnt_raise(ClientError, ER_PHIA, phia_env_get_error(env)); - phia_setstring(obj, "arg", fiber(), 0); if (key == NULL) return obj; uint32_t i = 0; @@ -71,7 +70,7 @@ PhiaIndex::createDocument(const char *key, const char **keyend) } if (partsize == 0) part = ""; - if (phia_setstring(obj, partname, part, partsize) == -1) + if (phia_document_set_field(obj, partname, part, partsize) == -1) tnt_raise(ClientError, ER_PHIA, phia_env_get_error(env)); i++; @@ -149,11 +148,11 @@ PhiaIndex::findByKey(const char *key, uint32_t part_count = 0) const transaction = (struct phia_tx *) in_txn()->engine_tx; /* try to read from cache first, if nothing is found * retry using disk */ - phia_setint(obj, "cache_only", 1); + phia_document_set_cache_only(obj, true); int rc; - rc = phia_open(obj); + rc = phia_document_open(obj); if (rc == -1) { - phia_destroy(obj); + phia_document_delete(obj); tnt_raise(ClientError, ER_PHIA, phia_env_get_error(env)); } struct phia_document *result; @@ -163,20 +162,20 @@ PhiaIndex::findByKey(const char *key, uint32_t part_count = 0) const result = phia_tx_get(transaction, obj); } if (result == NULL) { - phia_setint(obj, "cache_only", 0); + phia_document_set_cache_only(obj, false); if (transaction == NULL) { result = phia_index_read(db, obj); } else { result = phia_tx_read(transaction, obj); } - phia_destroy(obj); + phia_document_delete(obj); if (result == NULL) return NULL; } else { - phia_destroy(obj); + phia_document_delete(obj); } struct tuple *tuple = phia_tuple_new(result, key_def, format); - phia_destroy(result); + phia_document_delete(result); return tuple; } @@ -209,7 +208,7 @@ phia_iterator_free(struct iterator *ptr) assert(ptr->free == phia_iterator_free); struct phia_iterator *it = (struct phia_iterator *) ptr; if (it->current) { - phia_destroy(it->current); + phia_document_delete(it->current); it->current = NULL; } if (it->cursor) { @@ -234,29 +233,29 @@ phia_iterator_next(struct iterator *ptr) /* read from cache */ struct phia_document *obj = phia_cursor_get(it->cursor, it->current); if (likely(obj != NULL)) { - phia_destroy(it->current); + phia_document_delete(it->current); it->current = obj; return phia_tuple_new(obj, it->key_def, it->space->format); } /* switch to asynchronous mode (read from disk) */ - phia_setint(it->current, "cache_only", 0); + phia_document_set_cache_only(it->current, false); obj = phia_cursor_read(it->cursor, it->current); if (obj == NULL) { ptr->next = phia_iterator_last; /* immediately close the cursor */ phia_cursor_delete(it->cursor); - phia_destroy(it->current); + phia_document_delete(it->current); it->current = NULL; it->cursor = NULL; return NULL; } - phia_destroy(it->current); + phia_document_delete(it->current); it->current = obj; /* switch back to synchronous mode */ - phia_setint(obj, "cache_only", 1); + phia_document_set_cache_only(obj, true); return phia_tuple_new(obj, it->key_def, it->space->format); } @@ -311,22 +310,22 @@ PhiaIndex::initIterator(struct iterator *ptr, it->env = env; it->db = db; it->current = NULL; - const char *compare; /* point-lookup iterator */ if (type == ITER_EQ) { ptr->next = phia_iterator_eq; return; } /* prepare for the range scan */ + enum phia_order order; switch (type) { case ITER_ALL: - case ITER_GE: compare = ">="; + case ITER_GE: order = PHIA_GE; break; - case ITER_GT: compare = ">"; + case ITER_GT: order = PHIA_GT; break; - case ITER_LE: compare = "<="; + case ITER_LE: order = PHIA_LE; break; - case ITER_LT: compare = "<"; + case ITER_LT: order = PHIA_LT; break; default: return initIterator(ptr, type, key, part_count); @@ -341,7 +340,7 @@ PhiaIndex::initIterator(struct iterator *ptr, */ PhiaIndex *index = (PhiaIndex *)this; struct phia_document *obj = index->createDocument(key, &it->keyend); - phia_setstring(obj, "order", compare, 0); + phia_document_set_order(obj, order); obj = phia_cursor_read(it->cursor, obj); if (obj == NULL) { phia_cursor_delete(it->cursor); @@ -350,6 +349,6 @@ PhiaIndex::initIterator(struct iterator *ptr, } it->current = obj; /* switch to sync mode (cache read) */ - phia_setint(obj, "cache_only", 1); + phia_document_set_cache_only(obj, 1); ptr->next = phia_iterator_first; } diff --git a/src/box/phia_space.cc b/src/box/phia_space.cc index c196546da5..696fd889da 100644 --- a/src/box/phia_space.cc +++ b/src/box/phia_space.cc @@ -66,13 +66,13 @@ PhiaSpace::applySnapshotRow(struct space *space, struct request *request) struct phia_document *obj = index->createDocument(key, &value); size_t valuesize = size - (value - request->tuple); if (valuesize > 0) - phia_setstring(obj, "value", value, valuesize); + phia_document_set_field(obj, "value", value, valuesize); assert(request->header != NULL); struct phia_tx *tx = phia_begin(index->env); if (tx == NULL) { - phia_destroy(obj); + phia_document_delete(obj); tnt_raise(ClientError, ER_PHIA, phia_env_get_error(index->env)); } @@ -90,7 +90,7 @@ PhiaSpace::applySnapshotRow(struct space *space, struct request *request) case 1: /* rollback */ return; case 2: /* lock */ - phia_destroy(tx); + phia_rollback(tx); /* must never happen during JOIN */ tnt_raise(ClientError, ER_TRANSACTION_CONFLICT); return; @@ -140,7 +140,7 @@ PhiaSpace::executeReplace(struct txn*, struct phia_document *obj = index->createDocument(key, &value); size_t valuesize = size - (value - request->tuple); if (valuesize > 0) - phia_setstring(obj, "value", value, valuesize); + phia_document_set_field(obj, "value", value, valuesize); int rc; rc = phia_replace(tx, obj); if (rc == -1) @@ -205,7 +205,7 @@ PhiaSpace::executeUpdate(struct txn*, struct space *space, struct phia_document *obj = index->createDocument(key, &value); size_t valuesize = new_tuple->bsize - (value - new_tuple->data); if (valuesize > 0) - phia_setstring(obj, "value", value, valuesize); + phia_document_set_field(obj, "value", value, valuesize); int rc; rc = phia_replace(tx, obj); if (rc == -1) @@ -424,7 +424,7 @@ PhiaSpace::executeUpsert(struct txn*, struct space *space, sizeof(uint8_t) + sizeof(uint32_t) + tuple_value_size + expr_size; char *value = (char *)malloc(value_size); if (value == NULL) { - phia_destroy(obj); + phia_document_delete(obj); tnt_raise(OutOfMemory, sizeof(value_size), "Phia Space", "executeUpsert"); } @@ -436,7 +436,7 @@ PhiaSpace::executeUpsert(struct txn*, struct space *space, memcpy(p, tuple_value, tuple_value_size); p += tuple_value_size; memcpy(p, expr, expr_size); - phia_setstring(obj, "value", value, value_size); + phia_document_set_field(obj, "value", value, value_size); struct phia_tx *tx = (struct phia_tx *)(in_txn()->engine_tx); int rc = phia_upsert(tx, obj); free(value); -- GitLab