@@ -19,7 +19,6 @@ use sqlx_core::database::Database;
1919use sqlx_core:: describe:: Describe ;
2020use sqlx_core:: executor:: Executor ;
2121use sqlx_core:: transaction:: TransactionManager ;
22- use std:: pin:: pin;
2322use std:: sync:: Arc ;
2423
2524sqlx_core:: declare_driver_with_optional_migrate!( DRIVER = Sqlite ) ;
@@ -86,21 +85,7 @@ impl AnyConnectionBackend for SqliteConnection {
8685 persistent : bool ,
8786 arguments : Option < AnyArguments > ,
8887 ) -> BoxStream < ' _ , sqlx_core:: Result < Either < AnyQueryResult , AnyRow > > > {
89- let persistent = persistent && arguments. is_some ( ) ;
90- let args = arguments. map ( map_arguments) ;
91-
92- Box :: pin (
93- self . worker
94- . execute ( query, args, self . row_channel_size , persistent, None )
95- . map_ok ( flume:: Receiver :: into_stream)
96- . try_flatten_stream ( )
97- . map (
98- move |res : sqlx_core:: Result < Either < SqliteQueryResult , SqliteRow > > | match res? {
99- Either :: Left ( result) => Ok ( Either :: Left ( map_result ( result) ) ) ,
100- Either :: Right ( row) => Ok ( Either :: Right ( AnyRow :: try_from ( & row) ?) ) ,
101- } ,
102- ) ,
103- )
88+ self . fetch_with_limit ( query, persistent, arguments, None )
10489 }
10590
10691 fn fetch_optional (
@@ -109,19 +94,13 @@ impl AnyConnectionBackend for SqliteConnection {
10994 persistent : bool ,
11095 arguments : Option < AnyArguments > ,
11196 ) -> BoxFuture < ' _ , sqlx_core:: Result < Option < AnyRow > > > {
112- let persistent = persistent && arguments. is_some ( ) ;
113- let args = arguments. map ( map_arguments) ;
97+ let mut stream = self . fetch_with_limit ( query, persistent, arguments, Some ( 1 ) ) ;
11498
11599 Box :: pin ( async move {
116- let mut stream = pin ! (
117- self . worker
118- . execute( query, args, self . row_channel_size, persistent, Some ( 1 ) )
119- . map_ok( flume:: Receiver :: into_stream)
120- . await ?
121- ) ;
122-
123- if let Some ( Either :: Right ( row) ) = stream. try_next ( ) . await ? {
124- return Ok ( Some ( AnyRow :: try_from ( & row) ?) ) ;
100+ while let Some ( result) = stream. try_next ( ) . await ? {
101+ if let Either :: Right ( row) = result {
102+ return Ok ( Some ( row) ) ;
103+ }
125104 }
126105
127106 Ok ( None )
@@ -145,6 +124,32 @@ impl AnyConnectionBackend for SqliteConnection {
145124 }
146125}
147126
127+ impl SqliteConnection {
128+ fn fetch_with_limit (
129+ & mut self ,
130+ query : SqlStr ,
131+ persistent : bool ,
132+ arguments : Option < AnyArguments > ,
133+ limit : Option < usize > ,
134+ ) -> BoxStream < ' _ , sqlx_core:: Result < Either < AnyQueryResult , AnyRow > > > {
135+ let persistent = persistent && arguments. is_some ( ) ;
136+ let args = arguments. map ( map_arguments) ;
137+
138+ Box :: pin (
139+ self . worker
140+ . execute ( query, args, self . row_channel_size , persistent, limit)
141+ . map_ok ( flume:: Receiver :: into_stream)
142+ . try_flatten_stream ( )
143+ . map (
144+ move |res : sqlx_core:: Result < Either < SqliteQueryResult , SqliteRow > > | match res? {
145+ Either :: Left ( result) => Ok ( Either :: Left ( map_result ( result) ) ) ,
146+ Either :: Right ( row) => Ok ( Either :: Right ( AnyRow :: try_from ( & row) ?) ) ,
147+ } ,
148+ ) ,
149+ )
150+ }
151+ }
152+
148153impl < ' a > TryFrom < & ' a SqliteTypeInfo > for AnyTypeInfo {
149154 type Error = sqlx_core:: Error ;
150155
0 commit comments