Skip to content
This repository was archived by the owner on May 2, 2023. It is now read-only.

Commit ad15c0c

Browse files
committed
Fixes for BigQuery, REPL
1 parent ffc4afe commit ad15c0c

3 files changed

Lines changed: 14 additions & 10 deletions

File tree

sqeleton/databases/base.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ def _normalize_table_path(self, path: DbPath) -> DbPath:
487487
def parse_table_name(self, name: str) -> DbPath:
488488
return parse_table_name(name)
489489

490-
def _query_cursor(self, c, sql_code: str):
490+
def _query_cursor(self, c, sql_code: str) -> QueryResult:
491491
assert isinstance(sql_code, str), sql_code
492492
try:
493493
c.execute(sql_code)
@@ -499,7 +499,7 @@ def _query_cursor(self, c, sql_code: str):
499499
# logger.error(f'Caused by SQL: {sql_code}')
500500
raise
501501

502-
def _query_conn(self, conn, sql_code: Union[str, ThreadLocalInterpreter]) -> list:
502+
def _query_conn(self, conn, sql_code: Union[str, ThreadLocalInterpreter]) -> QueryResult:
503503
c = conn.cursor()
504504
callback = partial(self._query_cursor, c)
505505
return apply_query(callback, sql_code)
@@ -542,7 +542,7 @@ def set_conn(self):
542542
except Exception as e:
543543
self._init_error = e
544544

545-
def _query(self, sql_code: Union[str, ThreadLocalInterpreter]):
545+
def _query(self, sql_code: Union[str, ThreadLocalInterpreter]) -> QueryResult:
546546
r = self._queue.submit(self._query_in_worker, sql_code)
547547
return r.result()
548548

sqeleton/databases/bigquery.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
)
2020
from ..abcs import Compilable
2121
from ..queries import this, table, SKIP, code
22-
from .base import BaseDialect, Database, import_helper, parse_table_name, ConnectError, apply_query
22+
from .base import BaseDialect, Database, import_helper, parse_table_name, ConnectError, apply_query, QueryResult
2323
from .base import TIMESTAMP_PRECISION_POS, ThreadLocalInterpreter, Mixin_RandomSample
2424

2525

@@ -161,16 +161,18 @@ def _query_atom(self, sql_code: str):
161161
from google.cloud import bigquery
162162

163163
try:
164-
res = list(self._client.query(sql_code))
164+
result = self._client.query(sql_code).result()
165+
columns = [c.name for c in result.schema]
166+
rows = list(result)
165167
except Exception as e:
166168
msg = "Exception when trying to execute SQL code:\n %s\n\nGot error: %s"
167169
raise ConnectError(msg % (sql_code, e))
168170

169-
if res and isinstance(res[0], bigquery.table.Row):
170-
res = [tuple(self._normalize_returned_value(v) for v in row.values()) for row in res]
171-
return res
171+
if rows and isinstance(rows[0], bigquery.table.Row):
172+
rows = [tuple(self._normalize_returned_value(v) for v in row.values()) for row in rows]
173+
return QueryResult(rows, columns)
172174

173-
def _query(self, sql_code: Union[str, ThreadLocalInterpreter]):
175+
def _query(self, sql_code: Union[str, ThreadLocalInterpreter]) -> QueryResult:
174176
return apply_query(self._query_atom, sql_code)
175177

176178
def close(self):

sqeleton/repl.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ def repl(uri):
4747
help()
4848
continue
4949
try:
50-
schema = db.query_table_schema((table_name,))
50+
path = db.parse_table_name(table_name)
51+
print('->', path)
52+
schema = db.query_table_schema(path)
5153
except Exception as e:
5254
logging.error(e)
5355
else:

0 commit comments

Comments
 (0)