From 4be5de4c9e34d0680d53ccc01d07098e7ec3f89a Mon Sep 17 00:00:00 2001
From: Vladimir Davydov <vdavydov@tarantool.org>
Date: Wed, 6 Oct 2021 19:21:48 +0300
Subject: [PATCH] iproto: add watchers support

Part of #6257

@TarantoolBot document
Title: Document IPROTO watchers

There are three new commands to support asynchronous server->client
notifications signaled with `box.broadcast()`:

 - `IPROTO_WATCH` (code 74). Registers a new watcher for the given
   notification key or acknowledges a notification if a watcher is
   already registered. The key name is passed in `IPROTO_EVENT_KEY`
   (code 0x56). The watcher will be notified unconditionally after
   registration and then every time the key is updated with
   `box.broadcast()` provided the last notification was acknowledged.
   The server doesn't reply to the request unless it fails to parse
   the packet.

 - `IPROTO_UNWATCH` (code 75). Unregisters a watcher registered for the
   given notification key. The key name is passed in `IPROTO_EVENT_KEY`
   (code 0x56). A server doesn't reply to the request unless it fails to
   parse the packet.

 - `IPROTO_EVENT` (code 76). Sent by the server to notify a client
   about a key update. The key name is passed in `IPROTO_EVENT_KEY`
   (code 0x56). The key data (optional) is passed in `IPROTO_EVENT_DATA`
   (code 0x57).

When a connection is closed, all watchers registered for it are
unregistered.

Servers that support the new feature set the `IPROTO_FEATURE_WATCHERS`
feature bit (bit 3) in reply to the `IPROTO_ID` command.
---
 .../unreleased/gh-6257-iproto-watcher.md      |   3 +
 src/box/iproto.cc                             |  59 +++++++++-
 src/box/iproto_constants.c                    |   4 +
 src/box/iproto_constants.h                    |  25 +++++
 src/box/iproto_features.c                     |   2 +
 src/box/iproto_features.h                     |   7 +-
 src/box/lua/net_box.lua                       |   1 +
 src/box/session.cc                            |  94 ++++++++++++++++
 src/box/session.h                             |  28 +++++
 src/box/xrow.c                                |  88 +++++++++++++++
 src/box/xrow.h                                |  35 ++++++
 test/box-py/iproto.result                     |  51 ++++++++-
 test/box-py/iproto.test.py                    | 101 +++++++++++++++++-
 test/box/net.box_iproto_id.result             |  11 +-
 14 files changed, 496 insertions(+), 13 deletions(-)
 create mode 100644 changelogs/unreleased/gh-6257-iproto-watcher.md

diff --git a/changelogs/unreleased/gh-6257-iproto-watcher.md b/changelogs/unreleased/gh-6257-iproto-watcher.md
new file mode 100644
index 0000000000..0876dcc938
--- /dev/null
+++ b/changelogs/unreleased/gh-6257-iproto-watcher.md
@@ -0,0 +1,3 @@
+## feature/core
+
+* Added watchers support to the network protocol (gh-6257).
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index e4d9f8c72c..2aefbe1eb0 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -315,6 +315,8 @@ struct iproto_msg
 		struct request dml;
 		/** Box request, if this is a call or eval. */
 		struct call_request call;
+		/** Watch request. */
+		struct watch_request watch;
 		/** Authentication request. */
 		struct auth_request auth;
 		/** Features request. */
@@ -1541,6 +1543,12 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
 			goto error;
 		cmsg_init(&msg->base, iproto_thread->call_route);
 		break;
+	case IPROTO_WATCH:
+	case IPROTO_UNWATCH:
+		if (xrow_decode_watch(&msg->header, &msg->watch) != 0)
+			goto error;
+		cmsg_init(&msg->base, iproto_thread->misc_route);
+		break;
 	case IPROTO_EXECUTE:
 	case IPROTO_PREPARE:
 		if (xrow_decode_sql(&msg->header, &msg->sql) != 0)
