From d4b5b738eff71a5003e5aefe1e57d5732244b42d Mon Sep 17 00:00:00 2001
From: Vartan Babayan <v.babayan@picodata.io>
Date: Mon, 10 Feb 2025 23:08:34 +0300
Subject: [PATCH] feat!: add connection type column to _pico_peer_address and
 store pg_proto address in _pico_peer_address

---
 CHANGELOG.md             |  8 +++--
 src/bootstrap_entries.rs | 13 +++++++
 src/cli/status.rs        |  3 +-
 src/config.rs            | 13 +++----
 src/discovery.rs         |  2 +-
 src/governor/mod.rs      |  3 +-
 src/http_server.rs       |  7 ++--
 src/info.rs              |  3 +-
 src/lib.rs               | 20 +++++++++--
 src/rpc/join.rs          | 24 +++++++++++--
 src/rpc/mod.rs           |  8 +++--
 src/storage.rs           | 74 ++++++++++++++++++++++++++++++----------
 src/storage/snapshot.rs  | 35 ++++++++++++++-----
 src/sync.rs              |  2 +-
 src/traft/mod.rs         | 12 +++++++
 src/traft/network.rs     | 14 ++++----
 src/vshard.rs            |  2 ++
 test/conftest.py         |  7 +++-
 test/int/test_basics.py  | 20 +++++++++--
 test/int/test_joining.py |  2 ++
 test/int/test_sql.py     |  1 +
 21 files changed, 212 insertions(+), 61 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index eef649d21b..224b0d6cfd 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -56,9 +56,6 @@ with the `YY.MINOR.MICRO` scheme.
 
 - `listen`, `advertise` parameters are renamed to `iproto_listen`, `iproto_advertise`
 
-- New field `_pico_property.system_catalog_version` representing version of a system catalog.
-  It may not be changed at every release, so this is not autoincrementing value.
-
 - Added scopes to all parameters from `_pico_db_config`. There are two scopesright now - `tier`
 and `global`. Parameters with scope `tier`
 can be different on different tiers.
@@ -97,11 +94,16 @@ to 2 and 3.
 
 - New special command `\set delimiter enter` to change the default delimiter to EOL (End Of Line). Introduced a new inner prompt prefix to indicate when input is waiting for a delimiter. EOF is now treated as a delimiter when reading files.
 
+- New field `_pico_property.system_catalog_version` representing version of a system catalog.
+  It may not be changed at every release, so this is not autoincrementing value.
+
 - From now on, when joining a cluster, an instance's version must be the same as the cluster's version or one minor version higher. For example, if the cluster's version is 25.1, only instances with versions 25.1 or 25.2 can join.
   - In the `_pico_property` table, there is a new field called `cluster_version`, which shows the global version of the cluster.
   - In the `_pico_instance` table, there is a new field called `picodata_version` that displays the version of the executable running on the instance.
   - The global `cluster_version` is updated by the governor only when every instance in the cluster has been upgraded to the new minor version.
 
+- System table `_pico_peer_address` has a new column `conncetion_type` that indicates the connection type of the peer. It can be `iproto` or `pgproto`.
+
 - Global rename
   - Config File Changes:
     - `data_dir` renamed to `instance_dir`
diff --git a/src/bootstrap_entries.rs b/src/bootstrap_entries.rs
index ba494c51ab..f8c28faab6 100644
--- a/src/bootstrap_entries.rs
+++ b/src/bootstrap_entries.rs
@@ -41,6 +41,19 @@ pub(super) fn prepare(
             &traft::PeerAddress {
                 raft_id: instance.raft_id,
                 address: config.instance.iproto_advertise().to_host_port(),
+                connection_type: traft::ConnectionType::Iproto,
+            },
+            ADMIN_ID,
+        )
+        .expect("serialization cannot fail"),
+    );
+    ops.push(
+        op::Dml::replace(
+            storage::PeerAddresses::TABLE_ID,
+            &traft::PeerAddress {
+                raft_id: instance.raft_id,
+                address: config.instance.pg.listen().to_host_port(),
+                connection_type: traft::ConnectionType::Pgproto,
             },
             ADMIN_ID,
         )
diff --git a/src/cli/status.rs b/src/cli/status.rs
index 0e1ff08541..9cf429254e 100644
--- a/src/cli/status.rs
+++ b/src/cli/status.rs
@@ -101,7 +101,8 @@ fn main_impl(args: args::Status) -> Result<(), Box<dyn std::error::Error>> {
                        PI.replicaset_uuid,
                        PI.tier, PA.address uri
                 FROM _pico_peer_address PA
-                JOIN _pico_instance PI ON PA.raft_id = PI.raft_id)
+                JOIN _pico_instance PI ON PA.raft_id = PI.raft_id
+                WHERE connection_type = 'iproto')
             ORDER BY instance_name"#,
             Vec::<()>::new(),
         ),
