@@ -93,6 +93,7 @@ const {
9393 ArrayBufferViewGetByteOffset,
9494 ArrayBufferGetByteLength,
9595 AsyncIterator,
96+ cloneAsUint8Array,
9697 copyArrayBuffer,
9798 customInspect,
9899 dequeueValue,
@@ -211,6 +212,7 @@ class ReadableStream {
211212 throw new ERR_INVALID_ARG_VALUE ( 'source' , 'Object' , source ) ;
212213 this [ kState ] = {
213214 disturbed : false ,
215+ reader : undefined ,
214216 state : 'readable' ,
215217 storedError : undefined ,
216218 stream : undefined ,
@@ -1103,7 +1105,6 @@ class ReadableByteStreamController {
11031105 chunk ) ;
11041106 }
11051107 const chunkByteLength = ArrayBufferViewGetByteLength ( chunk ) ;
1106- const chunkByteOffset = ArrayBufferViewGetByteOffset ( chunk ) ;
11071108 const chunkBuffer = ArrayBufferViewGetBuffer ( chunk ) ;
11081109 const chunkBufferByteLength = ArrayBufferGetByteLength ( chunkBuffer ) ;
11091110 if ( chunkByteLength === 0 || chunkBufferByteLength === 0 ) {
@@ -1114,11 +1115,7 @@ class ReadableByteStreamController {
11141115 throw new ERR_INVALID_STATE . TypeError ( 'Controller is already closed' ) ;
11151116 if ( this [ kState ] . stream [ kState ] . state !== 'readable' )
11161117 throw new ERR_INVALID_STATE . TypeError ( 'ReadableStream is already closed' ) ;
1117- readableByteStreamControllerEnqueue (
1118- this ,
1119- chunkBuffer ,
1120- chunkByteLength ,
1121- chunkByteOffset ) ;
1118+ readableByteStreamControllerEnqueue ( this , chunk ) ;
11221119 }
11231120
11241121 /**
@@ -1416,6 +1413,13 @@ function readableStreamPipeTo(
14161413}
14171414
14181415function readableStreamTee ( stream , cloneForBranch2 ) {
1416+ if ( isReadableByteStreamController ( stream [ kState ] . controller ) ) {
1417+ return readableByteStreamTee ( stream ) ;
1418+ }
1419+ return readableStreamDefaultTee ( stream , cloneForBranch2 ) ;
1420+ }
1421+
1422+ function readableStreamDefaultTee ( stream , cloneForBranch2 ) {
14191423 const reader = new ReadableStreamDefaultReader ( stream ) ;
14201424 let reading = false ;
14211425 let canceled1 = false ;
@@ -1510,6 +1514,284 @@ function readableStreamTee(stream, cloneForBranch2) {
15101514 return [ branch1 , branch2 ] ;
15111515}
15121516
1517+ function readableByteStreamTee ( stream ) {
1518+ assert ( isReadableStream ( stream ) ) ;
1519+ assert ( isReadableByteStreamController ( stream [ kState ] . controller ) ) ;
1520+
1521+ let reader = new ReadableStreamDefaultReader ( stream ) ;
1522+ let reading = false ;
1523+ let readAgainForBranch1 = false ;
1524+ let readAgainForBranch2 = false ;
1525+ let canceled1 = false ;
1526+ let canceled2 = false ;
1527+ let reason1 ;
1528+ let reason2 ;
1529+ let branch1 ;
1530+ let branch2 ;
1531+ const cancelDeferred = createDeferredPromise ( ) ;
1532+
1533+ function forwardReaderError ( thisReader ) {
1534+ PromisePrototypeThen (
1535+ thisReader [ kState ] . close . promise ,
1536+ undefined ,
1537+ ( error ) => {
1538+ if ( thisReader !== reader ) {
1539+ return ;
1540+ }
1541+ readableStreamDefaultControllerError ( branch1 [ kState ] . controller , error ) ;
1542+ readableStreamDefaultControllerError ( branch2 [ kState ] . controller , error ) ;
1543+ if ( ! canceled1 || ! canceled2 ) {
1544+ cancelDeferred . resolve ( ) ;
1545+ }
1546+ }
1547+ ) ;
1548+ }
1549+
1550+ function pullWithDefaultReader ( ) {
1551+ if ( isReadableStreamBYOBReader ( reader ) ) {
1552+ readableStreamBYOBReaderRelease ( reader ) ;
1553+ reader = new ReadableStreamDefaultReader ( stream ) ;
1554+ forwardReaderError ( reader ) ;
1555+ }
1556+
1557+ const readRequest = {
1558+ [ kChunk ] ( chunk ) {
1559+ queueMicrotask ( ( ) => {
1560+ readAgainForBranch1 = false ;
1561+ readAgainForBranch2 = false ;
1562+ const chunk1 = chunk ;
1563+ let chunk2 = chunk ;
1564+
1565+ if ( ! canceled1 && ! canceled2 ) {
1566+ try {
1567+ chunk2 = cloneAsUint8Array ( chunk ) ;
1568+ } catch ( error ) {
1569+ readableByteStreamControllerError (
1570+ branch1 [ kState ] . controller ,
1571+ error
1572+ ) ;
1573+ readableByteStreamControllerError (
1574+ branch2 [ kState ] . controller ,
1575+ error
1576+ ) ;
1577+ cancelDeferred . resolve ( readableStreamCancel ( stream , error ) ) ;
1578+ return ;
1579+ }
1580+ }
1581+ if ( ! canceled1 ) {
1582+ readableByteStreamControllerEnqueue (
1583+ branch1 [ kState ] . controller ,
1584+ chunk1
1585+ ) ;
1586+ }
1587+ if ( ! canceled2 ) {
1588+ readableByteStreamControllerEnqueue (
1589+ branch2 [ kState ] . controller ,
1590+ chunk2
1591+ ) ;
1592+ }
1593+ reading = false ;
1594+
1595+ if ( readAgainForBranch1 ) {
1596+ pull1Algorithm ( ) ;
1597+ } else if ( readAgainForBranch2 ) {
1598+ pull2Algorithm ( ) ;
1599+ }
1600+ } ) ;
1601+ } ,
1602+ [ kClose ] ( ) {
1603+ reading = false ;
1604+
1605+ if ( ! canceled1 ) {
1606+ readableByteStreamControllerClose ( branch1 [ kState ] . controller ) ;
1607+ }
1608+ if ( ! canceled2 ) {
1609+ readableByteStreamControllerClose ( branch2 [ kState ] . controller ) ;
1610+ }
1611+ if ( branch1 [ kState ] . controller [ kState ] . pendingPullIntos . length > 0 ) {
1612+ readableByteStreamControllerRespond ( branch1 [ kState ] . controller , 0 ) ;
1613+ }
1614+ if ( branch2 [ kState ] . controller [ kState ] . pendingPullIntos . length > 0 ) {
1615+ readableByteStreamControllerRespond ( branch2 [ kState ] . controller , 0 ) ;
1616+ }
1617+ if ( ! canceled1 || ! canceled2 ) {
1618+ cancelDeferred . resolve ( ) ;
1619+ }
1620+ } ,
1621+ [ kError ] ( ) {
1622+ reading = false ;
1623+ } ,
1624+ } ;
1625+
1626+ readableStreamDefaultReaderRead ( reader , readRequest ) ;
1627+ }
1628+
1629+ function pullWithBYOBReader ( view , forBranch2 ) {
1630+ if ( isReadableStreamDefaultReader ( reader ) ) {
1631+ readableStreamDefaultReaderRelease ( reader ) ;
1632+ reader = new ReadableStreamBYOBReader ( stream ) ;
1633+ forwardReaderError ( reader ) ;
1634+ }
1635+
1636+ const byobBranch = forBranch2 === true ? branch2 : branch1 ;
1637+ const otherBranch = forBranch2 === false ? branch2 : branch1 ;
1638+ const readIntoRequest = {
1639+ [ kChunk ] ( chunk ) {
1640+ queueMicrotask ( ( ) => {
1641+ readAgainForBranch1 = false ;
1642+ readAgainForBranch2 = false ;
1643+ const byobCanceled = forBranch2 === true ? canceled2 : canceled1 ;
1644+ const otherCanceled = forBranch2 === false ? canceled2 : canceled1 ;
1645+
1646+ if ( ! otherCanceled ) {
1647+ let clonedChunk ;
1648+
1649+ try {
1650+ clonedChunk = cloneAsUint8Array ( chunk ) ;
1651+ } catch ( error ) {
1652+ readableByteStreamControllerError (
1653+ byobBranch [ kState ] . controller ,
1654+ error
1655+ ) ;
1656+ readableByteStreamControllerError (
1657+ otherBranch [ kState ] . controller ,
1658+ error
1659+ ) ;
1660+ cancelDeferred . resolve ( readableStreamCancel ( stream , error ) ) ;
1661+ return ;
1662+ }
1663+ if ( ! byobCanceled ) {
1664+ readableByteStreamControllerRespondWithNewView (
1665+ byobBranch [ kState ] . controller ,
1666+ chunk
1667+ ) ;
1668+ }
1669+
1670+ readableByteStreamControllerEnqueue (
1671+ otherBranch [ kState ] . controller ,
1672+ clonedChunk
1673+ ) ;
1674+ } else if ( ! byobCanceled ) {
1675+ readableByteStreamControllerRespondWithNewView (
1676+ byobBranch [ kState ] . controller ,
1677+ chunk
1678+ ) ;
1679+ }
1680+ reading = false ;
1681+
1682+ if ( readAgainForBranch1 ) {
1683+ pull1Algorithm ( ) ;
1684+ } else if ( readAgainForBranch2 ) {
1685+ pull2Algorithm ( ) ;
1686+ }
1687+ } ) ;
1688+ } ,
1689+ [ kClose ] ( chunk ) {
1690+ reading = false ;
1691+
1692+ const byobCanceled = forBranch2 === true ? canceled2 : canceled1 ;
1693+ const otherCanceled = forBranch2 === false ? canceled2 : canceled1 ;
1694+
1695+ if ( ! byobCanceled ) {
1696+ readableByteStreamControllerClose ( byobBranch [ kState ] . controller ) ;
1697+ }
1698+ if ( ! otherCanceled ) {
1699+ readableByteStreamControllerClose ( otherBranch [ kState ] . controller ) ;
1700+ }
1701+ if ( chunk !== undefined ) {
1702+ if ( ! byobCanceled ) {
1703+ readableByteStreamControllerRespondWithNewView (
1704+ byobBranch [ kState ] . controller ,
1705+ chunk
1706+ ) ;
1707+ }
1708+ if (
1709+ ! otherCanceled &&
1710+ otherBranch [ kState ] . controller [ kState ] . pendingPullIntos . length > 0
1711+ ) {
1712+ readableByteStreamControllerRespond (
1713+ otherBranch [ kState ] . controller ,
1714+ 0
1715+ ) ;
1716+ }
1717+ }
1718+ if ( ! byobCanceled || ! otherCanceled ) {
1719+ cancelDeferred . resolve ( ) ;
1720+ }
1721+ } ,
1722+ [ kError ] ( ) {
1723+ reading = false ;
1724+ } ,
1725+ } ;
1726+ readableStreamBYOBReaderRead ( reader , view , readIntoRequest ) ;
1727+ }
1728+
1729+ function pull1Algorithm ( ) {
1730+ if ( reading ) {
1731+ readAgainForBranch1 = true ;
1732+ return PromiseResolve ( ) ;
1733+ }
1734+ reading = true ;
1735+
1736+ const byobRequest = branch1 [ kState ] . controller . byobRequest ;
1737+ if ( byobRequest === null ) {
1738+ pullWithDefaultReader ( ) ;
1739+ } else {
1740+ pullWithBYOBReader ( byobRequest [ kState ] . view , false ) ;
1741+ }
1742+ return PromiseResolve ( ) ;
1743+ }
1744+
1745+ function pull2Algorithm ( ) {
1746+ if ( reading ) {
1747+ readAgainForBranch2 = true ;
1748+ return PromiseResolve ( ) ;
1749+ }
1750+ reading = true ;
1751+
1752+ const byobRequest = branch2 [ kState ] . controller . byobRequest ;
1753+ if ( byobRequest === null ) {
1754+ pullWithDefaultReader ( ) ;
1755+ } else {
1756+ pullWithBYOBReader ( byobRequest [ kState ] . view , true ) ;
1757+ }
1758+ return PromiseResolve ( ) ;
1759+ }
1760+
1761+ function cancel1Algorithm ( reason ) {
1762+ canceled1 = true ;
1763+ reason1 = reason ;
1764+ if ( canceled2 ) {
1765+ cancelDeferred . resolve ( readableStreamCancel ( stream , [ reason1 , reason2 ] ) ) ;
1766+ }
1767+ return cancelDeferred . promise ;
1768+ }
1769+
1770+ function cancel2Algorithm ( reason ) {
1771+ canceled2 = true ;
1772+ reason2 = reason ;
1773+ if ( canceled1 ) {
1774+ cancelDeferred . resolve ( readableStreamCancel ( stream , [ reason1 , reason2 ] ) ) ;
1775+ }
1776+ return cancelDeferred . promise ;
1777+ }
1778+
1779+ branch1 = new ReadableStream ( {
1780+ type : 'bytes' ,
1781+ pull : pull1Algorithm ,
1782+ cancel : cancel1Algorithm ,
1783+ } ) ;
1784+ branch2 = new ReadableStream ( {
1785+ type : 'bytes' ,
1786+ pull : pull2Algorithm ,
1787+ cancel : cancel2Algorithm ,
1788+ } ) ;
1789+
1790+ forwardReaderError ( reader ) ;
1791+
1792+ return [ branch1 , branch2 ] ;
1793+ }
1794+
15131795function readableByteStreamControllerConvertPullIntoDescriptor ( desc ) {
15141796 const {
15151797 buffer,
@@ -2273,18 +2555,18 @@ function readableByteStreamControllerFillHeadPullIntoDescriptor(
22732555 desc . bytesFilled += size ;
22742556}
22752557
2276- function readableByteStreamControllerEnqueue (
2277- controller ,
2278- buffer ,
2279- byteLength ,
2280- byteOffset ) {
2558+ function readableByteStreamControllerEnqueue ( controller , chunk ) {
22812559 const {
22822560 closeRequested,
22832561 pendingPullIntos,
22842562 queue,
22852563 stream,
22862564 } = controller [ kState ] ;
22872565
2566+ const buffer = ArrayBufferViewGetBuffer ( chunk ) ;
2567+ const byteOffset = ArrayBufferViewGetByteOffset ( chunk ) ;
2568+ const byteLength = ArrayBufferViewGetByteLength ( chunk ) ;
2569+
22882570 if ( closeRequested || stream [ kState ] . state !== 'readable' )
22892571 return ;
22902572
0 commit comments