Skip to content
Snippets Groups Projects
Commit 24fb6d3d authored by Maksim Kaitmazian's avatar Maksim Kaitmazian Committed by Maksim Kaitmazian
Browse files

feat: support portal encoding

parent 4e21d5a3
No related branches found
No related tags found
1 merge request!920pgproto module
......@@ -86,11 +86,13 @@ pub fn process_bind_message(
let params = mem::take(bind.parameters_mut());
let formats = bind.parameter_format_codes();
let params = decode_parameter_values(params, &describe.param_oids, formats)?;
let result_format = bind.result_column_format_codes();
manager.bind(
bind.statement_name().as_deref(),
bind.portal_name().as_deref(),
params,
result_format,
)?;
stream.write_message_noflush(messages::bind_complete())?;
Ok(())
......
......@@ -229,8 +229,7 @@ impl Entrypoints {
let bind = LuaFunction::load(
tarantool::lua_state(),
"
local client_id, statement, portal, params = ...
local res, err = pico.pg_bind(client_id, statement, portal, params, {})
local res, err = pico.pg_bind(...)
if res == nil then
error(err)
end
......@@ -351,9 +350,10 @@ impl Entrypoints {
statement: &str,
portal: &str,
params: Vec<PgValue>,
result_format: &[i16],
) -> PgResult<()> {
self.bind
.call_with_args((id, statement, portal, params))
.call_with_args((id, statement, portal, params, result_format))
.map_err(|e| PgError::TarantoolError(e.into()))
}
......
......@@ -68,6 +68,7 @@ impl StorageManager {
statement: Option<&str>,
portal: Option<&str>,
params: Vec<PgValue>,
result_format: &[i16],
) -> PgResult<()> {
PG_ENTRYPOINTS.with(|entrypoints| {
entrypoints.borrow().bind(
......@@ -75,6 +76,7 @@ impl StorageManager {
statement.unwrap_or(""),
portal.unwrap_or(""),
params,
result_format,
)
})
}
......
......@@ -55,6 +55,10 @@ impl PortalDescribe {
pub fn command_tag(&self) -> &CommandTag {
self.describe.command_tag()
}
pub fn output_format(&self) -> &[Format] {
&self.output_format
}
}
#[derive(Debug, Deserialize, PartialEq, Eq, Clone)]
......
use super::{
describe::{CommandTag, PortalDescribe, QueryType},
value::PgValue,
value::{Format, PgValue},
};
use crate::error::PgResult;
use bytes::BytesMut;
use pgwire::messages::data::{DataRow, RowDescription};
use std::iter::zip;
use std::vec::IntoIter;
fn encode_row(values: Vec<PgValue>, buf: &mut BytesMut) -> DataRow {
let row = values.into_iter().map(|v| v.encode(buf).unwrap()).collect();
fn encode_row(values: Vec<PgValue>, formats: &[Format], buf: &mut BytesMut) -> DataRow {
let row = zip(values, formats)
.map(|(v, f)| v.encode(f, buf).unwrap())
.collect();
DataRow::new(row)
}
......@@ -71,7 +74,7 @@ impl Iterator for ExecuteResult {
fn next(&mut self) -> Option<DataRow> {
self.values_stream.next().map(|row| {
let row = encode_row(row, &mut self.buf);
let row = encode_row(row, self.describe.output_format(), &mut self.buf);
self.buf.clear();
self.row_count += 1;
row
......
use bytes::{BufMut, Bytes, BytesMut};
use pgwire::types::ToSqlText;
use postgres_types::FromSql;
use postgres_types::Type;
use postgres_types::{FromSql, ToSql};
use postgres_types::{IsNull, Oid};
use serde_json::Value;
use serde_repr::Deserialize_repr;
use std::error::Error;
use std::str;
use tarantool::tlua::{AsLua, Nil, PushInto};
......@@ -94,27 +95,43 @@ fn decode_text_as_bool(s: &str) -> PgResult<bool> {
}
}
fn bool_to_sql_text(val: bool, buf: &mut BytesMut) -> Result<IsNull, Box<dyn Error + Send + Sync>> {
// There are many representations of bool in text, but some connectors do not support all of them,
// for instance, pg8000 doesn't recognize "true"/"false" as valid boolean values.
// It seems that "t"/"f" variants are likely to be supported, because they are more efficient and
// at least they work with psql, psycopg and pg8000.
buf.put_u8(if val { b't' } else { b'f' });
Ok(IsNull::No)
}
impl PgValue {
pub fn encode(&self, buf: &mut BytesMut) -> PgResult<Option<Bytes>> {
let do_encode = |buf: &mut BytesMut| match &self {
PgValue::Boolean(val) => {
buf.put_u8(if *val { b't' } else { b'f' });
Ok(IsNull::No)
}
PgValue::Integer(number) => {
number.to_sql_text(&Type::INT8, buf)?;
Ok(IsNull::No)
}
PgValue::Float(number) => {
number.to_sql_text(&Type::FLOAT8, buf)?;
Ok(IsNull::No)
}
fn encode_text(&self, buf: &mut BytesMut) -> Result<IsNull, Box<dyn Error + Send + Sync>> {
match &self {
PgValue::Boolean(val) => bool_to_sql_text(*val, buf),
PgValue::Integer(number) => number.to_sql_text(&Type::INT8, buf),
PgValue::Float(number) => number.to_sql_text(&Type::FLOAT8, buf),
PgValue::Text(string) => string.to_sql_text(&Type::TEXT, buf),
PgValue::Null => Ok(IsNull::Yes),
};
}
}
fn encode_binary(&self, buf: &mut BytesMut) -> Result<IsNull, Box<dyn Error + Send + Sync>> {
match &self {
PgValue::Boolean(val) => val.to_sql(&Type::BOOL, buf),
PgValue::Integer(number) => number.to_sql(&Type::INT8, buf),
PgValue::Float(number) => number.to_sql(&Type::FLOAT8, buf),
PgValue::Text(string) => string.to_sql(&Type::TEXT, buf),
PgValue::Null => Ok(IsNull::Yes),
}
}
pub fn encode(&self, format: &Format, buf: &mut BytesMut) -> PgResult<Option<Bytes>> {
let len = buf.len();
let is_null = do_encode(buf).map_err(|e| PgError::EncodingError(e))?;
let is_null = match format {
Format::Text => self.encode_text(buf),
Format::Binary => self.encode_binary(buf),
}
.map_err(|e| PgError::EncodingError(e))?;
if let IsNull::No = is_null {
Ok(Some(buf.split_off(len).freeze()))
} else {
......
......@@ -164,3 +164,13 @@ def test_parameterized_queries(postgres: Postgres):
)
rows = cur.fetchall()
assert sorted([params1, params2]) == sorted(rows)
cur = conn.execute(
"""
SELECT * FROM "tall";
""",
binary=True,
)
rows = cur.fetchall()
assert sorted([params1, params2]) == sorted(rows)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment