diff --git a/changelogs/unreleased/gh-1629-varbinary.md b/changelogs/unreleased/gh-1629-varbinary.md
new file mode 100644
index 0000000000000000000000000000000000000000..505bb96bc7ad53507a2e57548048b9e551d97213
--- /dev/null
+++ b/changelogs/unreleased/gh-1629-varbinary.md
@@ -0,0 +1,9 @@
+## feature/lua
+
+* **[Breaking change]** Added the new `varbinary` type to Lua. An object of
+  this type is similar to a plain string but encoded in MsgPack as `MP_BIN` so
+  it can be used for storing binary blobs in the database. This also works the
+  other way round: data fields stored as `MP_BIN` are now decoded in Lua as
+  varbinary objects, not as plain strings, as they used to be. Since the latter
+  may cause compatibility issues, the new compat option `binary_data_decoding`
+  was introduced to revert the built-in decoder to the old behavior (gh-1629).
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 134055995d915f0c3152131b8d4b38802e122369..547ee68f7efb838e0ee123fe26c1bf06f93db991 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -71,6 +71,7 @@ lua_source(lua_sources lua/timezones.lua timezones_lua)
 lua_source(lua_sources lua/print.lua print_lua)
 lua_source(lua_sources lua/pairs.lua pairs_lua)
 lua_source(lua_sources lua/compat.lua compat_lua)
+lua_source(lua_sources lua/varbinary.lua varbinary_lua)
 if (ENABLE_COMPRESS_MODULE)
     lua_source(lua_sources ${COMPRESS_MODULE_LUA_SOURCE} compress_lua)
 endif()
diff --git a/src/box/tuple_convert.c b/src/box/tuple_convert.c
index ba9a7e72ae887ab975f9264dedc88ea07a11d535..7c66f9d78d9010d9f3152feb409654af1aa14b16 100644
--- a/src/box/tuple_convert.c
+++ b/src/box/tuple_convert.c
@@ -176,14 +176,14 @@ encode_node(yaml_emitter_t *emitter, const char **data)
 		str = *data;
 		*data += len;
 		style = YAML_ANY_SCALAR_STYLE;
-		binlen = base64_bufsize(len, 0);
+		binlen = base64_encode_bufsize(len, BASE64_NOWRAP);
 		bin = (char *) malloc(binlen);
 		if (bin == NULL) {
 			diag_set(OutOfMemory, binlen, "malloc",
 				 "tuple_to_yaml");
 			return 0;
 		}
-		binlen = base64_encode(str, len, bin, binlen, 0);
+		binlen = base64_encode(str, len, bin, binlen, BASE64_NOWRAP);
 		str = bin;
 		len = binlen;
 		tag = (yaml_char_t *) LUAYAML_TAG_PREFIX "binary";