diff --git a/src/config.rs b/src/config.rs
index 032aafea73..4b98f38005 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -6,7 +6,6 @@ use crate::instance::InstanceName;
 use crate::introspection::leaf_field_paths;
 use crate::introspection::FieldInfo;
 use crate::introspection::Introspection;
-use crate::pgproto;
 use crate::replicaset::ReplicasetName;
 use crate::schema::ADMIN_ID;
 use crate::sql::value_type_str;
@@ -23,6 +22,7 @@ use crate::traft::RaftSpaceAccess;
 use crate::util::edit_distance;
 use crate::util::file_exists;
 use crate::{config_parameter_path, sql};
+use crate::{pgproto, traft};
 use sbroad::ir::relation::DerivedType;
 use sbroad::ir::value::{EncodedValue, Value};
 use serde_yaml::Value as YamlValue;
@@ -629,8 +629,7 @@ Using configuration file '{args_path}'.");
         todo!()
     }
 
-    /// Does validation of configuration parameters which are persisted in the
-    /// storage.
+    /// Does validation of configuration parameters which are persisted in the storage.
     pub fn validate_storage(
         &self,
         storage: &storage::Clusterwide,
@@ -646,7 +645,7 @@ Using configuration file '{args_path}'.");
             _ => {}
         }
 
