From 81661f0cb47ba1c7ce2d21a70e0f180350a5ccb7 Mon Sep 17 00:00:00 2001 From: Kaitmazian Maksim <m.kaitmazian@picodata.io> Date: Thu, 10 Aug 2023 20:22:04 +0300 Subject: [PATCH] fix: a few bugs terminating caused by 'SELECT COUNT(*) FROM T': make server continue working in case of non fatal error. sending CommandComplete before the operation is committed: temporarily add fiber_sleep after dipatch_query, the better solution will be added after discussion. --- pgproto/src/postgres/postgres.c | 41 ++++++++++++++++++-------- pgproto/test/CMakeLists.txt | 4 ++- pgproto/test/simple_query_test.py | 48 ++++++++++++++++++++++++++++++- 3 files changed, 79 insertions(+), 14 deletions(-) diff --git a/pgproto/src/postgres/postgres.c b/pgproto/src/postgres/postgres.c index 1aeca7db5f..1bf8f6041b 100644 --- a/pgproto/src/postgres/postgres.c +++ b/pgproto/src/postgres/postgres.c @@ -143,10 +143,15 @@ dispatch_query_wrapped(const char *query, size_t query_len) port_c_create(&out); struct box_function_ctx ctx = { &out }; int rc = dispatch_query(&ctx, args, args_end); - if (rc != 0) - return NULL; + /** + * Wait for the commit. + * @todo: fix me + */ + fiber_sleep(0.01); + const char *response = NULL; uint32_t response_size; - const char *response = port_get_msgpack(&out, &response_size); + if (rc == 0) + response = port_get_msgpack(&out, &response_size); port_destroy(&out); return response; } @@ -288,7 +293,11 @@ send_data_rows(struct pg_port *port, const char **data, return row_count; } -static uint32_t +/** + * Parse and send query response. + * Returns -1 in case of error, + */ +static int64_t process_query_response(struct pg_port *port, const char **response) { size_t row_count = 0; @@ -335,6 +344,11 @@ process_query_response(struct pg_port *port, const char **response) return row_count; } +/** + * Process a pending simple query message. + * Allocates on box region. + * Returns 0 if the query cycle can be continued, -1 otherwise. + */ static int process_simple_query_impl(struct pg_port *port) { @@ -343,6 +357,10 @@ process_simple_query_impl(struct pg_port *port) if (query == NULL) { pg_error(port, ERRCODE_INTERNAL_ERROR, "failed to read a query message"); + /** + * We can't restore the message borders, + * so the cycle should be stopped. + */ return -1; } @@ -354,19 +372,18 @@ process_simple_query_impl(struct pg_port *port) "failed to execute query \'%s\': %s", query, box_error_message(box_error_last())); /** - * Got client or memory error. - * The error code should be considered to decide what to return. - * In case of sql error 0 should be returned and pg_error should - * report the code, otherwise -1 should be returned. - * @todo: Handle it smarter. + * The error was properly handled, + * so we can continue the query cycle. */ return 0; } bool display_row_count; const char *command_tag = get_command_tag(query, &display_row_count); - size_t row_count = process_query_response(port, &response); - send_command_complete(port, command_tag, display_row_count, row_count); + int64_t row_count = process_query_response(port, &response); + /** Send CommandComplete only if no error happened. */ + if (row_count != -1) + send_command_complete(port, command_tag, display_row_count, row_count); return 0; } @@ -457,7 +474,7 @@ postgres_main(struct iostream *iostream) pg_send_parameter_status(&port, "client_encoding", "UTF8"); - if (start_query_cycle(&port) != 0) { + if (start_query_cycle(&port) != 0) { ret = -1; goto close_connection; } diff --git a/pgproto/test/CMakeLists.txt b/pgproto/test/CMakeLists.txt index c5932d4558..09caac42d2 100644 --- a/pgproto/test/CMakeLists.txt +++ b/pgproto/test/CMakeLists.txt @@ -1,6 +1,8 @@ get_all_targets(SOURCE_TARGETS ${PROJECT_SOURCE_DIR}/src) -set(LUA_CPATH "${PROJECT_BINARY_DIR}/src/server/?.so") +list(APPEND LUA_CPATH "${PROJECT_BINARY_DIR}/src/server/?.so") +list(APPEND LUA_CPATH "${PROJECT_BINARY_DIR}/src/server/?.dylib") +list(JOIN LUA_CPATH "\;" LUA_CPATH) add_custom_target(test DEPENDS ${SOURCE_TARGETS} diff --git a/pgproto/test/simple_query_test.py b/pgproto/test/simple_query_test.py index 6638208a7a..7ef6973978 100644 --- a/pgproto/test/simple_query_test.py +++ b/pgproto/test/simple_query_test.py @@ -69,7 +69,7 @@ def test_simple_flow_session(cluster: Cluster): user = 'admin' password = 'password' - i1.eval("box.cfg{auth_type='md5', log_level=7}") + i1.eval("box.cfg{auth_type='md5'}") i1.eval(f"box.schema.user.passwd('{user}', '{password}')") os.environ['PGSSLMODE'] = 'disable' @@ -105,4 +105,50 @@ def test_simple_flow_session(cluster: Cluster): assert [2, 'to', False, 0.2] in tuples assert [4, 'for', True, 0.4] in tuples + cur.execute(""" + DROP TABLE "tall"; + """) + stop_pg_server(i1) + +# Aggregates return value type is decimal, which is currently not supported, +# so an error is expected +def test_aggregate_error(cluster: Cluster): + cluster.deploy(instance_count=1) + i1 = cluster.instances[0] + + host = '127.0.0.1' + service = '54321' + start_pg_server(i1, host, service) + + user = 'admin' + password = 'password' + i1.eval("box.cfg{auth_type='md5'}") + i1.eval(f"box.schema.user.passwd('{user}', '{password}')") + + os.environ['PGSSLMODE'] = 'disable' + conn = pg.Connection(user, password=password, host=host, port=int(service)) + conn.autocommit = True + cur = conn.cursor() + + cur.execute(""" + create table "tall" ( + "id" integer not null, + "str" string, + "bool" boolean, + "real" double, + primary key ("id") + ) + using memtx distributed by ("id") + option (timeout = 3); + """) + + with pytest.raises(pg.DatabaseError, match="can't parse attributes description"): + cur.execute(""" + SELECT COUNT(*) FROM "tall"; + """) + + with pytest.raises(pg.DatabaseError, match="can't parse attributes description"): + cur.execute(""" + SELECT SUM("id") FROM "tall"; + """) -- GitLab