Skip to content
Snippets Groups Projects
Commit 81661f0c authored by Maksim Kaitmazian's avatar Maksim Kaitmazian Committed by Maksim Kaitmazian
Browse files

fix: a few bugs

terminating caused by 'SELECT COUNT(*) FROM T':
make server continue working in case of non fatal error.

sending CommandComplete before the operation is committed:
temporarily add fiber_sleep after dipatch_query, the better solution
will be added after discussion.
parent 07ca0504
No related branches found
No related tags found
1 merge request!920pgproto module
...@@ -143,10 +143,15 @@ dispatch_query_wrapped(const char *query, size_t query_len) ...@@ -143,10 +143,15 @@ dispatch_query_wrapped(const char *query, size_t query_len)
port_c_create(&out); port_c_create(&out);
struct box_function_ctx ctx = { &out }; struct box_function_ctx ctx = { &out };
int rc = dispatch_query(&ctx, args, args_end); int rc = dispatch_query(&ctx, args, args_end);
if (rc != 0) /**
return NULL; * Wait for the commit.
* @todo: fix me
*/
fiber_sleep(0.01);
const char *response = NULL;
uint32_t response_size; uint32_t response_size;
const char *response = port_get_msgpack(&out, &response_size); if (rc == 0)
response = port_get_msgpack(&out, &response_size);
port_destroy(&out); port_destroy(&out);
return response; return response;
} }
...@@ -288,7 +293,11 @@ send_data_rows(struct pg_port *port, const char **data, ...@@ -288,7 +293,11 @@ send_data_rows(struct pg_port *port, const char **data,
return row_count; return row_count;
} }
static uint32_t /**
* Parse and send query response.
* Returns -1 in case of error,
*/
static int64_t
process_query_response(struct pg_port *port, const char **response) process_query_response(struct pg_port *port, const char **response)
{ {
size_t row_count = 0; size_t row_count = 0;
...@@ -335,6 +344,11 @@ process_query_response(struct pg_port *port, const char **response) ...@@ -335,6 +344,11 @@ process_query_response(struct pg_port *port, const char **response)
return row_count; return row_count;
} }
/**
* Process a pending simple query message.
* Allocates on box region.
* Returns 0 if the query cycle can be continued, -1 otherwise.
*/
static int static int
process_simple_query_impl(struct pg_port *port) process_simple_query_impl(struct pg_port *port)
{ {
...@@ -343,6 +357,10 @@ process_simple_query_impl(struct pg_port *port) ...@@ -343,6 +357,10 @@ process_simple_query_impl(struct pg_port *port)
if (query == NULL) { if (query == NULL) {
pg_error(port, ERRCODE_INTERNAL_ERROR, pg_error(port, ERRCODE_INTERNAL_ERROR,
"failed to read a query message"); "failed to read a query message");
/**
* We can't restore the message borders,
* so the cycle should be stopped.
*/
return -1; return -1;
} }
...@@ -354,19 +372,18 @@ process_simple_query_impl(struct pg_port *port) ...@@ -354,19 +372,18 @@ process_simple_query_impl(struct pg_port *port)
"failed to execute query \'%s\': %s", "failed to execute query \'%s\': %s",
query, box_error_message(box_error_last())); query, box_error_message(box_error_last()));
/** /**
* Got client or memory error. * The error was properly handled,
* The error code should be considered to decide what to return. * so we can continue the query cycle.
* In case of sql error 0 should be returned and pg_error should
* report the code, otherwise -1 should be returned.
* @todo: Handle it smarter.
*/ */
return 0; return 0;
} }
bool display_row_count; bool display_row_count;
const char *command_tag = get_command_tag(query, &display_row_count); const char *command_tag = get_command_tag(query, &display_row_count);
size_t row_count = process_query_response(port, &response); int64_t row_count = process_query_response(port, &response);
send_command_complete(port, command_tag, display_row_count, row_count); /** Send CommandComplete only if no error happened. */
if (row_count != -1)
send_command_complete(port, command_tag, display_row_count, row_count);
return 0; return 0;
} }
...@@ -457,7 +474,7 @@ postgres_main(struct iostream *iostream) ...@@ -457,7 +474,7 @@ postgres_main(struct iostream *iostream)
pg_send_parameter_status(&port, "client_encoding", "UTF8"); pg_send_parameter_status(&port, "client_encoding", "UTF8");
if (start_query_cycle(&port) != 0) { if (start_query_cycle(&port) != 0) {
ret = -1; ret = -1;
goto close_connection; goto close_connection;
} }
......
get_all_targets(SOURCE_TARGETS ${PROJECT_SOURCE_DIR}/src) get_all_targets(SOURCE_TARGETS ${PROJECT_SOURCE_DIR}/src)
set(LUA_CPATH "${PROJECT_BINARY_DIR}/src/server/?.so") list(APPEND LUA_CPATH "${PROJECT_BINARY_DIR}/src/server/?.so")
list(APPEND LUA_CPATH "${PROJECT_BINARY_DIR}/src/server/?.dylib")
list(JOIN LUA_CPATH "\;" LUA_CPATH)
add_custom_target(test add_custom_target(test
DEPENDS ${SOURCE_TARGETS} DEPENDS ${SOURCE_TARGETS}
......
...@@ -69,7 +69,7 @@ def test_simple_flow_session(cluster: Cluster): ...@@ -69,7 +69,7 @@ def test_simple_flow_session(cluster: Cluster):
user = 'admin' user = 'admin'
password = 'password' password = 'password'
i1.eval("box.cfg{auth_type='md5', log_level=7}") i1.eval("box.cfg{auth_type='md5'}")
i1.eval(f"box.schema.user.passwd('{user}', '{password}')") i1.eval(f"box.schema.user.passwd('{user}', '{password}')")
os.environ['PGSSLMODE'] = 'disable' os.environ['PGSSLMODE'] = 'disable'
...@@ -105,4 +105,50 @@ def test_simple_flow_session(cluster: Cluster): ...@@ -105,4 +105,50 @@ def test_simple_flow_session(cluster: Cluster):
assert [2, 'to', False, 0.2] in tuples assert [2, 'to', False, 0.2] in tuples
assert [4, 'for', True, 0.4] in tuples assert [4, 'for', True, 0.4] in tuples
cur.execute("""
DROP TABLE "tall";
""")
stop_pg_server(i1) stop_pg_server(i1)
# Aggregates return value type is decimal, which is currently not supported,
# so an error is expected
def test_aggregate_error(cluster: Cluster):
cluster.deploy(instance_count=1)
i1 = cluster.instances[0]
host = '127.0.0.1'
service = '54321'
start_pg_server(i1, host, service)
user = 'admin'
password = 'password'
i1.eval("box.cfg{auth_type='md5'}")
i1.eval(f"box.schema.user.passwd('{user}', '{password}')")
os.environ['PGSSLMODE'] = 'disable'
conn = pg.Connection(user, password=password, host=host, port=int(service))
conn.autocommit = True
cur = conn.cursor()
cur.execute("""
create table "tall" (
"id" integer not null,
"str" string,
"bool" boolean,
"real" double,
primary key ("id")
)
using memtx distributed by ("id")
option (timeout = 3);
""")
with pytest.raises(pg.DatabaseError, match="can't parse attributes description"):
cur.execute("""
SELECT COUNT(*) FROM "tall";
""")
with pytest.raises(pg.DatabaseError, match="can't parse attributes description"):
cur.execute("""
SELECT SUM("id") FROM "tall";
""")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment