Skip to content
Snippets Groups Projects
  • Denis Smirnov's avatar
    6ea360f7
    feat!(sql): implement drop index · 6ea360f7
    Denis Smirnov authored
    BREAKING CHANGE: "name" index in the _pico_index has changed:
    previously it guarantees uniqueness for a pair of (space_id,
    index_name). Now index name must be unique among the cluster.
    So, all the global indexes for _pico tables were renamed.
    Verified
    6ea360f7
    History
    feat!(sql): implement drop index
    Denis Smirnov authored
    BREAKING CHANGE: "name" index in the _pico_index has changed:
    previously it guarantees uniqueness for a pair of (space_id,
    index_name). Now index name must be unique among the cluster.
    So, all the global indexes for _pico tables were renamed.
luamod.lua 39.96 KiB
local pico = {}
local help = {}
setmetatable(pico, help)

local fiber = require 'fiber'
local log = require 'log'
local utils = require('internal.utils')

local check_param = utils.check_param
local check_param_table = utils.check_param_table

local intro = [[
pico.help([topic])
==================

Show built-in Picodata reference for the given topic.

Full Picodata documentation:

    https://docs.picodata.io/picodata/

Params:

    1. topic (optional string)

Returns:

    (string)
    or
    (nil) if topic not found

Example:

    picodata> pico.help("help")
    -- Shows this message

Topics:

]]

function pico.help(topic)
    if topic == nil or topic == "help" then
        local topics = {"    - help"}
        for k, _ in pairs(help) do
            table.insert(topics, "    - " .. k)
        end
        table.sort(topics)
        return intro .. table.concat(topics, "\n") .. "\n"
    else
        return help[topic]
    end
end

local TIMEOUT_INFINITY = 100 * 365 * 24 * 60 * 60

local function mandatory_param(value, name)
    if value == nil then
        box.error(box.error.ILLEGAL_PARAMS, name .. ' is mandatory')
    end
end

-- Get next id unoccupied by a user or a role. Tarantool stores both
-- users and roles in the same space, so they share the same set of ids.
local function get_next_grantee_id()
    -- TODO: if id overflows start filling the holes
    local max_user = box.space._pico_user.index[0]:max()
    if max_user then
        return max_user.id + 1
    end
    -- There are always builtin tarantool users
    local tt_user = box.space._user.index[0]:max()
    return tt_user.id + 1
end

local function next_schema_version()
    local t = box.space._pico_property:get("next_schema_version")
    -- This shouldn't be necessary if this code is ran after raft node is
    -- initialized, but currently we can call this from the code passed via
    -- `--script` option which is called before node initialization
    if t ~= nil then
        return t.value
    end
    return 1
end

local function has_pending_schema_change()
    return box.space._pico_property:get("pending_schema_change") ~= nil
end

local function is_retriable_error(error)
    if type(error) ~= 'string' then
        -- TODO
        return false
    end

    return pico._is_retriable_error_message(error)
end

-- Performs a reenterable schema change CaS request. On success returns an index
-- of the proposed raft entry.
--
-- Params:
--
--     1. deadline (number), time by which the request should complete,
--          or an error is returned.
--
--     2. make_op_if_needed (function), callback which should check if the
--          corresponding schema entity is already in the desired state or else
--          return a raft operation for CaS request. Should return `nil` if no
--          action is needed, a table representing a raft operation if request
--          should proceed. May throw an error if conflict is detected.
--          It is called after raft_read_index and after any pending schema
--          change has been finalized.
--
-- Returns:
--
--      (number) raft index
--      or
--      (nil, error) in case of an error
--
local function reenterable_schema_change_request(deadline, make_op_if_needed)
    while true do
        ::retry::

        if fiber.clock() >= deadline then
            return nil, box.error.new(box.error.TIMEOUT)
        end

        local index, err = pico.raft_read_index(deadline - fiber.clock())
        if index == nil then
            return nil, err
        end

        if has_pending_schema_change() then
            -- Wait until applied index changes (or timeout) and then retry.
            pico.raft_wait_index(index + 1, deadline - fiber.clock())
            goto retry
        end
        local ok, op = pcall(make_op_if_needed)
        if not ok then
            local err = op
            return nil, err
        elseif op == nil then
            -- Request is satisfied at current index
            return index
        end

        local index, term = pico._schema_change_cas_request(op, index, deadline - fiber.clock())
        if index == nil then
            local err = term
            if is_retriable_error(err) then
                goto retry
            else
                return nil, err
            end
        end

        local res = pico.raft_wait_index(index, deadline - fiber.clock())
        if res == nil then
            return nil, box.error.new(box.error.TIMEOUT)
        end

        if pico.raft_term(index) ~= term then
            -- Leader has changed and the entry got rolled back, retry.
            goto retry
        end

        return index
    end
end

local function check_password_min_length(password, auth_type)
    if string.lower(auth_type) == 'ldap' then
        -- LDAP doesn't need password for authentication
        return
    end

    local password_min_length = box.space._pico_property:get("password_min_length")
    if password_min_length == nil then
        password_min_length = 0
    else
        password_min_length = password_min_length[2]
    end

    local password_len = string.len(password)
    if password_len < password_min_length then
        box.error(box.error.ILLEGAL_PARAMS,
                    "password is too short: expected at least " .. password_min_length .. ", got " .. password_len)
    end
end

help.create_user = [[
pico.create_user(user, password, [opts])
========================================

Creates a user on each instance of the cluster.

Proposes a raft entry which when applied on an instance creates a user on it.
Waits for opts.timeout seconds for the entry to be applied locally.
On success returns a raft index at which the user should exist.
Skips the request if the user already exists.

The function respects `password_min_length` parameter from `_pico_property` table.
(by default set to be at least 8 characters).

NOTE: If this function returns a timeout error the request is NOT cancelled and
the change may still be applied some time later. For this reason it is always
safe to call the same function with the same arguments again. And if the change
is finalized in between calls, the subsequent calls return the corresponding
result.

Params:

    1. user (string), username
    2. password (string)
    3. opts (optional table)
        - auth_type (optional string), authentication method name,
            defaults to box.cfg.auth_type value
        - timeout (optional number), in seconds, default: infinity

Returns:

    (number) raft index
    or
    (nil, error) in case of an error
]]
function pico.create_user(user, password, opts)
    local auth_type = box.cfg.auth_type
    -- Set the creator/owner of the user being created to current user
    local owner = box.session.euid()

    local ok, err = pcall(function()
        check_param_table(opts, {
            timeout = 'number',
            auth_type = 'string',
        })
        opts = opts or {}
        if not opts.timeout then
            opts.timeout = TIMEOUT_INFINITY
        end
        if opts.auth_type then
            auth_type = opts.auth_type
        end

        check_param(user, 'user', 'string')
        check_param(password, 'password', 'string')

        check_password_min_length(password, auth_type)
    end)
    if not ok then
        return nil, err
    end

    local deadline = fiber.clock() + opts.timeout

    -- XXX: we construct this closure every time the function is called,
    -- which is bad for performance/jit. Refactor if problems are discovered.
    local auth_data = box.internal.prepare_auth(auth_type, password, user)
    local function make_op_if_needed()
        local grantee_def = box.space._user.index.name:get(user)
        if grantee_def ~= nil then
            if grantee_def.type == "role" then
                box.error(box.error.ROLE_EXISTS, user)
            else
                -- TODO: check auth is the same
                -- User already exists, request is satisfied, no op needed
                return nil
            end
        end

        -- Doesn't exist yet, need to send request
        return {
            kind = 'acl',
            op_kind = 'create_user',
            user_def = {
                id = get_next_grantee_id(),
                name = user,
                schema_version = next_schema_version(),
                auth = {
                    method = auth_type,
                    data = auth_data,
                },
                owner = owner,
                type = "user"
            }
        }
    end

    return reenterable_schema_change_request(deadline, make_op_if_needed)
end

help.change_password = [[
pico.change_password(user, password, [opts])
========================================

Change the user's password on each instance of the cluster.

Proposes a raft entry which when applied on an instance changes the user's password on it.
Waits for opts.timeout seconds for the entry to be applied locally.
On success returns an index of the corresponding raft entry.
Skips the request if the password matches the current one.

The function respects `password_min_length` parameter from `_pico_property` table
(by default set to be at least 8 characters).

NOTE: If this function returns a timeout error the request is NOT cancelled and
the change may still be applied some time later. For this reason it is always
safe to call the same function with the same arguments again. And if the change
is finalized in between calls, the subsequent calls return the corresponding
result.

Params:

    1. user (string), username
    2. password (string)
    3. opts (optional table)
        - auth_type (optional string), authentication method name,
            defaults to box.cfg.auth_type value
        - timeout (optional number), in seconds, default: infinity

Returns:

    (number) raft index
    or
    (nil, error) in case of an error
]]
function pico.change_password(user, password, opts)
    local auth_type = box.cfg.auth_type

    local ok, err = pcall(function()
        check_param_table(opts, {
            timeout = 'number',
            auth_type = 'string',
        })
        opts = opts or {}
        if not opts.timeout then
            opts.timeout = TIMEOUT_INFINITY
        end
        if opts.auth_type then
            auth_type = opts.auth_type
        end

        check_param(user, 'user', 'string')
        check_param(password, 'password', 'string')

        check_password_min_length(password, auth_type)
    end)
    if not ok then
        return nil, err
    end

    local deadline = fiber.clock() + opts.timeout

    -- XXX: we construct this closure every time the function is called,
    -- which is bad for performance/jit. Refactor if problems are discovered.
    local auth_data = box.internal.prepare_auth(auth_type, password, user)
    local function make_op_if_needed()
        -- TODO: allow `user` to be a user id instead of name
        local user_def = box.space._pico_user.index._pico_user_name:get(user)
        if user_def == nil then
            box.error(box.error.NO_SUCH_USER, user)
        end

        if table.equals(user_def.auth, { [auth_type] = auth_data }) then
            -- Password is already the one given, no op needed
            return nil
        end

        return {
            kind = 'acl',
            op_kind = 'change_auth',
            user_id = user_def.id,
            schema_version = next_schema_version(),
            auth = {
                method = auth_type,
                data = auth_data,
            },
            initiator = box.session.euid(),
        }
    end

    return reenterable_schema_change_request(deadline, make_op_if_needed)
end

help.drop_user = [[
pico.drop_user(user, [opts])
========================================

Drop the user and any entities owned by them on each instance of the cluster.

Proposes a raft entry which when applied on an instance drops the user on it.
Waits for opts.timeout seconds for the entry to be applied locally.
On success returns a raft index at which the user should no longer exist.
Skips the request if the user doesn't exist.

NOTE: If this function returns a timeout error the request is NOT cancelled and
the change may still be applied some time later. For this reason it is always
safe to call the same function with the same arguments again. And if the change
is finalized in between calls, the subsequent calls return the corresponding
result.

Params:

    1. user (string), username
    2. opts (optional table)
        - timeout (optional number), in seconds, default: infinity

Returns:

    (number) raft index
    or
    (nil, error) in case of an error
]]
function pico.drop_user(user, opts)
    local ok, err = pcall(function()
        check_param(user, 'user', 'string')
        check_param_table(opts, { timeout = 'number' })
        opts = opts or {}
        if not opts.timeout then
            opts.timeout = TIMEOUT_INFINITY
        end
    end)
    if not ok then
        return nil, err
    end

    local deadline = fiber.clock() + opts.timeout

    -- XXX: we construct this closure every time the function is called,
    -- which is bad for performance/jit. Refactor if problems are discovered.
    local function make_op_if_needed()
        local user_def = box.space._pico_user.index._pico_user_name:get(user)
        if user_def == nil then
            -- User doesn't exists, request is satisfied, no op needed
            return nil
        end

        -- Does still exist, need to send request
        return {
            kind = 'acl',
            op_kind = 'drop_user',
            user_id = user_def.id,
            schema_version = next_schema_version(),
            initiator = box.session.euid(),
        }
    end

    return reenterable_schema_change_request(deadline, make_op_if_needed)
end

help.create_role = [[
pico.create_role(name, [opts])
========================================

Creates a role on each instance of the cluster.

Proposes a raft entry which when applied on an instance creates a role on it.
Waits for opts.timeout seconds for the entry to be applied locally.
On success returns an index of the corresponding raft entry.
Skips the request if the role already exists.

NOTE: If this function returns a timeout error the request is NOT cancelled and
the change may still be applied some time later. For this reason it is always
safe to call the same function with the same arguments again. And if the change
is finalized in between calls, the subsequent calls return the corresponding
result.

Params:

    1. name (string), role name
    2. opts (optional table)
        - timeout (optional number), in seconds, default: infinity

Returns:

    (number) raft index
    or
    (nil, error) in case of an error
]]
function pico.create_role(role, opts)
    -- Set the creator/owner user of the role being created to current user
    local owner = box.session.euid()

    local ok, err = pcall(function()
        check_param(role, 'role', 'string')
        check_param_table(opts, { timeout = 'number' })
        opts = opts or {}
        if not opts.timeout then
            opts.timeout = TIMEOUT_INFINITY
        end
    end)
    if not ok then
        return nil, err
    end

    local deadline = fiber.clock() + opts.timeout

    -- XXX: we construct this closure every time the function is called,
    -- which is bad for performance/jit. Refactor if problems are discovered.
    local function make_op_if_needed()
        local grantee_def = box.space._user.index.name:get(role)
        if grantee_def ~= nil then
            if grantee_def.type == "user" then
                box.error(box.error.USER_EXISTS, role)
            else
                -- Role exists at current index, no op needed
                return nil
            end
        end

        return {
            kind = 'acl',
            op_kind = 'create_role',
            role_def = {
                id = get_next_grantee_id(),
                name = role,
                schema_version = next_schema_version(),
                owner = owner,
                -- Dummy auth table specially for role as user_def object
                auth = {
                    method = 'ldap',
                    data = '',
                },
                type = "role",
            }
        }
    end

    return reenterable_schema_change_request(deadline, make_op_if_needed)
end

help.drop_role = [[
pico.drop_role(role, [opts])
========================================

Drop the role and any entities owned by them on each instance of the cluster.

Proposes a raft entry which when applied on an instance drops the role on it.
Waits for opts.timeout seconds for the entry to be applied locally.
On success returns a raft index at which the role should no longer exist.
Skips the request if the role doesn't exist.

NOTE: If this function returns a timeout error the request is NOT cancelled and
the change may still be applied some time later. For this reason it is always
safe to call the same function with the same arguments again. And if the change
is finalized in between calls, the subsequent calls return the corresponding
result.

Params:

    1. role (string), role name
    2. opts (optional table)
        - timeout (optional number), in seconds, default: infinity

Returns:

    (number) raft index
    or
    (nil, error) in case of an error
]]
function pico.drop_role(role, opts)
    local ok, err = pcall(function()
        check_param(role, 'role', 'string')
        check_param_table(opts, { timeout = 'number' })
        opts = opts or {}
        if not opts.timeout then
            opts.timeout = TIMEOUT_INFINITY
        end
    end)
    if not ok then
        return nil, err
    end

    local deadline = fiber.clock() + opts.timeout

    -- XXX: we construct this closure every time the function is called,
    -- which is bad for performance/jit. Refactor if problems are discovered.
    local function make_op_if_needed()
        local role_def = box.space._pico_user.index._pico_user_name:get(role)
        if role_def == nil then
            -- Role doesn't exists, request is satisfied, no op needed
            return nil
        end

        return {
            kind = 'acl',
            op_kind = 'drop_role',
            role_id = role_def.id,
            schema_version = next_schema_version(),
            initiator = box.session.euid(),
        }
    end

    return reenterable_schema_change_request(deadline, make_op_if_needed)
end

-- A lookup map
local supported_priveleges = {
    read = true,
    write = true,
    execute = true,

    -- temporal hack, as lua API is deprecated
    login = true,
    session = true,
    usage = true,

    create = true,
    drop = true,
    alter = true,
    insert = true,
    update = true,
    delete = true,
    grant = true,
    revoke = true,
}

-- Implementation is based on function privilege_check
-- from tarantool-sys/src/box/lua/schema.lua
local function privilege_check(privilege, object_type, entrypoint)
    if type(privilege) ~= 'string' then
        box.error(box.error.ILLEGAL_PARAMS, 'privilege must be a string')
    end

    if supported_priveleges[privilege] == nil then
        box.error(box.error.ILLEGAL_PARAMS, string.format(
            "unsupported privilege '%s', see pico.help('%s') for details",
            privilege, entrypoint
        ))
    end

    if type(object_type) ~= 'string' then
        box.error(box.error.ILLEGAL_PARAMS, 'object_type must be a string')
    end

    if object_type == 'universe' then
        return
    end

    if object_type == 'table' then
        local black_list = {
            session = true,
            revoke = true,
            grant = true,
            execute = true,
        }
        if black_list[privilege] then
            box.error(box.error.UNSUPPORTED_PRIV, object_type, privilege)
        else
            return
        end
    end

    local white_lists = {
        ['sequence'] = {
            read = true,
            write = true,
            usage = true,
            create = true,
            drop = true,
        },
        ['function'] = {
            execute = true,
            usage = true,
            create = true,
            drop = true,
        },
        ['role'] = {
            execute = true,
            usage = true,
            create = true,
            drop = true,
        },
        ['user'] = {
            create = true,
            drop = true,
            alter = true,
        },
    }

    local white_list = white_lists[object_type]
    if white_list == nil then
        box.error(box.error.UNKNOWN_SCHEMA_OBJECT, object_type)
    end

    if not white_list[privilege] then
        box.error(box.error.UNSUPPORTED_PRIV, object_type, privilege)
    end
end

-- Copy-pasted from tarantool-sys/src/box/lua/schema.lua
-- TODO: patch tarantool-sys to export this function and use it here directly
local function object_resolve(object_type, object_name)
    if object_name ~= nil and type(object_name) ~= 'string'
            and type(object_name) ~= 'number' then
        box.error(box.error.ILLEGAL_PARAMS, "wrong object name type")
    end
    if object_type == 'universe' then
        return 0
    end
    if object_type == 'table' then
        if object_name == '' then
            return ''
        end
        local space = box.space[object_name]
        if  space == nil then
            box.error(box.error.NO_SUCH_SPACE, object_name)
        end
        return space.id
    end
    if object_type == 'function' then
        if object_name == '' then
            return ''
        end
        local _vfunc = box.space[box.schema.VFUNC_ID]
        local func
        if type(object_name) == 'string' then
            func = _vfunc.index.name:get{object_name}
        else
            func = _vfunc:get{object_name}
        end
        if func then
            return func.id
        else
            box.error(box.error.NO_SUCH_FUNCTION, object_name)
        end
    end
    if object_type == 'sequence' then
        if object_name == '' then
            return ''
        end
        local seq = sequence_resolve(object_name)
        if seq == nil then
            box.error(box.error.NO_SUCH_SEQUENCE, object_name)
        end
        return seq
    end
    if object_type == 'role' or object_type == 'user' then
        if object_name == '' then
            return ''
        end
        local _vuser = box.space[box.schema.VUSER_ID]
        local role_or_user
        if type(object_name) == 'string' then
            role_or_user = _vuser.index.name:get{object_name}
        else
            role_or_user = _vuser:get{object_name}
        end
        if role_or_user and role_or_user.type == object_type then
            return role_or_user.id
        elseif object_type == 'role' then
            box.error(box.error.NO_SUCH_ROLE, object_name)
        else
            box.error(box.error.NO_SUCH_USER, object_name)
        end
    end

    box.error(box.error.UNKNOWN_SCHEMA_OBJECT, object_type)
end

help.grant_privilege = [[
pico.grant_privilege(grantee, privilege, object_type, [object_name], [opts])
========================================

Grant some privilege to a user or role on each instance of the cluster.

Proposes a raft entry which when applied on an instance grants the grantee the
specified privilege on it.
Waits for opts.timeout seconds for the entry to be applied locally.
On success returns an index of the corresponding raft entry.
Skips the request if the privilege is already granted.

NOTE: If this function returns a timeout error the request is NOT cancelled and
the change may still be applied some time later. For this reason it is always
safe to call the same function with the same arguments again. And if the change
is finalized in between calls, the subsequent calls return the corresponding
result.

Params:

    1. grantee (string), name of user or role

    2. privilege (string), one of
        'read' | 'write' | 'execute' | 'login' | 'create' | 'drop' |
        'alter'

    3. object_type (string), one of
        'universe' | 'table' | 'sequence' | 'function' | 'role' | 'user'

    4. object_name (optional string), can be omitted when privilege concerns an
        entire class of entities, see examples below.

    5. opts (optional table)
        - timeout (optional number), in seconds, default: infinity

Returns:

    (number) raft index
    or
    (nil, error) in case of an error

Examples:

    -- Enable the newly created user 'Dave' to perform essential actions,
    -- eg. create a table, write to it, read data from it.
    pico.grant_privilege('Dave', 'create', 'table')

    -- Grant read access to table 'Fruit' for user 'Dave'.
    pico.grant_privilege('Dave', 'read', 'table', 'Fruit')

    -- Grant user 'Dave' privilege to execute arbitrary lua code.
    pico.grant_privilege('Dave', 'execute', 'universe')

    -- Grant user 'Dave' privilege to create new users.
    pico.grant_privilege('Dave', 'create', 'user')

    -- Grant write access to table 'Junk' for role 'Maintainer'.
    pico.grant_privilege('Maintainer', 'write', 'table', 'Junk')

    -- Assign role 'Maintainer' to user 'Dave'.
    pico.grant_privilege('Dave', 'execute', 'role', 'Maintainer')

Notice:

    Only database admin can grant global privileges.
]]
function pico.grant_privilege(grantee, privilege, object_type, object_name, opts)
    local ok, err = pcall(function()
        check_param(grantee, 'grantee', 'string')
        -- temporal hack, as lua API is deprecated
        if privilege ~= 'login' then
            check_param(privilege, 'privilege', 'string')
        end
        check_param(object_type, 'object_type', 'string')
        object_name = object_name ~= nil and object_name or ''
        -- `object_name` is optional, thus it might contain `opts` instead
        if type(object_name) == 'table' and opts == nil then
            opts = object_name
            object_name = ''
        end
        check_param(object_name, 'object_name', 'string')
        check_param_table(opts, { timeout = 'number' })
        opts = opts or {}
        if not opts.timeout then
            opts.timeout = TIMEOUT_INFINITY
        end

        privilege_check(privilege, object_type, 'grant_privilege')
    end)
    if not ok then
        return nil, err
    end

    local deadline = fiber.clock() + opts.timeout

    -- XXX: we construct this closure every time the function is called,
    -- which is bad for performance/jit. Refactor if problems are discovered.
    local function make_op_if_needed()
        -- This is being checked after raft_read_index, so enough time has
        -- passed for any required entities to be created
        local grantee_def = box.space._pico_user.index._pico_user_name:get(grantee)
        if grantee_def == nil then
            box.error(box.error.NO_SUCH_USER, grantee)
        end

        -- Throws error if object doesn't exist
        local object_id = object_resolve(object_type, object_name)
        if object_id == "" then
            object_id = -1
        end

        if box.space._pico_privilege:get{grantee_def.id, object_type, object_id, privilege} ~= nil then
            -- Privilege is already granted, no op needed
            return nil
        end

        return {
            kind = 'acl',
            op_kind = 'grant_privilege',
            priv_def = {
                privilege = privilege,
                object_type = object_type,
                object_id = object_id,
                grantee_id = grantee_def.id,
                grantor_id = box.session.uid(),
                schema_version = next_schema_version(),
            },
        }
    end

    return reenterable_schema_change_request(deadline, make_op_if_needed)
