@@ -1017,43 +1017,65 @@ impl LiftIntrinsic {
10171017 Intrinsic :: AsyncStream ( AsyncStreamIntrinsic :: ExternalStreamClass ) . name ( ) ;
10181018 let lift_flat_stream_fn = self . name ( ) ;
10191019 let global_stream_table_map = AsyncStreamIntrinsic :: GlobalStreamTableMap . name ( ) ;
1020+ let lift_u32 = Self :: LiftFlatU32 . name ( ) ;
10201021
10211022 output. push_str ( & format ! ( r#"
1022- function {lift_flat_stream_fn}(streamTableIdx, ctx) {{
1023- {debug_log_fn}('[{lift_flat_stream_fn}()] args', {{ streamTableIdx, ctx }});
1024- const {{ memory, useDirectParams, params }} = ctx;
1023+ function {lift_flat_stream_fn}(meta) {{
1024+ const {{
1025+ streamTableIdx,
1026+ componentIdx,
1027+ isBorrowedType,
1028+ isNoneType,
1029+ isNumericTypeJs,
1030+ }} = meta;
1031+
1032+ return function {lift_flat_stream_fn}Inner(ctx) {{
1033+ {debug_log_fn}('[{lift_flat_stream_fn}()] args', {{ ctx }});
1034+
1035+ const streamMeta = {global_stream_table_map}[streamTableIdx];
1036+ if (streamMeta.componentIdx !== componentIdx) {{
1037+ throw new Error('unexpectedly mismatched component idx');
1038+ }}
1039+ const {{ table }} = streamMeta;
1040+ if (componentIdx === undefined || !table) {{
1041+ throw new Error(`invalid global stream table state for table [${{tableIdx}}]`);
1042+ }}
10251043
1026- const {{ table, componentIdx }} = {global_stream_table_map}[streamTableIdx];
1027- if (componentIdx === undefined || !table) {{
1028- throw new Error(`invalid global stream table state for table [${{tableIdx}}]`);
1029- }}
1044+ let streamEndWaitableIdx;
1045+ if (ctx.useDirectParams) {{
1046+ streamEndWaitableIdx = ctx.params[0];
1047+ ctx.params = ctx.params.slice(1);
1048+ }} else {{
1049+ const [waitableIdx, newCtx] = {lift_u32}(ctx);
1050+ ctx = newCtx;
1051+ streamEndWaitableIdx = waitableIdx;
1052+ }}
10301053
1031- const cstate = {get_or_create_async_state_fn}(componentIdx);
1032- if (!cstate) {{ throw new Error(`missing async state for component [${{componentIdx}}]`); }}
1054+ if (!streamEndWaitableIdx) {{ throw new Error('missing stream idx'); }}
10331055
1034- const streamEndWaitableIdx = params[0] ;
1035- if (!streamEndWaitableIdx ) {{ throw new Error(' missing stream idx' ); }}
1056+ const cstate = {get_or_create_async_state_fn}(componentIdx) ;
1057+ if (!cstate ) {{ throw new Error(` missing async state for component [${{componentIdx}}]` ); }}
10361058
1037- const streamEnd = cstate.getStreamEnd({{ tableIdx: streamTableIdx, streamEndWaitableIdx }});
1038- if (!streamEnd) {{
1039- throw new Error(`missing stream end [${{streamEndWaitableIdx}}] (table [${{streamTableIdx}}]) in component [${{componentIdx}}] during lift`);
1040- }}
1059+ const streamEnd = cstate.getStreamEnd({{ tableIdx: streamTableIdx, streamEndWaitableIdx }});
1060+ if (!streamEnd) {{
1061+ throw new Error(`missing stream end [${{streamEndWaitableIdx}}] (table [${{streamTableIdx}}]) in component [${{componentIdx}}] during lift`);
1062+ }}
10411063
1042- // TODO: check for borrowed type
1043- // TODO: check for readable only
1044- // TODO: confirm shared type matches tyep for lift
1045- // TODO: check for IDLE state
1046-
1047- const stream = new {external_stream_class}({{
1048- globalRep : streamEnd.globalStreamMapRep (),
1049- isReadable : streamEnd.isReadable (),
1050- isWritable: streamEnd.isWritable() ,
1051- writeFn : (v ) => {{ return streamEnd.write(v ); }},
1052- readFn : () => {{ return streamEnd.read (); }},
1053- dropFn: () => {{ return streamEnd.drop(); }},
1054- }});
1055-
1056- return [ stream, ctx ];
1064+ if (ctx.isBorrowed) {{ throw new Error('cannot lift flat stream of borrowed type'); }}
1065+ if (streamEnd.isWritable()) {{ throw new Error(' only readable streams can be lifted'); }}
1066+ if (!streamEnd.isIdle()) {{ throw new Error('streams must be in idle state'); }}
1067+
1068+ const stream = new {external_stream_class}({{
1069+ globalRep: streamEnd.globalStreamMapRep(),
1070+ isReadable : streamEnd.isReadable (),
1071+ isWritable : streamEnd.isWritable (),
1072+ writeFn: (v) => {{ return streamEnd.write(v); }} ,
1073+ readFn : () => {{ return streamEnd.read( ); }},
1074+ dropFn : () => {{ return streamEnd.drop (); }},
1075+ }});
1076+
1077+ return [ stream, ctx ];
1078+ }}
10571079 }}
10581080 "# ) ) ;
10591081 }
0 commit comments