diff --git a/src/box/tuple_convert_BACKUP_246187.c b/src/box/tuple_convert_BACKUP_246187.c
new file mode 100644
index 0000000000000000000000000000000000000000..c60f9a0d570d76980463165963167bb059ed3a27
--- /dev/null
+++ b/src/box/tuple_convert_BACKUP_246187.c
@@ -0,0 +1,286 @@
+/*
+ * Copyright 2010-2016, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "tuple.h"
+#include <msgpuck/msgpuck.h>
+#include <yaml.h>
+#include <base64.h>
+#include <small/region.h>
+#include <small/obuf.h>
+#include "fiber.h"
+#include <trivia/util.h>
+
+int
+tuple_to_obuf(struct tuple *tuple, struct obuf *buf)
+{
+	uint32_t bsize;
+	const char *data = tuple_data_range(tuple, &bsize);
+	if (obuf_dup(buf, data, bsize) != bsize) {
+		diag_set(OutOfMemory, bsize, "tuple_to_obuf", "dup");
+		return -1;
+	}
+	return 0;
+}
+
+int
+append_output(void *arg, unsigned char *buf, size_t len)
+{
+	(void) arg;
+	char *buf_out = region_alloc(&fiber()->gc, len + 1);
+	if (!buf_out) {
+		diag_set(OutOfMemory, len , "region", "tuple_to_yaml");
+		return 0;
+	}
+	memcpy(buf_out, buf, len);
+	buf_out[len] = '\0';
+	return 1;
+}
+
+static int
+encode_node(yaml_emitter_t *emitter, const char **data);
+
+static int
+encode_table(yaml_emitter_t *emitter, const char **data)
+{
+	yaml_event_t ev;
+	yaml_mapping_style_t yaml_style = YAML_FLOW_MAPPING_STYLE;
+	if (!yaml_mapping_start_event_initialize(&ev, NULL, NULL, 0, yaml_style)
+			|| !yaml_emitter_emit(emitter, &ev)) {
+		diag_set(SystemError, "failed to init event libyaml");
+		return 0;
+	}
+
+	uint32_t size = mp_decode_map(data);
+	for (uint32_t i = 0; i < size; i++) {
+		if (!encode_node(emitter, data))
+			return 0;
+		if (!encode_node(emitter, data))
+			return 0;
+	}
+
+	if (!yaml_mapping_end_event_initialize(&ev) ||
+	    !yaml_emitter_emit(emitter, &ev)) {
+		diag_set(SystemError, "failed to end event libyaml");
+		return 0;
+	}
+
+	return 1;
+}
+
+
+static int
+encode_array(yaml_emitter_t *emitter, const char **data)
+{
+	yaml_event_t ev;
+	yaml_sequence_style_t yaml_style = YAML_FLOW_SEQUENCE_STYLE;
+	if (!yaml_sequence_start_event_initialize(&ev, NULL, NULL, 0,
+				yaml_style) ||
+			!yaml_emitter_emit(emitter, &ev)) {
+		diag_set(SystemError, "failed to init event libyaml");
+		return 0;
+	}
+
+	uint32_t size = mp_decode_array(data);
+	for (uint32_t i = 0; i < size; i++) {
+		if (!encode_node(emitter, data))
+		   return 0;
+	}
+
+	if (!yaml_sequence_end_event_initialize(&ev) ||
+	    !yaml_emitter_emit(emitter, &ev)) {
+		diag_set(SystemError, "failed to end event libyaml");
+		return 0;
+	}
+
+	return 1;
+}
+
+#define LUAYAML_TAG_PREFIX "tag:yaml.org,2002:"
+
+static int
+encode_node(yaml_emitter_t *emitter, const char **data)
+{
+	size_t len = 0;
+	const char *str = "";
+	size_t binlen = 0;
+	char *bin = NULL;
+	yaml_char_t *tag = NULL;
+	yaml_event_t ev;
+	yaml_scalar_style_t style = YAML_PLAIN_SCALAR_STYLE;
+	char buf[FPCONV_G_FMT_BUFSIZE];
+	int type = mp_typeof(**data);
+	switch(type) {
+	case MP_UINT:
+		len = snprintf(buf, sizeof(buf), "%llu",
+			       (unsigned long long) mp_decode_uint(data));
+		buf[len] = 0;
+		str = buf;
+		break;
+	case MP_INT:
+		len = snprintf(buf, sizeof(buf), "%lld",
+			       (long long) mp_decode_int(data));
+		buf[len] = 0;
+		str = buf;
+		break;
+	case MP_FLOAT:
+		fpconv_g_fmt(buf, mp_decode_float(data),
+			     FPCONV_G_FMT_MAX_PRECISION);
+		str = buf;
+		len = strlen(buf);
+		break;
+	case MP_DOUBLE:
+		fpconv_g_fmt(buf, mp_decode_double(data),
+			     FPCONV_G_FMT_MAX_PRECISION);
+		str = buf;
+		len = strlen(buf);
+		break;
+	case MP_ARRAY:
+		return encode_array(emitter, data);
+	case MP_MAP:
+		return encode_table(emitter, data);
+	case MP_STR:
+		len = mp_decode_strl(data);
+		str = *data;
+		*data += len;
+		style = YAML_SINGLE_QUOTED_SCALAR_STYLE;
+		break;
+	case MP_BIN:
+		len = mp_decode_binl(data);
+		str = *data;
+		*data += len;
+		style = YAML_ANY_SCALAR_STYLE;
+<<<<<<< HEAD
+		binlen = base64_bufsize(len, 0);
+=======
+		binlen = base64_encode_bufsize(len, BASE64_NOWRAP);
+>>>>>>> ba749e820b (lua: add varbinary type)
+		bin = (char *) malloc(binlen);
+		if (bin == NULL) {
+			diag_set(OutOfMemory, binlen, "malloc",
+				 "tuple_to_yaml");
+			return 0;
+		}
+		binlen = base64_encode(str, len, bin, binlen, BASE64_NOWRAP);
+		str = bin;
+		len = binlen;
+		tag = (yaml_char_t *) LUAYAML_TAG_PREFIX "binary";
+		break;
+	case MP_BOOL:
+		if (mp_decode_bool(data)) {
+			str = "true";
+			len = 4;
+		} else {
+			str = "false";
+			len = 5;
+		}
+		break;
+	case MP_NIL:
+	case MP_EXT:
+		if (type == MP_NIL) {
+			mp_decode_nil(data);
+		} else {
+			mp_next(data);
+		}
+		style = YAML_PLAIN_SCALAR_STYLE;
+		str = "null";
+		len = 4;
+		break;
+	default:
+		unreachable();
+	}
+
+	int rc = 1;
+	if (!yaml_scalar_event_initialize(&ev, NULL, tag, (unsigned char *)str,
+					  len, bin == NULL, bin == NULL,
+					  style) ||
+	    !yaml_emitter_emit(emitter, &ev)) {
+		diag_set(OutOfMemory, len, "malloc", "tuple_to_yaml");
+		rc = 0;
+	}
+	if (bin != NULL)
+		free(bin);
+
+	return rc;
+}
+
+char *
+tuple_to_yaml(struct tuple *tuple)
+{
+	const char *data = tuple_data(tuple);
+	yaml_emitter_t emitter;
+	yaml_event_t ev;
+
+	size_t used = region_used(&fiber()->gc);
+
+	if (!yaml_emitter_initialize(&emitter)) {
+		diag_set(SystemError, "failed to init libyaml");
+		return NULL;
+	}
+	yaml_emitter_set_unicode(&emitter, 1);
+	yaml_emitter_set_indent(&emitter, 2);
+	yaml_emitter_set_width(&emitter, INT_MAX);
+	yaml_emitter_set_output(&emitter, &append_output, NULL);
+
+	if (!yaml_stream_start_event_initialize(&ev, YAML_UTF8_ENCODING) ||
+	    !yaml_emitter_emit(&emitter, &ev) ||
+	    !yaml_document_start_event_initialize(&ev, NULL, NULL, NULL, 1) ||
+	    !yaml_emitter_emit(&emitter, &ev)) {
+		diag_set(SystemError, "failed to init event libyaml");
+		goto error;
+	}
+	if (!encode_node(&emitter, &data))
+		goto error;
+
+	if (!yaml_document_end_event_initialize(&ev, 1) ||
+	    !yaml_emitter_emit(&emitter, &ev) ||
+	    !yaml_stream_end_event_initialize(&ev) ||
+	    !yaml_emitter_emit(&emitter, &ev) ||
+	    !yaml_emitter_flush(&emitter)) {
+		diag_set(SystemError, "failed to end event libyaml");
+		goto error;
+	}
+
+	yaml_emitter_delete(&emitter);
+
+	size_t total_len = region_used(&fiber()->gc) - used;
+	char *buf = (char *) region_join(&fiber()->gc, total_len);
+	if (buf == NULL) {
+		diag_set(OutOfMemory, total_len, "region", "tuple_to_yaml");
+		return NULL;
+	}
+	/* Remove trailing "\n\0" added by libyaml */
+	assert(total_len > 2);
+	assert(buf[total_len - 1] == '\0' && buf[total_len - 2] == '\n');
+	buf[total_len - 2] = '\0';
+	return buf;
+error:
+	yaml_emitter_delete(&emitter);
+	return NULL;
+}
diff --git a/src/box/tuple_convert_BASE_246187.c b/src/box/tuple_convert_BASE_246187.c
new file mode 100644
index 0000000000000000000000000000000000000000..e8110b52d521e92a7fd309db2cb49e2388e2611b
--- /dev/null
+++ b/src/box/tuple_convert_BASE_246187.c
@@ -0,0 +1,282 @@
+/*
+ * Copyright 2010-2016, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "tuple.h"
+#include <msgpuck/msgpuck.h>
+#include <yaml.h>
+#include <base64.h>
+#include <small/region.h>
+#include <small/obuf.h>
+#include "fiber.h"
+#include <trivia/util.h>
+
+int
+tuple_to_obuf(struct tuple *tuple, struct obuf *buf)
+{
+	uint32_t bsize;
+	const char *data = tuple_data_range(tuple, &bsize);
+	if (obuf_dup(buf, data, bsize) != bsize) {
+		diag_set(OutOfMemory, bsize, "tuple_to_obuf", "dup");
+		return -1;
+	}
+	return 0;
+}
+
+int
+append_output(void *arg, unsigned char *buf, size_t len)
+{
+	(void) arg;
+	char *buf_out = region_alloc(&fiber()->gc, len + 1);
+	if (!buf_out) {
+		diag_set(OutOfMemory, len , "region", "tuple_to_yaml");
+		return 0;
+	}
+	memcpy(buf_out, buf, len);
+	buf_out[len] = '\0';
+	return 1;
+}
+
+static int
+encode_node(yaml_emitter_t *emitter, const char **data);
+
+static int
+encode_table(yaml_emitter_t *emitter, const char **data)
+{
+	yaml_event_t ev;
+	yaml_mapping_style_t yaml_style = YAML_FLOW_MAPPING_STYLE;
+	if (!yaml_mapping_start_event_initialize(&ev, NULL, NULL, 0, yaml_style)
+			|| !yaml_emitter_emit(emitter, &ev)) {
+		diag_set(SystemError, "failed to init event libyaml");
+		return 0;
+	}
+
+	uint32_t size = mp_decode_map(data);
+	for (uint32_t i = 0; i < size; i++) {
+		if (!encode_node(emitter, data))
+			return 0;
+		if (!encode_node(emitter, data))
+			return 0;
+	}
+
+	if (!yaml_mapping_end_event_initialize(&ev) ||
+	    !yaml_emitter_emit(emitter, &ev)) {
+		diag_set(SystemError, "failed to end event libyaml");
+		return 0;
+	}
+
+	return 1;
+}
+
+
+static int
+encode_array(yaml_emitter_t *emitter, const char **data)
+{
+	yaml_event_t ev;
+	yaml_sequence_style_t yaml_style = YAML_FLOW_SEQUENCE_STYLE;
+	if (!yaml_sequence_start_event_initialize(&ev, NULL, NULL, 0,
+				yaml_style) ||
+			!yaml_emitter_emit(emitter, &ev)) {
+		diag_set(SystemError, "failed to init event libyaml");
+		return 0;
+	}
+
+	uint32_t size = mp_decode_array(data);
+	for (uint32_t i = 0; i < size; i++) {
+		if (!encode_node(emitter, data))
+		   return 0;
+	}
+
+	if (!yaml_sequence_end_event_initialize(&ev) ||
+	    !yaml_emitter_emit(emitter, &ev)) {
+		diag_set(SystemError, "failed to end event libyaml");
+		return 0;
+	}
+
+	return 1;
+}
+
+#define LUAYAML_TAG_PREFIX "tag:yaml.org,2002:"
+
+static int
+encode_node(yaml_emitter_t *emitter, const char **data)
+{
+	size_t len = 0;
+	const char *str = "";
+	size_t binlen = 0;
+	char *bin = NULL;
+	yaml_char_t *tag = NULL;
+	yaml_event_t ev;
+	yaml_scalar_style_t style = YAML_PLAIN_SCALAR_STYLE;
+	char buf[FPCONV_G_FMT_BUFSIZE];
+	int type = mp_typeof(**data);
+	switch(type) {
+	case MP_UINT:
+		len = snprintf(buf, sizeof(buf), "%llu",
+			       (unsigned long long) mp_decode_uint(data));
+		buf[len] = 0;
+		str = buf;
+		break;
+	case MP_INT:
+		len = snprintf(buf, sizeof(buf), "%lld",
+			       (long long) mp_decode_int(data));
+		buf[len] = 0;
+		str = buf;
+		break;
+	case MP_FLOAT:
+		fpconv_g_fmt(buf, mp_decode_float(data),
+			     FPCONV_G_FMT_MAX_PRECISION);
+		str = buf;
+		len = strlen(buf);
+		break;
+	case MP_DOUBLE:
+		fpconv_g_fmt(buf, mp_decode_double(data),
+			     FPCONV_G_FMT_MAX_PRECISION);
+		str = buf;
+		len = strlen(buf);
+		break;
+	case MP_ARRAY:
+		return encode_array(emitter, data);
+	case MP_MAP:
+		return encode_table(emitter, data);
+	case MP_STR:
+		len = mp_decode_strl(data);
+		str = *data;
+		*data += len;
+		style = YAML_SINGLE_QUOTED_SCALAR_STYLE;
+		break;
+	case MP_BIN:
+		len = mp_decode_binl(data);
+		str = *data;
+		*data += len;
+		style = YAML_ANY_SCALAR_STYLE;
+		binlen = base64_encode_bufsize(len, 0);
+		bin = (char *) malloc(binlen);
+		if (bin == NULL) {
+			diag_set(OutOfMemory, binlen, "malloc",
+				 "tuple_to_yaml");
+			return 0;
+		}
+		binlen = base64_encode(str, len, bin, binlen, 0);
+		str = bin;
+		len = binlen;
+		tag = (yaml_char_t *) LUAYAML_TAG_PREFIX "binary";
+		break;
+	case MP_BOOL:
+		if (mp_decode_bool(data)) {
+			str = "true";
+			len = 4;
+		} else {
+			str = "false";
+			len = 5;
+		}
+		break;
+	case MP_NIL:
+	case MP_EXT:
+		if (type == MP_NIL) {
+			mp_decode_nil(data);
+		} else {
+			mp_next(data);
+		}
+		style = YAML_PLAIN_SCALAR_STYLE;
+		str = "null";
+		len = 4;
+		break;
+	default:
+		unreachable();
+	}
+
+	int rc = 1;
+	if (!yaml_scalar_event_initialize(&ev, NULL, tag, (unsigned char *)str,
+					  len, bin == NULL, bin == NULL,
+					  style) ||
+	    !yaml_emitter_emit(emitter, &ev)) {
+		diag_set(OutOfMemory, len, "malloc", "tuple_to_yaml");
+		rc = 0;
+	}
+	if (bin != NULL)
+		free(bin);
+
+	return rc;
+}
+
+char *
+tuple_to_yaml(struct tuple *tuple)
+{
+	const char *data = tuple_data(tuple);
+	yaml_emitter_t emitter;
+	yaml_event_t ev;
+
+	size_t used = region_used(&fiber()->gc);
+
+	if (!yaml_emitter_initialize(&emitter)) {
+		diag_set(SystemError, "failed to init libyaml");
+		return NULL;
+	}
+	yaml_emitter_set_unicode(&emitter, 1);
+	yaml_emitter_set_indent(&emitter, 2);
+	yaml_emitter_set_width(&emitter, INT_MAX);
+	yaml_emitter_set_output(&emitter, &append_output, NULL);
+
+	if (!yaml_stream_start_event_initialize(&ev, YAML_UTF8_ENCODING) ||
+	    !yaml_emitter_emit(&emitter, &ev) ||
+	    !yaml_document_start_event_initialize(&ev, NULL, NULL, NULL, 1) ||
+	    !yaml_emitter_emit(&emitter, &ev)) {
+		diag_set(SystemError, "failed to init event libyaml");
+		goto error;
+	}
+	if (!encode_node(&emitter, &data))
+		goto error;
+
+	if (!yaml_document_end_event_initialize(&ev, 1) ||
+	    !yaml_emitter_emit(&emitter, &ev) ||
+	    !yaml_stream_end_event_initialize(&ev) ||
+	    !yaml_emitter_emit(&emitter, &ev) ||
+	    !yaml_emitter_flush(&emitter)) {
+		diag_set(SystemError, "failed to end event libyaml");
+		goto error;
+	}
+
+	yaml_emitter_delete(&emitter);
+
+	size_t total_len = region_used(&fiber()->gc) - used;
+	char *buf = (char *) region_join(&fiber()->gc, total_len);
+	if (buf == NULL) {
+		diag_set(OutOfMemory, total_len, "region", "tuple_to_yaml");
+		return NULL;
+	}
+	/* Remove trailing "\n\0" added by libyaml */
+	assert(total_len > 2);
+	assert(buf[total_len - 1] == '\0' && buf[total_len - 2] == '\n');
+	buf[total_len - 2] = '\0';
+	return buf;
+error:
+	yaml_emitter_delete(&emitter);
+	return NULL;
+}
diff --git a/src/box/tuple_convert_LOCAL_246187.c b/src/box/tuple_convert_LOCAL_246187.c
new file mode 100644
index 0000000000000000000000000000000000000000..ba9a7e72ae887ab975f9264dedc88ea07a11d535
--- /dev/null
+++ b/src/box/tuple_convert_LOCAL_246187.c
@@ -0,0 +1,282 @@
+/*
+ * Copyright 2010-2016, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "tuple.h"
+#include <msgpuck/msgpuck.h>
+#include <yaml.h>
+#include <base64.h>
+#include <small/region.h>
+#include <small/obuf.h>
+#include "fiber.h"
+#include <trivia/util.h>
+
+int
+tuple_to_obuf(struct tuple *tuple, struct obuf *buf)
+{
+	uint32_t bsize;
+	const char *data = tuple_data_range(tuple, &bsize);
+	if (obuf_dup(buf, data, bsize) != bsize) {
+		diag_set(OutOfMemory, bsize, "tuple_to_obuf", "dup");
+		return -1;
+	}
+	return 0;
+}
+
+int
+append_output(void *arg, unsigned char *buf, size_t len)
+{
+	(void) arg;
+	char *buf_out = region_alloc(&fiber()->gc, len + 1);
+	if (!buf_out) {
+		diag_set(OutOfMemory, len , "region", "tuple_to_yaml");
+		return 0;
+	}
+	memcpy(buf_out, buf, len);
+	buf_out[len] = '\0';
+	return 1;
+}
+
+static int
+encode_node(yaml_emitter_t *emitter, const char **data);
+
+static int
+encode_table(yaml_emitter_t *emitter, const char **data)
+{
+	yaml_event_t ev;
+	yaml_mapping_style_t yaml_style = YAML_FLOW_MAPPING_STYLE;
+	if (!yaml_mapping_start_event_initialize(&ev, NULL, NULL, 0, yaml_style)
+			|| !yaml_emitter_emit(emitter, &ev)) {
+		diag_set(SystemError, "failed to init event libyaml");
+		return 0;
+	}
+
+	uint32_t size = mp_decode_map(data);
+	for (uint32_t i = 0; i < size; i++) {
+		if (!encode_node(emitter, data))
+			return 0;
+		if (!encode_node(emitter, data))
+			return 0;
+	}
+
+	if (!yaml_mapping_end_event_initialize(&ev) ||
+	    !yaml_emitter_emit(emitter, &ev)) {
+		diag_set(SystemError, "failed to end event libyaml");
+		return 0;
+	}
+
+	return 1;
+}
+
+
+static int
+encode_array(yaml_emitter_t *emitter, const char **data)
+{
+	yaml_event_t ev;
+	yaml_sequence_style_t yaml_style = YAML_FLOW_SEQUENCE_STYLE;
+	if (!yaml_sequence_start_event_initialize(&ev, NULL, NULL, 0,
+				yaml_style) ||
+			!yaml_emitter_emit(emitter, &ev)) {
+		diag_set(SystemError, "failed to init event libyaml");
+		return 0;
+	}
+
+	uint32_t size = mp_decode_array(data);
+	for (uint32_t i = 0; i < size; i++) {
+		if (!encode_node(emitter, data))
+		   return 0;
+	}
+
+	if (!yaml_sequence_end_event_initialize(&ev) ||
+	    !yaml_emitter_emit(emitter, &ev)) {
+		diag_set(SystemError, "failed to end event libyaml");
+		return 0;
+	}
+
+	return 1;
+}
+
+#define LUAYAML_TAG_PREFIX "tag:yaml.org,2002:"
+
+static int
+encode_node(yaml_emitter_t *emitter, const char **data)
+{
+	size_t len = 0;
+	const char *str = "";
+	size_t binlen = 0;
+	char *bin = NULL;
+	yaml_char_t *tag = NULL;
+	yaml_event_t ev;
+	yaml_scalar_style_t style = YAML_PLAIN_SCALAR_STYLE;
+	char buf[FPCONV_G_FMT_BUFSIZE];
+	int type = mp_typeof(**data);
+	switch(type) {
+	case MP_UINT:
+		len = snprintf(buf, sizeof(buf), "%llu",
+			       (unsigned long long) mp_decode_uint(data));
+		buf[len] = 0;
+		str = buf;
+		break;
+	case MP_INT:
+		len = snprintf(buf, sizeof(buf), "%lld",
+			       (long long) mp_decode_int(data));
+		buf[len] = 0;
+		str = buf;
+		break;
+	case MP_FLOAT:
+		fpconv_g_fmt(buf, mp_decode_float(data),
+			     FPCONV_G_FMT_MAX_PRECISION);
+		str = buf;
+		len = strlen(buf);
+		break;
+	case MP_DOUBLE:
+		fpconv_g_fmt(buf, mp_decode_double(data),
+			     FPCONV_G_FMT_MAX_PRECISION);
+		str = buf;
+		len = strlen(buf);
+		break;
+	case MP_ARRAY:
+		return encode_array(emitter, data);
+	case MP_MAP:
+		return encode_table(emitter, data);
+	case MP_STR:
+		len = mp_decode_strl(data);
+		str = *data;
+		*data += len;
+		style = YAML_SINGLE_QUOTED_SCALAR_STYLE;
+		break;
+	case MP_BIN:
+		len = mp_decode_binl(data);
+		str = *data;
+		*data += len;
+		style = YAML_ANY_SCALAR_STYLE;
+		binlen = base64_bufsize(len, 0);
+		bin = (char *) malloc(binlen);
+		if (bin == NULL) {
+			diag_set(OutOfMemory, binlen, "malloc",
+				 "tuple_to_yaml");
+			return 0;
+		}
+		binlen = base64_encode(str, len, bin, binlen, 0);
+		str = bin;
+		len = binlen;
+		tag = (yaml_char_t *) LUAYAML_TAG_PREFIX "binary";
+		break;
+	case MP_BOOL:
+		if (mp_decode_bool(data)) {
+			str = "true";
+			len = 4;
+		} else {
+			str = "false";
+			len = 5;
+		}
+		break;
+	case MP_NIL:
+	case MP_EXT:
+		if (type == MP_NIL) {
+			mp_decode_nil(data);
+		} else {
+			mp_next(data);
+		}
+		style = YAML_PLAIN_SCALAR_STYLE;
+		str = "null";
+		len = 4;
+		break;
+	default:
+		unreachable();
+	}
+
+	int rc = 1;
+	if (!yaml_scalar_event_initialize(&ev, NULL, tag, (unsigned char *)str,
+					  len, bin == NULL, bin == NULL,
+					  style) ||
+	    !yaml_emitter_emit(emitter, &ev)) {
+		diag_set(OutOfMemory, len, "malloc", "tuple_to_yaml");
+		rc = 0;
+	}
+	if (bin != NULL)
+		free(bin);
+
+	return rc;
+}
+
+char *
+tuple_to_yaml(struct tuple *tuple)
+{
+	const char *data = tuple_data(tuple);
+	yaml_emitter_t emitter;
+	yaml_event_t ev;
+
+	size_t used = region_used(&fiber()->gc);
+
+	if (!yaml_emitter_initialize(&emitter)) {
+		diag_set(SystemError, "failed to init libyaml");
+		return NULL;
+	}
+	yaml_emitter_set_unicode(&emitter, 1);
+	yaml_emitter_set_indent(&emitter, 2);
+	yaml_emitter_set_width(&emitter, INT_MAX);
+	yaml_emitter_set_output(&emitter, &append_output, NULL);
+
+	if (!yaml_stream_start_event_initialize(&ev, YAML_UTF8_ENCODING) ||
+	    !yaml_emitter_emit(&emitter, &ev) ||
+	    !yaml_document_start_event_initialize(&ev, NULL, NULL, NULL, 1) ||
+	    !yaml_emitter_emit(&emitter, &ev)) {
+		diag_set(SystemError, "failed to init event libyaml");
+		goto error;
+	}
+	if (!encode_node(&emitter, &data))
+		goto error;
+
+	if (!yaml_document_end_event_initialize(&ev, 1) ||
+	    !yaml_emitter_emit(&emitter, &ev) ||
+	    !yaml_stream_end_event_initialize(&ev) ||
+	    !yaml_emitter_emit(&emitter, &ev) ||
+	    !yaml_emitter_flush(&emitter)) {
+		diag_set(SystemError, "failed to end event libyaml");
+		goto error;
+	}
+
+	yaml_emitter_delete(&emitter);
+
+	size_t total_len = region_used(&fiber()->gc) - used;
+	char *buf = (char *) region_join(&fiber()->gc, total_len);
+	if (buf == NULL) {
+		diag_set(OutOfMemory, total_len, "region", "tuple_to_yaml");
+		return NULL;
+	}
+	/* Remove trailing "\n\0" added by libyaml */
+	assert(total_len > 2);
+	assert(buf[total_len - 1] == '\0' && buf[total_len - 2] == '\n');
+	buf[total_len - 2] = '\0';
+	return buf;
+error:
+	yaml_emitter_delete(&emitter);
+	return NULL;
+}
diff --git a/src/box/tuple_convert_REMOTE_246187.c b/src/box/tuple_convert_REMOTE_246187.c
new file mode 100644
index 0000000000000000000000000000000000000000..7c66f9d78d9010d9f3152feb409654af1aa14b16
--- /dev/null
+++ b/src/box/tuple_convert_REMOTE_246187.c
@@ -0,0 +1,282 @@
+/*
+ * Copyright 2010-2016, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "tuple.h"
+#include <msgpuck/msgpuck.h>
+#include <yaml.h>
+#include <base64.h>
+#include <small/region.h>
+#include <small/obuf.h>
+#include "fiber.h"
+#include <trivia/util.h>
+
+int
+tuple_to_obuf(struct tuple *tuple, struct obuf *buf)
+{
+	uint32_t bsize;
+	const char *data = tuple_data_range(tuple, &bsize);
+	if (obuf_dup(buf, data, bsize) != bsize) {
+		diag_set(OutOfMemory, bsize, "tuple_to_obuf", "dup");
+		return -1;
+	}
+	return 0;
+}
+
+int
+append_output(void *arg, unsigned char *buf, size_t len)
+{
+	(void) arg;
+	char *buf_out = region_alloc(&fiber()->gc, len + 1);
+	if (!buf_out) {
+		diag_set(OutOfMemory, len , "region", "tuple_to_yaml");
+		return 0;
+	}
+	memcpy(buf_out, buf, len);
+	buf_out[len] = '\0';
+	return 1;
+}
+
+static int
+encode_node(yaml_emitter_t *emitter, const char **data);
+
+static int
+encode_table(yaml_emitter_t *emitter, const char **data)
+{
+	yaml_event_t ev;
+	yaml_mapping_style_t yaml_style = YAML_FLOW_MAPPING_STYLE;
+	if (!yaml_mapping_start_event_initialize(&ev, NULL, NULL, 0, yaml_style)
+			|| !yaml_emitter_emit(emitter, &ev)) {
+		diag_set(SystemError, "failed to init event libyaml");
+		return 0;
+	}
+
+	uint32_t size = mp_decode_map(data);
+	for (uint32_t i = 0; i < size; i++) {
+		if (!encode_node(emitter, data))
+			return 0;
+		if (!encode_node(emitter, data))
+			return 0;
+	}
+
+	if (!yaml_mapping_end_event_initialize(&ev) ||
+	    !yaml_emitter_emit(emitter, &ev)) {
+		diag_set(SystemError, "failed to end event libyaml");
+		return 0;
+	}
+
+	return 1;
+}
+
+
+static int
+encode_array(yaml_emitter_t *emitter, const char **data)
+{
+	yaml_event_t ev;
+	yaml_sequence_style_t yaml_style = YAML_FLOW_SEQUENCE_STYLE;
+	if (!yaml_sequence_start_event_initialize(&ev, NULL, NULL, 0,
+				yaml_style) ||
+			!yaml_emitter_emit(emitter, &ev)) {
+		diag_set(SystemError, "failed to init event libyaml");
+		return 0;
+	}
+
+	uint32_t size = mp_decode_array(data);
+	for (uint32_t i = 0; i < size; i++) {
+		if (!encode_node(emitter, data))
+		   return 0;
+	}
+
+	if (!yaml_sequence_end_event_initialize(&ev) ||
+	    !yaml_emitter_emit(emitter, &ev)) {
+		diag_set(SystemError, "failed to end event libyaml");
+		return 0;
+	}
+
+	return 1;
+}
+
+#define LUAYAML_TAG_PREFIX "tag:yaml.org,2002:"
+
+static int
+encode_node(yaml_emitter_t *emitter, const char **data)
+{
+	size_t len = 0;
+	const char *str = "";
+	size_t binlen = 0;
+	char *bin = NULL;
+	yaml_char_t *tag = NULL;
+	yaml_event_t ev;
+	yaml_scalar_style_t style = YAML_PLAIN_SCALAR_STYLE;
+	char buf[FPCONV_G_FMT_BUFSIZE];
+	int type = mp_typeof(**data);
+	switch(type) {
+	case MP_UINT:
+		len = snprintf(buf, sizeof(buf), "%llu",
+			       (unsigned long long) mp_decode_uint(data));
+		buf[len] = 0;
+		str = buf;
+		break;
+	case MP_INT:
+		len = snprintf(buf, sizeof(buf), "%lld",
+			       (long long) mp_decode_int(data));
+		buf[len] = 0;
+		str = buf;
+		break;
+	case MP_FLOAT:
+		fpconv_g_fmt(buf, mp_decode_float(data),
+			     FPCONV_G_FMT_MAX_PRECISION);
+		str = buf;
+		len = strlen(buf);
+		break;
+	case MP_DOUBLE:
+		fpconv_g_fmt(buf, mp_decode_double(data),
+			     FPCONV_G_FMT_MAX_PRECISION);
+		str = buf;
+		len = strlen(buf);
+		break;
+	case MP_ARRAY:
+		return encode_array(emitter, data);
+	case MP_MAP:
+		return encode_table(emitter, data);
+	case MP_STR:
+		len = mp_decode_strl(data);
+		str = *data;
+		*data += len;
+		style = YAML_SINGLE_QUOTED_SCALAR_STYLE;
+		break;
+	case MP_BIN:
+		len = mp_decode_binl(data);
+		str = *data;
+		*data += len;
+		style = YAML_ANY_SCALAR_STYLE;
+		binlen = base64_encode_bufsize(len, BASE64_NOWRAP);
+		bin = (char *) malloc(binlen);
+		if (bin == NULL) {
+			diag_set(OutOfMemory, binlen, "malloc",
+				 "tuple_to_yaml");
+			return 0;
+		}
+		binlen = base64_encode(str, len, bin, binlen, BASE64_NOWRAP);
+		str = bin;
+		len = binlen;
+		tag = (yaml_char_t *) LUAYAML_TAG_PREFIX "binary";
+		break;
+	case MP_BOOL:
+		if (mp_decode_bool(data)) {
+			str = "true";
+			len = 4;
+		} else {
+			str = "false";
+			len = 5;
+		}
+		break;
+	case MP_NIL:
+	case MP_EXT:
+		if (type == MP_NIL) {
+			mp_decode_nil(data);
+		} else {
+			mp_next(data);
+		}
+		style = YAML_PLAIN_SCALAR_STYLE;
+		str = "null";
+		len = 4;
+		break;
+	default:
+		unreachable();
+	}
+
+	int rc = 1;
+	if (!yaml_scalar_event_initialize(&ev, NULL, tag, (unsigned char *)str,
+					  len, bin == NULL, bin == NULL,
+					  style) ||
+	    !yaml_emitter_emit(emitter, &ev)) {
+		diag_set(OutOfMemory, len, "malloc", "tuple_to_yaml");
+		rc = 0;
+	}
+	if (bin != NULL)
+		free(bin);
+
+	return rc;
+}
+
+char *
+tuple_to_yaml(struct tuple *tuple)
+{
+	const char *data = tuple_data(tuple);
+	yaml_emitter_t emitter;
+	yaml_event_t ev;
+
+	size_t used = region_used(&fiber()->gc);
+
+	if (!yaml_emitter_initialize(&emitter)) {
+		diag_set(SystemError, "failed to init libyaml");
+		return NULL;
+	}
+	yaml_emitter_set_unicode(&emitter, 1);
+	yaml_emitter_set_indent(&emitter, 2);
+	yaml_emitter_set_width(&emitter, INT_MAX);
+	yaml_emitter_set_output(&emitter, &append_output, NULL);
+
+	if (!yaml_stream_start_event_initialize(&ev, YAML_UTF8_ENCODING) ||
+	    !yaml_emitter_emit(&emitter, &ev) ||
+	    !yaml_document_start_event_initialize(&ev, NULL, NULL, NULL, 1) ||
+	    !yaml_emitter_emit(&emitter, &ev)) {
+		diag_set(SystemError, "failed to init event libyaml");
+		goto error;
+	}
+	if (!encode_node(&emitter, &data))
+		goto error;
+
+	if (!yaml_document_end_event_initialize(&ev, 1) ||
+	    !yaml_emitter_emit(&emitter, &ev) ||
+	    !yaml_stream_end_event_initialize(&ev) ||
+	    !yaml_emitter_emit(&emitter, &ev) ||
+	    !yaml_emitter_flush(&emitter)) {
+		diag_set(SystemError, "failed to end event libyaml");
+		goto error;
+	}
+
+	yaml_emitter_delete(&emitter);
+
+	size_t total_len = region_used(&fiber()->gc) - used;
+	char *buf = (char *) region_join(&fiber()->gc, total_len);
+	if (buf == NULL) {
+		diag_set(OutOfMemory, total_len, "region", "tuple_to_yaml");
+		return NULL;
+	}
+	/* Remove trailing "\n\0" added by libyaml */
+	assert(total_len > 2);
+	assert(buf[total_len - 1] == '\0' && buf[total_len - 2] == '\n');
+	buf[total_len - 2] = '\0';
+	return buf;
+error:
+	yaml_emitter_delete(&emitter);
+	return NULL;
+}
diff --git a/src/lua/compat.lua b/src/lua/compat.lua
index 3e0438011e8e13f8daf7962f55bed0b390971cd4..a047f300a35102cc83e51811458548d91ec56e90 100644
--- a/src/lua/compat.lua
+++ b/src/lua/compat.lua
@@ -58,6 +58,13 @@ additional msgpack array when returning them via iproto.
 https://tarantool.io/compat/c_func_iproto_multireturn
 ]]
 