end

help.revoke_privilege = [[
pico.revoke_privilege(grantee, privilege, object_type, [object_name], [opts])
========================================

Revoke some privilege from the user or role on each instance of the cluster.

Proposes a raft entry which when applied on an instance revokes the specified
privilege from the grantee on it.
Waits for opts.timeout seconds for the entry to be applied locally.
On success returns an index of the corresponding raft entry.
Skips the request if the privilege is not yet granted.

NOTE: If this function returns a timeout error the request is NOT cancelled and
the change may still be applied some time later. For this reason it is always
safe to call the same function with the same arguments again. And if the change
is finalized in between calls, the subsequent calls return the corresponding
result.

Params:

    1. grantee (string), name of user or role

    2. privilege (string), one of
        'read' | 'write' | 'execute' | 'session' | 'usage' | 'create' | 'drop' |
        'alter' | 'insert' | 'update' | 'delete'

    3. object_type (string), one of
        'universe' | 'table' | 'sequence' | 'function' | 'role' | 'user'

    4. object_name (optional string), can be omitted when privilege concerns an
        entire class of entities, see pico.help("pico.grant_privilege") for details.

    5. opts (optional table)
        - timeout (optional number), in seconds, default: infinity

Returns:

    (number) raft index
    or
    (nil, error) in case of an error
]]
function pico.revoke_privilege(grantee, privilege, object_type, object_name, opts)
    local ok, err = pcall(function()
        check_param(grantee, 'grantee', 'string')
        check_param(privilege, 'privilege', 'string')
        check_param(object_type, 'object_type', 'string')
        object_name = object_name ~= nil and object_name or ''
        check_param(object_name, 'object_name', 'string')
        check_param_table(opts, { timeout = 'number' })
        opts = opts or {}
        if not opts.timeout then
            opts.timeout = TIMEOUT_INFINITY
        end

        privilege_check(privilege, object_type, 'revoke_privilege')
    end)
    if not ok then
        return nil, err
    end

    local deadline = fiber.clock() + opts.timeout

    -- XXX: we construct this closure every time the function is called,
    -- which is bad for performance/jit. Refactor if problems are discovered.
    local function make_op_if_needed()
        -- This is being checked after raft_read_index, so enough time has
        -- passed for any required entities to be created
        local grantee_def = box.space._pico_user.index._pico_user_name:get(grantee)
        if grantee_def == nil then
            box.error(box.error.NO_SUCH_USER, grantee)
        end

        -- Throws error if object doesn't exist
        local object_id = object_resolve(object_type, object_name)
        if object_id == "" then
            object_id = -1
        end

        local priv = box.space._pico_privilege:get{grantee_def.id, object_type, object_id, privilege}
        if priv == nil then
            -- Privilege is not yet granted, no op needed
            return nil
        end

        return {
            kind = 'acl',
            op_kind = 'revoke_privilege',
            priv_def = {
                privilege = privilege,
                object_type = object_type,
                object_id = object_id,
                grantee_id = grantee_def.id,
                grantor_id = priv.grantor_id,
                schema_version = next_schema_version(),
            },
            initiator = box.session.euid(),
        }
    end

    return reenterable_schema_change_request(deadline, make_op_if_needed)
end

help.create_table = [[
pico.create_table(opts)
=======================

Creates a table.

Returns when the table is created globally and becomes operable on the
current instance. Returns the index of the corresponding Op::DdlCommit
raft entry. It's necessary for syncing with other instances.
Skips the request if the table already exists.

NOTE: This operation requires updating state on multiple instances of the cluster
and it will wait until a response is received from each one of them. If the timeout
option is small enough it may happen that timeout is reached while only a part
of instances have replied successfully. In this case a timeout error is returned
to the user, but the requested change may still finalize some time later.
For this reason it is always safe to retry calling the same function with the
same arguments in case of a timeout error. If the change is finalized at some
point the corresponding result is returned on each subsequent call.

Params:

    1. opts (table)
        - name (string)
        - format (table {table TableField,...}), see pico.help('table TableField')
        - primary_key (table {string,...}), with field names
        - id (optional number), default: implicitly generated
        - distribution (string), one of 'global' | 'sharded'
            in case it's sharded, either `by_field` (for explicit sharding)
            or `sharding_key`+`sharding_fn` (for implicit sharding) options
            must be supplied.
        - by_field (optional string), usually 'bucket_id'
        - sharding_key (optional table {string,...}) with field names
        - sharding_fn (optional string), only default 'murmur3' is supported for now
        - engine (optional string), one of 'memtx' | 'vinyl', default: 'memtx'
        - timeout (optional number), in seconds, default: infinity

Returns:

    (number) raft index
    or
    (nil, string) in case of an error

Example:

    -- Creates a global memtx table 'friends_of_peppa' with two fields:
    -- id (unsigned) and name (string).
    pico.create_table({
        name = 'friends_of_peppa',
        format = {
            {name = 'id', type = 'unsigned', is_nullable = false},
            {name = 'name', type = 'string', is_nullable = false},
        },
        primary_key = {'id'},
        distribution = 'global',
        timeout = 3,
    })

    -- Global tables are updated with compare-and-swap, see pico.help('cas')
    pico.cas({
        kind = 'insert',
        table = 'friends_of_peppa',
        tuple = {1, 'Suzy'},
    })

    -- Global tables are read with Tarantool `box.space` API for now.
    box.space.friends_of_peppa:fselect()

    -- Creates an implicitly sharded table 'wonderland' with two fields:
    -- property (string) and value (any).
    pico.create_table({
        name = 'wonderland',
        format = {
            {name = 'property', type = 'string', is_nullable = false},
            {name = 'value', type = 'integer', is_nullable = true}
        },
        primary_key = {'property'},
        distribution = 'sharded',
        sharding_key = {'property'},
        timeout = 3,
    })

    -- Calculate an SQL-compatible hash for the bucket id.
    local key = require('key_def').new({{fieldno = 1, type = 'string'}})
    local tuple = box.tuple.new({'unicorns'})
    local bucket_id = key:hash(tuple) % vshard.router.bucket_count()

    -- Sharded tables are updated via vshard api, see [1]
    vshard.router.callrw(bucket_id, 'box.space.wonderland:insert', {{'unicorns', 12}})

See also:

    [1]: https://www.tarantool.io/en/doc/latest/reference/reference_rock/vshard/vshard_router/
]]
function pico.create_table(opts)
    -- Set the creator/owner user of the table being created to current user
    opts.owner = box.session.euid()

    local ok, err = pcall(function()
        check_param_table(opts, {
            name = 'string',
            format = 'table',
            primary_key = 'table',
            id = 'number',
            distribution = 'string',
            by_field = 'string',
            sharding_key = 'table',
            sharding_fn = 'string',
            engine = 'string',
            timeout = 'number',
            owner = 'number,'
        })
        mandatory_param(opts, 'opts')
        mandatory_param(opts.name, 'opts.name')
        mandatory_param(opts.format, 'opts.format')
        mandatory_param(opts.primary_key, 'opts.primary_key')
        mandatory_param(opts.distribution, 'opts.distribution')
        if not opts.timeout then
            opts.timeout = TIMEOUT_INFINITY
        end

        local ok, err = pico._check_create_table_opts(opts)
        if not ok then
            box.error(box.error.ILLEGAL_PARAMS, err)
        end
    end)
    if not ok then
        return nil, err
    end

    local deadline = fiber.clock() + opts.timeout

    local should_wait_for_ddl_fin = true

    -- XXX: we construct this closure every time the function is called,
    -- which is bad for performance/jit. Refactor if problems are discovered.
    local function make_op_if_needed()
        local op, err = pico._make_create_table_op_if_needed(opts)
        if op ~= nil then
            return op
        elseif err ~= nil then
            error(err)
        else
            -- No op needed
            should_wait_for_ddl_fin = false
            return
        end
    end

    local index, err = reenterable_schema_change_request(deadline, make_op_if_needed)
    if index == nil then
        return nil, err
    end

    if not should_wait_for_ddl_fin then
        return index
    end

    local fin_index, err = pico.wait_ddl_finalize(index, { timeout = deadline - fiber.clock() })
    if fin_index == nil then
        return nil, err
    end

    return fin_index
