-
Notifications
You must be signed in to change notification settings - Fork 380
Expand file tree
/
Copy pathtest_db_api.py
More file actions
59 lines (48 loc) · 1.7 KB
/
test_db_api.py
File metadata and controls
59 lines (48 loc) · 1.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import pytest
from pyspark.sql import SparkSession
from sqlmesh.engines.spark.db_api import errors
from sqlmesh.engines.spark.db_api import spark_session as spark_session_db
# note: this is deliberately not marked with 'spark' so that it
# can run separately from the spark integration tests.
# running them at the same time mutates some global state in the SparkSession which breaks these tests
pytestmark = [pytest.mark.slow, pytest.mark.pyspark]
def test_spark_session_cursor(spark_session: SparkSession):
connection = spark_session_db.connection(spark_session)
cursor = connection.cursor()
with pytest.raises(errors.ProgrammingError):
cursor.fetchone()
cursor.execute(
"""
SELECT *
FROM
VALUES ('key1', 1),
('key2', 2),
('key3', 3),
('key4', 4),
('key5', 5),
('key6', 6),
('key7', 7) AS data(key, value)
"""
)
assert cursor.fetchone() == ("key1", 1)
assert cursor.fetchmany(size=2) == [
("key2", 2),
("key3", 3),
]
assert cursor.fetchone() == ("key4", 4)
assert cursor.fetchall() == [
("key5", 5),
("key6", 6),
("key7", 7),
]
assert cursor.fetchone() is None
assert cursor.fetchmany(size=2) == []
assert cursor.fetchall() == []
with pytest.raises(errors.ProgrammingError):
cursor.fetchmany(size=-1)
def test_spark_session_cursor_execute_parameters_not_supported(
spark_session: SparkSession,
):
connection = spark_session_db.connection(spark_session)
with pytest.raises(errors.NotSupportedError):
connection.cursor().execute("SELECT 1 WHERE a = ?", ["value"])