-
Notifications
You must be signed in to change notification settings - Fork 48
Expand file tree
/
Copy pathcloudfetch.test.ts
More file actions
135 lines (114 loc) · 5.38 KB
/
cloudfetch.test.ts
File metadata and controls
135 lines (114 loc) · 5.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import { expect } from 'chai';
import sinon from 'sinon';
import { DBSQLClient } from '../../lib';
import { ClientConfig } from '../../lib/contracts/IClientContext';
import CloudFetchResultHandler from '../../lib/result/CloudFetchResultHandler';
import ArrowResultConverter from '../../lib/result/ArrowResultConverter';
import ResultSlicer from '../../lib/result/ResultSlicer';
import config from './utils/config';
async function openSession(customConfig: Partial<ClientConfig> = {}) {
const client = new DBSQLClient();
const clientConfig = client.getConfig();
sinon.stub(client, 'getConfig').returns({
...clientConfig,
...customConfig,
});
const connection = await client.connect({
host: config.host,
path: config.path,
token: config.token,
});
return connection.openSession({
initialCatalog: config.catalog,
initialSchema: config.schema,
});
}
// This suite takes a while to execute, and in this case it's expected.
// If one day it starts to fail with timeouts - you may consider to just increase timeout for it
describe('CloudFetch', () => {
it('should fetch data', async () => {
const cloudFetchConcurrentDownloads = 5;
const session = await openSession({
cloudFetchConcurrentDownloads,
useLZ4Compression: false,
});
const queriedRowsCount = 10000000; // result has to be quite big to enable CloudFetch
const operation = await session.executeStatement(
`
SELECT *
FROM range(0, ${queriedRowsCount}) AS t1
LEFT JOIN (SELECT 1) AS t2
`,
{
maxRows: null, // disable DirectResults
useCloudFetch: true, // tell server that we would like to use CloudFetch
},
);
// We're going to examine some internals of operation, so explicitly wait for completion
await operation.finished();
// Check if we're actually getting data via CloudFetch
// @ts-expect-error TS2339: Property getResultHandler does not exist on type IOperation
const resultHandler = await operation.getResultHandler();
expect(resultHandler).to.be.instanceof(ResultSlicer);
expect(resultHandler.source).to.be.instanceof(ArrowResultConverter);
expect(resultHandler.source.source).to.be.instanceOf(CloudFetchResultHandler);
const cfResultHandler = resultHandler.source.source;
// Fetch first chunk and check if result handler behaves properly.
// With the count of rows we queried, there should be at least one row set,
// containing 8 result links. After fetching the first chunk,
// result handler should download 5 of them and schedule the rest
expect(await cfResultHandler.hasMore()).to.be.true;
expect(cfResultHandler.pendingLinks.length).to.be.equal(0);
expect(cfResultHandler.downloadTasks.length).to.be.equal(0);
// @ts-expect-error TS2339: Property _data does not exist on type IOperation
sinon.spy(operation._data, 'fetchNext');
const chunk = await operation.fetchChunk({ maxRows: 100000, disableBuffering: true });
// Count links returned from server
// @ts-expect-error TS2339: Property _data does not exist on type IOperation
const resultSet = await operation._data.fetchNext.firstCall.returnValue;
const resultLinksCount = resultSet?.resultLinks?.length ?? 0;
expect(await cfResultHandler.hasMore()).to.be.true;
// expected batches minus first 5 already fetched
expect(cfResultHandler.pendingLinks.length).to.be.equal(resultLinksCount - cloudFetchConcurrentDownloads);
expect(cfResultHandler.downloadTasks.length).to.be.equal(cloudFetchConcurrentDownloads - 1);
let fetchedRowCount = chunk.length;
// eslint-disable-next-line no-await-in-loop
while (await operation.hasMoreRows()) {
// eslint-disable-next-line no-await-in-loop
const ch = await operation.fetchChunk({ maxRows: 100000, disableBuffering: true });
fetchedRowCount += ch.length;
}
expect(fetchedRowCount).to.be.equal(queriedRowsCount);
});
it('should not use LZ4 compression with cloud fetch', async () => {
const cloudFetchConcurrentDownloads = 5;
const session = await openSession({
cloudFetchConcurrentDownloads,
useLZ4Compression: true, // This is ignored when cloud fetch is enabled
});
const queriedRowsCount = 10000000; // result has to be quite big to enable CloudFetch
const operation = await session.executeStatement(
`
SELECT *
FROM range(0, ${queriedRowsCount}) AS t1
LEFT JOIN (SELECT 1) AS t2
`,
{
maxRows: null, // disable DirectResults
useCloudFetch: true, // tell server that we would like to use CloudFetch
},
);
// We're going to examine some internals of operation, so explicitly wait for completion
await operation.finished();
// Check if we're actually getting data via CloudFetch
// @ts-expect-error TS2339: Property getResultHandler does not exist on type IOperation
const resultHandler = await operation.getResultHandler();
expect(resultHandler).to.be.instanceof(ResultSlicer);
expect(resultHandler.source).to.be.instanceof(ArrowResultConverter);
expect(resultHandler.source.source).to.be.instanceOf(CloudFetchResultHandler);
// LZ4 compression should not be enabled with cloud fetch
expect(resultHandler.source.source.isLZ4Compressed).to.be.false;
const chunk = await operation.fetchChunk({ maxRows: 100000, disableBuffering: true });
expect(chunk.length).to.be.gt(0);
});
});