end

help.drop_table = [[
pico.drop_table(table, [opts])
======================

Drops a table on each instance of the cluster.

Waits for the table to be dropped globally or returns an error if the timeout is
reached before that.
Skips the request if the table doesn't exist.

NOTE: This operation requires updating state on multiple instances of the cluster
and it will wait until a response is received from each one of them. If the timeout
option is small enough it may happen that timeout is reached while only a part
of instances have replied successfully. In this case a timeout error is returned
to the user, but the requested change may still finalize some time later.
For this reason it is always safe to retry calling the same function with the
same arguments in case of a timeout error. If the change is finalized at some
point the corresponding result is returned on each subsequent call.

Params:

    1. table (number | string), clusterwide table id or name

    2. opts (optional table)
        - timeout (optional number), in seconds, default: infinity

Returns:

    (number) raft index
    or
    (nil, error) in case of an error
]]
function pico.drop_table(table, opts)
    local ok, err = pcall(function()
        if type(table) ~= 'string' and type(table) ~= 'number' then
            box.error(box.error.ILLEGAL_PARAMS, 'table should be a number or a string')
        end
        check_param_table(opts, { timeout = 'number' })
        opts = opts or {}
        if not opts.timeout then
            opts.timeout = TIMEOUT_INFINITY
        end
    end)
    if not ok then
        return nil, err
    end

    local deadline = fiber.clock() + opts.timeout

    local should_wait_for_ddl_fin = true

    -- XXX: we construct this closure every time the function is called,
    -- which is bad for performance/jit. Refactor if problems are discovered.
    local function make_op_if_needed()
        local space_def = nil
        if type(table) == 'string' then
            space_def = box.space._pico_table.index._pico_table_name:get(table)
        elseif type(table) == 'number' then
            space_def = box.space._pico_table:get(table)
        end
        if space_def == nil then
            -- Table doesn't exist yet, no op needed
            should_wait_for_ddl_fin = false
            return nil
        end

        return {
            kind = 'ddl_prepare',
            schema_version = next_schema_version(),
            ddl = {
                kind = 'drop_table',
                id = space_def.id,
                initiator = box.session.euid(),
            }
        }
    end

    local index, err = reenterable_schema_change_request(deadline, make_op_if_needed)
    if index == nil then
        return nil, err
    end

    if not should_wait_for_ddl_fin then
        return index
    end

    local fin_index, err = pico.wait_ddl_finalize(index, { timeout = deadline - fiber.clock() })
    if fin_index == nil then
        return nil, err
    end

    return fin_index
end

help.update_plugin_config = [[
pico.drop_table(plugin_name, service_name, new_config, [opts])
======================

Update a plugin service configuration.

Params:
        1. plugin_name - plugin name

        2. service_name - service name

        3. new_config - new configuration

        4. opts (optional table)
            - timeout (optional number), in seconds, default: 10
]]
function pico.update_plugin_config(plugin_name, service_name, new_config, opts)
    local raw_new_config = require'msgpack'.encode(new_config)
    return pico._update_plugin_config(plugin_name, service_name, raw_new_config, opts)
end


_G.pico = pico
package.loaded.pico = pico