-        // Instance id
+        // Instance name
         let mut instance_name = None;
         match (raft_storage.instance_name()?, &self.instance.name) {
             (Some(from_storage), Some(from_config)) if from_storage != from_config => {
@@ -660,7 +659,7 @@ Using configuration file '{args_path}'.");
             _ => {}
         }
 
-        // Replicaset id
+        // Replicaset name
         if let Some(instance_name) = &instance_name {
             if let Ok(instance_info) = storage.instances.get(instance_name) {
                 match (
@@ -690,7 +689,9 @@ Using configuration file '{args_path}'.");
         // Advertise address
         if let Some(raft_id) = raft_storage.raft_id()? {
             match (
-                storage.peer_addresses.get(raft_id)?,
+                storage
+                    .peer_addresses
+                    .get(raft_id, &traft::ConnectionType::Iproto)?,
                 &self.instance.iproto_advertise,
             ) {
                 (Some(from_storage), Some(from_config))
diff --git a/src/discovery.rs b/src/discovery.rs
index 6fa420af35..443d15410c 100644
--- a/src/discovery.rs
+++ b/src/discovery.rs
@@ -225,7 +225,7 @@ fn proc_discover<'a>(request: Request, request_to: Address) -> Result<Response,
             .map(|leader_id| (&node.storage.peer_addresses, leader_id, status.id))
     });
     if let Some((peers_addresses, leader_id, id)) = ready_ids {
-        let leader_address = peers_addresses.try_get(leader_id)?;
+        let leader_address = peers_addresses.try_get(leader_id, &traft::ConnectionType::Iproto)?;
         Ok(Response::Done(Role::new(leader_address, leader_id == id)))
     } else {
         let mut discovery = discovery();
diff --git a/src/governor/mod.rs b/src/governor/mod.rs
index 5f95d982d3..2514e9c393 100644
--- a/src/governor/mod.rs
+++ b/src/governor/mod.rs
@@ -41,7 +41,7 @@ use crate::traft::node::Status;
 use crate::traft::op::Dml;
 use crate::traft::op::PluginRaftOp;
 use crate::traft::raft_storage::RaftSpaceAccess;
-use crate::traft::Result;
+use crate::traft::{ConnectionType, Result};
 use crate::unwrap_ok_or;
 use futures::future::try_join;
 use futures::future::try_join_all;
@@ -100,6 +100,7 @@ impl Loop {
             .peer_addresses
             .iter()
             .unwrap()
+            .filter(|peer| peer.connection_type == ConnectionType::Iproto)
             .map(|pa| (pa.raft_id, pa.address))
             .collect();
         let voters = raft_storage.voters().expect("storage should never fail");
diff --git a/src/http_server.rs b/src/http_server.rs
index 78690d4b9a..892c119d54 100644
--- a/src/http_server.rs
+++ b/src/http_server.rs
@@ -5,7 +5,7 @@ use crate::storage::Clusterwide;
 use crate::storage::ToEntryIter as _;
 use crate::tier::Tier;
 use crate::traft::network::ConnectionPool;
-use crate::traft::Result;
+use crate::traft::{ConnectionType, Result};
 use crate::util::Uppercase;
 use crate::{has_states, tlog, unwrap_ok_or};
 use futures::future::join_all;
@@ -174,7 +174,10 @@ fn get_peer_addresses(
         })
         .map(|item| (item.raft_id, true))
         .collect();
-    let i = storage.peer_addresses.iter()?;
+    let i = storage
+        .peer_addresses
+        .iter()?
+        .filter(|peer| peer.connection_type == ConnectionType::Iproto);
     Ok(i.filter(|pa| leaders.get(&pa.raft_id) == Some(&true))
         .map(|pa| (pa.raft_id, pa.address))
         .collect())
diff --git a/src/info.rs b/src/info.rs
index 319e6dc19b..bcdb43fb81 100644
--- a/src/info.rs
+++ b/src/info.rs
@@ -3,6 +3,7 @@ use crate::instance::InstanceName;
 use crate::instance::State;
 use crate::replicaset::ReplicasetName;
 use crate::tlua;
+use crate::traft;
 use crate::traft::error::Error;
 use crate::traft::node;
 use crate::traft::RaftId;
@@ -126,7 +127,7 @@ impl InstanceInfo {
         let peer_address = node
             .storage
             .peer_addresses
-            .get(instance.raft_id)?
+            .get(instance.raft_id, &traft::ConnectionType::Iproto)?
             .unwrap_or_else(|| "<unknown>".into());
 
         let cluster_name = node.raft_storage.cluster_name()?;
diff --git a/src/lib.rs b/src/lib.rs
index 574c36460c..374c0d5489 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -925,6 +925,7 @@ fn start_join(config: &PicodataConfig, instance_address: String) -> Result<(), E
         instance_name: config.instance.name().map(From::from),
         replicaset_name: config.instance.replicaset_name().map(From::from),
         advertise_address: config.instance.iproto_advertise().to_host_port(),
+        pgproto_advertise_address: config.instance.pg.listen().to_host_port(),
         failure_domain: config.instance.failure_domain().clone(),
         tier: config.instance.tier().into(),
         picodata_version: version,
@@ -975,8 +976,16 @@ fn start_join(config: &PicodataConfig, instance_address: String) -> Result<(), E
     let raft_id = resp.instance.raft_id;
     transaction(|| -> Result<(), TntError> {
         storage.instances.put(&resp.instance).unwrap();
-        for traft::PeerAddress { raft_id, address } in resp.peer_addresses {
-            storage.peer_addresses.put(raft_id, &address).unwrap();
+        for traft::PeerAddress {
+            raft_id,
+            address,
+            connection_type,
+        } in resp.peer_addresses
+        {
+            storage
+                .peer_addresses
+                .put(raft_id, &address, &connection_type)
+                .unwrap();
         }
         raft_storage.persist_raft_id(raft_id).unwrap();
         raft_storage
@@ -1126,7 +1135,12 @@ fn postjoin(
             .expect("storage should never fail");
         // Doesn't have to be leader - can be any online peer
         let leader_id = node.status().leader_id;
-        let leader_address = leader_id.and_then(|id| storage.peer_addresses.try_get(id).ok());
+        let leader_address = leader_id.and_then(|id| {
+            storage
+                .peer_addresses
+                .try_get(id, &traft::ConnectionType::Iproto)
+                .ok()
+        });
         let Some(leader_address) = leader_address else {
             // FIXME: don't hard code timeout
             let timeout = Duration::from_millis(250);
diff --git a/src/rpc/join.rs b/src/rpc/join.rs
index 44b7b75c48..62aae0c4ef 100644
--- a/src/rpc/join.rs
+++ b/src/rpc/join.rs
@@ -48,6 +48,7 @@ crate::define_rpc_request! {
         pub instance_name: Option<InstanceName>,
         pub replicaset_name: Option<ReplicasetName>,
         pub advertise_address: String,
+        pub pgproto_advertise_address: String,
         pub failure_domain: FailureDomain,
         pub tier: String,
         pub picodata_version: String,
@@ -131,13 +132,27 @@ pub fn handle_join_request_and_wait(req: Request, timeout: Duration) -> Result<R
         let peer_address = traft::PeerAddress {
             raft_id: instance.raft_id,
             address: req.advertise_address.clone(),
+            connection_type: traft::ConnectionType::Iproto,
+        };
+        let pgproto_peer_address = traft::PeerAddress {
+            raft_id: instance.raft_id,
+            address: req.pgproto_advertise_address.clone(),
+            connection_type: traft::ConnectionType::Pgproto,
         };
 
-        let mut ops = Vec::with_capacity(3);
+        let mut ops = Vec::with_capacity(4);
         ops.push(
             Dml::replace(storage::PeerAddresses::TABLE_ID, &peer_address, ADMIN_ID)
                 .expect("encoding should not fail"),
         );
+        ops.push(
+            Dml::replace(
+                storage::PeerAddresses::TABLE_ID,
+                &pgproto_peer_address,
+                ADMIN_ID,
+            )
+            .expect("encoding should not fail"),
+        );
         ops.push(
             Dml::replace(storage::Instances::TABLE_ID, &instance, ADMIN_ID)
                 .expect("encoding should not fail"),
@@ -178,7 +193,12 @@ pub fn handle_join_request_and_wait(req: Request, timeout: Duration) -> Result<R
 
         // A joined instance needs to communicate with other nodes.
         // TODO: limit the number of entries sent to reduce response size.
-        let peer_addresses = node.storage.peer_addresses.iter()?.collect();
+        let peer_addresses = node
+            .storage
+            .peer_addresses
+            .iter()?
+            .filter(|peer| peer.connection_type == traft::ConnectionType::Iproto)
+            .collect();
         let replicas = storage
             .instances
             .replicaset_instances(&instance.replicaset_name)
diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs
index 01ec9e3fdf..61de60c9ab 100644
--- a/src/rpc/mod.rs
+++ b/src/rpc/mod.rs
@@ -14,8 +14,7 @@ use crate::replicaset::ReplicasetName;
 use crate::schema::PICO_SERVICE_USER_NAME;
 use crate::tlog;
 use crate::traft::error::Error;
-use crate::traft::node;
-use crate::traft::Result;
+use crate::traft::{node, ConnectionType, Result};
 
 use std::collections::HashMap;
 use std::fmt::Debug;
@@ -182,7 +181,10 @@ where
 {
     let node = node::global()?;
     let leader_id = node.status().leader_id.ok_or(Error::LeaderUnknown)?;
-    let leader_address = node.storage.peer_addresses.try_get(leader_id)?;
+    let leader_address = node
+        .storage
+        .peer_addresses
+        .try_get(leader_id, &ConnectionType::Iproto)?;
     let resp = network_call(&leader_address, proc_name, request).await?;
     Ok(resp)
 }
diff --git a/src/storage.rs b/src/storage.rs
index a140f3ccce..231c724f4d 100644
--- a/src/storage.rs
+++ b/src/storage.rs
@@ -734,6 +734,7 @@ impl TClusterwideTable for PeerAddresses {
         vec![
             Field::from(("raft_id", FieldType::Unsigned)),
             Field::from(("address", FieldType::String)),
+            Field::from(("connection_type", FieldType::String)),
         ]
     }
 
@@ -745,7 +746,10 @@ impl TClusterwideTable for PeerAddresses {
             name: "_pico_peer_address_raft_id".into(),
             ty: IndexType::Tree,
             opts: vec![IndexOption::Unique(true)],
-            parts: vec![Part::from(("raft_id", IndexFieldType::Unsigned)).is_nullable(false)],
+            parts: vec![
+                Part::from(("raft_id", IndexFieldType::Unsigned)).is_nullable(false),
+                Part::from(("connection_type", IndexFieldType::String)).is_nullable(false),
+            ],
             operable: true,
             // This means the local schema is already up to date and main loop doesn't need to do anything
             schema_version: INITIAL_SCHEMA_VERSION,
@@ -766,6 +770,7 @@ impl PeerAddresses {
             .index_builder("_pico_peer_address_raft_id")
             .unique(true)
             .part("raft_id")
+            .part("connection_type")
             .if_not_exists(true)
             .create()?;
 
@@ -773,29 +778,46 @@ impl PeerAddresses {
     }
 
     #[inline]
-    pub fn put(&self, raft_id: RaftId, address: &traft::Address) -> tarantool::Result<()> {
-        self.space.replace(&(raft_id, address))?;
+    pub fn put(
+        &self,
+        raft_id: RaftId,
+        address: &traft::Address,
+        connection_type: &traft::ConnectionType,
+    ) -> tarantool::Result<()> {
+        self.space.replace(&(raft_id, address, connection_type))?;
         Ok(())
     }
 
     #[allow(dead_code)]
     #[inline]
-    pub fn delete(&self, raft_id: RaftId) -> tarantool::Result<()> {
-        self.space.delete(&[raft_id])?;
+    pub fn delete(
+        &self,
+        raft_id: RaftId,
+        connection_type: &traft::ConnectionType,
+    ) -> tarantool::Result<()> {
+        self.space.delete(&(raft_id, connection_type))?;
         Ok(())
     }
 
     #[inline(always)]
-    pub fn get(&self, raft_id: RaftId) -> Result<Option<traft::Address>> {
-        let Some(tuple) = self.space.get(&[raft_id])? else {
+    pub fn get(
+        &self,
+        raft_id: RaftId,
+        connection_type: &traft::ConnectionType,
+    ) -> Result<Option<traft::Address>> {
+        let Some(tuple) = self.space.get(&(raft_id, connection_type))? else {
             return Ok(None);
         };
         tuple.field(1).map_err(Into::into)
     }
 
     #[inline(always)]
-    pub fn try_get(&self, raft_id: RaftId) -> Result<traft::Address> {
-        self.get(raft_id)?
+    pub fn try_get(
+        &self,
+        raft_id: RaftId,
+        connection_type: &traft::ConnectionType,
+    ) -> Result<traft::Address> {
+        self.get(raft_id, connection_type)?
             .ok_or(Error::AddressUnknownForRaftId(raft_id))
     }
 
@@ -804,7 +826,9 @@ impl PeerAddresses {
         &self,
         ids: impl IntoIterator<Item = RaftId>,
     ) -> Result<HashSet<traft::Address>> {
-        ids.into_iter().map(|id| self.try_get(id)).collect()
+        ids.into_iter()
+            .map(|id| self.try_get(id, &traft::ConnectionType::Iproto))
+            .collect()
     }
 }
 
@@ -3361,7 +3385,7 @@ mod tests {
         ] {
             space_by_name(Instances::TABLE_NAME).unwrap().put(&instance).unwrap();
             let (_, _, raft_id, ..) = instance;
-            space_peer_addresses.put(&(raft_id, format!("addr:{raft_id}"))).unwrap();
+            space_peer_addresses.put(&(raft_id, format!("addr:{raft_id}"), &traft::ConnectionType::Iproto)).unwrap();
         }
 
         let instance = storage.instances.all_instances().unwrap();
@@ -3400,8 +3424,8 @@ mod tests {
         {
             // Ensure traft storage doesn't impose restrictions
             // on peer_address uniqueness.
-            storage_peer_addresses.put(10, &traft::Address::from("addr:collision")).unwrap();
-            storage_peer_addresses.put(11, &traft::Address::from("addr:collision")).unwrap();
+            storage_peer_addresses.put(10, &traft::Address::from("addr:collision"), &traft::ConnectionType::Iproto).unwrap();
+            storage_peer_addresses.put(11, &traft::Address::from("addr:collision"), &traft::ConnectionType::Iproto).unwrap();
         }
 
         {
@@ -3429,7 +3453,7 @@ mod tests {
 
         let box_replication = |replicaset_name: &str| -> Vec<traft::Address> {
             storage.instances.replicaset_instances(replicaset_name).unwrap()
-                .map(|instance| storage_peer_addresses.try_get(instance.raft_id).unwrap())
+                .map(|instance| storage_peer_addresses.try_get(instance.raft_id, &traft::ConnectionType::Iproto).unwrap())
                 .collect::<Vec<_>>()
         };
 
@@ -3458,15 +3482,29 @@ mod tests {
 
         space_by_name(PeerAddresses::TABLE_NAME)
             .unwrap()
-            .insert(&(1, "foo"))
+            .insert(&(1, "foo", &traft::ConnectionType::Iproto))
             .unwrap();
         space_by_name(PeerAddresses::TABLE_NAME)
             .unwrap()
-            .insert(&(2, "bar"))
+            .insert(&(2, "bar", &traft::ConnectionType::Pgproto))
             .unwrap();
 
-        assert_eq!(storage.peer_addresses.get(1).unwrap().unwrap(), "foo");
-        assert_eq!(storage.peer_addresses.get(2).unwrap().unwrap(), "bar");
+        assert_eq!(
+            storage
+                .peer_addresses
+                .get(1, &traft::ConnectionType::Iproto)
+                .unwrap()
+                .unwrap(),
+            "foo"
+        );
+        assert_eq!(
+            storage
+                .peer_addresses
+                .get(2, &traft::ConnectionType::Pgproto)
+                .unwrap()
+                .unwrap(),
+            "bar"
+        );
 
         let inst = |raft_id, instance_name: &str| Instance {
             raft_id,
diff --git a/src/storage/snapshot.rs b/src/storage/snapshot.rs
index 8a70b37811..38074eb147 100644
--- a/src/storage/snapshot.rs
+++ b/src/storage/snapshot.rs
@@ -870,7 +870,12 @@ mod tests {
             tuples,
         });
 
-        let tuples = [(1, "google.com"), (2, "ya.ru")].to_tuple_buffer().unwrap();
+        let tuples = [
+            (1, "google.com", traft::ConnectionType::Iproto),
+            (2, "ya.ru", traft::ConnectionType::Pgproto),
+        ]
+        .to_tuple_buffer()
+        .unwrap();
         data.space_dumps.push(SpaceDump {
             space_id: PeerAddresses::TABLE_ID,
             tuples,
@@ -902,10 +907,18 @@ mod tests {
         assert_eq!(instance, i);
 
         assert_eq!(storage.peer_addresses.space.len().unwrap(), 2);
-        let addr = storage.peer_addresses.get(1).unwrap().unwrap();
-        assert_eq!(addr, "google.com");
-        let addr = storage.peer_addresses.get(2).unwrap().unwrap();
-        assert_eq!(addr, "ya.ru");
+        let addr = storage
+            .peer_addresses
+            .get(1, &traft::ConnectionType::Iproto)
+            .unwrap()
+            .unwrap();
+        assert_eq!(addr, "google.com", "iproto");
+        let addr = storage
+            .peer_addresses
+            .get(2, &traft::ConnectionType::Pgproto)
+            .unwrap()
+            .unwrap();
+        assert_eq!(addr, "ya.ru", "pgproto");
 
         assert_eq!(storage.replicasets.space.len().unwrap(), 1);
         let replicaset = storage.replicasets.get("r1").unwrap().unwrap();
@@ -926,9 +939,13 @@ mod tests {
         storage
             .peer_addresses
             .space
-            .insert(&(1, "google.com"))
+            .insert(&(1, "google.com", traft::ConnectionType::Iproto))
+            .unwrap();
+        storage
+            .peer_addresses
+            .space
+            .insert(&(2, "ya.ru", traft::ConnectionType::Pgproto))
             .unwrap();
-        storage.peer_addresses.space.insert(&(2, "ya.ru")).unwrap();
 
         storage.properties.space.insert(&("foo", "bar")).unwrap();
 
@@ -955,11 +972,11 @@ mod tests {
                 }
 
                 s if s == PeerAddresses::TABLE_ID => {
-                    let addrs: [(i32, String); 2] =
+                    let addrs: [(i32, &str, &str); 2] =
                         Decode::decode(space_dump.tuples.as_ref()).unwrap();
                     assert_eq!(
                         addrs,
-                        [(1, "google.com".to_string()), (2, "ya.ru".to_string())]
+                        [(1, "google.com", "iproto"), (2, "ya.ru", "pgproto")]
                     );
                 }
 
diff --git a/src/sync.rs b/src/sync.rs
index e5e573c934..7ca1dc34f5 100644
--- a/src/sync.rs
+++ b/src/sync.rs
@@ -297,7 +297,7 @@ mod tests {
         storage.instances.put(&instance).unwrap();
         storage
             .peer_addresses
-            .put(instance.raft_id, &listen)
+            .put(instance.raft_id, &listen, &traft::ConnectionType::Iproto)
             .unwrap();
         crate::init_handlers();
 
diff --git a/src/traft/mod.rs b/src/traft/mod.rs
index 30832861c7..d3b797b303 100644
--- a/src/traft/mod.rs
+++ b/src/traft/mod.rs
@@ -31,6 +31,14 @@ pub const INIT_RAFT_TERM: RaftTerm = 1;
 
 pub type Result<T, E = error::Error> = std::result::Result<T, E>;
 
+::tarantool::define_str_enum! {
+    /// An enumeration of all connection types that picodata supports.
+    pub enum ConnectionType {
+        Iproto = "iproto",
+        Pgproto = "pgproto",
+    }
+}
+
 //////////////////////////////////////////////////////////////////////////////////////////
 // RaftEntryId
 //////////////////////////////////////////////////////////////////////////////////////////
@@ -132,6 +140,10 @@ pub struct PeerAddress {
     /// Inbound address used for communication with the node.
     /// Not to be confused with listen address.
     pub address: Address,
+
+    /// Used for identifying the connection type.
+    /// For example "iproto", "pgproto".
+    pub connection_type: ConnectionType,
 }
 impl Encode for PeerAddress {}
 
diff --git a/src/traft/network.rs b/src/traft/network.rs
index 2ba537d288..394ad756f8 100644
--- a/src/traft/network.rs
+++ b/src/traft/network.rs
@@ -132,7 +132,7 @@ impl PoolWorker {
         let (stop_sender, stop_receiver) = oneshot::channel();
         let (inbox_ready_sender, inbox_ready_receiver) = watch::channel(());
         let instance_name = instance_name.into();
-        let full_address = storage.try_get(raft_id)?;
+        let full_address = storage.try_get(raft_id, &traft::ConnectionType::Iproto)?;
         let (address, port) = full_address
             .rsplit_once(':')
             .ok_or_else(|| Error::AddressParseFailure(full_address.clone()))?;
@@ -450,7 +450,9 @@ impl ConnectionPool {
         // Check if address of this peer is known.
         // No need to store the result,
         // because it will be updated in the loop
-        let _ = self.peer_addresses.try_get(raft_id)?;
+        let _ = self
+            .peer_addresses
+            .try_get(raft_id, &traft::ConnectionType::Iproto)?;
         let worker = PoolWorker::run(
             raft_id,
             instance_name.clone(),
@@ -666,7 +668,7 @@ mod tests {
         storage.instances.put(&instance).unwrap();
         storage
             .peer_addresses
-            .put(instance.raft_id, &listen)
+            .put(instance.raft_id, &listen, &traft::ConnectionType::Iproto)
             .unwrap();
 
         let result: u32 = fiber::block_on(
@@ -716,7 +718,7 @@ mod tests {
         storage.instances.put(&instance).unwrap();
         storage
             .peer_addresses
-            .put(instance.raft_id, &listen)
+            .put(instance.raft_id, &listen, &traft::ConnectionType::Iproto)
             .unwrap();
         tlog!(Info, "TEST: connecting {listen}");
         // pool.connect(1337, listen);
@@ -802,7 +804,7 @@ mod tests {
         storage.instances.put(&instance).unwrap();
         storage
             .peer_addresses
-            .put(instance.raft_id, &listen)
+            .put(instance.raft_id, &listen, &traft::ConnectionType::Iproto)
             .unwrap();
         tlog!(Info, "TEST: connecting {listen}");
 
@@ -880,7 +882,7 @@ mod tests {
         storage.instances.put(&instance).unwrap();
         storage
             .peer_addresses
-            .put(instance.raft_id, &listen)
+            .put(instance.raft_id, &listen, &traft::ConnectionType::Iproto)
             .unwrap();
         tlog!(Info, "TEST: connecting {listen}");
 
diff --git a/src/vshard.rs b/src/vshard.rs
index 40481b939c..5b9635d79b 100644
--- a/src/vshard.rs
+++ b/src/vshard.rs
@@ -11,6 +11,7 @@ use crate::storage::ToEntryIter as _;
 use crate::storage::TABLE_ID_BUCKET;
 use crate::traft::error::Error;
 use crate::traft::node;
+use crate::traft::ConnectionType;
 use crate::traft::RaftId;
 use crate::traft::Result;
 use sbroad::executor::engine::Vshard;
@@ -139,6 +140,7 @@ impl VshardConfig {
         let peer_addresses: HashMap<_, _> = storage
             .peer_addresses
             .iter()?
+            .filter(|peer| peer.connection_type == ConnectionType::Iproto)
             .map(|pa| (pa.raft_id, pa.address))
             .collect();
         let replicasets: Vec<_> = storage.replicasets.iter()?.collect();
diff --git a/test/conftest.py b/test/conftest.py
index 4f7aabc3de..33e869def9 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -2014,8 +2014,13 @@ class Cluster:
         if raft_info["state"] == "Leader":
             return self.peer
 
+        connection_type = "iproto"
         leader_id = raft_info["leader_id"]
-        [[leader_address]] = self.peer.sql("SELECT address FROM _pico_peer_address WHERE raft_id = ?", leader_id)
+        [[leader_address]] = self.peer.sql(
+            "SELECT address FROM _pico_peer_address WHERE raft_id = ? and connection_type = ?",
+            leader_id,
+            connection_type,
+        )
         self.peer = self.get_instance_by_address(leader_address)
         return self.peer
 
diff --git a/test/int/test_basics.py b/test/int/test_basics.py
index 2284af2ae5..005481c184 100644
--- a/test/int/test_basics.py
+++ b/test/int/test_basics.py
@@ -335,7 +335,8 @@ def test_raft_log(instance: Instance):
 |index|term|contents|
 +-----+----+--------+
 |  0  | 1  |BatchDml(
-Replace(_pico_peer_address, [1,"127.0.0.1:{p}"]),
+Replace(_pico_peer_address, [1,"127.0.0.1:{p}","iproto"]),
+Replace(_pico_peer_address, [1,"127.0.0.1:{pg_port}","pgproto"]),
 Insert(_pico_instance, ["default_1_1","{i1_uuid}",1,"default_1","{r1_uuid}",["Offline",0],["Offline",0],{b},"default","{picodata_version}"]),
 Insert(_pico_replicaset, ["default_1","{r1_uuid}","default_1_1","default_1_1","default",0.0,"auto","not-ready",0,0,{{}}]))|
 |  0  | 1  |BatchDml(Insert(_pico_tier, ["default",1,true,0,0,false,3000]))|
@@ -401,8 +402,8 @@ Insert(_pico_index, [{_pico_table},2,"_pico_table_owner_id","tree",[{{"unique":f
 Insert(_pico_table, [{_pico_index},"_pico_index",{{"Global":null}},[{{"field_type":"unsigned","is_nullable":false,"name":"table_id"}},{{"field_type":"unsigned","is_nullable":false,"name":"id"}},{{"field_type":"string","is_nullable":false,"name":"name"}},{{"field_type":"string","is_nullable":false,"name":"type"}},{{"field_type":"array","is_nullable":false,"name":"opts"}},{{"field_type":"array","is_nullable":false,"name":"parts"}},{{"field_type":"boolean","is_nullable":false,"name":"operable"}},{{"field_type":"unsigned","is_nullable":false,"name":"schema_version"}}],0,true,"memtx",1,""]),
 Insert(_pico_index, [{_pico_index},0,"_pico_index_id","tree",[{{"unique":true}}],[["table_id","unsigned",null,false,null],["id","unsigned",null,false,null]],true,0]),
 Insert(_pico_index, [{_pico_index},1,"_pico_index_name","tree",[{{"unique":true}}],[["name","string",null,false,null]],true,0]),
-Insert(_pico_table, [{_pico_peer_address},"_pico_peer_address",{{"Global":null}},[{{"field_type":"unsigned","is_nullable":false,"name":"raft_id"}},{{"field_type":"string","is_nullable":false,"name":"address"}}],0,true,"memtx",1,""]),
-Insert(_pico_index, [{_pico_peer_address},0,"_pico_peer_address_raft_id","tree",[{{"unique":true}}],[["raft_id","unsigned",null,false,null]],true,0]),
+Insert(_pico_table, [{_pico_peer_address},"_pico_peer_address",{{"Global":null}},[{{"field_type":"unsigned","is_nullable":false,"name":"raft_id"}},{{"field_type":"string","is_nullable":false,"name":"address"}},{{"field_type":"string","is_nullable":false,"name":"connection_type"}}],0,true,"memtx",1,""]),
+Insert(_pico_index, [{_pico_peer_address},0,"_pico_peer_address_raft_id","tree",[{{"unique":true}}],[["raft_id","unsigned",null,false,null],["connection_type","string",null,false,null]],true,0]),
 Insert(_pico_table, [{_pico_instance},"_pico_instance",{{"Global":null}},[{{"field_type":"string","is_nullable":false,"name":"name"}},{{"field_type":"string","is_nullable":false,"name":"uuid"}},{{"field_type":"unsigned","is_nullable":false,"name":"raft_id"}},{{"field_type":"string","is_nullable":false,"name":"replicaset_name"}},{{"field_type":"string","is_nullable":false,"name":"replicaset_uuid"}},{{"field_type":"array","is_nullable":false,"name":"current_state"}},{{"field_type":"array","is_nullable":false,"name":"target_state"}},{{"field_type":"map","is_nullable":false,"name":"failure_domain"}},{{"field_type":"string","is_nullable":false,"name":"tier"}},{{"field_type":"string","is_nullable":false,"name":"picodata_version"}}],0,true,"memtx",1,""]),
 Insert(_pico_index, [{_pico_instance},0,"_pico_instance_name","tree",[{{"unique":true}}],[["name","string",null,false,null]],true,0]),
 Insert(_pico_index, [{_pico_instance},1,"_pico_instance_uuid","tree",[{{"unique":true}}],[["uuid","string",null,false,null]],true,0]),
@@ -455,6 +456,7 @@ Update(_pico_tier, ["default"], [["=","target_vshard_config_version",1]])
 +-----+----+--------+
 """.format(  # noqa: E501
         p=instance.port,
+        pg_port=instance.pg_port,
         b="{}",
         i1_uuid=instance.uuid(),
         r1_uuid=instance.replicaset_uuid(),
@@ -798,3 +800,15 @@ def test_pico_service_password_security_warning(cluster: Cluster):
     i1.start()
     i1.wait_online()
     assert not lc.matched
+
+
+def test_peer_connection_type(cluster: Cluster):
+    i1, _ = cluster.deploy(instance_count=2)
+
+    iproto = "iproto"
+    pgproto = "pgproto"
+    addr_list = i1.sql(""" SELECT address FROM _pico_peer_address WHERE connection_type = ? """, iproto)
+    pg_addr_list = i1.sql(""" SELECT address FROM _pico_peer_address WHERE connection_type = ? """, pgproto)
+
+    # each instance has one iproto address and one pg adress, their length should be equal
+    assert len(addr_list) == len(pg_addr_list)
diff --git a/test/int/test_joining.py b/test/int/test_joining.py
index 0f0050ce60..01bb2fe8c7 100644
--- a/test/int/test_joining.py
+++ b/test/int/test_joining.py
@@ -36,6 +36,7 @@ def raft_join(
     # invalid address format to eliminate blocking DNS requests.
     # See https://git.picodata.io/picodata/picodata/tarantool-module/-/issues/81
     address = f"nowhere/{instance_name}"
+    pg_address = f"pg_nowhere/{instance_name}"
     picodata_version = instance.call(".proc_version_info")["picodata_version"]
     return instance.call(
         ".proc_raft_join",
@@ -43,6 +44,7 @@ def raft_join(
         instance_name,
         replicaset_name,
         address,
+        pg_address,
         failure_domain,
         instance.tier if instance.tier is not None else "default",
         picodata_version,
diff --git a/test/int/test_sql.py b/test/int/test_sql.py
index 6f8c2c16dd..cecd32ee19 100644
--- a/test/int/test_sql.py
+++ b/test/int/test_sql.py
@@ -4523,6 +4523,7 @@ def test_order_by(cluster: Cluster):
                             PI.tier
                         FROM _pico_peer_address PA
                         JOIN _pico_instance PI ON PA.raft_id = PI.raft_id
+                        WHERE connection_type = 'iproto'
                         ORDER BY instance_name
  """
     )
-- 
GitLab