diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c index d41a8d654a8bc58e195d992825dcece80fc5b701..78a51942d4eaf51714bdd78916a9a711ebb85818 100644 --- a/src/box/lua/net_box.c +++ b/src/box/lua/net_box.c @@ -41,7 +41,7 @@ #include "box/tuple.h" #include "box/execute.h" #include "box/error.h" -#include "box/mp_error.h" +#include "box/schema_def.h" #include "lua/msgpack.h" #include <base64.h> @@ -53,6 +53,7 @@ #include "lua/fiber.h" #include "mpstream/mpstream.h" #include "misc.h" /* lbox_check_tuple_format() */ +#include "version.h" #define cfg luaL_msgpack_default @@ -145,6 +146,13 @@ static const char netbox_request_typename[] = "net.box.request"; */ static int luaT_netbox_request_iterator_next_ref; +/** Passed to mpstream_init() to set a boolean flag on error. */ +static void +mpstream_error_handler(void *error_ctx) +{ + *(bool *)error_ctx = true; +} + static void netbox_request_destroy(struct netbox_request *request) { @@ -380,30 +388,22 @@ netbox_encode_ping(lua_State *L, int idx, struct mpstream *stream, netbox_end_encode(stream, svp); } +/** + * Encodes an authorization request and writes it to the provided buffer. + * Returns -1 on memory allocation error. + */ static int -netbox_encode_auth(lua_State *L) +netbox_encode_auth(struct ibuf *ibuf, uint64_t sync, + const char *user, size_t user_len, + const char *password, size_t password_len, + const char *salt) { - if (lua_gettop(L) < 5) { - return luaL_error(L, "Usage: netbox.encode_update(ibuf, sync, " - "user, password, greeting)"); - } - struct ibuf *ibuf = (struct ibuf *)lua_topointer(L, 1); - uint64_t sync = luaL_touint64(L, 2); - + bool is_error = false; struct mpstream stream; mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb, - luamp_error, L); + mpstream_error_handler, &is_error); size_t svp = netbox_begin_encode(&stream, sync, IPROTO_AUTH); - size_t user_len; - const char *user = lua_tolstring(L, 3, &user_len); - size_t password_len; - const char *password = lua_tolstring(L, 4, &password_len); - size_t salt_len; - const char *salt = lua_tolstring(L, 5, &salt_len); - if (salt_len < SCRAMBLE_SIZE) - return luaL_error(L, "Invalid salt"); - /* Adapted from xrow_encode_auth() */ mpstream_encode_map(&stream, password != NULL ? 2 : 1); mpstream_encode_uint(&stream, IPROTO_USER_NAME); @@ -418,7 +418,30 @@ netbox_encode_auth(lua_State *L) } netbox_end_encode(&stream, svp); - return 0; + return is_error ? -1 : 0; +} + +/** + * Encodes a SELECT(*) request and writes it to the provided buffer. + * Returns -1 on memory allocation error. + */ +static int +netbox_encode_select_all(struct ibuf *ibuf, uint64_t sync, uint32_t space_id) +{ + bool is_error = false; + struct mpstream stream; + mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb, + mpstream_error_handler, &is_error); + size_t svp = netbox_begin_encode(&stream, sync, IPROTO_SELECT); + mpstream_encode_map(&stream, 3); + mpstream_encode_uint(&stream, IPROTO_SPACE_ID); + mpstream_encode_uint(&stream, space_id); + mpstream_encode_uint(&stream, IPROTO_LIMIT); + mpstream_encode_uint(&stream, UINT32_MAX); + mpstream_encode_uint(&stream, IPROTO_KEY); + mpstream_encode_array(&stream, 0); + netbox_end_encode(&stream, svp); + return is_error ? -1 : 0; } static void @@ -762,16 +785,13 @@ netbox_communicate(int fd, struct ibuf *send_buf, struct ibuf *recv_buf, /** * Sends and receives data over an iproto connection. - * Takes socket fd, send_buf (ibuf), recv_buf (ibuf). - * On success returns header (table), body_rpos (char *), body_end (char *). - * On error returns nil, error. + * Returns 0 and a decoded response header on success. + * On error returns -1. */ -static int -netbox_send_and_recv_iproto(lua_State *L) +int +netbox_send_and_recv_iproto(int fd, struct ibuf *send_buf, + struct ibuf *recv_buf, struct xrow_header *hdr) { - int fd = lua_tointeger(L, 1); - struct ibuf *send_buf = (struct ibuf *)lua_topointer(L, 2); - struct ibuf *recv_buf = (struct ibuf *)lua_topointer(L, 3); while (true) { size_t required; size_t data_len = ibuf_used(recv_buf); @@ -779,29 +799,23 @@ netbox_send_and_recv_iproto(lua_State *L) if (data_len < fixheader_size) { required = fixheader_size; } else { - /* PWN! insufficient input validation */ const char *bufpos = recv_buf->rpos; const char *rpos = bufpos; size_t len = mp_decode_uint(&rpos); required = (rpos - bufpos) + len; if (data_len >= required) { const char *body_end = rpos + len; - const char *body_rpos = rpos; - luamp_decode(L, cfg, &body_rpos); - *(const char **)luaL_pushcdata( - L, CTID_CONST_CHAR_PTR) = body_rpos; - *(const char **)luaL_pushcdata( - L, CTID_CONST_CHAR_PTR) = body_end; recv_buf->rpos = (char *)body_end; - return 3; + return xrow_header_decode( + hdr, &rpos, body_end, + /*end_is_exact=*/true); } } size_t unused; if (netbox_communicate(fd, send_buf, recv_buf, /*limit=*/required, /*boundary=*/NULL, /*boundary_len=*/0, &unused) != 0) { - luaL_testcancel(L); - return luaT_push_nil_and_error(L); + return -1; } } } @@ -1314,58 +1328,6 @@ netbox_decode_method(struct lua_State *L, enum netbox_method method, method_decoder[method](L, data, data_end, format); } -/** - * Decodes an error from raw data. On success returns the decoded error object - * with ref counter incremented. On failure returns NULL. - */ -static struct error * -netbox_decode_error(const char **data, uint32_t errcode) -{ - struct error *error = NULL; - assert(mp_typeof(**data) == MP_MAP); - uint32_t map_size = mp_decode_map(data); - for (uint32_t i = 0; i < map_size; ++i) { - uint32_t key = mp_decode_uint(data); - if (key == IPROTO_ERROR) { - if (error != NULL) - error_unref(error); - error = error_unpack_unsafe(data); - if (error == NULL) - return NULL; - error_ref(error); - /* - * IPROTO_ERROR comprises error encoded with - * IPROTO_ERROR_24, so we may ignore content - * of that key. - */ - break; - } else if (key == IPROTO_ERROR_24) { - if (error != NULL) - error_unref(error); - const char *reason = ""; - uint32_t reason_len = 0; - if (mp_typeof(**data) == MP_STR) - reason = mp_decode_str(data, &reason_len); - box_error_raise(errcode, "%.*s", reason_len, reason); - error = box_error_last(); - error_ref(error); - continue; - } - /* Skip value. */ - mp_next(data); - } - if (error == NULL) { - /* - * Error body is missing in the response. - * Set the error code without a 'reason' message - */ - box_error_raise(errcode, ""); - error = box_error_last(); - error_ref(error); - } - return error; -} - static inline struct netbox_registry * luaT_check_netbox_registry(struct lua_State *L, int idx) { @@ -1658,37 +1620,35 @@ netbox_new_request(struct lua_State *L) } /** - * Given a request registry, request id (sync), status, and a pointer to a - * response body, decodes the response and either completes the request or - * invokes the on-push trigger, depending on the status. + * Given a request registry and a response header, decodes the response and + * either completes the request or invokes the on-push trigger, depending on + * the status. + * + * Lua stack is used for temporarily storing the response table before getting + * a reference to it and executing the on-push trigger. */ -static int -netbox_dispatch_response_iproto(struct lua_State *L) +static void +netbox_dispatch_response_iproto(struct lua_State *L, + struct netbox_registry *registry, + struct xrow_header *hdr) { - struct netbox_registry *registry = luaT_check_netbox_registry(L, 1); - uint64_t sync = luaL_touint64(L, 2); - enum iproto_type status = lua_tointeger(L, 3); - uint32_t ctypeid; - const char *data = *(const char **)luaL_checkcdata(L, 4, &ctypeid); - assert(ctypeid == CTID_CHAR_PTR || ctypeid == CTID_CONST_CHAR_PTR); - const char *data_end = *(const char **)luaL_checkcdata(L, 5, &ctypeid); - assert(ctypeid == CTID_CHAR_PTR || ctypeid == CTID_CONST_CHAR_PTR); - struct netbox_request *request = netbox_registry_lookup(registry, sync); + struct netbox_request *request = netbox_registry_lookup(registry, + hdr->sync); if (request == NULL) { /* Nobody is waiting for the response. */ - return 0; + return; } + enum iproto_type status = hdr->type; if (status > IPROTO_CHUNK) { /* Handle errors. */ - struct error *error = netbox_decode_error( - &data, status & (IPROTO_TYPE_ERROR - 1)); - if (error == NULL) - return luaT_error(L); + xrow_decode_error(hdr); + struct error *error = box_error_last(); netbox_request_set_error(request, error); - error_unref(error); netbox_request_complete(request); - return 0; + return; } + const char *data = hdr->body[0].iov_base; + const char *data_end = data + hdr->body[0].iov_len; if (request->buffer != NULL) { /* Copy xrow.body to user-provided buffer. */ if (request->skip_header) @@ -1728,7 +1688,6 @@ netbox_dispatch_response_iproto(struct lua_State *L) lua_call(L, 2, 0); netbox_request_signal(request); } - return 0; } /** @@ -1754,6 +1713,181 @@ netbox_dispatch_response_console(struct lua_State *L, netbox_request_complete(request); } +/** + * Performs an authorization request for an iproto connection. + * Takes user, password, salt, request registry, socket fd, + * send_buf (ibuf), recv_buf (ibuf). + * Returns schema_version on success, nil and error on failure. + */ +static int +netbox_iproto_auth(struct lua_State *L) +{ + size_t user_len; + const char *user = lua_tolstring(L, 1, &user_len); + size_t password_len; + const char *password = lua_tolstring(L, 2, &password_len); + size_t salt_len; + const char *salt = lua_tolstring(L, 3, &salt_len); + if (salt_len < SCRAMBLE_SIZE) + return luaL_error(L, "Invalid salt"); + struct netbox_registry *registry = luaT_check_netbox_registry(L, 4); + int fd = lua_tointeger(L, 5); + struct ibuf *send_buf = (struct ibuf *)lua_topointer(L, 6); + struct ibuf *recv_buf = (struct ibuf *)lua_topointer(L, 7); + if (netbox_encode_auth(send_buf, registry->next_sync++, user, user_len, + password, password_len, salt) != 0) { + goto error; + } + struct xrow_header hdr; + if (netbox_send_and_recv_iproto(fd, send_buf, recv_buf, &hdr) != 0) { + goto error; + } + if (hdr.type != IPROTO_OK) { + xrow_decode_error(&hdr); + goto error; + } + lua_pushinteger(L, hdr.schema_version); + return 1; +error: + return luaT_push_nil_and_error(L); +} + +/** + * Fetches schema over an iproto connection. While waiting for the schema, + * processes other requests in a loop, like netbox_iproto_loop(). + * Takes peer_version_id, request registry, socket fd, send_buf (ibuf), + * recv_buf (ibuf). + * Returns schema_version and a table with the following fields: + * [VSPACE_ID] = <spaces> + * [VINDEX_ID] = <indexes> + * [VCOLLATION_ID] = <collations> + * On failure returns nil, error. + */ +static int +netbox_iproto_schema(struct lua_State *L) +{ + uint32_t peer_version_id = lua_tointeger(L, 1); + struct netbox_registry *registry = luaT_check_netbox_registry(L, 2); + int fd = lua_tointeger(L, 3); + struct ibuf *send_buf = (struct ibuf *)lua_topointer(L, 4); + struct ibuf *recv_buf = (struct ibuf *)lua_topointer(L, 5); + /* _vcollation view was added in 2.2.0-389-g3e3ef182f */ + bool peer_has_vcollation = peer_version_id >= version_id(2, 2, 1); +restart: + lua_newtable(L); + uint64_t vspace_sync = registry->next_sync++; + if (netbox_encode_select_all(send_buf, vspace_sync, + BOX_VSPACE_ID) != 0) { + return luaT_error(L); + } + uint64_t vindex_sync = registry->next_sync++; + if (netbox_encode_select_all(send_buf, vindex_sync, + BOX_VINDEX_ID) != 0) { + return luaT_error(L); + } + uint64_t vcollation_sync = registry->next_sync++; + if (peer_has_vcollation && + netbox_encode_select_all(send_buf, vcollation_sync, + BOX_VCOLLATION_ID) != 0) { + return luaT_error(L); + } + bool got_vspace = false; + bool got_vindex = false; + bool got_vcollation = false; + uint32_t schema_version = 0; + do { + struct xrow_header hdr; + if (netbox_send_and_recv_iproto(fd, send_buf, recv_buf, + &hdr) != 0) { + luaL_testcancel(L); + return luaT_push_nil_and_error(L); + } + netbox_dispatch_response_iproto(L, registry, &hdr); + if (hdr.sync != vspace_sync && + hdr.sync != vindex_sync && + hdr.sync != vcollation_sync) { + continue; + } + if (iproto_type_is_error(hdr.type)) { + uint32_t errcode = hdr.type & (IPROTO_TYPE_ERROR - 1); + if (errcode == ER_NO_SUCH_SPACE && + hdr.sync == vcollation_sync) { + /* + * No _vcollation space + * (server has old schema version). + */ + peer_has_vcollation = false; + continue; + } + xrow_decode_error(&hdr); + return luaT_push_nil_and_error(L); + } + if (schema_version == 0) { + schema_version = hdr.schema_version; + } else if (schema_version != hdr.schema_version) { + /* + * Schema changed while fetching schema. + * Restart loader. + */ + lua_pop(L, 1); + goto restart; + } + const char *data = hdr.body[0].iov_base; + const char *data_end = data + hdr.body[0].iov_len; + int key; + if (hdr.sync == vspace_sync) { + key = BOX_VSPACE_ID; + got_vspace = true; + } else if (hdr.sync == vindex_sync) { + key = BOX_VINDEX_ID; + got_vindex = true; + } else if (hdr.sync == vcollation_sync) { + key = BOX_VCOLLATION_ID; + got_vcollation = true; + } else { + unreachable(); + } + lua_pushinteger(L, key); + netbox_decode_table(L, &data, data_end, tuple_format_runtime); + lua_settable(L, -3); + } while (!(got_vspace && got_vindex && + (got_vcollation || !peer_has_vcollation))); + lua_pushinteger(L, schema_version); + lua_insert(L, -2); + return 2; +} + +/** + * Processes iproto requests in a loop until an error or a schema change. + * Takes schema_version, request registry, socket fd, send_buf (ibuf), + * recv_buf (ibuf). + * Returns schema_version if the loop was broken because of a schema change. + * If the loop was broken by an error, returns nil and the error. + */ +static int +netbox_iproto_loop(struct lua_State *L) +{ + uint32_t schema_version = lua_tointeger(L, 1); + struct netbox_registry *registry = luaT_check_netbox_registry(L, 2); + int fd = lua_tointeger(L, 3); + struct ibuf *send_buf = (struct ibuf *)lua_topointer(L, 4); + struct ibuf *recv_buf = (struct ibuf *)lua_topointer(L, 5); + while (true) { + struct xrow_header hdr; + if (netbox_send_and_recv_iproto(fd, send_buf, recv_buf, + &hdr) != 0) { + luaL_testcancel(L); + return luaT_push_nil_and_error(L); + } + netbox_dispatch_response_iproto(L, registry, &hdr); + if (hdr.schema_version > 0 && + hdr.schema_version != schema_version) { + lua_pushinteger(L, hdr.schema_version); + return 1; + } + } +} + /** * Sets up console delimiter. Should be called before serving any requests. * Takes socket fd, send_buf (ibuf), recv_buf (ibuf). @@ -1845,13 +1979,13 @@ luaopen_net_box(struct lua_State *L) luaL_register_type(L, netbox_request_typename, netbox_request_meta); static const luaL_Reg net_box_lib[] = { - { "encode_auth", netbox_encode_auth }, { "encode_method", netbox_encode_method }, { "decode_greeting",netbox_decode_greeting }, - { "send_and_recv_iproto", netbox_send_and_recv_iproto }, { "new_registry", netbox_new_registry }, { "new_request", netbox_new_request }, - { "dispatch_response_iproto", netbox_dispatch_response_iproto }, + { "iproto_auth", netbox_iproto_auth }, + { "iproto_schema", netbox_iproto_schema }, + { "iproto_loop", netbox_iproto_loop }, { "console_setup", netbox_console_setup }, { "console_loop", netbox_console_loop }, { NULL, NULL} diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua index 545abe363e71adf327421aef6e0919077cf327e7..1e0119babdda3ef9dc231db546c1646b30f9fa76 100644 --- a/src/box/lua/net_box.lua +++ b/src/box/lua/net_box.lua @@ -9,18 +9,15 @@ local urilib = require('uri') local internal = require('net.box.lib') local trigger = require('internal.trigger') -local band = bit.band local max = math.max local fiber_clock = fiber.clock local fiber_self = fiber.self -local decode = msgpack.decode_unchecked local check_iterator_type = box.internal.check_iterator_type local check_index_arg = box.internal.check_index_arg local check_space_arg = box.internal.check_space_arg local check_primary_index = box.internal.check_primary_index -local encode_auth = internal.encode_auth local encode_method = internal.encode_method local decode_greeting = internal.decode_greeting @@ -29,20 +26,12 @@ local VSPACE_ID = 281 local VINDEX_ID = 289 local VCOLLATION_ID = 277 local DEFAULT_CONNECT_TIMEOUT = 10 - -local IPROTO_STATUS_KEY = 0x00 -local IPROTO_ERRNO_MASK = 0x7FFF -local IPROTO_SYNC_KEY = 0x01 -local IPROTO_SCHEMA_VERSION_KEY = 0x05 -local IPROTO_DATA_KEY = 0x30 -local IPROTO_ERROR_24 = 0x31 local IPROTO_GREETING_SIZE = 128 -- select errors from box.error local E_UNKNOWN = box.error.UNKNOWN local E_NO_CONNECTION = box.error.NO_CONNECTION local E_PROC_LUA = box.error.PROC_LUA -local E_NO_SUCH_SPACE = box.error.NO_SUCH_SPACE -- Method types used internally by net.box. local M_PING = 0 @@ -68,14 +57,6 @@ local M_INJECT = 17 -- utility tables local is_final_state = {closed = 1, error = 1} -local function version_id(major, minor, patch) - return bit.bor(bit.lshift(major, 16), bit.lshift(minor, 8), patch) -end - -local function version_at_least(peer_version_id, major, minor, patch) - return peer_version_id >= version_id(major, minor, patch) -end - -- -- Connect to a remote server, do handshake. -- @param host Hostname. @@ -314,24 +295,6 @@ local function create_transport(host, port, user, password, callback, return request:wait_result(timeout) end - local function dispatch_response_iproto(hdr, body_rpos, body_end) - local id = hdr[IPROTO_SYNC_KEY] - local status = hdr[IPROTO_STATUS_KEY] - internal.dispatch_response_iproto(requests, id, status, - body_rpos, body_end) - end - - -- IO (WORKER FIBER) -- - local function send_and_recv_iproto() - local hdr, body_rpos, body_end = internal.send_and_recv_iproto( - connection:fd(), send_buf, recv_buf) - if not hdr then - local err = body_rpos - return err.code, err.message - end - return nil, hdr, body_rpos, body_end - end - -- PROTOCOL STATE MACHINE (WORKER FIBER) -- -- -- The sm is implemented as a collection of functions performing @@ -390,17 +353,13 @@ local function create_transport(host, port, user, password, callback, set_state('fetch_schema') return iproto_schema_sm() end - encode_auth(send_buf, requests:new_id(), user, password, salt) - local err, hdr, body_rpos = send_and_recv_iproto() - if err then - return error_sm(err, hdr) - end - if hdr[IPROTO_STATUS_KEY] ~= 0 then - local body = decode(body_rpos) - return error_sm(E_NO_CONNECTION, body[IPROTO_ERROR_24]) + local schema_version, err = internal.iproto_auth( + user, password, salt, requests, connection:fd(), send_buf, recv_buf) + if not schema_version then + return error_sm(err.code, err.message) end set_state('fetch_schema') - return iproto_schema_sm(hdr[IPROTO_SCHEMA_VERSION_KEY]) + return iproto_schema_sm(schema_version) end iproto_schema_sm = function(schema_version) @@ -408,82 +367,28 @@ local function create_transport(host, port, user, password, callback, set_state('active') return iproto_sm(schema_version) end - -- _vcollation view was added in 2.2.0-389-g3e3ef182f - local peer_has_vcollation = version_at_least(greeting.version_id, - 2, 2, 1) - local select1_id = requests:new_id() - local select2_id = requests:new_id() - local select3_id - local response = {} - -- fetch everything from space _vspace, 2 = ITER_ALL - encode_method(M_SELECT, send_buf, select1_id, VSPACE_ID, 0, 2, 0, - 0xFFFFFFFF, nil) - -- fetch everything from space _vindex, 2 = ITER_ALL - encode_method(M_SELECT, send_buf, select2_id, VINDEX_ID, 0, 2, 0, - 0xFFFFFFFF, nil) - -- fetch everything from space _vcollation, 2 = ITER_ALL - if peer_has_vcollation then - select3_id = requests:new_id() - encode_method(M_SELECT, send_buf, select3_id, VCOLLATION_ID, - 0, 2, 0, 0xFFFFFFFF, nil) + local schema_version, schema = internal.iproto_schema( + greeting.version_id, requests, connection:fd(), send_buf, recv_buf) + if not schema_version then + local err = schema + return error_sm(err.code, err.message) end - - schema_version = nil -- any schema_version will do provided that - -- it is consistent across responses - repeat - local err, hdr, body_rpos, body_end = send_and_recv_iproto() - if err then return error_sm(err, hdr) end - dispatch_response_iproto(hdr, body_rpos, body_end) - local id = hdr[IPROTO_SYNC_KEY] - -- trick: omit check for peer_has_vcollation: id is - -- not nil - if id == select1_id or id == select2_id or id == select3_id then - -- response to a schema query we've submitted - local status = hdr[IPROTO_STATUS_KEY] - local response_schema_version = hdr[IPROTO_SCHEMA_VERSION_KEY] - if status ~= 0 then - -- No _vcollation space (server has an old - -- schema version). - local errno = band(status, IPROTO_ERRNO_MASK) - if id == select3_id and errno == E_NO_SUCH_SPACE then - peer_has_vcollation = false - goto continue - end - local body = decode(body_rpos) - return error_sm(E_NO_CONNECTION, body[IPROTO_ERROR_24]) - end - if schema_version == nil then - schema_version = response_schema_version - elseif schema_version ~= response_schema_version then - -- schema changed while fetching schema; restart loader - return iproto_schema_sm() - end - local body = decode(body_rpos) - response[id] = body[IPROTO_DATA_KEY] - end - ::continue:: - until response[select1_id] and response[select2_id] and - (not peer_has_vcollation or response[select3_id]) - -- trick: response[select3_id] is nil when the key is nil - callback('did_fetch_schema', schema_version, response[select1_id], - response[select2_id], response[select3_id]) + callback('did_fetch_schema', schema_version, schema[VSPACE_ID], + schema[VINDEX_ID], schema[VCOLLATION_ID]) set_state('active') return iproto_sm(schema_version) end iproto_sm = function(schema_version) - local err, hdr, body_rpos, body_end = send_and_recv_iproto() - if err then return error_sm(err, hdr) end - dispatch_response_iproto(hdr, body_rpos, body_end) - local response_schema_version = hdr[IPROTO_SCHEMA_VERSION_KEY] - if response_schema_version > 0 and - response_schema_version ~= schema_version then - -- schema_version has been changed - start to load a new version. - -- Sic: self.schema_version will be updated only after reload. - set_state('fetch_schema') - return iproto_schema_sm(schema_version) + local schema_version, err = internal.iproto_loop( + schema_version, requests, connection:fd(), send_buf, recv_buf) + if not schema_version then + return error_sm(err.code, err.message) end - return iproto_sm(schema_version) + -- schema_version has been changed - start to load a new version. + -- Sic: self.schema_version will be updated only after reload. + set_state('fetch_schema') + return iproto_schema_sm(schema_version) end error_sm = function(err, msg)