+local BINARY_DATA_DECODING_BRIEF = [[
+Whether a binary data field should be stored in a varbinary object or a plain
+string when decoded in Lua.
+
+https://tarantool.io/compat/binary_data_decoding
+]]
+
 -- Returns an action callback that toggles a tweak.
 local function tweak_action(tweak_name, old_tweak_value, new_tweak_value)
     return function(is_new)
@@ -110,6 +117,15 @@ local options = {
         run_action_now = true,
         action = tweak_action('c_func_iproto_multireturn', false, true),
     },
+    binary_data_decoding = {
+        default = 'new',
+        obsolete = nil,
+        brief = BINARY_DATA_DECODING_BRIEF,
+        action = function(is_new)
+            tweaks.yaml_decode_binary_as_string = not is_new
+            tweaks.msgpack_decode_binary_as_string = not is_new
+        end,
+    },
 }
 
 -- Array with option names in order of addition.
diff --git a/src/lua/init.c b/src/lua/init.c
index 940efd054bbf0e9cf4a220b24e531ee15fee1d6d..a8b8fe67a9cc09c234a5ab04b3ae03288d6028c1 100644
--- a/src/lua/init.c
+++ b/src/lua/init.c
@@ -157,6 +157,7 @@ extern char minifio_lua[],
 	table_lua[],
 	trigger_lua[],
 	string_lua[],
+	varbinary_lua[],
 	swim_lua[],
 	jit_p_lua[], /* LuaJIT 2.1 profiler */
 	jit_zone_lua[], /* LuaJIT 2.1 profiler */
@@ -288,6 +289,7 @@ static const char *lua_modules[] = {
 	"env", env_lua,
 	"buffer", buffer_lua,
 	"string", string_lua,
+	"varbinary", varbinary_lua,
 	"table", table_lua,
 	"msgpackffi", msgpackffi_lua,
 	"crypto", crypto_lua,
diff --git a/src/lua/msgpack.c b/src/lua/msgpack.c
index c67b0ca9a2b46f3c4f2f0cec5b7c5cac1217c3dc..73c3b16f0e0f92db8260fdc1254e386e2e4a3ac2 100644
--- a/src/lua/msgpack.c
+++ b/src/lua/msgpack.c
@@ -44,6 +44,7 @@
 
 #include "core/assoc.h"
 #include "core/decimal.h" /* decimal_unpack() */
+#include "core/tweaks.h"
 #include "lua/decimal.h" /* luaT_newdecimal() */
 #include "mp_extension_types.h"
 #include "mp_uuid.h" /* mp_decode_uuid() */
@@ -106,6 +107,13 @@ struct luamp_iterator {
 
 static const char luamp_iterator_typename[] = "msgpack.iterator";
 
+/**
+ * If this flag is set, a binary data field will be decoded to a plain Lua
+ * string, not a varbinary object.
+ */
+static bool msgpack_decode_binary_as_string = false;
+TWEAK_BOOL(msgpack_decode_binary_as_string);
+
 void
 luamp_error(void *error_ctx)
 {
@@ -223,7 +231,8 @@ luamp_encode_with_translation_r(struct lua_State *L,
 		type = MP_STR;
 		break;
 	case MP_BIN:
-		mpstream_encode_strn(stream, field->sval.data, field->sval.len);
+		mpstream_encode_binl(stream, field->sval.len);
+		mpstream_memcpy(stream, field->sval.data, field->sval.len);
 		type = MP_BIN;
 		break;
 	case MP_INT:
@@ -431,7 +440,10 @@ luamp_decode(struct lua_State *L, struct luaL_serializer *cfg,
 	{
 		uint32_t len = 0;
 		const char *str = mp_decode_bin(data, &len);
-		lua_pushlstring(L, str, len);
+		if (msgpack_decode_binary_as_string)
+			lua_pushlstring(L, str, len);
+		else
+			luaT_pushvarbinary(L, str, len);
 		return;
 	}
 	case MP_BOOL:
diff --git a/src/lua/msgpackffi.lua b/src/lua/msgpackffi.lua
index 3eec796b4ce69724dd206b54fa9105b5d01fef9c..4c2bd689150e186cf11b8bd2393d3fc662d05afb 100644
--- a/src/lua/msgpackffi.lua
+++ b/src/lua/msgpackffi.lua
@@ -1,9 +1,11 @@
 -- msgpackffi.lua (internal file)
 
+local tweaks = require('internal.tweaks')
 local ffi = require('ffi')
 local buffer = require('buffer')
 local builtin = ffi.C
 local msgpack = require('msgpack') -- .NULL, .array_mt, .map_mt, .cfg
+local varbinary = require('varbinary')
 local int8_ptr_t = ffi.typeof('int8_t *')
 local uint8_ptr_t = ffi.typeof('uint8_t *')
 local uint16_ptr_t = ffi.typeof('uint16_t *')
@@ -216,6 +218,20 @@ local function encode_str(buf, str)
     ffi.copy(p, str, len)
 end
 
+local function encode_bin(buf, bin)
+    local len = #bin
+    buf:reserve(5 + len)
+    if len <= 0xff then
+        encode_u8(buf, 0xc4, len)
+    elseif len <= 0xffff then
+        encode_u16(buf, 0xc5, len)
+    else
+        encode_u32(buf, 0xc6, len)
+    end
+    local p = buf:alloc(len)
+    ffi.copy(p, bin, len)
+end
+
 local function encode_array(buf, size)
     if size <= 0xf then
         encode_fix(buf, 0x90, size)
@@ -357,6 +373,7 @@ on_encode(ffi.typeof('const unsigned char'), encode_int)
 on_encode(ffi.typeof('bool'), encode_bool_cdata)
 on_encode(ffi.typeof('float'), encode_float)
 on_encode(ffi.typeof('double'), encode_double)
+on_encode(ffi.typeof('struct varbinary'), encode_bin)
 on_encode(ffi.typeof('decimal_t'), encode_decimal)
 on_encode(ffi.typeof('struct tt_uuid'), encode_uuid)
 on_encode(ffi.typeof('const struct error &'), encode_error)
@@ -518,6 +535,15 @@ local function decode_str(data, size)
     return ret
 end
 
+local function decode_bin(data, size)
+    if tweaks.msgpack_decode_binary_as_string then
+        return decode_str(data, size)
+    end
+    local ret = varbinary.new(data[0], size)
+    data[0] = data[0] + size
+    return ret
+end
+
 local function decode_array(data, size)
     assert (type(size) == "number")
     local arr = {}
@@ -599,9 +625,9 @@ end
 
 local decoder_hint = {
     --[[{{{ MP_BIN]]
-    [0xc4] = function(data) return decode_str(data, decode_u8(data)) end;
-    [0xc5] = function(data) return decode_str(data, decode_u16(data)) end;
-    [0xc6] = function(data) return decode_str(data, decode_u32(data)) end;
+    [0xc4] = function(data) return decode_bin(data, decode_u8(data)) end;
+    [0xc5] = function(data) return decode_bin(data, decode_u16(data)) end;
+    [0xc6] = function(data) return decode_bin(data, decode_u32(data)) end;
 
     --[[MP_FLOAT, MP_DOUBLE]]
     [0xca] = decode_float;
diff --git a/src/lua/serializer.c b/src/lua/serializer.c
index f613dc9bd53c656553c35645b5d189de3d8a0246..add81ad6c960bc6d1fa20f048ab4539f727c2b3d 100644
--- a/src/lua/serializer.c
+++ b/src/lua/serializer.c
@@ -540,8 +540,17 @@ luaL_tofield(struct lua_State *L, struct luaL_serializer *cfg, int index,
 				field->type = MP_NIL;
 				return 0;
 			}
-			/* Fall through */
+			field->type = MP_EXT;
+			field->ext_type = MP_UNKNOWN_EXTENSION;
+			return 0;
 		default:
+			if (ctypeid == CTID_VARBINARY) {
+				field->type = MP_BIN;
+				field->sval.data = luaT_tovarbinary(
+					L, index, &field->sval.len);
+				assert(field->sval.data != NULL);
+				return 0;
+			}
 			field->type = MP_EXT;
 			if (ctypeid == CTID_DECIMAL) {
 				field->ext_type = MP_DECIMAL;
diff --git a/src/lua/utils.c b/src/lua/utils.c
index 01b963c6003b63e3f3c172f4a2ff43b65e06dbc2..192b21766d624a0e646fd4c407271739aa717686 100644
--- a/src/lua/utils.c
+++ b/src/lua/utils.c
@@ -54,6 +54,7 @@ static uint32_t CTID_STRUCT_IBUF;
 static uint32_t CTID_STRUCT_IBUF_PTR;
 uint32_t CTID_CHAR_PTR;
 uint32_t CTID_CONST_CHAR_PTR;
+uint32_t CTID_VARBINARY;
 uint32_t CTID_UUID;
 uint32_t CTID_DATETIME = 0;
 uint32_t CTID_INTERVAL = 0;
@@ -156,6 +157,48 @@ luaT_pushvclock(struct lua_State *L, const struct vclock *vclock)
 	luaL_setmaphint(L, -1); /* compact flow */
 }
 
+/*
+ * Note: varbinary is a VLS object so we can't use luaL_pushcdata and
+ * luaL_checkcdata helpers.
+ */
+void
+luaT_pushvarbinary(struct lua_State *L, const char *data, uint32_t len)
+{
+	assert(CTID_VARBINARY != 0);
+	/* Calculate the cdata size. */
+	CTState *cts = ctype_cts(L);
+	CType *ct = ctype_raw(cts, CTID_VARBINARY);
+	CTSize size;
+	CTInfo info = lj_ctype_info(cts, CTID_VARBINARY, &size);
+	size = lj_ctype_vlsize(cts, ct, (CTSize)len);
+	assert(size != CTSIZE_INVALID);
+	/* Allocate a new cdata. */
+	GCcdata *cd = lj_cdata_newx(cts, CTID_VARBINARY, size, info);
+	/* Anchor the uninitialized cdata with the stack. */
+	TValue *o = L->top;
+	setcdataV(L, o, cd);
+	incr_top(L);
+	/* Initialize the cdata. */
+	memcpy(cdataptr(cd), data, len);
+	lj_gc_check(L);
+}
+
+const char *
+luaT_tovarbinary(struct lua_State *L, int index, uint32_t *len)
+{
+	assert(CTID_VARBINARY != 0);
+	TValue *o = index2adr(L, index);
+	if (!tviscdata(o))
+		return NULL;
+	GCcdata *cd = cdataV(o);
+	if (cd->ctypeid != CTID_VARBINARY)
+		return NULL;
+	CTSize size = cdatavlen(cd);
+	assert(size != CTSIZE_INVALID);
+	*len = size;
+	return cdataptr(cd);
+}
+
 struct tt_uuid *
 luaT_newuuid(struct lua_State *L)
 {
@@ -934,6 +977,10 @@ tarantool_lua_utils_init(struct lua_State *L)
 	assert(CTID_CHAR_PTR != 0);
 	CTID_CONST_CHAR_PTR = luaL_ctypeid(L, "const char *");
 	assert(CTID_CONST_CHAR_PTR != 0);
+	rc = luaL_cdef(L, "struct varbinary { char data[?]; };");
+	assert(rc == 0);
+	CTID_VARBINARY = luaL_ctypeid(L, "struct varbinary");
+	assert(CTID_VARBINARY != 0);
 	rc = luaL_cdef(L, "struct tt_uuid {"
 				  "uint32_t time_low;"
 				  "uint16_t time_mid;"
diff --git a/src/lua/utils.h b/src/lua/utils.h
index 9fb97002cb569895f41e0567c0013b7ec45f5759..adf84562d121bd75b6950e80f9ef8dc2ad545f27 100644
--- a/src/lua/utils.h
+++ b/src/lua/utils.h
@@ -72,11 +72,26 @@ extern struct lua_State *tarantool_L;
 
 extern uint32_t CTID_CHAR_PTR;
 extern uint32_t CTID_CONST_CHAR_PTR;
+/** Type ID of struct varbinary. */
+extern uint32_t CTID_VARBINARY;
 extern uint32_t CTID_UUID;
 extern uint32_t CTID_DATETIME;
 /** Type ID of struct interval. */
 extern uint32_t CTID_INTERVAL;
 
+/**
+ * Pushes a new varbinary object with the given content to the Lua stack.
+ */
+void
+luaT_pushvarbinary(struct lua_State *L, const char *data, uint32_t len);
+
+/**
+ * If the value stored in the Lua stack at the given index is a varbinary
+ * object, returns its content, otherwise returns NULL.
+ */
+const char *
+luaT_tovarbinary(struct lua_State *L, int index, uint32_t *len);
+
 /**
  * Push vclock to the Lua stack as a plain Lua table.
  */
diff --git a/src/lua/varbinary.lua b/src/lua/varbinary.lua
new file mode 100644
index 0000000000000000000000000000000000000000..fa32cb71c37fb25be60e922be98c267f4f316b9c
--- /dev/null
+++ b/src/lua/varbinary.lua
@@ -0,0 +1,65 @@
+local ffi = require('ffi')
+
+ffi.cdef([[
+    int memcmp(const char *s1, const char *s2, size_t n);
+]])
+
+local memcmp = ffi.C.memcmp
+
+local const_char_ptr_t = ffi.typeof('const char *')
+local varbinary_t = ffi.typeof('struct varbinary')
+
+local function is_varbinary(obj)
+    return ffi.istype(varbinary_t, obj)
+end
+
+local function new_varbinary(data, size)
+    if data == nil then
+        size = 0
+    elseif type(data) == 'string' then
+        size = #data
+    elseif ffi.istype(varbinary_t, data) then
+        size = ffi.sizeof(data)
+    elseif not ffi.istype(const_char_ptr_t, data) or type(size) ~= 'number' then
+        error('Usage: varbinary.new(str) or varbinary.new(ptr, size)', 2)
+    end
+    local bin = ffi.new(varbinary_t, size)
+    ffi.copy(bin, data, size)
+    return bin
+end
+
+local function varbinary_len(bin)
+    assert(ffi.istype(varbinary_t, bin))
+    return ffi.sizeof(bin)
+end
+
+local function varbinary_tostring(bin)
+    assert(ffi.istype(varbinary_t, bin))
+    return ffi.string(bin, ffi.sizeof(bin))
+end
+
+local function varbinary_eq(a, b)
+    if not (type(a) == 'string' or ffi.istype(varbinary_t, a)) or
+            not (type(b) == 'string' or ffi.istype(varbinary_t, b)) then
+        return false
+    end
+    local size_a = #a
+    local size_b = #b
+    if size_a ~= size_b then
+        return false
+    end
+    local data_a = ffi.cast(const_char_ptr_t, a)
+    local data_b = ffi.cast(const_char_ptr_t, b)
+    return memcmp(data_a, data_b, size_a) == 0
+end
+
+ffi.metatype(varbinary_t, {
+    __len = varbinary_len,
+    __tostring = varbinary_tostring,
+    __eq = varbinary_eq,
+})
+
+return {
+    is = is_varbinary,
+    new = new_varbinary,
+}
diff --git a/test/app-luatest/varbinary_test.lua b/test/app-luatest/varbinary_test.lua
new file mode 100644
index 0000000000000000000000000000000000000000..a31d8fd808dc5b62b993a9bd963eed21dd8954fe
--- /dev/null
+++ b/test/app-luatest/varbinary_test.lua
@@ -0,0 +1,212 @@
+local compat = require('compat')
+local ffi = require('ffi')
+local json = require('json')
+local msgpack = require('msgpack')
+local msgpackffi = require('msgpackffi')
+local varbinary = require('varbinary')
+local yaml = require('yaml')
+
+local t = require('luatest')
+local g = t.group()
+
+g.test_new_invalid_args = function()
+    local errmsg = 'Usage: varbinary.new(str) or varbinary.new(ptr, size)'
+    t.assert_error_msg_equals(errmsg, varbinary.new, 1)
+    t.assert_error_msg_equals(errmsg, varbinary.new, {})
+    t.assert_error_msg_equals(errmsg, varbinary.new, true)
+    t.assert_error_msg_equals(errmsg, varbinary.new,
+                              ffi.cast('const char *', 'foo'))
+    t.assert_error_msg_equals(errmsg, varbinary.new,
+                              ffi.cast('const char *', 'foo'), 'bar')
+end
+
+g.test_new_from_nil = function()
+    local v = varbinary.new()
+    t.assert(varbinary.is(v))
+    t.assert_equals(#v, 0)
+    t.assert_equals(tostring(v), '')
+    t.assert_equals(v, '')
+    t.assert_equals('', v)
+    t.assert_equals(v, v)
+    t.assert_equals(v, varbinary.new())
+    t.assert_equals(v, varbinary.new(''))
+end
+
+g.test_new_from_str = function()
+    local v = varbinary.new('foo')
+    t.assert(varbinary.is(v))
+    t.assert_equals(#v, 3)
+    t.assert_equals(tostring(v), 'foo')
+    t.assert_equals(v, 'foo')
+    t.assert_equals('foo', v)
+    t.assert_equals(v, v)
+    t.assert_equals(v, varbinary.new('foo'))
+    t.assert_equals(v, varbinary.new(ffi.cast('const char *', 'foo'), 3))
+end
+
+g.test_new_from_ptr = function()
+    local v = varbinary.new(ffi.cast('const char *', 'foo'), 3)
+    t.assert(varbinary.is(v))
+    t.assert_equals(#v, 3)
+    t.assert_equals(tostring(v), 'foo')
+    t.assert_equals(v, 'foo')
+    t.assert_equals('foo', v)
+    t.assert_equals(v, v)
+    t.assert_equals(v, varbinary.new('foo'))
+    t.assert_equals(v, varbinary.new(ffi.cast('const char *', 'foo'), 3))
+end
+
+g.test_is = function()
+    t.assert_equals(varbinary.is(varbinary.new()), true)
+    t.assert_equals(varbinary.is(varbinary.new('')), true)
+    t.assert_equals(varbinary.is(varbinary.new('foo')), true)
+    t.assert_equals(varbinary.is(nil), false)
+    t.assert_equals(varbinary.is(msgpack.NULL), false)
+    t.assert_equals(varbinary.is(1), false)
+    t.assert_equals(varbinary.is({}), false)
+    t.assert_equals(varbinary.is(''), false)
+    t.assert_equals(varbinary.is('foo'), false)
+end
+
+g.test_len = function()
+    t.assert_equals(#varbinary.new(), 0)
+    t.assert_equals(#varbinary.new(''), 0)
+    t.assert_equals(#varbinary.new('foo'), 3)
+    t.assert_equals(#varbinary.new(ffi.cast('const char *', 'foobar'), 6), 6)
+end
+
+g.test_tostring = function()
+    t.assert_equals(tostring(varbinary.new()), '')
+    t.assert_equals(tostring(varbinary.new('')), '')
+    t.assert_equals(tostring(varbinary.new('foo')), 'foo')
+    t.assert_equals(tostring(varbinary.new(
+        ffi.cast('const char *', 'foobar'), 6)), 'foobar')
+end
+
+g.test_eq = function()
+    local v1 = varbinary.new('foo')
+    local v2 = varbinary.new(ffi.cast('const char *', 'foo'), 3)
+    local v3 = varbinary.new('foobar')
+    t.assert_equals('foo', v1)
+    t.assert_equals('foo', v2)
+    t.assert_equals('foobar', v3)
+    t.assert_not_equals('foobar', v1)
+    t.assert_not_equals('foobar', v2)
+    t.assert_not_equals('foo', v3)
+    t.assert_equals(v1, 'foo')
+    t.assert_not_equals(v1, 'foobar')
+    t.assert_equals(v1, v1)
+    t.assert_equals(v1, v2)
+    t.assert_not_equals(v1, v3)
+    t.assert_equals(v2, 'foo')
+    t.assert_not_equals(v2, 'foobar')
+    t.assert_equals(v2, v1)
+    t.assert_equals(v2, v2)
+    t.assert_not_equals(v2, v3)
+    t.assert_equals(v3, 'foobar')
+    t.assert_not_equals(v3, 'foo')
+    t.assert_not_equals(v3, v1)
+    t.assert_not_equals(v3, v2)
+    t.assert_equals(v3, v3)
+end
+
+-- Map: string => expected base64 encoding.
+local base64_tests = {
+    {'', ''},
+    {'\xFF', '/w=='},
+    {'foo', 'Zm9v'},
+    {
+        string.rep('x', 100),
+        string.rep('eHh4', 33) .. 'eA=='
+    },
+}
+
+g.test_yaml = function()
+    for _, i in ipairs(base64_tests) do
+        local v = varbinary.new(i[1])
+        local r = '--- !!binary ' .. i[2] .. '\n...\n'
+        local v2 = yaml.decode(r)
+        local r2 = yaml.encode(v)
+        t.assert_equals(r2, r)
+        t.assert_equals(v2, v)
+        t.assert(varbinary.is(v2))
+    end
+end
+
+g.test_tuple_tostring = function()
+    for _, i in ipairs(base64_tests) do
+        local v = varbinary.new(i[1])
+        local r = '[!!binary ' .. (i[2] == '' and "''" or i[2]) .. ']'
+        t.assert_equals(tostring(box.tuple.new(v)), r)
+    end
+end
+
+-- Map: string => expected msgpack encoding.
+local msgpack_tests = {
+    {'', '\xC4\x00'},
+    {'\xFF', '\xC4\x01\xFF'},
+    {'foo', '\xC4\x03foo'},
+    {string.rep('x', 300), '\xC5\x01\x2C' .. string.rep('x', 300)},
+    {
+        string.rep('x', 70000),
+        '\xC6\x00\x01\x11\x70' .. string.rep('x', 70000)
+    },
+}
+
+g.test_msgpack = function()
+    for _, i in ipairs(msgpack_tests) do
+        local v = varbinary.new(i[1])
+        local r = i[2]
+        local v2 = msgpack.decode(r)
+        local r2 = msgpack.encode(v)
+        t.assert_equals(r2, r)
+        t.assert_equals(v2, v)
+        t.assert(varbinary.is(v2))
+    end
+end
+
+g.test_msgpackffi = function()
+    for _, i in ipairs(msgpack_tests) do
+        local v = varbinary.new(i[1])
+        local r = i[2]
+        local v2 = msgpackffi.decode(r)
+        local r2 = msgpackffi.encode(v)
+        t.assert_equals(r2, r)
+        t.assert_equals(v2, v)
+        t.assert(varbinary.is(v2))
+    end
+end
+
+-- JSON encoder converts binary data to string.
+g.test_json = function()
+    t.assert_equals(json.encode(varbinary.new()), "\"\"")
+    t.assert_equals(json.encode(varbinary.new('foo')), "\"foo\"")
+    t.assert_equals(json.encode(varbinary.new('\xFF')), "\"\xFF\"")
+end
+
+-- Lua console converts binary data to string.
+g.test_lua_console = function()
+    local function format(v)
+        return require('console.lib').format_lua({block = false, indent = 2}, v)
+    end
+    t.assert_equals(format(varbinary.new()), "\"\"")
+    t.assert_equals(format(varbinary.new('foo')), "\"foo\"")
+    t.assert_equals(format(varbinary.new('\xFF')), "\"\xFF\"")
+end
+
+g.after_test('test_compat', function()
+    compat.binary_data_decoding = 'default'
+end)
+
+g.test_compat = function()
+    t.assert_equals(compat.binary_data_decoding.current, 'default')
+    t.assert_equals(compat.binary_data_decoding.default, 'new')
+    local v = varbinary.new()
+    t.assert(varbinary.is(yaml.decode(yaml.encode(v))))
+    t.assert(varbinary.is(msgpack.decode(msgpack.encode(v))))
+    t.assert(varbinary.is(msgpackffi.decode(msgpackffi.encode(v))))
+    compat.binary_data_decoding = 'old'
+    t.assert_equals(type(yaml.decode(yaml.encode(v))), 'string')
+    t.assert_equals(type(msgpack.decode(msgpack.encode(v))), 'string')
+    t.assert_equals(type(msgpackffi.decode(msgpackffi.encode(v))), 'string')
+end
diff --git a/test/app/uuid.result b/test/app/uuid.result
index f0bfda0c13d3a7077d41b5b7d158f95f583d73bc..a6808961781f7b2bf44c72cb1a077f0e03fdc6e7 100644
--- a/test/app/uuid.result
+++ b/test/app/uuid.result
@@ -485,7 +485,7 @@ s:select()
 - - [true]
   - [1]
   - ['1']
-  - ['000']
+  - [!!binary MDAw]
   - [11111111-1111-1111-1111-111111111111]
   - [11111111-1111-1111-1111-111111111112]
 ...
@@ -493,7 +493,7 @@ s:select({}, {iterator='LE'})
 ---
 - - [11111111-1111-1111-1111-111111111112]
   - [11111111-1111-1111-1111-111111111111]
-  - ['000']
+  - [!!binary MDAw]
   - ['1']
   - [1]
   - [true]
diff --git a/test/box-luatest/varbinary_test.lua b/test/box-luatest/varbinary_test.lua
new file mode 100644
index 0000000000000000000000000000000000000000..37e5c8e159728deee037be80fe149b6b2a100b90
--- /dev/null
+++ b/test/box-luatest/varbinary_test.lua
@@ -0,0 +1,139 @@
+local server = require('luatest.server')
+local t = require('luatest')
+local g = t.group()
+
+g.before_all(function(cg)
+    cg.server = server:new()
+    cg.server:start()
+end)
+
+g.after_all(function(cg)
+    cg.server:drop()
+end)
+
+--
+-- gh-4201: Introduce varbinary field type.
+--
+g.test_field_type = function(cg)
+    cg.server:exec(function()
+        local varbinary = require('varbinary')
+        local s = box.schema.space.create('withdata')
+        s:format({{"b", "integer"}})
+        t.assert_error_msg_equals(
+            "Field 1 (b) has type 'integer' in space format, " ..
+            "but type 'varbinary' in index definition",
+            s.create_index, s, 'pk', {parts = {1, "varbinary"}})
+        s:format({{"b", "varbinary"}})
+        t.assert_error_msg_equals(
+            "Field 1 (b) has type 'varbinary' in space format, " ..
+            "but type 'integer' in index definition",
+            s.create_index, s, 'pk', {parts = {1, "integer"}})
+        local pk = s:create_index('pk', {parts = {1, "varbinary"}})
+        s:insert({varbinary.new('\xDE\xAD\xBE\xAF')})
+        s:insert({varbinary.new('\xFE\xED\xFA\xCE')})
+        local result = s:select()
+        t.assert_equals(result, {
+            {'\xDE\xAD\xBE\xAF'},
+            {'\xFE\xED\xFA\xCE'},
+        })
+        t.assert(varbinary.is(result[1].b))
+        t.assert(varbinary.is(result[2].b))
+        result = box.execute("SELECT * FROM \"withdata\" " ..
+                             "WHERE \"b\" < x'FEEDFACE';")
+        t.assert_equals(result, {
+            metadata = {
+                {name = 'b', type = 'varbinary'},
+            },
+            rows = {
+                {'\xDE\xAD\xBE\xAF'},
+            },
+        })
+        t.assert(varbinary.is(result.rows[1][1]))
+        pk:alter({parts = {1, 'scalar'}})
+        s:format({{'b', 'scalar'}})
+        s:insert({11})
+        s:insert({22})
+        s:insert({'11'})
+        s:insert({'22'})
+        t.assert_equals(s:select(), {
+            {11},
+            {22},
+            {'11'},
+            {'22'},
+            {'\xDE\xAD\xBE\xAF'},
+            {'\xFE\xED\xFA\xCE'},
+        })
+        t.assert_equals(box.execute("SELECT * FROM \"withdata\" " ..
+                                    "WHERE \"b\" <= x'DEADBEAF';"), {
+            metadata = {
+                {name = 'b', type = 'scalar'},
+            },
+            rows = {
+                {11},
+                {22},
+                {'11'},
+                {'22'},
+                {'\xDE\xAD\xBE\xAF'},
+            },
+        })
+        t.assert_error_msg_equals(
+            "Tuple field 1 (b) type does not match one " ..
+            "required by operation: expected varbinary, got unsigned",
+            pk.alter, pk, {parts = {1, 'varbinary'}})
+        s:delete({11})
+        s:delete({22})
+        s:delete({'11'})
+        s:delete({'22'})
+        s:insert({varbinary.new('\xFA\xDE\xDE\xAD')})
+        pk:alter({parts = {1, 'varbinary'}})
+        t.assert_equals(s:select(), {
+            {'\xDE\xAD\xBE\xAF'},
+            {'\xFA\xDE\xDE\xAD'},
+            {'\xFE\xED\xFA\xCE'},
+        })
+    end)
+end
+
+g.after_test('test_field_type', function(cg)
+    cg.server:exec(function()
+        if box.space.withdata then
+            box.space.withdata:drop()
+        end
+    end)
+end)
+
+--
+-- gh-5071: Bitset index for binary fields.
+--
+g.test_bitset_index = function(cg)
+    cg.server:exec(function()
+        local varbinary = require('varbinary')
+        local s = box.schema.space.create('withdata')
+        s:create_index('pk', {parts = {1, "varbinary"}})
+        local bs = s:create_index('bitset', {type = 'bitset',
+                                             parts = {1, 'varbinary'}})
+        s:insert({varbinary.new('\xDE\xAD\xBE\xAF')})
+        s:insert({varbinary.new('\xFA\xDE\xDE\xAD')})
+        s:insert({varbinary.new('\xFE\xED\xFA\xCE')})
+        s:insert({varbinary.new('\xFF')})
+        t.assert_equals(bs:select(varbinary.new('\xFF'), 'BITS_ALL_SET'), {
+            {'\xFF'},
+        })
+        t.assert_equals(bs:select(varbinary.new('\x04'), 'BITS_ANY_SET'), {
+            {'\xDE\xAD\xBE\xAF'},
+            {'\xFE\xED\xFA\xCE'},
+            {'\xFF'},
+        })
+        t.assert_equals(bs:select(varbinary.new('\x04'), 'BITS_ALL_NOT_SET'), {
+            {'\xFA\xDE\xDE\xAD'},
+        })
+    end)
+end
+
+g.after_test('test_bitset_index', function(cg)
+    cg.server:exec(function()
+        if box.space.withdata then
+            box.space.withdata:drop()
+        end
+    end)
+end)
diff --git a/test/box/varbinary_type.result b/test/box/varbinary_type.result
deleted file mode 100644
index 5bf9ed449a5cb1de242dd57d649410bbf746b39b..0000000000000000000000000000000000000000
--- a/test/box/varbinary_type.result
+++ /dev/null
@@ -1,236 +0,0 @@
-env = require('test_run')
----
-...
-test_run = env.new()
----
-...
---
--- gh-4201: Introduce varbinary field type.
---
-s = box.schema.space.create('withdata')
----
-...
-s:format({{"b", "integer"}})
----
-...
-_ = s:create_index('pk', {parts = {1, "varbinary"}})
----
-- error: Field 1 (b) has type 'integer' in space format, but type 'varbinary' in index
-    definition
-...
-s:format({{"b", "varbinary"}})
----
-...
-_ = s:create_index('pk', {parts = {1, "integer"}})
----
-- error: Field 1 (b) has type 'varbinary' in space format, but type 'integer' in index
-    definition
-...
-pk = s:create_index('pk', {parts = {1, "varbinary"}})
----
-...
-buffer = require('buffer')
----
-...
-ffi = require('ffi')
----
-...
-test_run:cmd("setopt delimiter ';'")
----
-- true
-...
-function encode_bin(bytes)
-    local tmpbuf = buffer.ibuf()
-    local p = tmpbuf:alloc(3 + #bytes)
-    p[0] = 0x91
-    p[1] = 0xC4
-    p[2] = #bytes
-    for i, c in pairs(bytes) do
-        p[i + 3 - 1] = c
-    end
-    return tmpbuf
-end
-test_run:cmd("setopt delimiter ''");
----
-...
-test_run:cmd("setopt delimiter ';'")
----
-- true
-...
-function bintuple_insert(space, bytes)
-    local tmpbuf = encode_bin(bytes)
-    ffi.cdef[[int box_insert(uint32_t space_id, const char *tuple, const char *tuple_end, box_tuple_t **result);]]
-    ffi.C.box_insert(space.id, tmpbuf.rpos, tmpbuf.wpos, nil)
-end
-test_run:cmd("setopt delimiter ''");
----
-...
-bintuple_insert(s, {0xDE, 0xAD, 0xBE, 0xAF})
----
-...
-bintuple_insert(s, {0xFE, 0xED, 0xFA, 0xCE})
----
-...
-s:select()
----
-- - ["Þ­\xBE\xAF"]
-  - ["\xFE\xED\xFA\xCE"]
-...
-box.execute("SELECT * FROM \"withdata\" WHERE \"b\" < x'FEEDFACE';")
----
-- metadata:
-  - name: b
-    type: varbinary
-  rows:
-  - ["Þ­\xBE\xAF"]
-...
-pk:alter({parts = {1, "scalar"}})
----
-...
-s:format({{"b", "scalar"}})
----
-...
-s:insert({11})
----
-- [11]
-...
-s:insert({22})
----
-- [22]
-...
-s:insert({"11"})
----
-- ['11']
-...
-s:insert({"22"})
----
-- ['22']
-...
-s:select()
----
-- - [11]
-  - [22]
-  - ['11']
-  - ['22']
-  - ["Þ­\xBE\xAF"]
-  - ["\xFE\xED\xFA\xCE"]
-...
-box.execute("SELECT * FROM \"withdata\" WHERE \"b\" <= x'DEADBEAF';")
----
-- metadata:
-  - name: b
-    type: scalar
-  rows:
-  - [11]
-  - [22]
-  - ['11']
-  - ['22']
-  - ["Þ­\xBE\xAF"]
-...
-pk:alter({parts = {1, "varbinary"}})
----
-- error: 'Tuple field 1 (b) type does not match one required by operation: expected
-    varbinary, got unsigned'
-...
-s:delete({11})
----
-- [11]
-...
-s:delete({22})
----
-- [22]
-...
-s:delete({"11"})
----
-- ['11']
-...
-s:delete({"22"})
----
-- ['22']
-...
-bintuple_insert(s, {0xFA, 0xDE, 0xDE, 0xAD})
----
-...
-pk:alter({parts = {1, "varbinary"}})
----
-...
-s:select()
----
-- - ["Þ­\xBE\xAF"]
-  - ["\xFA\xDEÞ­"]
-  - ["\xFE\xED\xFA\xCE"]
-...
---
--- gh-5071: bitset index for binary fields
---
-bs = s:create_index('bitset', {type = 'bitset', parts = {1, "varbinary"}})
----
-...
-bintuple_insert(s, {0xFF})
----
-...
-ITER_BITS_ALL_SET = 7
----
-...
-ITER_BITS_ANY_SET = 8
----
-...
-ITER_BITS_ALL_NOT_SET = 9
----
-...
-test_run:cmd("setopt delimiter ';'")
----
-- true
-...
-function varbinary_select(space, idx, bytes, flag)
-    local tmpbuf = encode_bin(bytes)
-    ffi.cdef[[
-    box_iterator_t *box_index_iterator(uint32_t space_id, uint32_t index_id, int type,
-           const char *key, const char *key_end);//
-    int box_iterator_next(box_iterator_t *iterator, box_tuple_t **result);//
-    const char *box_tuple_field(box_tuple_t *tuple, uint32_t fieldno);//
-    ]]
-    local res = ffi.new("box_tuple_t*[1]")
-    local it = ffi.C.box_index_iterator(space.id, idx.id, flag, tmpbuf.rpos, tmpbuf.wpos)
-
-    ffi.C.box_iterator_next(it, res)
-
-    local output_table = {}
-
-    while res[0] ~= nil do
-        local field = ffi.C.box_tuple_field(res[0], 0)
-        assert(bit.band(field[0], 0xff) == 0xc4)
-        local len = field[1]
-        assert(len >= 0)
-
-        local output = ''
-        for i = 0, len - 1 do
-            output = output .. string.format("%x", bit.band(field[i+2], 0xff))
-        end
-        table.insert(output_table, output)
-
-        ffi.C.box_iterator_next(it, res)
-    end
-
-    return output_table
-end
-test_run:cmd("setopt delimiter ''");
----
-...
-varbinary_select(s, bs, { 0xff }, ITER_BITS_ALL_SET)
----
-- - ff
-...
-varbinary_select(s, bs, { 0x04 }, ITER_BITS_ANY_SET)
----
-- - deadbeaf
-  - feedface
-  - ff
-...
-varbinary_select(s, bs, { 0x04 }, ITER_BITS_ALL_NOT_SET)
----
-- - fadedead
-...
-s:drop()
----
-...
diff --git a/test/box/varbinary_type.test.lua b/test/box/varbinary_type.test.lua
deleted file mode 100644
index 7b9a1e7215ce70fed4b4aca42c96496077340822..0000000000000000000000000000000000000000
--- a/test/box/varbinary_type.test.lua
+++ /dev/null
@@ -1,110 +0,0 @@
-env = require('test_run')
-test_run = env.new()
-
---
--- gh-4201: Introduce varbinary field type.
---
-s = box.schema.space.create('withdata')
-s:format({{"b", "integer"}})
-_ = s:create_index('pk', {parts = {1, "varbinary"}})
-s:format({{"b", "varbinary"}})
-_ = s:create_index('pk', {parts = {1, "integer"}})
-pk = s:create_index('pk', {parts = {1, "varbinary"}})
-
-buffer = require('buffer')
-ffi = require('ffi')
-
-test_run:cmd("setopt delimiter ';'")
-function encode_bin(bytes)
-    local tmpbuf = buffer.ibuf()
-    local p = tmpbuf:alloc(3 + #bytes)
-    p[0] = 0x91
-    p[1] = 0xC4
-    p[2] = #bytes
-    for i, c in pairs(bytes) do
-        p[i + 3 - 1] = c
-    end
-    return tmpbuf
-end
-test_run:cmd("setopt delimiter ''");
-
-test_run:cmd("setopt delimiter ';'")
-function bintuple_insert(space, bytes)
-    local tmpbuf = encode_bin(bytes)
-    ffi.cdef[[int box_insert(uint32_t space_id, const char *tuple, const char *tuple_end, box_tuple_t **result);]]
-    ffi.C.box_insert(space.id, tmpbuf.rpos, tmpbuf.wpos, nil)
-end
-test_run:cmd("setopt delimiter ''");
-
-bintuple_insert(s, {0xDE, 0xAD, 0xBE, 0xAF})
-bintuple_insert(s, {0xFE, 0xED, 0xFA, 0xCE})
-s:select()
-box.execute("SELECT * FROM \"withdata\" WHERE \"b\" < x'FEEDFACE';")
-pk:alter({parts = {1, "scalar"}})
-s:format({{"b", "scalar"}})
-s:insert({11})
-s:insert({22})
-s:insert({"11"})
-s:insert({"22"})
-s:select()
-box.execute("SELECT * FROM \"withdata\" WHERE \"b\" <= x'DEADBEAF';")
-pk:alter({parts = {1, "varbinary"}})
-s:delete({11})
-s:delete({22})
-s:delete({"11"})
-s:delete({"22"})
-bintuple_insert(s, {0xFA, 0xDE, 0xDE, 0xAD})
-pk:alter({parts = {1, "varbinary"}})
-s:select()
-
---
--- gh-5071: bitset index for binary fields
---
-bs = s:create_index('bitset', {type = 'bitset', parts = {1, "varbinary"}})
-
-bintuple_insert(s, {0xFF})
-
-ITER_BITS_ALL_SET = 7
-ITER_BITS_ANY_SET = 8
-ITER_BITS_ALL_NOT_SET = 9
-
-test_run:cmd("setopt delimiter ';'")
-function varbinary_select(space, idx, bytes, flag)
-    local tmpbuf = encode_bin(bytes)
-    ffi.cdef[[
-    box_iterator_t *box_index_iterator(uint32_t space_id, uint32_t index_id, int type,
-           const char *key, const char *key_end);//
-    int box_iterator_next(box_iterator_t *iterator, box_tuple_t **result);//
-    const char *box_tuple_field(box_tuple_t *tuple, uint32_t fieldno);//
-    ]]
-    local res = ffi.new("box_tuple_t*[1]")
-    local it = ffi.C.box_index_iterator(space.id, idx.id, flag, tmpbuf.rpos, tmpbuf.wpos)
-
-    ffi.C.box_iterator_next(it, res)
-
-    local output_table = {}
-
-    while res[0] ~= nil do
-        local field = ffi.C.box_tuple_field(res[0], 0)
-        assert(bit.band(field[0], 0xff) == 0xc4)
-        local len = field[1]
-        assert(len >= 0)
-
-        local output = ''
-        for i = 0, len - 1 do
-            output = output .. string.format("%x", bit.band(field[i+2], 0xff))
-        end
-        table.insert(output_table, output)
-
-        ffi.C.box_iterator_next(it, res)
-    end
-
-    return output_table
-end
-test_run:cmd("setopt delimiter ''");
-
-varbinary_select(s, bs, { 0xff }, ITER_BITS_ALL_SET)
-varbinary_select(s, bs, { 0x04 }, ITER_BITS_ANY_SET)
-varbinary_select(s, bs, { 0x04 }, ITER_BITS_ALL_NOT_SET)
-
-s:drop()
diff --git a/test/sql-tap/array.test.lua b/test/sql-tap/array.test.lua
index 01c08c2d390438a19ba6b9c3ce86774380785178..339277df09f50bed1d173c9d48474022fa9b57bb 100755
--- a/test/sql-tap/array.test.lua
+++ b/test/sql-tap/array.test.lua
@@ -984,7 +984,8 @@ test:do_execsql_test(
     [[
         SELECT [a, g, t, n, f, i, b, v, s, d, u] FROM t1 WHERE id = 1;
     ]], {
-        {{1}, 1, '1', 1, 1, 1, true, '1', 1, require('decimal').new(1),
+        {{1}, 1, '1', 1, 1, 1, true, require('varbinary').new('1'), 1,
+         require('decimal').new(1),
          require('uuid').fromstr('11111111-1111-1111-1111-111111111111')}
     })
 
@@ -993,7 +994,8 @@ test:do_execsql_test(
     [[
         SELECT [1, true, 1.5e0, ['asd', x'32'], 1234.0];
     ]], {
-        {1, true, 1.5, {'asd', '2'}, require('decimal').new(1234)}
+        {1, true, 1.5, {'asd', require('varbinary').new('2')},
+         require('decimal').new(1234)}
     })
 
 test:do_execsql_test(
diff --git a/test/sql-tap/blob.test.lua b/test/sql-tap/blob.test.lua
index 719b25deb0a8febba8cffbfb69997eb09f148dda..1d5dad6a759380c6c82913c6005afc895dd9fb99 100755
--- a/test/sql-tap/blob.test.lua
+++ b/test/sql-tap/blob.test.lua
@@ -21,6 +21,7 @@ test:plan(20)
 
 
 local function bin_to_hex(blob)
+    blob = tostring(blob)
     local bytes2 = {  }
     for i = 1, string.len(blob), 1 do
         string.byte("ABCDE")
diff --git a/test/sql-tap/cast.test.lua b/test/sql-tap/cast.test.lua
index 3ae1d7739d78cb51a3ba6269b36a8c9c58c32025..8b1f53d3d82cf69d1226613a66c8449107f2ede2 100755
--- a/test/sql-tap/cast.test.lua
+++ b/test/sql-tap/cast.test.lua
@@ -30,7 +30,7 @@ test:do_execsql_test(
         SELECT x'616263'
     ]], {
         -- <cast-1.1>
-        "abc"
+        require('varbinary').new("abc")
         -- </cast-1.1>
     })
 
@@ -80,7 +80,7 @@ test:do_execsql_test(
         SELECT CAST(x'616263' AS SCALAR)
     ]], {
         -- <cast-1.7>
-        "abc"
+        require('varbinary').new("abc")
         -- </cast-1.7>
     })
 
diff --git a/test/sql-tap/default.test.lua b/test/sql-tap/default.test.lua
index da911c61b7e7a57a01c8fb6307b170531c99cc6f..aec3ec44587dc590f9bf434be0481d1504afb6ce 100755
--- a/test/sql-tap/default.test.lua
+++ b/test/sql-tap/default.test.lua
@@ -34,7 +34,7 @@ test:do_execsql_test(
 		SELECT a, b from t1;
 	]], {
 	-- <default-1.1>
-	1, "hi"
+       1, require('varbinary').new("hi")
 	-- </default-1.1>
 })
 
diff --git a/test/sql-tap/func.test.lua b/test/sql-tap/func.test.lua
index 475aa26938570f141af61f2d73abe76214229406..856a9246625afac85636b1e2d622a71b9afd0917 100755
--- a/test/sql-tap/func.test.lua
+++ b/test/sql-tap/func.test.lua
@@ -1946,7 +1946,7 @@ test:do_execsql_test(
         SELECT TRIM(X'00' FROM X'004100');
     ]], {
         -- <func-22.23>
-        "A"
+        require('varbinary').new("A")
         -- </func-22.23>
     })
 
@@ -1956,7 +1956,7 @@ test:do_execsql_test(
         SELECT TRIM(X'0000' FROM X'004100');
     ]], {
         -- <func-22.24>
-        "A"
+        require('varbinary').new("A")
         -- </func-22.24>
     })
 
@@ -1966,7 +1966,7 @@ test:do_execsql_test(
         SELECT TRIM(X'0042' FROM X'004100');
     ]], {
         -- <func-22.25>
-        "A"
+        require('varbinary').new("A")
         -- </func-22.25>
     })
 
@@ -1976,7 +1976,7 @@ test:do_execsql_test(
         SELECT TRIM(X'00' FROM X'00004100420000');
     ]], {
         -- <func-22.26>
-        "A\0B"
+        require('varbinary').new("A\0B")
         -- </func-22.26>
     })
 
@@ -1986,7 +1986,7 @@ test:do_execsql_test(
         SELECT TRIM(LEADING X'00' FROM X'004100');
     ]], {
         -- <func-22.27>
-        "A\0"
+        require('varbinary').new("A\0")
         -- </func-22.27>
     })
 
@@ -1996,7 +1996,7 @@ test:do_execsql_test(
         SELECT TRIM(LEADING X'0000' FROM X'004100');
     ]], {
         -- <func-22.28>
-        "A\0"
+        require('varbinary').new("A\0")
         -- </func-22.28>
     })
 
@@ -2006,7 +2006,7 @@ test:do_execsql_test(
         SELECT TRIM(LEADING X'0042' FROM X'004100');
     ]], {
         -- <func-22.29>
-        "A\0"
+        require('varbinary').new("A\0")
         -- </func-22.29>
     })
 
@@ -2016,7 +2016,7 @@ test:do_execsql_test(
         SELECT TRIM(LEADING X'00' FROM X'00004100420000');
     ]], {
         -- <func-22.30>
-        "A\0B\0\0"
+        require('varbinary').new("A\0B\0\0")
         -- </func-22.30>
     })
 
@@ -2026,7 +2026,7 @@ test:do_execsql_test(
         SELECT TRIM(TRAILING X'00' FROM X'004100');
     ]], {
         -- <func-22.31>
-        "\0A"
+        require('varbinary').new("\0A")
         -- </func-22.31>
     })
 
@@ -2036,7 +2036,7 @@ test:do_execsql_test(
         SELECT TRIM(TRAILING X'0000' FROM X'004100');
     ]], {
         -- <func-22.32>
-        "\0A"
+        require('varbinary').new("\0A")
         -- </func-22.32>
     })
 
@@ -2046,7 +2046,7 @@ test:do_execsql_test(
         SELECT TRIM(TRAILING X'0042' FROM X'004100');
     ]], {
         -- <func-22.33>
-        "\0A"
+        require('varbinary').new("\0A")
         -- </func-22.33>
     })
 
@@ -2056,7 +2056,7 @@ test:do_execsql_test(
         SELECT TRIM(TRAILING X'00' FROM X'00004100420000');
     ]], {
         -- <func-22.34>
-        "\0\0A\0B"
+        require('varbinary').new("\0\0A\0B")
         -- </func-22.34>
     })
 
@@ -2546,7 +2546,7 @@ test:do_execsql_test(
 test:do_execsql_test(
     "func-36",
     [[VALUES (RANDOMBLOB(0))]],
-    {''})
+    {require('varbinary').new('')})
 
 -- gh-3542
 -- In SQL '\0' is NOT a end-of-string signal. Tests below ensures
diff --git a/test/sql-tap/func2.test.lua b/test/sql-tap/func2.test.lua
index b786b4d96faf1f6a476c05675e0d3d9bd23c7178..6835b2c5912280563163b231ff26e161bc9bbaea 100755
--- a/test/sql-tap/func2.test.lua
+++ b/test/sql-tap/func2.test.lua
@@ -25,6 +25,7 @@ test:plan(130)
 --   func2-3.*: substr implementation (blob)
 --
 local function bin_to_hex(blob)
+    blob = tostring(blob)
     return (blob:gsub('.', function (c)
         return string.format('%02X', string.byte(c))
     end))
diff --git a/test/sql-tap/gh-5890-wrong-select-with-groupby.test.lua b/test/sql-tap/gh-5890-wrong-select-with-groupby.test.lua
index 2b7e198626fd5546f2ad0ae354e91330b3054f34..7711e02cfc6e11339eb92753156c02dc4977d0e0 100755
--- a/test/sql-tap/gh-5890-wrong-select-with-groupby.test.lua
+++ b/test/sql-tap/gh-5890-wrong-select-with-groupby.test.lua
@@ -14,7 +14,8 @@ test:do_execsql_test(
         INSERT INTO t VALUES(1, x'6178'), (2, x'6278'), (3, x'6379');
         SELECT count(*), substr(v,2,1) AS m FROM t GROUP BY m;
     ]], {
-        2, 'x', 1, 'y'
+        2, require('varbinary').new('x'),
+        1, require('varbinary').new('y')
     })
 
 test:do_execsql_test(
@@ -22,7 +23,9 @@ test:do_execsql_test(
     [[
         SELECT count(*), v || v AS m FROM t GROUP BY m;
     ]], {
-        1, 'axax', 1, 'bxbx', 1, 'cycy'
+        1, require('varbinary').new('axax'),
+        1, require('varbinary').new('bxbx'),
+        1, require('varbinary').new('cycy')
     })
 
 test:finish_test()
diff --git a/test/sql-tap/metatypes.test.lua b/test/sql-tap/metatypes.test.lua
index 912b2674cf3ae80f4ffb7782019311be55dc4d87..89b4454300b75a97cd0c4ea1338b2e2e2eaeb160 100755
--- a/test/sql-tap/metatypes.test.lua
+++ b/test/sql-tap/metatypes.test.lua
@@ -17,6 +17,7 @@ box.execute([[INSERT INTO t1(id) VALUES(NULL);]])
 local uuid_str = [[11111111-1111-1111-1111-111111111111]]
 local uuid = require('uuid').fromstr(uuid_str)
 local dec = require('decimal').new(1.5)
+local bin = require('varbinary').new("5")
 test:do_execsql_test(
     "metatypes-1.1",
     [[
@@ -38,11 +39,11 @@ test:do_execsql_test(
         2, 2, 2, 2,
         3, "3", "", "",
         4, true, "", "",
-        5, "5", "", "",
+        5, bin, "", "",
         6, uuid, "", "",
         7, "", "", "3",
         8, "", "", true,
-        9, "", "", "5",
+        9, "", "", bin,
         10, "", "", uuid,
         11, "", "", 3,
         12, "", "", dec
@@ -447,7 +448,7 @@ test:do_execsql_test(
     [[
         SELECT COALESCE(s, a) FROM t;
     ]], {
-        1, 2, "3", true, "5", uuid, "3", true, "5", uuid, 3, dec
+        1, 2, "3", true, bin, uuid, "3", true, bin, uuid, 3, dec
     })
 
 test:do_execsql_test(
@@ -487,7 +488,7 @@ test:do_execsql_test(
     [[
         SELECT IFNULL(s, a) FROM t;
     ]], {
-        1, 2, "3", true, "5", uuid, "3", true, "5", uuid, 3, dec
+        1, 2, "3", true, bin, uuid, "3", true, bin, uuid, 3, dec
     })
 
 test:do_catchsql_test(
@@ -520,7 +521,7 @@ test:do_execsql_test(
     [[
         SELECT LIKELIHOOD(a, 0.5e0) FROM t;
     ]], {
-        1, 2, "", "", "", "", "3", true, "5", uuid, 3, dec
+        1, 2, "", "", "", "", "3", true, bin, uuid, 3, dec
     })
 
 test:do_execsql_test(
@@ -528,7 +529,7 @@ test:do_execsql_test(
     [[
         SELECT LIKELY(a) FROM t;
     ]], {
-        1, 2, "", "", "", "", "3", true, "5", uuid, 3, dec
+        1, 2, "", "", "", "", "3", true, bin, uuid, 3, dec
     })
 
 test:do_catchsql_test(
@@ -677,7 +678,7 @@ test:do_execsql_test(
     [[
         SELECT UNLIKELY(a) FROM t;
     ]], {
-        1, 2, "", "", "", "", "3", true, "5", uuid, 3, dec
+        1, 2, "", "", "", "", "3", true, bin, uuid, 3, dec
     })
 
 test:do_catchsql_test(
diff --git a/test/sql-tap/view.test.lua b/test/sql-tap/view.test.lua
index e84fc033d1b4b0b88f8f1bd1a0ce602d2d91c515..3dfabb9f25ca3cb66df2f8341fe80054ee6dacfe 100755
--- a/test/sql-tap/view.test.lua
+++ b/test/sql-tap/view.test.lua
@@ -1284,7 +1284,7 @@ test:do_execsql_test(
         SELECT * FROM v;
     ]], {
         -- <view-24.5>
-        "aaaaaaaa\naaaaaaaa\naaaaaaaa\naaaaaaaa\naaaaaaaa"
+        require('varbinary').new("aaaaaaaa\naaaaaaaa\naaaaaaaa\naaaaaaaa\naaaaaaaa")
         -- </view-24.5>
     })
 
diff --git a/test/sql-tap/with1.test.lua b/test/sql-tap/with1.test.lua
index e30d67141623d51b882501353ae40784c9f6acaa..0b761a2c78f43bef56b424d96b78413fe039de09 100755
--- a/test/sql-tap/with1.test.lua
+++ b/test/sql-tap/with1.test.lua
@@ -553,7 +553,7 @@ test:do_execsql_test("8.1-mandelbrot", [[
   SELECT group_concat(CAST(TRIM(TRAILING FROM t) AS VARBINARY),x'0a') FROM a;
 ]], {
   -- <8.1-mandelbrot>
-  [[                                    ....#
+  require('varbinary').new([[                                    ....#
                                    ..#*..
                                  ..+####+.
                             .......+####....   +
@@ -574,7 +574,7 @@ test:do_execsql_test("8.1-mandelbrot", [[
                                  ..+####+.
                                    ..#*..
                                     ....#
-                                    +.]]
+                                    +.]])
   -- </8.1-mandelbrot>
 })
 
diff --git a/test/sql/gh-3888-values-blob-assert.result b/test/sql/gh-3888-values-blob-assert.result
index ce3f58cdcbd22ff41c1463a6bdd31756c24df952..4b2287b920e3f808914bc57b30020586a31db430 100644
--- a/test/sql/gh-3888-values-blob-assert.result
+++ b/test/sql/gh-3888-values-blob-assert.result
@@ -62,7 +62,7 @@ box.execute('SELECT X\'507265766564\'')
   - name: COLUMN_1
     type: varbinary
   rows:
-  - ['Preved']
+  - [!!binary UHJldmVk]
 ...
 -- check 'SELECT' well-formed expression  (return value)
 box.execute('SELECT 3.14')
@@ -79,5 +79,5 @@ box.execute('SELECT X\'4D6564766564\'')
   - name: COLUMN_1
     type: varbinary
   rows:
-  - ['Medved']
+  - [!!binary TWVkdmVk]
 ...
diff --git a/test/sql/iproto.result b/test/sql/iproto.result
index 17ddb52d3149ee4c087f9bc15c0fc75c1235ea16..0e3f86a3d7c1c3e5c3756fee546eb59c507464b0 100644
--- a/test/sql/iproto.result
+++ b/test/sql/iproto.result
@@ -744,7 +744,7 @@ cn:execute("SELECT zeroblob(1);")
   - name: COLUMN_1
     type: varbinary
   rows:
-  - ["\0"]
+  - [!!binary AA==]
 ...
 -- randomblob() returns different results each time, so check only
 -- type in meta.
diff --git a/test/sql/misc.result b/test/sql/misc.result
index 17489f6c21d7a25a492cee33921c0bbf4567137c..c46bee4e24338955d5320cbbcce6e23ef0b38175 100644
--- a/test/sql/misc.result
+++ b/test/sql/misc.result
@@ -133,7 +133,7 @@ box.execute('SELECT X\'4D6564766564\'')
   - name: COLUMN_1
     type: varbinary
   rows:
-  - ['Medved']
+  - [!!binary TWVkdmVk]
 ...
 --
 -- gh-4139: assertion when reading a data-temporary space.
@@ -301,7 +301,7 @@ box.execute([[SELECT * FROM j;]])
     type: varbinary
   rows:
   - [1, null, null]
-  - [2, '', "\0"]
+  - [2, '', !!binary AA==]
 ...
 box.execute([[SELECT * FROM j INDEXED BY I3;]])
 ---
@@ -313,7 +313,7 @@ box.execute([[SELECT * FROM j INDEXED BY I3;]])
   - name: S3
     type: varbinary
   rows:
-  - [2, '', "\0"]
+  - [2, '', !!binary AA==]
 ...
 box.execute([[SELECT COUNT(*) FROM j GROUP BY s2;]])
 ---
@@ -351,8 +351,8 @@ box.execute([[SELECT * FROM j;]])
     type: varbinary
   rows:
   - [1, null, null]
-  - [2, null, "\0"]
-  - [3, 'a', '3']
+  - [2, null, !!binary AA==]
+  - [3, 'a', !!binary Mw==]
 ...
 box.execute([[SELECT * FROM j INDEXED BY I3;]])
 ---
@@ -364,7 +364,7 @@ box.execute([[SELECT * FROM j INDEXED BY I3;]])
   - name: S3
     type: varbinary
   rows:
-  - [3, 'a', '3']
+  - [3, 'a', !!binary Mw==]
 ...
 box.execute([[UPDATE j INDEXED BY i3 SET s3 = NULL;]])
 ---
@@ -373,7 +373,7 @@ box.execute([[UPDATE j INDEXED BY i3 SET s3 = NULL;]])
 s:select{}
 ---
 - - [1, null, null]
-  - [2, null, "\0"]
+  - [2, null, !!binary AA==]
   - [3, 'a', null]
 ...
 s:drop()
diff --git a/test/sql/types.result b/test/sql/types.result
index ea1cc1b8509ab5e129bc2fdd286c7346725593e6..2cfb8fe4a1fd87cf37590ddb6fe459e7c04b6beb 100644
--- a/test/sql/types.result
+++ b/test/sql/types.result
@@ -1128,7 +1128,7 @@ box.execute("SELECT CASE 1 WHEN 1 THEN x'0000000000' WHEN 2 THEN 'str' END")
   - name: COLUMN_1
     type: scalar
   rows:
-  - ["\0\0\0\0\0"]
+  - [!!binary AAAAAAA=]
 ...
 box.execute("SELECT CASE 1 WHEN 1 THEN 666 WHEN 2 THEN 123 END")
 ---
@@ -1239,7 +1239,7 @@ box.execute("SELECT * FROM t WHERE v = x'616263'")
   - name: V
     type: varbinary
   rows:
-  - [1, 'abc']
+  - [1, !!binary YWJj]
 ...
 box.execute("SELECT sum(v) FROM t;")
 ---
@@ -1262,7 +1262,7 @@ box.execute("SELECT min(v) FROM t;")
   - name: COLUMN_1
     type: varbinary
   rows:
-  - ['abc']
+  - [!!binary YWJj]
 ...
 box.execute("SELECT max(v) FROM t;")
 ---
@@ -1270,7 +1270,7 @@ box.execute("SELECT max(v) FROM t;")
   - name: COLUMN_1
     type: varbinary
   rows:
-  - ['abc']
+  - [!!binary YWJj]
 ...
 box.execute("SELECT count(v) FROM t;")
 ---
@@ -1286,7 +1286,7 @@ box.execute("SELECT group_concat(v) FROM t;")
   - name: COLUMN_1
     type: varbinary
   rows:
-  - ['abc']
+  - [!!binary YWJj]
 ...
 box.execute("SELECT lower(v) FROM t;")
 ---
@@ -1325,7 +1325,7 @@ box.execute("SELECT LEAST(v, x'') FROM t;")
   - name: COLUMN_1
     type: varbinary
   rows:
-  - ['']
+  - [!!binary '']
 ...
 box.execute("CREATE INDEX iv ON t(v);")
 ---
@@ -1337,7 +1337,7 @@ box.execute("SELECT v FROM t WHERE v = x'616263';")
   - name: V
     type: varbinary
   rows:
-  - ['abc']
+  - [!!binary YWJj]
 ...
 box.execute("SELECT v FROM t ORDER BY v;")
 ---
@@ -1345,7 +1345,7 @@ box.execute("SELECT v FROM t ORDER BY v;")
   - name: V
     type: varbinary
   rows:
-  - ['abc']
+  - [!!binary YWJj]
 ...
 box.execute("UPDATE t SET v = x'636261' WHERE v = x'616263';")
 ---
@@ -1357,7 +1357,7 @@ box.execute("SELECT v FROM t;")
   - name: V
     type: varbinary
   rows:
-  - ['cba']
+  - [!!binary Y2Jh]
 ...
 box.execute("CREATE TABLE parent (id INT PRIMARY KEY, a VARBINARY UNIQUE);")
 ---
@@ -1422,7 +1422,7 @@ box.execute("INSERT INTO t1 (id) VALUES (1);")
 ...
 box.space.T1:select()
 ---
-- - [1, 'abc']
+- - [1, !!binary YWJj]
 ...
 box.space.T1:drop()
 ---
@@ -1448,7 +1448,7 @@ box.execute("SELECT CAST('asd' AS VARBINARY);")
   - name: COLUMN_1
     type: varbinary
   rows:
-  - ['asd']
+  - [!!binary YXNk]
 ...
 box.execute("SELECT CAST(x'' AS VARBINARY);")
 ---
@@ -1456,7 +1456,7 @@ box.execute("SELECT CAST(x'' AS VARBINARY);")
   - name: COLUMN_1
     type: varbinary
   rows:
-  - ['']
+  - [!!binary '']
 ...
 -- gh-4148: make sure that typeof() returns origin type of column
 -- even if value is null.
@@ -2376,7 +2376,7 @@ box.execute([[SELECT * FROM tv;]])
     type: varbinary
   rows:
   - [1, null]
-  - [2, '44']
+  - [2, !!binary NDQ=]
 ...
 box.execute([[INSERT INTO ts(s) VALUES (NULL);]])
 ---
@@ -2427,7 +2427,7 @@ box.execute([[SELECT * FROM ts;]])
   - [3, 22.2]
   - [4, true]
   - [5, '33']
-  - [6, '44']
+  - [6, !!binary NDQ=]
 ...
 -- Check for UPDATE.
 box.execute([[DELETE FROM ti;]])
@@ -2704,7 +2704,7 @@ box.execute([[SELECT * FROM tv;]])
   - name: V
     type: varbinary
   rows:
-  - [1, '44']
+  - [1, !!binary NDQ=]
 ...
 box.execute([[UPDATE ts SET s = NULL WHERE a = 1;]])
 ---
@@ -2738,7 +2738,7 @@ box.execute([[SELECT * FROM ts;]])
   - name: S
     type: scalar
   rows:
-  - [1, '44']
+  - [1, !!binary NDQ=]
 ...
 box.execute([[DROP TABLE ti;]])
 ---
diff --git a/third_party/lua-yaml/lyaml.cc b/third_party/lua-yaml/lyaml.cc
index 3d3249d0c8f0c4f78239ca5ca68986b7726334c3..90de31510f2293f096122bd3cbac63c162e6b3cb 100644
--- a/third_party/lua-yaml/lyaml.cc
+++ b/third_party/lua-yaml/lyaml.cc
@@ -46,13 +46,12 @@ extern "C" {
 #include <lj_state.h>
 
 #include "yaml.h"
-#include "b64.h"
 } /* extern "C" */
+
+#include "base64.h"
 #include "lua/utils.h"
 #include "lua/serializer.h"
 #include "lib/core/decimal.h"
-#include "diag.h"
-#include "tt_static.h"
 #include "mp_extension_types.h" /* MP_DECIMAL, MP_UUID */
 #include "tt_uuid.h" /* tt_uuid_to_string(), UUID_STR_LEN */
 #include "tweaks.h"
@@ -99,17 +98,24 @@ struct lua_yaml_dumper {
 
    lua_State *outputL;
    luaL_Buffer yamlbuf;
-   int reftable_index;
 };
 
 /**
- * By default, only strings that contain a '\n\n' substring are encoded in
- * the block scalar style. Setting this flag, makes the encoder use the block
- * scalar style for all multiline strings.
+ * By default, all strings that contain '\n' are encoded in the block scalar
+ * style. Setting this flag to false, makes the encoder use default yaml style
+ * with excessive newlines for all strins without "\n\n" substring. This is a
+ * compatibility-only feature.
  */
-static bool yaml_pretty_multiline;
+static bool yaml_pretty_multiline = true;
 TWEAK_BOOL(yaml_pretty_multiline);
 
+/**
+ * If this flag is set, a binary data field will be decoded to a plain Lua
+ * string, not a varbinary object.
+ */
+static bool yaml_decode_binary_as_string = false;
+TWEAK_BOOL(yaml_decode_binary_as_string);
+
 /**
  * Verify whether a string represents a boolean literal in YAML.
  *
@@ -159,54 +165,6 @@ yaml_is_null(const char *str, size_t len)
    return false;
 }
 
-/**
- * Verify whether a string represents a number literal in YAML.
- *
- * Non-standard:
- *
- * False-positives:
- * - 'inf', 'nan' literals despite the case are parsed as numbers
- *   (the standard specifies only 'inf', 'Inf', 'INF', 'nan',
- *   'NaN', 'NAN').
- * - 'infinity' (ignoring case) is considered a number.
- * - Binary literals ('0b...') are considered numbers.
- *
- * Bugs:
- * - Octal numbers are not supported.
- *
- * This function is used only in encoding for wrapping strings
- * containing number literals in quotes to make YAML parser
- * handle them as strings. It means false-positives will lead to
- * extra quotation marks and are not dangerous at all.
- *
- * @param str Literal to check.
- * @param len Length of @a str.
- *
- * @retval Whether @a str represents a number value.
- */
-static inline bool
-yaml_is_number(const char *str, size_t len, struct lua_State *L)
-{
-   /*
-    * TODO: Should be implemented with the literal parser
-    * instead of using strtod() and lua_isnumber().
-    * Using parser will make it possible to remove the third
-    * argument.
-    */
-   if (len == 0)
-      return false;
-
-   if (lua_isnumber(L, -1))
-      return true;
-
-   char *endptr = NULL;
-   fpconv_strtod(str, &endptr);
-   if (endptr == str + len)
-      return true;
-
-   return false;
-}
-
 static void generate_error_message(struct lua_yaml_loader *loader) {
    char buf[256];
    luaL_Buffer b;
@@ -327,7 +285,14 @@ static void load_scalar(struct lua_yaml_loader *loader) {
          lua_pushboolean(loader->L, value);
          return;
       } else if (!strcmp(tag, "binary")) {
-         frombase64(loader->L, (const unsigned char *)str, length);
+         int bufsize = base64_decode_bufsize(length);
+         char *buf = (char *)xmalloc(bufsize);
+         int size = base64_decode(str, length, buf, bufsize);
+         if (yaml_decode_binary_as_string)
+            lua_pushlstring(loader->L, buf, size);
+         else
+            luaT_pushvarbinary(loader->L, buf, size);
+         free(buf);
          return;
       }
    }
@@ -665,6 +630,8 @@ static int yaml_is_flow_mode(struct lua_yaml_dumper *dumper) {
    return 0;
 }
 
+static void find_references(struct lua_yaml_dumper *dumper);
+
 static int dump_node(struct lua_yaml_dumper *dumper)
 {
    size_t len = 0;
@@ -679,13 +646,14 @@ static int dump_node(struct lua_yaml_dumper *dumper)
    bool unused;
    (void) unused;
 
-   luaT_reftable_serialize(dumper->L, dumper->reftable_index);
    yaml_char_t *anchor = get_yaml_anchor(dumper);
    if (anchor && !*anchor)
       return 1;
 
    int top = lua_gettop(dumper->L);
    luaL_checkfield(dumper->L, dumper->cfg, top, &field);
+   if (field.serialized)
+      find_references(dumper);
    switch(field.type) {
    case MP_UINT:
       snprintf(buf, sizeof(buf) - 1, "%" PRIu64, field.ival);
@@ -714,9 +682,10 @@ static int dump_node(struct lua_yaml_dumper *dumper)
    case MP_MAP:
       return dump_table(dumper, &field, anchor);
    case MP_STR:
-      str = lua_tolstring(dumper->L, -1, &len);
+      str = field.sval.data;
+      len = field.sval.len;
       if (yaml_is_null(str, len) || yaml_is_bool(str, len, &unused) ||
-          yaml_is_number(str, len, dumper->L)) {
+          lua_isnumber(dumper->L, -1)) {
          /*
           * The string is convertible to a null, a boolean or
           * a number, quote it to preserve its type.
@@ -739,8 +708,10 @@ static int dump_node(struct lua_yaml_dumper *dumper)
       break;
    case MP_BIN:
       is_binary = 1;
-      tobase64(dumper->L, -1);
-      str = lua_tolstring(dumper->L, -1, &len);
+      len = base64_encode_bufsize(field.sval.len, BASE64_NOWRAP);
+      str = (char *)xmalloc(len);
+      len = base64_encode(field.sval.data, field.sval.len, (char *)str, len,
+                          BASE64_NOWRAP);
       tag = (yaml_char_t *) LUAYAML_TAG_PREFIX "binary";
       break;
    case MP_BOOL:
@@ -785,15 +756,16 @@ static int dump_node(struct lua_yaml_dumper *dumper)
       break;
     }
 
+   int rc = 1;
    if (!yaml_scalar_event_initialize(&ev, NULL, tag, (unsigned char *)str, len,
                                      !is_binary, !is_binary, style) ||
        !yaml_emitter_emit(&dumper->emitter, &ev))
-      return 0;
+      rc = 0;
 
    if (is_binary)
-      lua_pop(dumper->L, 1);
+      free((void *)str);
 
-   return 1;
+   return rc;
 }
 
 static void dump_document(struct lua_yaml_dumper *dumper) {
@@ -819,12 +791,9 @@ static int append_output(void *arg, unsigned char *buf, size_t len) {
 }
 
 static void find_references(struct lua_yaml_dumper *dumper) {
-   int newval = -1;
-
-   lua_pushvalue(dumper->L, -1); /* push copy of table */
-   luaT_reftable_serialize(dumper->L, dumper->reftable_index);
-   if (lua_type(dumper->L, -1) != LUA_TTABLE)
-      goto done;
+   int newval = -1, type = lua_type(dumper->L, -1);
+   if (type != LUA_TTABLE)
+      return;
 
    lua_pushvalue(dumper->L, -1); /* push copy of table */
    lua_rawget(dumper->L, dumper->anchortable_index);
@@ -839,7 +808,7 @@ static void find_references(struct lua_yaml_dumper *dumper) {
       lua_rawset(dumper->L, dumper->anchortable_index);
    }
    if (newval)
-      goto done;
+      return;
 
    /* recursively process other table values */
    lua_pushnil(dumper->L);
@@ -848,17 +817,6 @@ static void find_references(struct lua_yaml_dumper *dumper) {
       lua_pop(dumper->L, 1);
       find_references(dumper); /* find references on key */
    }
-
-done:
-   /*
-    * Pop the serialized object, leave the original object on top
-    * of the Lua stack.
-    *
-    * NB: It is important for the cycle above: it assumes that
-    * table keys are not changed in the recursive call. Otherwise
-    * it would feed an incorrect key to lua_next().
-    */
-   lua_pop(dumper->L, 1);
 }
 
 int
@@ -905,16 +863,12 @@ lua_yaml_encode(lua_State *L, struct luaL_serializer *serializer,
    lua_newtable(L);
    dumper.anchortable_index = lua_gettop(L);
    dumper.anchor_number = 0;
-
-   luaT_reftable_new(L, dumper.cfg, 1);
-   dumper.reftable_index = lua_gettop(L);
-
    lua_pushvalue(L, 1); /* push copy of arg we're processing */
    find_references(&dumper);
    dump_document(&dumper);
    if (dumper.error)
       goto error;
-   lua_pop(L, 3); /* pop copied arg and anchor/ref tables */
+   lua_pop(L, 2); /* pop copied arg and anchor table */
 
    if (!yaml_stream_end_event_initialize(&ev) ||
        !yaml_emitter_emit(&dumper.emitter, &ev) ||