Skip to content
Snippets Groups Projects
Commit 3cb2052c authored by Yaroslav Dynnikov's avatar Yaroslav Dynnikov
Browse files

chore: name pool workers fibers

parent c84d3342
No related branches found
No related tags found
1 merge request!374Name fibers for easier debugging
......@@ -52,29 +52,44 @@ type Callback = Box<dyn FnOnce(::tarantool::Result<RawByteBuf>)>;
type Queue = Mailbox<(Callback, &'static str, TupleBuffer)>;
pub struct PoolWorker {
id: RaftId,
// Despite instances are usually identified by `instance_id` in
// picodata, raft commutication relies on `raft_id`, so it is
// primary for worker.
raft_id: RaftId,
// Store instance_id for the debugging purposes only.
instance_id: InstanceId,
inbox: Queue,
fiber: fiber::LuaUnitJoinHandle<'static>,
fiber: fiber::UnitJoinHandle<'static>,
cond: Rc<fiber::Cond>,
stop_flag: Rc<Cell<bool>>,
handler_name: &'static str,
}
impl PoolWorker {
pub fn run(id: RaftId, storage: PeerStorage, opts: WorkerOptions) -> PoolWorker {
pub fn run(
raft_id: RaftId,
instance_id: InstanceId,
storage: PeerStorage,
opts: WorkerOptions,
) -> PoolWorker {
let cond = Rc::new(fiber::Cond::new());
let inbox = Mailbox::with_cond(cond.clone());
let stop_flag = Rc::new(Cell::default());
let handler_name = opts.handler_name;
let fiber = fiber::defer_proc({
let cond = cond.clone();
let inbox = inbox.clone();
let stop_flag = stop_flag.clone();
move || Self::worker_loop(id, storage, cond, inbox, stop_flag, &opts)
});
let fiber = fiber::Builder::new()
.name(format!("to:{instance_id}"))
.proc({
let cond = cond.clone();
let inbox = inbox.clone();
let stop_flag = stop_flag.clone();
move || Self::worker_loop(raft_id, storage, cond, inbox, stop_flag, &opts)
})
.start()
.unwrap();
Self {
id,
raft_id,
instance_id,
fiber,
cond,
inbox,
......@@ -299,7 +314,10 @@ impl PoolWorker {
impl std::fmt::Debug for PoolWorker {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PoolWorker").field("id", &self.id).finish()
f.debug_struct("PoolWorker")
.field("raft_id", &self.raft_id)
.field("instance_id", &self.instance_id)
.finish()
}
}
......@@ -379,9 +397,12 @@ impl ConnectionPool {
.storage
.peer_field::<peer_field::InstanceId>(&raft_id)
.map_err(|_| Error::NoPeerWithRaftId(raft_id))?;
let storage = self.storage.clone();
let worker_options = self.worker_options.clone();
let worker = PoolWorker::run(raft_id, storage, worker_options);
let worker = PoolWorker::run(
raft_id,
instance_id.clone(),
self.storage.clone(),
self.worker_options.clone(),
);
self.raft_ids.insert(instance_id, raft_id);
Ok(entry.insert(worker))
}
......@@ -403,8 +424,12 @@ impl ConnectionPool {
.storage
.peer_field::<peer_field::RaftId>(instance_id)
.map_err(|_| Error::NoPeerWithInstanceId(instance_id.clone()))?;
let worker =
PoolWorker::run(raft_id, self.storage.clone(), self.worker_options.clone());
let worker = PoolWorker::run(
raft_id,
instance_id.clone(),
self.storage.clone(),
self.worker_options.clone(),
);
entry.insert(raft_id);
Ok(self.workers.entry(raft_id).or_insert(worker))
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment