diff --git a/README.md b/README.md index 9258dd5..993851e 100644 --- a/README.md +++ b/README.md @@ -899,6 +899,45 @@ main(); ``` ![img.png](images/unaryTest.png) + +## Specifications + +### Throwing Timeouts + +By default, a timeout will not cause an RPC call to automatically throw, this must be manually done by the handler when it receives the abort signal from `ctx.signal`. An example of this is like so: + +```ts +class TestMethod extends UnaryHandler { + public handle = async ( + input: JSONValue, + cancel: (reason?: any) => void, + meta: Record | undefined, + ctx: ContextTimed, + ): Promise => { + const abortProm = utils.promise(); + ctx.signal.addEventListener('abort', () => { + resolveCtxP(ctx); + abortProm.resolveP(ctx.signal.reason); + }); + throw await abortProm.p; + }; +} +``` + +### Timeout Middleware + +The `timeoutMiddleware` sets an RPCServer's timeout based on the lowest timeout between the Client and the Server. This is so that handlers can eagerly time out and stop processing as soon as it is known that the client has timed out. + +This case can be seen in the first diagram, where the server is able to stop the processing of the handler, and close the associated stream of the RPC call based on the shorter timeout sent by the client: + +![RPCServer sets timeout based on RPCClient](images/timeoutMiddlewareClientTimeout.svg) + +Where the `RPCClient` sends a timeout that is longer than that set on the `RPCServer`, it will be rejected. This is as the timeout of the client should never be expected to exceed that of the server, so that the server's timeout is an absolute limit. + +![RPCServer rejects longer timeout sent by RPCClient](images/timeoutMiddlewareServerTimeout.svg) + +The `timeoutMiddleware` is enabled by default, and uses the `.metadata.timeout` property on a JSON-RPC request object for the client to send it's timeout. + ## Development Run `nix-shell`, and once you're inside, you can use: diff --git a/images/timeoutMiddlewareClientTimeout.svg b/images/timeoutMiddlewareClientTimeout.svg new file mode 100644 index 0000000..f15e650 --- /dev/null +++ b/images/timeoutMiddlewareClientTimeout.svg @@ -0,0 +1,17 @@ + + + eyJ2ZXJzaW9uIjoiMSIsImVuY29kaW5nIjoiYnN0cmluZyIsImNvbXByZXNzZWQiOnRydWUsImVuY29kZWQiOiJ4nO1cXGtX4shcdTAwMTb93r+C63y5d63pmno/5ptcdTAwMGZcdTAwMWNpn+2jsfveWb1cIkSIhFx1MDAwNENcdTAwMTRwVv/3e1xuhYQkIIraOEP6sSRVIfU4e+9zTlX514dSaS1cdTAwMWV03LXfS2tuv+b4Xj1yemu/2vu3btT1wlx1MDAwMIro8HM3vIlqw5rNOO50f//tt+RcdFRcdTAwMGLb90+5vtt2g7hcdTAwMGL1/lx1MDAwYp9Lpb+G/6feXHUwMDEzubXYXHRcdTAwMWG+O3xgWJS8ilOdvXtcdTAwMTBcdTAwMDbD11x1MDAxYSypVnhcXOx1t+BlsVuHskvH77pJib21tl5ePz51r/f7l9dOMPh83j+4pF7yzkvP90/igT9sUzeEfiRl3Thcbltu1avHzVH3U/enPVx1MDAxNYU3jWbgdm3fyfhu2HFqXjyw93DS+PtcdTAwMDH4vZTc6cMnZpBcdTAwMTCUXHUwMDEyo1x1MDAxNKZEXGJcIsbF9lx1MDAwYj5cdTAwMTJBXHUwMDE4UoJcdTAwMWGKXHUwMDE11ZSZTMs2Qz+MbMt+Ia79k7Ttwqm1XHUwMDFh0MCgPq5cdTAwMTNHTtDtOFx1MDAxMcxWUq836jPTiKUuKsdVmq7XaMa2PzxpfddcdTAwMWTOXHUwMDAzkVx1MDAxYXMtXHUwMDE4TcbMvrVTqVx1MDAwZlxy4s9k9COn7VbsI8GN76eHMKg/XGbhyHBcdTAwMTLTYVx1MDAwZnd+JN2y9ctZk0ubXcog9su1fUJcdTAwMGbVLjvyTXOnsXHgOJ/GXZ+wUSeKwt7auOTHr8XfO6pcdTAwMWW7/Xjyi+7f2Lz+cnJS/n7ufFx1MDAxNOWr7ajt3e6r5uNfe/9w2Vx1MDAwYvt78V10XHUwMDE2Ylx1MDAxNW1Fe/HtXHUwMDFmzf58zX34KVx1MDAxOe6bTt2JXHUwMDFmZshowFx1MDAxMcdcXHE2Lve9oJWdXHUwMDBiP6y1XHUwMDEycH1INThcdTAwMDfpif6n0MyMmoZmXCJcdTAwMTiXRNGUYT1cdTAwMDbo4tFcXGpAK4kk0VorI6mSXHUwMDA0Z/HMpX5cdTAwMWI8XHUwMDEzyVx1MDAxMTFwScM15Sl8juEs8mjWWFx1MDAxMU6Z0IujeaIgXHUwMDBi24nCxW01aVdcdTAwMTjEJ96dnVx1MDAxN4on7m47bc9cdTAwMWZMzOvQiGFcdTAwMWM3fVx1MDAwZpr1v+Dfp17bXHJv4uE8t7v/WZuoue57XHJr4ms1qOxGXHUwMDEz1lx1MDAxZnugi+Ncbm2vXk+LXVxyXu94gVx1MDAxYlXmkakw8lx1MDAxYV7g+Kdzt1x1MDAwZcbH3Vx1MDAxOXM0olwiZTRd15ZanWUzXHUwMDAxPVujXHUwMDA1yd5cdTAwMWSjWmEtOFx1MDAwM6WeXHUwMDFi1Vx1MDAwN8fXzV1nvbI3wKHfXFzfb5+pO7HkqMZcdTAwMDIppaXmWmLNZUJyXHUwMDBmKs0w4lx1MDAxYcDDMDZcblx1MDAxYpZp2U+XaYqZ0UqaXHUwMDE05P/JMu1q9+Drpjb9XouY1q709/qVL/PK9M632qAuW3pHN+rlO7F1cVlVvfcl01xc0KmAJlx1MDAxNFx1MDAxM6oxmd/vLlx1MDAxZc3lXHUwMDA2NGdIcMFcdTAwMDTGmEjGczKtxVx1MDAxYlx1MDAwMZpIijS0XHUwMDAyMyyIXHUwMDEySufxnNdpXHUwMDBlT1BoPf8nyfSJXHUwMDFiga1OXGKheEWZfkSmsjL9eOtcdTAwMTaR6XlcdTAwMDKEXGbrTNg7XHUwMDExXG5cdTAwMDJNoZk0hEgsU2Z2b/BMXG5EJZOUYm0kpypnqMxGqlxuKirAXHUwMDA1XHUwMDE2mLhcdTAwMDCTXHUwMDAyY6VcdTAwMWFJKrXiUlx1MDAwMLCImYHEl8LSXHUwMDBiMEzd6TbdZ1LMXHUwMDBiKinNKenIM6dcdTAwMDRoXHUwMDE5Qlx1MDAwNpk26FG6hpvs3Vx1MDAxMZ2DM0+1TS7MweZvhf5O6GW9guSnUmJcdTAwMWbDXHUwMDBm45///LWw9seZdmmvnEUmX5nTY9/pxpthu+3F0Ncj285sn7qxXHUwMDEzxVx1MDAxYl5Q94LG5Fx1MDAxND4kwuZx8odkV7uxY4CRVVx1MDAxOGG0kGT412CdqtZwOlBJg1x1MDAxOFx1MDAxOVxuw860gMCS5KzEXHLqSZvy7V23jNB0nZw1wnPZslx1MDAxOfwz2/OZxT9cdTAwMTCBXCJcdTAwMDPdZJJcdTAwMTipXHUwMDA1TuzmgX5cdTAwMTRFXHUwMDEwL1x1MDAxYow1xJ+pLNiKfZaCfVxiN1hKzXK5WvtcdTAwMTSdXHUwMDFlXHUwMDFkSlx1MDAwNXFcdTAwMDdMaDJVK/p5XHUwMDBi+pntvKTo56PlXHUwMDFmzJkyhDFsQGbA96epivdcdTAwMDRENKKEXHUwMDEwRYlcdTAwMTHccNCUn0RBx73TmlPnjfLdYbNnPuGjO+9LOJ9cdTAwMGIkgUWBRDmXkkFXWI6DMEbAwlx1MDAxNOhYXHRiUlx1MDAxObNcdTAwMTFcdEnOXHUwMDExVpxQRjhcdTAwMTdYXHUwMDE3xN6zXHUwMDAyjyVcIpxcdTAwMDVcdTAwMDKqt+BcdTAwMWIlJYReJFx1MDAxNYMmdEPyLtCYbriBXHRcdTAwMTZUPIdupqQoXHUwMDFlSyV8LVfONq+/qb0yo43jvW5nI7imr1x1MDAxYvK/XHUwMDBlk0017szTi5NWXHUwMDE2/i9BXHUwMDFigFx1MDAxNO9cdTAwMWVxM8ijeK5y5DGZNbHcwYlARjLGsVx1MDAwMFx1MDAwN0wokc1cdTAwMTcwwlx1MDAxMJvJXHUwMDFkYJuI2IxcdTAwMWRcdTAwMDE/TlNNXG5cdTAwMTLyqZBwxVx1MDAxZaVcdTAwMTl5ipFbXHUwMDAy/lwiXHUwMDA3ReW5bJZ1XHUwMDA1p9NcdTAwMDTjSnOh51mIeiunZJTkmEzyp2btqVx1MDAxOZPn50FS2Vxikvh7mezIIzqczY5M61YuJzJcdTAwMDO9cfss7le2XHUwMDA0XHUwMDE53JxtyYObi/PD+vZcXNKvjEHKMFx1MDAwNdGzUlx1MDAxNKKoSfRqw1x1MDAxMSNcdTAwMTCbcM1cdIZ/eeFcdTAwMDf/h1x1MDAxOUWFUEBcdTAwMDPpKV5cdP/LXHUwMDA2XHUwMDFhmFGQIMqL0lx1MDAxY1xmT1x1MDAwZjSYwVx1MDAxNEJccvOGym86V5Xm5Wdvt/fJ51x1MDAwNydb14pcdTAwMWZcdTAwMWa9R+X/ONW8M4+/Y+kvnqx5pJ9JxChcdTAwMDZ6oHaXXHUwMDBippm1P4iLkZ5JXHUwMDFlcFx1MDAxN2LEVIY/qbJS/ucqP5bCXHUwMDE4iOJcblx1MDAxM1x1MDAxMphn7yabUFx1MDAwNDhxMu2gLZf210uHN/G/llf3XHUwMDFmXHUwMDEx4Vwi3c926Sma3zo9v8TVr41rWY33vt3uVredznVcdTAwMWW2w2ZPwJYwjexSpjbKXHUwMDAw4jBOMoH3mi9cdTAwMTSCUFx1MDAxMbxcdTAwMDFcYnlcdTAwMTQ2SfFcYrZcdTAwMTBcdTAwMGVcdTAwMDFcdTAwMTlKKShcdTAwMDZj41xmXHUwMDE3pVx1MDAxYlx1MDAxOeJcbnDNwfU3XHUwMDFjp1Jcbv9cdTAwMDRcdTAwMTQnst5v7Fx1MDAxZrk7p/xL0O95xlx1MDAxYsS7zqk7ksLX8lx1MDAxNaRcdTAwMDTBolJcdTAwMTQlXHRcblx1MDAxY4hcdTAwMTFcdTAwMDUwqYA2RMrEfz5cdTAwMDO8qJxPN1xce+VMdtnl/ZGEYLB/XHUwMDE1ft9p0aB21Ypx6G2V9c58XGahICDHWEiiJLbLZInFPDBcdTAwMDRFSnK75EZs7jYv7FxcXCKloUByQ4FJXHUwMDEyXG5cdTAwMTnzw9RcdTAwMWErenh1emCCaVx0kV9cdTAwMTE9XHUwMDAwOqbRgzaAZkDN33fFYppR2itb9s7JXHUwMDAxX3WqXHUwMDFiZ6fOac/boVx1MDAxN3d7tGq2XHUwMDBiXHUwMDE2LPPkoDFG4CVcdTAwMDJcdTAwMDdcdTAwMDBTXHUwMDAyPWS5QVx1MDAwMndIYzinnCuGV97DXCL04EpPb3bqt1efN1x1MDAwN9y9qlx1MDAwNF/Xo9Zr01x1MDAwM9GaXHUwMDEyYrgpiiBcdTAwMTienjykRCmB+VJFXHUwMDEwK//h2Vx1MDAxNNGilYa3sYnPy61B3I/j1l7wrTlcdTAwMTdFcIKkJoxyKpU1iixFcCSZXHUwMDA2+4IohGu5clx1MDAxZt5cdTAwMTc/MLByJaQqclx1MDAxZviMJUiCNbXbRpdpcWHlPzyTXHUwMDFjKuefXHUwMDFj/2h/z/f8iH6/Ozwk+1tcdTAwMDXBRcGSg6EwXHUwMDEylHAuqcTwIbffUlx1MDAxMKRccnhcdTAwMTbCcJ3a8pVkXHJcdTAwMTFhUkvNXHUwMDE0fI0kXHUwMDEyM1x1MDAxN54q4Fxiblx1MDAxMMNgdVx1MDAwMioytkokvlxi+lxyxFx1MDAwZVx1MDAwMk/Z76Sm7raUIIqcqrn2zr/QKsRldZ1s3tKtavekWv18cuFv7bVaT1+FSNzTn8Yrsy3eXjlbX3Z6mWtponhcdTAwMDZzJJNfmtBGgy9mlDJcXILNseymXHUwMDA0Qlx1MDAwNNJcdTAwMWF8OEZcdTAwMDSWJlx1MDAxNfOOSEZIxLXdlCC0ktwuYeRcdGbFKE9bmrArQTajXFxEXHUwMDFkeuqxXHUwMDFiXG5WXHJhJn9cdTAwMTZ1zOE3PFx1MDAwYt+jhYkl34vwiEpn1yRcdTAwMTbYgtDtd7zO+fppJVx1MDAxNFXeaVx1MDAxZcp6d5fMt/tcdTAwMTAmXHUwMDE3XHUwMDE5aijDlCpAbmJcdCOHgCFcbqTGNYZygGtcdTAwMGWsyVCPsam0QEZpZlx1MDAxNFx1MDAwN1x1MDAwMuApuVpBdVx1MDAwMddcdTAwMWZzXGZ+PCvMXHUwMDFjXG6WvTtcdTAwMTZ/aKdcdTAwMTbPSlx1MDAxYz5T+1XvvOmpyHRa9V5YOf60qU+ci/eo/Vx1MDAxM7VzRv23UPniuZpD5Vx0JuD1aIhFOTFcbpPUdN1vQFBcdTAwMTQpxond20ypzlx1MDAxZp1cdTAwMTBcdTAwMDJhK/JcdTAwMTTbpVx1MDAwZSVcbk5cdTAwMTiuNP6J21x1MDAwZuxcdTAwMTk6SVnhNiUyXeVcdTAwMTW3u89eLTmwmMiLpVx1MDAxNvlHpDcn8uLZXCL/JdjuMG/nrnG88+2P5jXpb/s35/MgVVx1MDAxOIFcdTAwMDSE+1JQyidm+UHibXaESXvai1x1MDAwMpJZXHUwMDFlqlx1MDAxNKBcdTAwMGXMx2A2XHUwMDA1uIJEXHUwMDE0nDBYYfWJWGX2wFx1MDAxOda6XHUwMDEwq9NPTlx1MDAxMqBTZXfDvVx1MDAwMlZ1+mTa83dcdJe6blxcisOX2yvsu5fxXGbgxmHnaaid6NOUPcFTOvFcdTAwMTTEKtnZ3Tg6XHUwMDAzXHUwMDFkP1x1MDAxNkHUXHUwMDFlNK88XnAoKI9YyTRSVFJjUz2Cm0xcdTAwMDRtOLFbXHUwMDA0jOCY2tXAvE9OpEQgXHUwMDA2UmpQZqKIXonrwoCVylx1MDAxZeMhxbk3NjXzLikmSstXkFbBXGZeaGPf/e9cdTAwMDYoxcPNcGDy71x1MDAwZarTO/BcdTAwMTSY9ncxLzdqjS/NwffKMVx0vt1cXFU/zlx1MDAwNVNhXHUwMDEwXHUwMDA218lcdTAwMTBAXHUwMDE5XHUwMDA2rE3CXHUwMDE0sIdcZrAqsesjWFx1MDAxN+zAZVx1MDAxOGG2ktWXPXsjIDYzsvBcdTAwMTdcdTAwMTJcYjxdVjk1jFx1MDAwM6G+/Po5xN50MV3d9MOuWzqJI9dpvzuQXHUwMDE2N/4pXHUwMDAwdWpm/bJRvjjwj9q+U78+/1x1MDAxY1c/zVx1MDAwM1BcdTAwMDNcdTAwMDBcdTAwMTVMXHUwMDFiXHUwMDAx8TpWXHUwMDE0Z1a7QECRkFx1MDAwMmadYGpPS+dcdTAwMTDKiN1sJ+2WTqPsKkqCxllcYiVIKGzPLFCp7DqaUFx1MDAxOdisIFtKIGtcdTAwMTNcdTAwMDBcdTAwMTiLXCJdJTOO1kjJXHRhZK7f8PZEZWXWrVpcdTAwMDSw1fDGr5eazq2biFPJiUsvXHUwMDE2zL4pgufszVx1MDAxNEh/eEiMrTmdzklcZiO9Nsp6rt16bm8jb/C/XFxcdTAwMGUvm7ZcdTAwMWFcdTAwMTKCNXZ3mCz98eHH/1x1MDAwMdb2tjIifQ== + + + + + Client(Timeout: 100ms)Server(Timeout: 150ms)Timeout: 100msTimed Out!100ms150msTimeout set to 100msServer timed outClose StreamWould have timed out at 150ms \ No newline at end of file diff --git a/images/timeoutMiddlewareServerTimeout.svg b/images/timeoutMiddlewareServerTimeout.svg new file mode 100644 index 0000000..ae9eeb1 --- /dev/null +++ b/images/timeoutMiddlewareServerTimeout.svg @@ -0,0 +1,17 @@ + + + eyJ2ZXJzaW9uIjoiMSIsImVuY29kaW5nIjoiYnN0cmluZyIsImNvbXByZXNzZWQiOnRydWUsImVuY29kZWQiOiJ4nO1cXGtz2khcdTAwMTb9nl/Bej7sTlWs6fcjXHUwMDFmtsrG4LdNTFx1MDAxY8femXLJIEBGIFx1MDAwMsJcdTAwMDZP5b/vbTBISFx1MDAwMlx1MDAwM4bEmUBSLlvdQv2459xzb3fr73eZzFbQbzlbXHUwMDFmMltOr2R7brltP269N9dcdTAwMWacdsf1m1BEXHUwMDA2f3f8brs0qFlcdTAwMGKCVufDXHUwMDFmf4R3WCW/MbzL8ZyG01xmOlDvf/B3JvP34GfkOW2nXHUwMDE02M2q51xmblx1MDAxOFx1MDAxNIWP4kjFr575zcFjMdOUcoVcdTAwMTFcdTAwMTnXcDt78LzAKUNxxfY6TlhiLm1cdTAwMWTvfz4s5ulDtveku7fnupH7ut1cdTAwMGZcdTAwMWZbcT2vXHUwMDE49L1Bszo+dCUs61x1MDAwNG2/7lxcueWgNlx1MDAxYYHI9Wl3tf1utdZ0Oqb7eHzVb9klN+iba1xija9cdTAwMGXH4EMmvNIzNVx1MDAwNEZcdTAwMTZcIkoyijlcdTAwMTMs/Fx1MDAxNnP/NuZcdTAwMThbUiEkKKWKRIqHXHLL+p7fNlxy+1xyO+Zf2LQ7u1SvQvua5XGdoG03Oy27XHLzXHUwMDE11ntcdTAwMWN1mSqLRj5EjKvUXHUwMDFjt1pcdTAwMGJcdTAwMDZTXHUwMDEyNr7jXGamXHUwMDAxY0k4UYKHJeaprcPywCT+XG5cdTAwMDe/bTecQ3NLs+t50Vx1MDAxMWyWn0dwZDqh8dDnK9/Cbpn6ubjRRVxyL2JcdTAwMGanudIpJufymFx1MDAxNjxdO6juntn20bjrXHUwMDEzVmq32/7j1rjk2/tZ33tzr0mDo/xx8bBWL+vGfeP406dXfe+oeuD0gskvXHUwMDFhPvGgfZS/a9xl0Vx1MDAxOUbl6uVtr/Pw1Z63uVx1MDAxN8V69WRf5++vT89cdTAwMGUve1kn16g05mvu82/hNHZbZXtcYkAstFx1MDAxMkQxqpTm43LPbdbjc+z5pXqI2XeRXHUwMDA2J8hiov9cdTAwMTGeYFxuTeNcdC2YIFxcUjU3TaRcdTAwMGbmXHUwMDFip1x0Ji2pOUOKwE+iVIwnmFx1MDAxMN+HJ7AgXHUwMDE2PFx1MDAwNyGKOJZcXKokTfBcdTAwMDRLcMyohGl6PUdMXHUwMDE0xMlgojBpqYhcdTAwMDHLLmCpYbv8ZlB0n8y0XHUwMDEwNHE1bzdcXK8/Ma1cdTAwMDNcdTAwMTOGUcx6LjTrz+Z/PrlccsfvXHUwMDA2UIejRuf3rYmaO55bNVx1MDAwNr5VgspOe8L2XHUwMDAzXHUwMDE3/O24QsMtl6NOtFx1MDAwNI+33abTPpzH9/ltt+o2be/T3K2D8XFcdTAwMGXGzG9cdTAwMTFcdTAwMWUxmY5jSlxyLulMOM/2/XQqpinnSmEqQlx1MDAxZvRcdTAwMTKmi5dlXFw4P3m64rnr+ol+vMDty97bxjQhUsCwclx1MDAwMphmnMjwyc+unyiLw0/MXHUwMDA11lx1MDAxMmP21nw/XHUwMDAz0qE42rB/vuvPn330XHUwMDFhh1xc79zmu9tcdTAwMWR+sqvx9ce1uv6j3fPWaSF/d1x1MDAxOewwXHUwMDEy2PWgkK98WcxFL0p8S7loTsU0OIOGl1x1MDAxMktF2dx4Tu/128YzxdSiXHUwMDA0KynA2ylwjnFcdTAwMWat6HdcdTAwMDI0yFx1MDAwNVx1MDAwYmv4XGLNgFxcXGJJ4jnppCUjXHUwMDEwjVxi8kt56aLTXHUwMDA2Y53wg2iNXvpcdTAwMDU3XHUwMDE199Ivt+41XnqeYCbGZpOiVIKHQpRQgYhcdTAwMTbml5jBU0Etolx1MDAxMeBBgCkyplx1MDAxMoZcboBcdTAwMDDPXHUwMDA3XHUwMDE1pVx1MDAwMlHJXHUwMDExdrYjsFxijVx1MDAxNVx1MDAxZSSIgChZcEFcdTAwMTnWM6C4KiytgGLKdqfmLMkxK3SlJOFKR7JcXIAu14IkiNvYXGaSU+mcXG5cZpOOZdja6XT+vdDf8t24LFxif8uE9jH4Y/z7X+9Ta2/PtEvzSVhk+JVcdIfs2Z0g6zdcdTAwMWFuXHUwMDAwfS2Ydsb71Fx07Haw6zbLbrM6OYXP+bV5NP6A7EpdM1x1MDAwNshCSEuuXHUwMDE1zLD5L3WkUtVuQVx1MDAxNWUxpVx0XGY6VVx1MDAxY0xcdTAwMDAnbMRplsNcdTAwMTYlW7tj+KDm2Fx0W4T74mUz2Ge2nprFPoRqboG1IK65wqCl4+QjkUVcdTAwMTFcdTAwMThcdTAwMTRl4Fx1MDAwMXFozVx1MDAxYu55XHUwMDBi3KM4U0qlMVx1MDAwZlxyrTXBPJwwKVx1MDAxMXDFhnm+K/PM1i1cdTAwMTHm2TbUXHUwMDAzmJNcdTAwMWFTijQmXGaD/OSRmkP6wcpcItgkj7HmTDNK1Fx1MDAwZlwioKdcdTAwMWLU+nh6XHUwMDFlXHUwMDFjod2Pt4F78fT0XHUwMDE5teeUP1x1MDAxMDZcdTAwMWKFLVx1MDAxNNhcdTAwMGVnJKSYIVx1MDAwM1x1MDAwMbtaSlHKIb7mWEesa0RBoIksJFx1MDAxOXhTzFx1MDAxOEcqJe6eXHUwMDE1dbwhunlFNPU92IaapVx1MDAxNS50XCLjNJjHXHUwMDE5a1BcXEEkh/A8geuc+YmX4v39gDw+trtuXHUwMDE2d4rdXHUwMDFl2surQ3293nh/PVQ21bhjd7+eteLwX1x1MDAwNW1cdTAwMDBS3CHiZpBH+lxcJchjMmUy4Fx1MDAwZVxy3KCR4Fx1MDAxNIONXHQp4ut+cN2iM7lcdTAwMDOodVwixo9cYpMxeUTCwVxye2Rm5ChGq5mgL1x1MDAxMKiSyEpFhCfoVF1iXHUwMDE2ZeA+uVxmTaxLlowyXHUwMDFj0Vx1MDAwNH9k0lx1MDAxNk2WLJ9cdTAwMDKJJFwicMi0scTIXHUwMDBibjieXHUwMDE4Se9UXCJcdTAwMTkyXHUwMDAzuntVfnVg7z3dOvv3u9VWnmR7lYc5XHUwMDAzXHUwMDBmiCyIRFx1MDAxNFx0XHUwMDA0QTCO5e0hqrIg5OBAfVx1MDAxY1x1MDAwM+uFxWOvXHUwMDBm4odqSTjEZYJGp3fj9Vfq9UFCa41cYk74d6NcYqavKCtcZreBYJhn9WlFTn+n3HNbj6S8V+idnvb3XHUwMDFih8fdW/IzOv3tqcZcdTAwMWS7/Sf2+umTNYfXJ1xiXHRcdTAwMGKGXHUwMDA1/D1cZo1cdTAwMDBxOUlcdTAwMWRcdTAwMWExS82kXHUwMDBluFxu8WFk9T2E2cbpL+v0lcRcdTAwMTJcdTAwMDDP0pJcdTAwMTFEJaTAiCaEXHUwMDAwrYDRXFyLWj/C6Zcz593gX2/X5b/ggdNcXH68S4s4/M97t1x1MDAwZv7Rg78vXHUwMDBmene5fOFLVd+laPVBsye1ujSgXHUwMDAzVVxiolxcXHUwMDExQnks06g4tyghYEWUT8iBXHUwMDExaCFcdTAwMGVcdTAwMDIqXHUwMDE0glx1MDAxYvxzRlFalpFazEhcblxiXHUwMDA2iGaIhrP/K2A4dOk7l7tcdTAwMWb3r5yTXCK5rLHK7Z3XzPcvRo5wXTpcdTAwMDHwXHUwMDA2dKojUX5EJ0RWtVx1MDAxMsvamHMqJf/HroNMt1xc80nY7Fv37i+kXHUwMDAyK9d33Ms+XHUwMDFkXHUwMDE2UaX28ODsXlx1MDAxNI+jsJrFXHUwMDEwTFtcXFx1MDAwMk9wzJhi8d15iiNw+1x1MDAxODw7IZJcdJlCXHUwMDExZvee4FgwTbgkKaH81Fx1MDAxYVx1MDAxYn5YOz+A5TOmOU+LI1x1MDAxOJvKXHUwMDBmXGZTpiXlb0kgrFbsTzNK84mX/eTk8Hl326596bBivvT1dP/Gr51XjrrzkFx1MDAwM6FcdTAwMTJbMG9UcqE0iu2RUEJaWFx1MDAwM3NcYsWZpMk030Y8zE9cdTAwMGVudc9cdTAwMTHXxTJ5XGaebm92zndumm26bnJcdTAwMDCQXHUwMDAz71Ol07NcZtPZgUusXHUwMDA1WSrL8FOww68lXHUwMDFl1KnjniG3ivzHQl5cbrl/fZS15+JcdTAwMDeGiYUhlGdywJaRePKZIYhcdTAwMDWDXGLBpoTRUmojXHUwMDFlfjp+UMoscKelXHUwMDE3OJpcdTAwMWVdSKlcdTAwMTChXG7/c/c6/ELyoVx1MDAxMFB1f5VcdTAwMTPbn75cdTAwMWNcdTAwMWTKR7vwcFkrJ+khbblcdTAwMDE0pJFcYlpowbhZXHUwMDEwnORcdTAwMDcjMi1NKFZcblx1MDAxNKqOnpdcdTAwMWNnXHItTFx1MDAwNShcZohVXHUwMDA3W3xcdTAwMTF14LZcdTAwMTSagG+iXGKDO4OKlG5cdTAwMTKJq8kucMkpkzo1u0Bx/OpcdTAwMTj/XHUwMDE4XFwmMIBcXFx1MDAwNv9LLkNcYvTUv+42c53LWu5+r36Qr599zS2+XGZcdTAwMTH29IdRy2yTN5+Esb91hplrbVwifVx1MDAwNlx1MDAxMzyTsjbBhLI0XHUwMDE3XHUwMDE0XHUwMDExZVx1MDAwZSmEXHUwMDFhYcgyXHUwMDE4U0sjKTWXwmzAS+6n5MJiylx1MDAxY1x1MDAxZuLmXHUwMDA0hGAqJVLZUMpia1x1MDAxM6CMXHTimKXuR1x1MDAxMFNXMLGiXHUwMDA05CRZU2yxXHUwMDE0vkdLXHUwMDEzg1x1MDAxM1xmb3dV4lx1MDAwNUdcdTAwMWRflYj1ZpFcdTAwMDWJ2ceyZ+48XHUwMDE0ypxcdTAwMDbWjFxuhlx1MDAxOVx1MDAwNkjGJVx1MDAwMTfPRkwjQUFFhlhcdTAwMWVhNVx1MDAxY+kxNKXilpaKaskkRixynnqD1FeofyWRQWJqdoDpXHUwMDE5R+ZcdTAwMTAhmi61uLik8691/du9fr3Sujo8oaWD3nlud/tmve9cdTAwMDJYj/OfqJ0w61W6+Vx1MDAxNVx1MDAxY9RcdTAwMDBlXGJcdTAwMTCV0ESCNdbhJuLMaKs0QVx1MDAxNrhjiIGk2VhNQFx1MDAxMSaM7XsqjHQzmUNhYCFcdTAwMTCwkpRcdTAwMTKZnY3xfVNaXCJLXG5MYFx1MDAxMNBgNTUpMLiFjMAgZtuVkDzl1Vx1MDAwM1x1MDAxYn2xoL7gSilGUrc+zFj5XHUwMDE0RFxuzpeKTF6QXHUwMDE3VImI2FlGXvA3LS9ecPpcdHnBl5ZcdTAwMTfo8uSytdMuoapT71dQ4S5Lvzbnilx1MDAwNEBSWFxipIXWSoG4SEk4XHUwMDAwXHUwMDEwqTRcdTAwMDc+TdykkkgljFnCQJVRjrjielx1MDAxM1xuvH6bXHUwMDEyloRQjFPPMFx1MDAxMDI9j6AhbNNC6NUvQ3JuiPw1aH3ex5tcdTAwMDE31e9k7CCzsujAcyrBXGbwXHUwMDA2fmsx5E70asp25KndWFx1MDAwNLdP5UK93Kw5jYPbo1x1MDAxZLn7xf10XZxzd6FcdTAwMTCW4IRrsFx1MDAxMlxi/3TiPJLkXHUwMDE2TFx1MDAxOdFMQlxmx3RcdTAwMTK3XHUwMDE0I4tcdTAwMDFswcg0Nlx1MDAxOcckbCObXTawnVx1MDAwN7aIULP8ldxHaMabTz/qSFx1MDAxOMhVTtdw1JGDaUTOwy5cdTAwMDHbPb/5b7B2J8hcdTAwMDTPllx1MDAxZvjDXHUwMDE3XGJl7M6fzdLg5ULjsv9CTfNcdTAwMWWD0YVcdTAwMWZcYu/IqsdcdTAwMDLwXlV3XHUwMDE3oYGr69xVzrvob3dv/Vx1MDAwM3lyf5o9qFx1MDAxY8wlsyUnlqBUasIxjr7LZyizqbZcdTAwMTjCyMRCiKnIS1tC5y0soJLBmqNmmKGUU4lcdTAwMWLnvejBXCJAMoNcdTAwMDAvzXlTPt15S2FU2Fp2XHRoXHUwMDEwektcdTAwMDXrI1x1MDAxNrhwSo774JSj9p75sfhezn2/3JFFkJs9/dz8LHKV4lx1MDAxNT3Ou/T44pjfePMhV1xm03ZMQ5BcdTAwMWPfXHUwMDA2wJWlQEyDi0aIXGJcdTAwMTXO3Fx1MDAxOLdcdTAwMWNcdTAwMGZcdTAwMTPwkoHPYTTy/qFccm6Xxy1cdTAwMTFmPSPNezMy9eAwwUhQXHUwMDAw2Vxcb6VcXNB7Y0EjIddcdTAwMTK4LbSdhlx1MDAxZHTbjtfPlDy/44BubTt246fD7ctcdTAwMWSZgtt3z1m/LbvVKlx1MDAwNjC+W6NcXN7Wg+s87iZccv23yuBjXHUwMDEyY1x1MDAwM9RcdTAwMWIjd1x1MDAwNrngb+++/Vx1MDAxZnWwXHUwMDAx4iJ9 + + + + + Client(Timeout: 150ms)Server(Timeout: 100ms)Timeout: 150Timed Out!100ms150msTimeout stays at 100msDon't set timeout to 150ms asclient timeout > server timeoutReceived server time outPrematurely close stream \ No newline at end of file diff --git a/src/RPCServer.ts b/src/RPCServer.ts index fbf9161..8226009 100644 --- a/src/RPCServer.ts +++ b/src/RPCServer.ts @@ -287,7 +287,9 @@ class RPCServer { // Input generator derived from the forward stream const inputGen = async function* (): AsyncIterable { for await (const data of forwardStream) { - ctx.timer.refresh(); + if (ctx.timer.status !== 'settled') { + ctx.timer.refresh(); + } yield data.params as I; } }; @@ -296,7 +298,9 @@ class RPCServer { timer: ctx.timer, }); for await (const response of handlerG) { - ctx.timer.refresh(); + if (ctx.timer.status !== 'settled') { + ctx.timer.refresh(); + } const responseMessage: JSONRPCResponseResult = { jsonrpc: '2.0', result: response, @@ -570,13 +574,16 @@ class RPCServer { } // Setting up Timeout logic const timeout = this.defaultTimeoutMap.get(method); - if (timeout != null && timeout < this.handlerTimeoutTime) { - // Reset timeout with new delay if it is less than the default - timer.reset(timeout); - } else { - // Otherwise refresh - timer.refresh(); + if (timer.status !== 'settled') { + if (timeout != null && timeout < this.handlerTimeoutTime) { + // Reset timeout with new delay if it is less than the default + timer.reset(timeout); + } else { + // Otherwise refresh + timer.refresh(); + } } + this.logger.info(`Handling stream with method (${method})`); let handlerResult: [JSONValue | undefined, ReadableStream]; const headerWriter = rpcStream.writable.getWriter(); diff --git a/src/errors.ts b/src/errors.ts index ae8958a..4a1fd4b 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -148,6 +148,13 @@ class ErrorRPCStreamEnded extends ErrorRPCProtocol { class ErrorRPCTimedOut extends ErrorRPCProtocol { static description = 'RPC handler has timed out'; code = JSONRPCErrorCode.RPCTimedOut; + public toJSON(): JSONRPCError { + const json = super.toJSON(); + if (typeof json === 'object' && !Array.isArray(json)) { + (json as POJO).type = this.constructor.name; + } + return json; + } } class ErrorUtilsUndefinedBehaviour extends ErrorRPCProtocol { diff --git a/src/middleware.ts b/src/middleware.ts index 25c12af..d7cbb0b 100644 --- a/src/middleware.ts +++ b/src/middleware.ts @@ -4,7 +4,11 @@ import type { JSONRPCResponse, JSONRPCResponseResult, MiddlewareFactory, + JSONValue, + JSONRPCRequestMetadata, + JSONRPCResponseMetadata, } from './types'; +import type { ContextTimed } from '@matrixai/contexts'; import { TransformStream } from 'stream/web'; import { JSONParser } from '@streamparser/json'; import * as utils from './utils'; @@ -75,6 +79,80 @@ function jsonMessageToBinaryStream(): TransformStream< }); } +function timeoutMiddlewareServer( + ctx: ContextTimed, + _cancel: (reason?: any) => void, + _meta: Record | undefined, +) { + const currentTimeout = ctx.timer.delay; + // Flags for tracking if the first message has been processed + let forwardFirst = true; + return { + forward: new TransformStream< + JSONRPCRequest, + JSONRPCRequest + >({ + transform: (chunk, controller) => { + controller.enqueue(chunk); + if (forwardFirst) { + forwardFirst = false; + let clientTimeout = chunk.metadata?.timeout; + if (clientTimeout === undefined) return; + if (clientTimeout === null) clientTimeout = Infinity; + if (clientTimeout < currentTimeout) ctx.timer.reset(clientTimeout); + } + }, + }), + reverse: new TransformStream< + JSONRPCResponse, + JSONRPCResponse + >({ + transform: (chunk, controller) => { + // Passthrough chunk, no need for server to send ctx.timeout + controller.enqueue(chunk); + }, + }), + }; +} + +/** + * This adds its own timeout to the forward metadata and updates it's timeout + * based on the reverse metadata. + * @param ctx + * @param _cancel + * @param _meta + */ +function timeoutMiddlewareClient( + ctx: ContextTimed, + _cancel: (reason?: any) => void, + _meta: Record | undefined, +) { + const currentTimeout = ctx.timer.delay; + // Flags for tracking if the first message has been processed + let forwardFirst = true; + return { + forward: new TransformStream({ + transform: (chunk, controller) => { + if (forwardFirst) { + forwardFirst = false; + if (chunk == null) chunk = { jsonrpc: '2.0', method: '' }; + if (chunk.metadata == null) chunk.metadata = {}; + (chunk.metadata as any).timeout = currentTimeout; + } + controller.enqueue(chunk); + }, + }), + reverse: new TransformStream< + JSONRPCResponse, + JSONRPCResponse + >({ + transform: (chunk, controller) => { + controller.enqueue(chunk); // Passthrough chunk, no need for client to set ctx.timeout + }, + }), + }; +} + /** * This function is a factory for creating a pass-through streamPair. It is used * as the default middleware for the middleware wrappers. @@ -116,12 +194,14 @@ function defaultServerMiddlewareWrapper( >(); const middleMiddleware = middlewareFactory(ctx, cancel, meta); + const timeoutMiddleware = timeoutMiddlewareServer(ctx, cancel, meta); - const forwardReadable = inputTransformStream.readable.pipeThrough( - middleMiddleware.forward, - ); // Usual middleware here + const forwardReadable = inputTransformStream.readable + .pipeThrough(timeoutMiddleware.forward) // Timeout middleware here + .pipeThrough(middleMiddleware.forward); // Usual middleware here const reverseReadable = outputTransformStream.readable .pipeThrough(middleMiddleware.reverse) // Usual middleware here + .pipeThrough(timeoutMiddleware.reverse) // Timeout middleware here .pipeThrough(jsonMessageToBinaryStream()); return { @@ -172,13 +252,15 @@ const defaultClientMiddlewareWrapper = ( JSONRPCRequest >(); + const timeoutMiddleware = timeoutMiddlewareClient(ctx, cancel, meta); const middleMiddleware = middleware(ctx, cancel, meta); const forwardReadable = inputTransformStream.readable + .pipeThrough(timeoutMiddleware.forward) .pipeThrough(middleMiddleware.forward) // Usual middleware here .pipeThrough(jsonMessageToBinaryStream()); - const reverseReadable = outputTransformStream.readable.pipeThrough( - middleMiddleware.reverse, - ); // Usual middleware here + const reverseReadable = outputTransformStream.readable + .pipeThrough(middleMiddleware.reverse) + .pipeThrough(timeoutMiddleware.reverse); // Usual middleware here return { forward: { @@ -196,6 +278,8 @@ const defaultClientMiddlewareWrapper = ( export { binaryToJsonMessageStream, jsonMessageToBinaryStream, + timeoutMiddlewareClient, + timeoutMiddlewareServer, defaultMiddleware, defaultServerMiddlewareWrapper, defaultClientMiddlewareWrapper, diff --git a/src/types.ts b/src/types.ts index e67a60a..904184a 100644 --- a/src/types.ts +++ b/src/types.ts @@ -38,7 +38,7 @@ type JSONRPCRequestMessage = { * SHOULD NOT contain fractional parts [2] */ id: string | number | null; -}; +} & JSONRPCRequestMetadata; /** * This is the JSON RPC notification object. this is used for a request that @@ -60,7 +60,7 @@ type JSONRPCRequestNotification = { * This member MAY be omitted. */ params?: T; -}; +} & JSONRPCRequestMetadata; /** * This is the JSON RPC response result object. It contains the response data for a @@ -84,7 +84,7 @@ type JSONRPCResponseResult = { * it MUST be Null. */ id: string | number | null; -}; +} & JSONRPCResponseMetadata; /** * This is the JSON RPC response Error object. It contains any errors that have @@ -110,6 +110,34 @@ type JSONRPCResponseError = { id: string | number | null; }; +/** + * Used when an empty object is needed. + * Defined here with a linter override to avoid a false positive. + */ +// eslint-disable-next-line +type ObjectEmpty = {}; + +// Prevent overwriting the metadata type with `Omit<>` +type JSONRPCRequestMetadata = ObjectEmpty> = + { + metadata?: { + [Key: string]: JSONValue; + } & Partial<{ + timeout: number | null; + }>; + } & Omit; + +// Prevent overwriting the metadata type with `Omit<>` +type JSONRPCResponseMetadata< + T extends Record = ObjectEmpty, +> = { + metadata?: { + [Key: string]: JSONValue; + } & Partial<{ + timeout: number | null; + }>; +} & Omit; + /** * This is a JSON RPC error object, it encodes the error data for the JSONRPCResponseError object. */ @@ -357,6 +385,8 @@ export type { JSONRPCRequestNotification, JSONRPCResponseResult, JSONRPCResponseError, + JSONRPCRequestMetadata, + JSONRPCResponseMetadata, JSONRPCError, JSONRPCRequest, JSONRPCResponse, diff --git a/src/utils.ts b/src/utils.ts index 82168db..6b0fedf 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -278,6 +278,7 @@ const standardErrors: { URIError, AggregateError, AbstractError, + ErrorRPCTimedOut: errors.ErrorRPCTimedOut, }; /** @@ -342,6 +343,7 @@ function toError( let e: Error; switch (eClass) { case AbstractError: + case errors.ErrorRPCTimedOut: e = eClass.fromJSON(errorData); break; case AggregateError: diff --git a/tests/RPC.test.ts b/tests/RPC.test.ts index c7937f8..849728b 100644 --- a/tests/RPC.test.ts +++ b/tests/RPC.test.ts @@ -870,7 +870,126 @@ describe('RPC', () => { }, { numRuns: 1 }, ); + test('RPC server times out using client timeout', async () => { + // Setup server and client communication pairs + const { clientPair, serverPair } = rpcTestUtils.createTapPairs< + Uint8Array, + Uint8Array + >(); + const { p: ctxP, resolveP: resolveCtxP } = utils.promise(); + class TestMethod extends UnaryHandler { + public handle = async ( + input: JSONValue, + cancel: (reason?: any) => void, + meta: Record | undefined, + ctx: ContextTimed, + ): Promise => { + const abortProm = utils.promise(); + ctx.signal.addEventListener('abort', () => { + resolveCtxP(ctx); + abortProm.resolveP(ctx.signal.reason); + }); + throw await abortProm.p; + }; + } + // Set up a client and server with matching timeout settings + const rpcServer = new RPCServer({ + logger, + idGen, + handlerTimeoutTime: 150, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + rpcServer.handleStream({ + ...serverPair, + cancel: () => {}, + }); + const rpcClient = new RPCClient({ + manifest: { + testMethod: new UnaryCaller(), + }, + streamFactory: async () => { + return { + ...clientPair, + cancel: () => {}, + }; + }, + logger, + idGen, + }); + await expect(rpcClient.methods.testMethod({}, { timer: 100 })).toReject(); + await expect(ctxP).resolves.toHaveProperty(['timer', 'delay'], 100); + + await rpcServer.stop({ force: true }); + }); + testProp( + 'RPC client times out and server is able to ignore exception', + [fc.string()], + async (message) => { + // Setup server and client communication pairs + const { clientPair, serverPair } = rpcTestUtils.createTapPairs< + Uint8Array, + Uint8Array + >(); + const { p: ctxP, resolveP: resolveCtxP } = utils.promise(); + class TestMethod extends UnaryHandler { + public handle = async ( + input: JSONValue, + cancel: (reason?: any) => void, + meta: Record | undefined, + ctx: ContextTimed, + ): Promise => { + const abortProm = utils.promise(); + ctx.signal.addEventListener('abort', () => { + resolveCtxP(ctx); + abortProm.resolveP(ctx.signal.reason); + }); + await abortProm.p; + return input; + }; + } + // Set up a client and server with matching timeout settings + const rpcServer = new RPCServer({ + logger, + idGen, + handlerTimeoutTime: 150, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + rpcServer.handleStream({ + ...serverPair, + cancel: () => {}, + }); + + const rpcClient = new RPCClient({ + manifest: { + testMethod: new UnaryCaller(), + }, + streamFactory: async () => { + return { + ...clientPair, + cancel: () => {}, + }; + }, + logger, + idGen, + }); + await expect( + rpcClient.methods.testMethod(message, { timer: 100 }), + ).resolves.toBe(message); + await expect(ctxP).resolves.toHaveProperty(['timer', 'delay'], 100); + + await rpcServer.stop({ force: true }); + }, + { numRuns: 1 }, + ); testProp( 'RPC Serializes and Deserializes Error', [rpcTestUtils.errorArb(rpcTestUtils.errorArb())], diff --git a/tests/RPCClient.test.ts b/tests/RPCClient.test.ts index d518616..59c3b13 100644 --- a/tests/RPCClient.test.ts +++ b/tests/RPCClient.test.ts @@ -123,18 +123,20 @@ describe(`${RPCClient.name}`, () => { } await writable.close(); - const expectedMessages: Array = messages.map((v) => { - const request: JSONRPCRequestMessage = { + const expectedMessages: Array = messages.map( + (v, i) => ({ jsonrpc: '2.0', method: methodName, id: null, ...(v.result === undefined ? {} : { params: v.result }), - }; - return request; - }); + ...(i === 0 ? { metadata: { timeout: null } } : {}), + }), + ); + const outputMessages = (await outputResult).map((v) => JSON.parse(v.toString()), ); + expect(outputMessages).toStrictEqual(expectedMessages); }); testProp( @@ -171,6 +173,9 @@ describe(`${RPCClient.name}`, () => { jsonrpc: '2.0', id: null, params, + metadata: { + timeout: null, + }, }), ); }, @@ -207,14 +212,16 @@ describe(`${RPCClient.name}`, () => { } await writer.close(); expect(await output).toStrictEqual(message.result); - const expectedOutput = params.map((v) => + const expectedOutput = params.map((v, i) => JSON.stringify({ method: methodName, jsonrpc: '2.0', id: null, params: v, + ...(i === 0 ? { metadata: { timeout: null } } : {}), }), ); + expect((await outputResult).map((v) => v.toString())).toStrictEqual( expectedOutput, ); @@ -249,6 +256,7 @@ describe(`${RPCClient.name}`, () => { jsonrpc: '2.0', id: null, params: params, + metadata: { timeout: null }, }), ); }, @@ -423,19 +431,19 @@ describe(`${RPCClient.name}`, () => { } const expectedMessages: Array = messages.map( - () => { - const request: JSONRPCRequestMessage = { - jsonrpc: '2.0', - method: methodName, - id: null, - params: 'one', - }; - return request; - }, + (_, i) => ({ + jsonrpc: '2.0', + method: methodName, + id: null, + params: 'one', + ...(i === 0 ? { metadata: { timeout: null } } : {}), + }), ); + const outputMessages = (await outputResult).map((v) => JSON.parse(v.toString()), ); + expect(outputMessages).toStrictEqual(expectedMessages); }, ); @@ -527,6 +535,7 @@ describe(`${RPCClient.name}`, () => { jsonrpc: '2.0', id: null, params, + metadata: { timeout: null }, }), ); }, @@ -562,12 +571,13 @@ describe(`${RPCClient.name}`, () => { } expect(await output).toStrictEqual(message.result); await writer.close(); - const expectedOutput = params.map((v) => + const expectedOutput = params.map((v, i) => JSON.stringify({ method: 'client', jsonrpc: '2.0', id: null, params: v, + ...(i === 0 ? { metadata: { timeout: null } } : {}), }), ); expect((await outputResult).map((v) => v.toString())).toStrictEqual( @@ -603,6 +613,7 @@ describe(`${RPCClient.name}`, () => { jsonrpc: '2.0', id: null, params: params, + metadata: { timeout: null }, }), ); }, diff --git a/tests/RPCServer.test.ts b/tests/RPCServer.test.ts index acb4ed1..bd8d22a 100644 --- a/tests/RPCServer.test.ts +++ b/tests/RPCServer.test.ts @@ -689,68 +689,75 @@ describe(`${RPCServer.name}`, () => { rpcServer.handleStream(readWriteStream); const out = await outputResult; expect(out.map((v) => v!.toString())).toStrictEqual( - messages.map(() => { - return JSON.stringify({ + messages.map(() => + JSON.stringify({ jsonrpc: '2.0', result: 1, id: null, - }); - }), - ); - await rpcServer.stop({ force: true }); - }); - testProp('reverse middlewares', [specificMessageArb], async (messages) => { - const stream = rpcTestUtils.messagesToReadableStream(messages); - class TestMethod extends DuplexHandler { - public handle = async function* ( - input: AsyncGenerator, - _cancel: (reason?: any) => void, - _meta: Record | undefined, - _ctx: ContextTimed, - ): AsyncGenerator { - yield* input; - }; - } - const middleware = rpcUtilsMiddleware.defaultServerMiddlewareWrapper(() => { - return { - forward: new TransformStream(), - reverse: new TransformStream({ - transform: (chunk, controller) => { - if ('result' in chunk) chunk.result = 1; - controller.enqueue(chunk); - }, }), - }; - }); - const rpcServer = new RPCServer({ - middlewareFactory: middleware, - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const readWriteStream: RPCStream = { - cancel: () => {}, - readable: stream, - writable: outputStream, - }; - rpcServer.handleStream(readWriteStream); - const out = await outputResult; - expect(out.map((v) => v!.toString())).toStrictEqual( - messages.map(() => { - return JSON.stringify({ - jsonrpc: '2.0', - result: 1, - id: null, - }); - }), + ), ); await rpcServer.stop({ force: true }); }); + testProp( + 'reverse middlewares', + [specificMessageArb], + async (messages) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + class TestMethod extends DuplexHandler { + public handle = async function* ( + input: AsyncGenerator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + _ctx: ContextTimed, + ): AsyncGenerator { + yield* input; + }; + } + const middleware = rpcUtilsMiddleware.defaultServerMiddlewareWrapper( + () => { + return { + forward: new TransformStream(), + reverse: new TransformStream({ + transform: (chunk, controller) => { + if ('result' in chunk) chunk.result = 1; + controller.enqueue(chunk); + }, + }), + }; + }, + ); + const rpcServer = new RPCServer({ + middlewareFactory: middleware, + logger, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, + }; + rpcServer.handleStream(readWriteStream); + const out = await outputResult; + expect(out.map((v) => v!.toString())).toStrictEqual( + messages.map(() => + JSON.stringify({ + jsonrpc: '2.0', + result: 1, + id: null, + }), + ), + ); + await rpcServer.stop({ force: true }); + }, + { numRuns: 1 }, + ); testProp( 'forward middleware authentication', [invalidTokenMessageArb], diff --git a/tests/middleware.test.ts b/tests/middleware.test.ts index 3954fd1..f108e4a 100644 --- a/tests/middleware.test.ts +++ b/tests/middleware.test.ts @@ -1,5 +1,6 @@ import { fc, testProp } from '@fast-check/jest'; import { AsyncIterableX as AsyncIterable } from 'ix/asynciterable'; +import { Timer } from '@matrixai/timer'; import * as rpcUtils from '@/utils'; import 'ix/add/asynciterable-operators/toarray'; import * as rpcErrors from '@/errors'; @@ -99,4 +100,140 @@ describe('Middleware tests', () => { }, { numRuns: 1000 }, ); + testProp( + 'timeoutMiddlewareServer should set ctx.timeout if timeout is lower', + [rpcTestUtils.jsonMessagesArb, fc.integer({ min: 0 })], + async (messages, timeout) => { + messages[0].metadata = { ...messages[0].metadata, timeout }; + const abortController = new AbortController(); + const timer = new Timer(undefined, Infinity); + const ctx = { + signal: abortController.signal, + timer, + }; + const timeoutMiddleware = rpcUtilsMiddleware.timeoutMiddlewareServer( + ctx, + () => {}, + {}, + ); + const parsedStream = rpcTestUtils + .messagesToReadableStream(messages) + .pipeThrough( + rpcUtilsMiddleware.binaryToJsonMessageStream( + rpcUtils.parseJSONRPCMessage, + ), + ) // Converting back. + .pipeThrough(timeoutMiddleware.forward); + + const asd = await AsyncIterable.as(parsedStream).toArray(); + expect(asd).toEqual(messages); + expect(timer.delay).toBe(timeout); + timer.cancel(); + await timer.catch(() => {}); + }, + ); + testProp( + 'timeoutMiddlewareServer wont set ctx.timeout if timeout is higher', + [rpcTestUtils.jsonMessagesArb, fc.integer({ min: 1 })], + async (messages, timeout) => { + messages[0].metadata = { ...messages[0].metadata, timeout }; + const abortController = new AbortController(); + const timer = new Timer(undefined, 0); + const ctx = { + signal: abortController.signal, + timer, + }; + const timeoutMiddleware = rpcUtilsMiddleware.timeoutMiddlewareServer( + ctx, + () => {}, + {}, + ); + const parsedStream = rpcTestUtils + .messagesToReadableStream(messages) + .pipeThrough( + rpcUtilsMiddleware.binaryToJsonMessageStream( + rpcUtils.parseJSONRPCMessage, + ), + ) // Converting back. + .pipeThrough(timeoutMiddleware.forward); + + const asd = await AsyncIterable.as(parsedStream).toArray(); + expect(asd).toEqual(messages); + expect(timer.delay).toBe(0); + timer.cancel(); + await timer.catch(() => {}); + }, + ); + testProp( + 'timeoutMiddlewareServer should set ctx.timeout if timeout is infinity/null', + [rpcTestUtils.jsonMessagesArb], + async (messages) => { + messages[0].metadata = { ...messages[0].metadata, timeout: Infinity }; + const abortController = new AbortController(); + const timer = new Timer(undefined, Infinity); + const ctx = { + signal: abortController.signal, + timer, + }; + const timeoutMiddleware = rpcUtilsMiddleware.timeoutMiddlewareServer( + ctx, + () => {}, + {}, + ); + const parsedStream = rpcTestUtils + .messagesToReadableStream(messages) + .pipeThrough( + rpcUtilsMiddleware.binaryToJsonMessageStream( + rpcUtils.parseJSONRPCMessage, + ), + ) + .pipeThrough(timeoutMiddleware.forward); // Converting back. + + const expectedMessages = [...messages]; + if (expectedMessages[0].metadata != null) { + expectedMessages[0].metadata.timeout = null; + } + const asd = await AsyncIterable.as(parsedStream).toArray(); + expect(asd).toEqual(expectedMessages); + expect(timer.delay).toBe(Infinity); + timer.cancel(); + await timer.catch(() => {}); + }, + ); + testProp( + 'timeoutMiddlewareClient can encode ctx.timeout', + [rpcTestUtils.jsonMessagesArb, fc.integer({ min: 0 })], + async (messages, timeout) => { + const abortController = new AbortController(); + const timer = new Timer(undefined, timeout); + const ctx = { + signal: abortController.signal, + timer, + }; + const timeoutMiddleware = rpcUtilsMiddleware.timeoutMiddlewareClient( + ctx, + () => {}, + {}, + ); + const parsedStream = rpcTestUtils + .messagesToReadableStream(messages) + .pipeThrough( + rpcUtilsMiddleware.binaryToJsonMessageStream( + rpcUtils.parseJSONRPCMessage, + ), + ) + .pipeThrough(timeoutMiddleware.forward); // Converting back. + + const expectedMessages = [...messages]; + expectedMessages[0].metadata = { + ...expectedMessages[0].metadata, + timeout, + }; + const asd = await AsyncIterable.as(parsedStream).toArray(); + expect(asd).toEqual(expectedMessages); + expect(timer.delay).toBe(timeout); + timer.cancel(); + await timer.catch(() => {}); + }, + ); });