diff --git a/core/fiber.c b/core/fiber.c index 1e9b74136eb0da590d350a7f3b0446bb20bb1618..328e4d0253b6f5dee10c6e2c598c96ad76d2ef89 100644 --- a/core/fiber.c +++ b/core/fiber.c @@ -109,6 +109,24 @@ fiber_call(struct fiber *callee) coro_transfer(&caller->coro.ctx, &callee->coro.ctx); } +void +fiber_raise(struct fiber *callee, jmp_buf exc, int value) +{ + struct fiber *caller = fiber; + + assert(sp - call_stack < 8); + assert(caller); + + fiber = callee; + *sp++ = caller; + +#if CORO_ASM + save_rbp(&caller->rbp); +#endif + callee->csw++; + coro_save_and_longjmp(&caller->coro.ctx, exc, value); +} + void yield(void) { diff --git a/core/log_io_remote.c b/core/log_io_remote.c index fc8713702ef8ff31a3d0abf353f6a8565dd87910..e3d347f96f7d793ad3d0a3055f331f71f6827cda 100644 --- a/core/log_io_remote.c +++ b/core/log_io_remote.c @@ -38,11 +38,6 @@ #include <log_io.h> #include <pickle.h> -struct remote_state { - struct recovery_state *r; - int (*handler) (struct recovery_state * r, struct tbuf *row); -}; - static u32 row_v11_len(struct tbuf *r) { @@ -137,8 +132,17 @@ pull_from_remote(void *state) struct remote_state *h = state; struct tbuf *row; - if (setjmp(fiber->exc) != 0) - fiber_close(); + switch (setjmp(fiber->exc)) { + case 0: + break; + + case FIBER_EXIT: + fiber_close(); + return; + + default: + fiber_close(); + } for (;;) { row = remote_read_row(h->r->confirmed_lsn + 1); diff --git a/include/fiber.h b/include/fiber.h index 63e21ad58a2a8441908b3048eecf392cc76cd206..eb68fc13db93d9f47382b6fe697a1f89822989d1 100644 --- a/include/fiber.h +++ b/include/fiber.h @@ -40,6 +40,8 @@ #include <say.h> #include <coro.h> +#define FIBER_EXIT -1 + struct msg { uint32_t sender_fid; struct tbuf *msg; @@ -156,6 +158,7 @@ ssize_t fiber_flush_output(void); void fiber_cleanup(void); void fiber_gc(void); void fiber_call(struct fiber *callee); +void fiber_raise(struct fiber *callee, jmp_buf exc, int value); int fiber_connect(struct sockaddr_in *addr); void fiber_sleep(ev_tstamp s); void fiber_info(struct tbuf *out); diff --git a/include/log_io.h b/include/log_io.h index 58df0173457282de1e1eb06a826225f541a71da5..25e7246f16cc9ffa8f559e75806b10883623ae1b 100644 --- a/include/log_io.h +++ b/include/log_io.h @@ -96,6 +96,11 @@ struct recovery_state { void *data; }; +struct remote_state { + struct recovery_state *r; + int (*handler) (struct recovery_state * r, struct tbuf *row); +}; + struct wal_write_request { i64 lsn; u32 len; diff --git a/third_party/coro/coro.c b/third_party/coro/coro.c index 43c8824d625fdee6cfd5cef937dd3a1a4078a412..3546628c1957af4d08769a67e203e459affa29ff 100644 --- a/third_party/coro/coro.c +++ b/third_party/coro/coro.c @@ -150,6 +150,40 @@ trampoline (int sig) #endif "\tret\n" ); + asm ( + ".text\n" + ".globl coro_save_and_longjmp\n" + ".type coro_save_and_longjmp, @function\n" + "coro_save_and_longjmp:\n" + #if __amd64 + #define NUM_SAVED 6 + "\tpush %rbp\n" + "\tpush %rbx\n" + "\tpush %r12\n" + "\tpush %r13\n" + "\tpush %r14\n" + "\tpush %r15\n" + "\tmov %rsp, (%rdi)\n" + "\tmovq %rsi, %rdi\n" + "\tsubq $8, %rsp\n" + "\tmovl %edx, %esi\n" + "\tcall longjmp\n" + #elif __i386 + #define NUM_SAVED 4 + "\tpush %ebp\n" + "\tpush %ebx\n" + "\tpush %esi\n" + "\tpush %edi\n" + "\tmov %esp, (%eax)\n" + "\tsubl $0x28,%esp\n" + "\tmovl %ecx,0x4(%esp)\n" + "\tmovl %edx,(%esp)\n" + "\tcall longjmp\n" + #else + #error unsupported architecture + #endif + "\tret\n" + ); # endif diff --git a/third_party/coro/coro.h b/third_party/coro/coro.h index bc84c0a0985c0a8a507325079b38cf54e27b7a21..e931718a14c7206ed9fb4f7ff64285b7a9797c09 100644 --- a/third_party/coro/coro.h +++ b/third_party/coro/coro.h @@ -262,16 +262,21 @@ struct coro_context { }; # define coro_transfer(p,n) do { if (!coro_setjmp ((p)->env)) coro_longjmp ((n)->env); } while (0) +# define coro_save_and_longjmp(p,j,v) do { if (!coro_setjmp ((p)->env)) longjmp (j,v); } while (0) # define coro_destroy(ctx) (void *)(ctx) #elif CORO_ASM +# include <setjmp.h> /* for jmp_buf */ + struct coro_context { void **sp; /* must be at offset 0 */ }; void __attribute__ ((noinline, regparm(2))) coro_transfer (coro_context *prev, coro_context *next); +void __attribute__ ((noinline, regparm(3))) +coro_save_and_longjmp (coro_context *prev, jmp_buf jump, int value); # define coro_destroy(ctx) (void *)(ctx)