@@ -2112,6 +2120,11 @@ tx_process_call(struct cmsg *m)
 	tx_end_msg(msg);
 }
 
+static void
+iproto_session_notify(struct session *session,
+		      const char *key, size_t key_len,
+		      const char *data, const char *data_end);
+
 static void
 tx_process_misc(struct cmsg *m)
 {
@@ -2149,6 +2162,17 @@ tx_process_misc(struct cmsg *m)
 			iproto_reply_vote_xc(out, &ballot, msg->header.sync,
 					     ::schema_version);
 			break;
+		case IPROTO_WATCH:
+			session_watch(con->session, msg->watch.key,
+				      msg->watch.key_len,
+				      iproto_session_notify);
+			/* Sic: no reply. */
+			break;
+		case IPROTO_UNWATCH:
+			session_unwatch(con->session, msg->watch.key,
+					msg->watch.key_len);
+			/* Sic: no reply. */
+			break;
 		default:
 			unreachable();
 		}
@@ -2644,6 +2668,15 @@ tx_end_push(struct cmsg *m)
 		tx_begin_push(con);
 }
 
+static void
+tx_push(struct iproto_connection *con)
+{
+	if (!con->tx.is_push_sent)
+		tx_begin_push(con);
+	else
+		con->tx.is_push_pending = true;
+}
+
 /**
  * Push a message from @a port to a remote client.
  * @param session iproto session.
@@ -2668,13 +2701,31 @@ iproto_session_push(struct session *session, struct port *port)
 	}
 	iproto_reply_chunk(con->tx.p_obuf, &svp, iproto_session_sync(session),
 			   ::schema_version);
-	if (! con->tx.is_push_sent)
-		tx_begin_push(con);
-	else
-		con->tx.is_push_pending = true;
+	tx_push(con);
 	return 0;
 }
 
+/**
+ * Sends a notification to a remote watcher when a key is updated.
+ * Uses IPROTO_PUSH (kharon) infrastructure to signal the iproto thread
+ * about new data.
+ */
+static void
+iproto_session_notify(struct session *session,
+		      const char *key, size_t key_len,
+		      const char *data, const char *data_end)
+{
+	struct iproto_connection *con =
+		(struct iproto_connection *)session->meta.connection;
+	if (iproto_send_event(con->tx.p_obuf, key, key_len,
+			      data, data_end) != 0) {
+		/* Nothing we can do on error but log the error. */
+		diag_log();
+		return;
+	}
+	tx_push(con);
+}
+
 /** }}} */
 
 static inline void
diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
index e3ffb0ab92..8a49257743 100644
--- a/src/box/iproto_constants.c
+++ b/src/box/iproto_constants.c
@@ -150,6 +150,8 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] =
 	/* 0x54 */	MP_UINT, /* IPROTO_VERSION */
 	/* 0x55 */	MP_ARRAY, /* IPROTO_FEATURES */
 	/* 0x56 */	MP_DOUBLE, /* IPROTO_TIMEOUT */
+	/* 0x57 */	MP_STR, /* IPROTO_EVENT_KEY */
+	/* 0x58 */	MP_NIL, /* IPROTO_EVENT_DATA (can be any) */
 	/* }}} */
 };
 
@@ -284,6 +286,8 @@ const char *iproto_key_strs[IPROTO_KEY_MAX] = {
 	"version",          /* 0x54 */
 	"features",         /* 0x55 */
 	"timeout",          /* 0x56 */
+	"event key",        /* 0x57 */
+	"event data",       /* 0x58 */
 };
 
 const char *vy_page_info_key_strs[VY_PAGE_INFO_KEY_MAX] = {
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index f616e66b2f..f31915483f 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -145,6 +145,9 @@ enum iproto_key {
 	IPROTO_FEATURES = 0x55,
 	/** Operation timeout. Specific to request type. */
 	IPROTO_TIMEOUT = 0x56,
+	/** Key name and data sent to a remote watcher. */
+	IPROTO_EVENT_KEY = 0x57,
+	IPROTO_EVENT_DATA = 0x58,
 	/*
 	 * Be careful to not extend iproto_key values over 0x7f.
 	 * iproto_keys are encoded in msgpack as positive fixnum, which ends at
@@ -257,6 +260,28 @@ enum iproto_type {
 	IPROTO_JOIN_SNAPSHOT = 72,
 	/** Protocol features request. */
 	IPROTO_ID = 73,
+	/**
+	 * The following three request types are used by the remote watcher
+	 * protocol (box.watch over network), which operates as follows:
+	 *
+	 *  1. The client sends an IPROTO_WATCH packet to subscribe to changes
+	 *     of a specified key defined on the server.
+	 *  2. The server sends an IPROTO_EVENT packet to the subscribed client
+	 *     with the key name and its current value unconditionally after
+	 *     registration and then every time the key value is updated
+	 *     provided the last notification was acknowledged (see below).
+	 *  3. Upon receiving a notification, the client sends an IPROTO_WATCH
+	 *     packet to acknowledge the notification.
+	 *  4. When the client doesn't want to receive any more notifications,
+	 *     it unsubscribes by sending an IPROTO_UNWATCH packet.
+	 *
+	 * All the three request types are fully asynchronous - a receiving end
+	 * doesn't send a packet in reply to any of them (therefore neither of
+	 * them has a sync number).
+	 */
+	IPROTO_WATCH = 74,
+	IPROTO_UNWATCH = 75,
+	IPROTO_EVENT = 76,
 
 	/** Vinyl run info stored in .index file */
 	VY_INDEX_RUN_INFO = 100,
diff --git a/src/box/iproto_features.c b/src/box/iproto_features.c
index d9b5ff69c2..ee1e922762 100644
--- a/src/box/iproto_features.c
+++ b/src/box/iproto_features.c
@@ -64,4 +64,6 @@ iproto_features_init(void)
 			    IPROTO_FEATURE_TRANSACTIONS);
 	iproto_features_set(&IPROTO_CURRENT_FEATURES,
 			    IPROTO_FEATURE_ERROR_EXTENSION);
+	iproto_features_set(&IPROTO_CURRENT_FEATURES,
+			    IPROTO_FEATURE_WATCHERS);
 }
diff --git a/src/box/iproto_features.h b/src/box/iproto_features.h
index d6c3f6bffb..25eb3932a7 100644
--- a/src/box/iproto_features.h
+++ b/src/box/iproto_features.h
@@ -40,6 +40,11 @@ enum iproto_feature_id {
 	 * MsgPack extension.
 	 */
 	IPROTO_FEATURE_ERROR_EXTENSION = 2,
+	/**
+	 * Remote watchers support:
+	 * IPROTO_WATCH, IPROTO_UNWATCH, IPROTO_EVENT commands.
+	 */
+	IPROTO_FEATURE_WATCHERS = 3,
 	iproto_feature_id_MAX,
 };
 
@@ -55,7 +60,7 @@ struct iproto_features {
  * It should be incremented every time a new feature is added or removed.
  */
 enum {
-	IPROTO_CURRENT_VERSION = 2,
+	IPROTO_CURRENT_VERSION = 3,
 };
 
 /**
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index c1383261e0..da9bca9c3e 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -64,6 +64,7 @@ local IPROTO_FEATURE_NAMES = {
     [0]     = 'streams',
     [1]     = 'transactions',
     [2]     = 'error_extension',
+    [3]     = 'watchers',
 }
 
 -- Given an array of IPROTO feature ids, returns a map {feature_name: bool}.
diff --git a/src/box/session.cc b/src/box/session.cc
index 844eddeca5..7fb1d1abf5 100644
--- a/src/box/session.cc
+++ b/src/box/session.cc
@@ -37,6 +37,7 @@
 #include "error.h"
 #include "tt_static.h"
 #include "sql_stmt_cache.h"
+#include "watcher.h"
 
 const char *session_type_strs[] = {
 	"background",
@@ -95,6 +96,95 @@ session_on_stop(struct trigger *trigger, void * /* event */)
 	return 0;
 }
 
+/**
+ * Watcher registered for a session. Unregistered when the session is closed.
+ */
+struct session_watcher {
+	struct watcher base;
+	struct session *session;
+	session_notify_f cb;
+};
+
+static void
+session_watcher_run_f(struct watcher *base)
+{
+	struct session_watcher *watcher = (struct session_watcher *)base;
+	size_t key_len;
+	const char *key = watcher_key(base, &key_len);
+	const char *data_end;
+	const char *data = watcher_data(base, &data_end);
+	watcher->cb(watcher->session, key, key_len, data, data_end);
+}
+
+void
+session_watch(struct session *session, const char *key,
+	      size_t key_len, session_notify_f cb)
+{
+	/* Look up a watcher for the specified key in this session. */
+	struct mh_strnptr_t *h = session->watchers;
+	if (h == NULL)
+		h = session->watchers = mh_strnptr_new();
+	uint32_t key_hash = mh_strn_hash(key, key_len);
+	struct mh_strnptr_key_t k = {key, key_len, key_hash};
+	mh_int_t i = mh_strnptr_find(h, &k, NULL);
+	/* If a watcher is already registered, acknowledge a notification. */
+	if (i != mh_end(h)) {
+		struct session_watcher *watcher =
+			(struct session_watcher *)mh_strnptr_node(h, i)->val;
+		watcher_ack(&watcher->base);
+		return;
+	}
+	/* Otherwise register a new watcher. */
+	struct session_watcher *watcher =
+		(struct session_watcher *)xmalloc(sizeof(*watcher));
+	watcher->session = session;
+	watcher->cb = cb;
+	box_register_watcher(key, key_len, session_watcher_run_f,
+			     (watcher_destroy_f)free, WATCHER_EXPLICIT_ACK,
+			     &watcher->base);
+	key = watcher_key(&watcher->base, &key_len);
+	struct mh_strnptr_node_t n = {
+		key, key_len, key_hash, watcher
+	};
+	mh_strnptr_put(h, &n, NULL, NULL);
+}
+
+void
+session_unwatch(struct session *session, const char *key,
+		size_t key_len)
+{
+	struct mh_strnptr_t *h = session->watchers;
+	if (h == NULL)
+		return;
+	mh_int_t i = mh_strnptr_find_inp(h, key, key_len);
+	if (i == mh_end(h))
+		return;
+	struct session_watcher *watcher =
+		(struct session_watcher *)mh_strnptr_node(h, i)->val;
+	mh_strnptr_del(h, i, NULL);
+	watcher_unregister(&watcher->base);
+}
+
+/**
+ * Unregisters all watchers registered in this session.
+ * Called when the session is closed.
+ */
+static void
+session_unregister_all_watchers(struct session *session)
+{
+	struct mh_strnptr_t *h = session->watchers;
+	if (h == NULL)
+		return;
+	mh_int_t i;
+	mh_foreach(h, i) {
+		struct session_watcher *watcher =
+			(struct session_watcher *)mh_strnptr_node(h, i)->val;
+		watcher_unregister(&watcher->base);
+	}
+	mh_strnptr_delete(h);
+	session->watchers = NULL;
+}
+
 static int
 closed_session_push(struct session *session, struct port *port)
 {
@@ -114,6 +204,7 @@ void
 session_close(struct session *session)
 {
 	session->vtab = &closed_session_vtab;
+	session_unregister_all_watchers(session);
 }
 
 void
@@ -142,6 +233,7 @@ session_create(enum session_type type)
 	session->sql_flags = default_flags;
 	session->sql_default_engine = SQL_STORAGE_ENGINE_MEMTX;
 	session->sql_stmts = NULL;
+	session->watchers = NULL;
 
 	/* For on_connect triggers. */
 	credentials_create(&session->credentials, guest_user);
@@ -244,6 +336,8 @@ session_run_on_auth_triggers(const struct on_auth_trigger_ctx *result)
 void
 session_destroy(struct session *session)
 {
+	/* Watchers are unregistered in session_close(). */
+	assert(session->watchers == NULL);
 	session_storage_cleanup(session->id);
 	struct mh_i64ptr_node_t node = { session->id, NULL };
 	mh_i64ptr_remove(session_registry, &node, NULL);
diff --git a/src/box/session.h b/src/box/session.h
index 2d1517d5bd..fd5bb146ff 100644
--- a/src/box/session.h
+++ b/src/box/session.h
@@ -114,6 +114,11 @@ struct session {
 	const struct session_vtab *vtab;
 	/** Session metadata. */
 	struct session_meta meta;
+	/**
+	 * Watchers registered for this session (key -> session_watcher).
+	 * Allocated on demand.
+	 */
+	struct mh_strnptr_t *watchers;
 	/**
 	 * ID of statements prepared in current session.
 	 * This map is allocated on demand.
@@ -319,6 +324,29 @@ session_run_on_disconnect_triggers(struct session *session);
 int
 session_run_on_auth_triggers(const struct on_auth_trigger_ctx *result);
 
+typedef void
+(*session_notify_f)(struct session *session, const char *key, size_t key_len,
+		    const char *data, const char *data_end);
+
+/**
+ * If there's no watcher registered for the specified key in the given session,
+ * this function registers one, otherwise it acknowledges a notification.
+ *
+ * The callback will be called unconditionally right after registration and
+ * then every time the key is updated provided the last notification was
+ * acknowledged.
+ */
+void
+session_watch(struct session *session, const char *key, size_t key_len,
+	      session_notify_f cb);
+
+/**
+ * Unregisters a watcher registered for the given session and notification key.
+ * Does nothing if there's no watcher registered
+ */
+void
+session_unwatch(struct session *session, const char *key, size_t key_len);
+
 /**
  * Check whether or not the current user is authorized to connect
  */
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 825067728d..5cba746b2c 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -750,6 +750,53 @@ iproto_reply_chunk(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
 	memcpy(pos + IPROTO_HEADER_LEN, &body, sizeof(body));
 }
 
+int
+iproto_send_event(struct obuf *out, const char *key, size_t key_len,
+		  const char *data, const char *data_end)
+{
+	/* Calculate the packet size. */
+	size_t size = 5;
+	/* Packet header. Note: no sync and schema version. */
+	size += mp_sizeof_map(1);
+	size += mp_sizeof_uint(IPROTO_REQUEST_TYPE);
+	size += mp_sizeof_uint(IPROTO_EVENT);
+	/* Packet body. */
+	size += mp_sizeof_map(data != NULL ? 2 : 1);
+	size += mp_sizeof_uint(IPROTO_EVENT_KEY);
+	size += mp_sizeof_str(key_len);
+	if (data != NULL) {
+		size += mp_sizeof_uint(IPROTO_EVENT_DATA);
+		size += data_end - data;
+	}
+	/* Encode the packet. */
+	char *buf = obuf_alloc(out, size);
+	if (buf == NULL) {
+		diag_set(OutOfMemory, size, "obuf_alloc", "buf");
+		return -1;
+	}
+	char *p = buf;
+	/* Fix header. */
+	*(p++) = 0xce;
+	mp_store_u32(p, size - 5);
+	p += 4;
+	/* Packet header. */
+	p = mp_encode_map(p, 1);
+	p = mp_encode_uint(p, IPROTO_REQUEST_TYPE);
+	p = mp_encode_uint(p, IPROTO_EVENT);
+	/* Packet body. */
+	p = mp_encode_map(p, data != NULL ? 2 : 1);
+	p = mp_encode_uint(p, IPROTO_EVENT_KEY);
+	p = mp_encode_str(p, key, key_len);
+	if (data != NULL) {
+		p = mp_encode_uint(p, IPROTO_EVENT_DATA);
+		memcpy(p, data, data_end - data);
+		p += data_end - data;
+	}
+	assert(size == (size_t)(p - buf));
+	(void)p;
+	return 0;
+}
+
 int
 xrow_decode_dml(struct xrow_header *row, struct request *request,
 		uint64_t key_map)
@@ -1269,6 +1316,47 @@ xrow_decode_call(const struct xrow_header *row, struct call_request *request)
 	return 0;
 }
 
+int
+xrow_decode_watch(const struct xrow_header *row, struct watch_request *request)
+{
+	if (row->bodycnt == 0) {
+		diag_set(ClientError, ER_INVALID_MSGPACK,
+			 "missing request body");
+		return -1;
+	}
+	assert(row->bodycnt == 1);
+	const char *data = (const char *)row->body[0].iov_base;
+	if (mp_typeof(*data) != MP_MAP) {
+error:
+		xrow_on_decode_err(row, ER_INVALID_MSGPACK, "packet body");
+		return -1;
+	}
+	memset(request, 0, sizeof(*request));
+	uint32_t map_size = mp_decode_map(&data);
+	for (uint32_t i = 0; i < map_size; i++) {
+		if (mp_typeof(*data) != MP_UINT)
+			goto error;
+		uint64_t key = mp_decode_uint(&data);
+		if (key < IPROTO_KEY_MAX &&
+		    iproto_key_type[key] != mp_typeof(*data))
+			goto error;
+		switch (key) {
+		case IPROTO_EVENT_KEY:
+			request->key = mp_decode_str(&data, &request->key_len);
+			break;
+		default:
+			mp_next(&data);
+			break;
+		}
+	}
+	if (request->key == NULL) {
+		xrow_on_decode_err(row, ER_MISSING_REQUEST_FIELD,
+				   iproto_key_name(IPROTO_EVENT_KEY));
+		return -1;
+	}
+	return 0;
+}
+
 int
 xrow_decode_auth(const struct xrow_header *row, struct auth_request *request)
 {
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 762b6e36b6..78b71b6e69 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -350,6 +350,26 @@ struct call_request {
 int
 xrow_decode_call(const struct xrow_header *row, struct call_request *request);
 
+/**
+ * WATCH/UNWATCH request.
+ */
+struct watch_request {
+	/** Notification key name. String, not null-terminated. */
+	const char *key;
+	/** Length of the notification key name string. */
+	uint32_t key_len;
+};
+
+/**
+ * Decode WATCH/UNWATCH request from MessagePack.
+ * @param row Request header.
+ * @param[out] request Request to decode to.
+ * @retval  0 on success
+ * @retval -1 on error
+ */
+int
+xrow_decode_watch(const struct xrow_header *row, struct watch_request *request);
+
 /**
  * AUTH request
  */
@@ -762,6 +782,21 @@ void
 iproto_reply_chunk(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
 		   uint32_t schema_version);
 
+/**
+ * Encode IPROTO_EVENT packet.
+ * @param out Encode to.
+ * @param key Notification key name.
+ * @param key_len Length of the notification key name.
+ * @param data Notification data (MsgPack).
+ * @param data_end End of notification data.
+ *
+ * @retval  0 Success.
+ * @retval -1 Memory error.
+ */
+int
+iproto_send_event(struct obuf *out, const char *key, size_t key_len,
+		  const char *data, const char *data_end);
+
 /** Write error directly to a socket. */
 void
 iproto_do_write_error(int fd, const struct error *e, uint32_t schema_version,
diff --git a/test/box-py/iproto.result b/test/box-py/iproto.result
index 60e0ced124..ea32f8bc87 100644
--- a/test/box-py/iproto.result
+++ b/test/box-py/iproto.result
@@ -208,6 +208,53 @@ Invalid MsgPack - request body
 # Invalid features
 Invalid MsgPack - request body
 # Empty request body
-version=2, features=[0, 1, 2]
+version=3, features=[0, 1, 2, 3]
 # Unknown version and features
-version=2, features=[0, 1, 2]
+version=3, features=[0, 1, 2, 3]
+
+#
+# gh-6257 Watchers
+#
+
+# Missing key
+error: Missing mandatory field 'event key' in request
+# Invalid key type
+error: Invalid MsgPack - packet body
+# Watch key 'foo'
+# Recieve event
+key='foo', value=None
+# Watch key 'bar'
+# Recieve event
+key='bar', value=None
+# Unwatch key 'bar'
+# Watch key 'bar'
+# Recieve event
+key='bar', value=None
+box.broadcast('foo', {1, 2, 3})
+---
+...
+# Recieve event
+   =>  Failed to recv response
+<no event received>
+# Watch key 'foo'
+# Recieve event
+key='foo', value=[1, 2, 3]
+# Watch key 'bar'
+box.broadcast('bar', 123)
+---
+...
+# Recieve event
+key='bar', value=123
+box.broadcast('bar', 456)
+---
+...
+# Unwatch key 'bar'
+# Recieve event
+   =>  Failed to recv response
+<no event received>
+box.broadcast('foo', nil)
+---
+...
+box.broadcast('bar', nil)
+---
+...
diff --git a/test/box-py/iproto.test.py b/test/box-py/iproto.test.py
index f11bd8161c..8c8181d260 100644
--- a/test/box-py/iproto.test.py
+++ b/test/box-py/iproto.test.py
@@ -17,6 +17,13 @@ if not 'REQUEST_TYPE_ID' in locals():
     IPROTO_VERSION = 0x54
     IPROTO_FEATURES = 0x55
 
+if not 'REQUEST_TYPE_WATCH' in locals():
+    REQUEST_TYPE_WATCH = 74
+    REQUEST_TYPE_UNWATCH = 75
+    REQUEST_TYPE_EVENT = 76
+    IPROTO_EVENT_KEY = 0x57
+    IPROTO_EVENT_DATA = 0x58
+
 admin("box.schema.user.grant('guest', 'read,write,execute', 'universe')")
 
 print("""
@@ -250,24 +257,33 @@ def receive_response():
         unpacker.feed(resp_headerbody)
         resp_header = unpacker.unpack()
         resp_body = unpacker.unpack()
-    except OSError as e:
+    except (OSError, socket.timeout) as e:
         print("   => ", "Failed to recv response")
     res = {}
     res["header"] = resp_header
     res["body"] = resp_body
     return res
 
-def test_request(req_header, req_body):
+def send_request(req_header, req_body):
     query_header = msgpack.dumps(req_header)
     query_body = msgpack.dumps(req_body)
     packet_len = len(query_header) + len(query_body)
     query = msgpack.dumps(packet_len) + query_header + query_body
     try:
         s.send(query)
-    except OSError as e:
+    except (OSError, socket.timeout) as e:
         print("   => ", "Failed to send request")
+
+def test_request(req_header, req_body):
+    send_request(req_header, req_body)
     return receive_response()
 
+def resp_status(resp):
+    if resp["header"][IPROTO_CODE] == REQUEST_TYPE_OK:
+        return "ok"
+    else:
+        return "error: {}".format(resp["body"][IPROTO_ERROR].decode("utf-8"))
+
 header = { IPROTO_CODE : REQUEST_TYPE_SELECT}
 body = { IPROTO_SPACE_ID: space_id,
     IPROTO_INDEX_ID: 0,
@@ -477,3 +493,82 @@ resp = test_request(header, { IPROTO_VERSION: 99999999,
 print("version={}, features={}".format(
     resp["body"][IPROTO_VERSION], resp["body"][IPROTO_FEATURES]))
 c.close()
+
+print("""
+#
+# gh-6257 Watchers
+#
+""")
+def watch(key):
+    print("# Watch key '{}'".format(key))
+    send_request({IPROTO_CODE: REQUEST_TYPE_WATCH}, {IPROTO_EVENT_KEY: key})
+
+def unwatch(key):
+    print("# Unwatch key '{}'".format(key))
+    send_request({IPROTO_CODE: REQUEST_TYPE_UNWATCH}, {IPROTO_EVENT_KEY: key})
+
+def receive_event():
+    print("# Recieve event")
+    resp = receive_response()
+    code = resp["header"].get(IPROTO_CODE)
+    if code is None:
+        print("<no event received>")
+        return
+    if code == REQUEST_TYPE_EVENT:
+        print("key='{}', value={}".format(
+            resp["body"].get(IPROTO_EVENT_KEY, '').decode('utf-8'),
+            resp["body"].get(IPROTO_EVENT_DATA)))
+    else:
+        print("Unexpected packet: {}".format(resp))
+
+def check_no_event():
+    s.settimeout(0.01)
+    receive_event()
+    s.settimeout(None)
+
+c = Connection("localhost", server.iproto.port)
+c.connect()
+s = c._socket
+
+print("# Missing key")
+resp = test_request({IPROTO_CODE: REQUEST_TYPE_WATCH}, {})
+print(resp_status(resp))
+
+print("# Invalid key type")
+resp = test_request({IPROTO_CODE: REQUEST_TYPE_WATCH},
+                    {IPROTO_EVENT_KEY: 123})
+print(resp_status(resp))
+
+# Register a watcher
+watch("foo")
+receive_event()
+
+# Register a watcher for another key
+watch("bar")
+receive_event()
+
+# Unregister and register watcher
+unwatch("bar")
+watch("bar")
+receive_event()
+
+# No notification without ack
+admin("box.broadcast('foo', {1, 2, 3})")
+check_no_event()
+
+# Notification after ack
+watch("foo")
+receive_event()
+watch("bar")
+admin("box.broadcast('bar', 123)")
+receive_event()
+
+# No notification after unregister
+admin("box.broadcast('bar', 456)")
+unwatch("bar")
+check_no_event()
+
+# Cleanup
+c.close()
+admin("box.broadcast('foo', nil)")
+admin("box.broadcast('bar', nil)")
diff --git a/test/box/net.box_iproto_id.result b/test/box/net.box_iproto_id.result
index 92b491d87d..c71c46b151 100644
--- a/test/box/net.box_iproto_id.result
+++ b/test/box/net.box_iproto_id.result
@@ -15,11 +15,12 @@ c = net.connect(box.cfg.listen)
  | ...
 c.peer_protocol_version
  | ---
- | - 2
+ | - 3
  | ...
 c.peer_protocol_features
  | ---
  | - transactions: true
+ |   watchers: true
  |   error_extension: true
  |   streams: true
  | ...
@@ -46,6 +47,7 @@ c.peer_protocol_version
 c.peer_protocol_features
  | ---
  | - transactions: false
+ |   watchers: false
  |   error_extension: false
  |   streams: false
  | ...
@@ -93,6 +95,7 @@ c.peer_protocol_version
 c.peer_protocol_features
  | ---
  | - transactions: true
+ |   watchers: true
  |   error_extension: true
  |   streams: true
  | ...
@@ -140,11 +143,12 @@ c.error -- error
  | ...
 c.peer_protocol_version
  | ---
- | - 2
+ | - 3
  | ...
 c.peer_protocol_features
  | ---
  | - transactions: false
+ |   watchers: true
  |   error_extension: true
  |   streams: true
  | ...
@@ -165,11 +169,12 @@ c.error -- error
  | ...
 c.peer_protocol_version
  | ---
- | - 2
+ | - 3
  | ...
 c.peer_protocol_features
  | ---
  | - transactions: true
+ |   watchers: true
  |   error_extension: true
  |   streams: true
  | ...
-- 
GitLab