Skip to content

Commit d2ffa06

Browse files
committed
test: add success e2e tests for stream processing commands MONGOSH-2127
1 parent 5238d59 commit d2ffa06

File tree

2 files changed

+336
-0
lines changed

2 files changed

+336
-0
lines changed

.evergreen.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,10 @@ functions:
214214
MONGOSH_RUN_ONLY_IN_PACKAGE: ${mongosh_run_only_in_package}
215215
AWS_AUTH_IAM_ACCESS_KEY_ID: ${devtools_ci_aws_key}
216216
AWS_AUTH_IAM_SECRET_ACCESS_KEY: ${devtools_ci_aws_secret}
217+
STREAMS_E2E_SPI_CONNECTION_STRING: ${streams_e2e_spi_connection_string}
218+
STREAMS_E2E_DB_USER: ${streams_e2e_db_user}
219+
STREAMS_E2E_DB_PASSWORD: ${streams_e2e_db_password}
220+
STREAMS_E2E_CLUSTER_CONNECTION_STRING: ${streams_e2e_cluster_connection_string}
217221
TASK_NAME: ${task_name}
218222
- command: s3.put
219223
params:
@@ -3772,6 +3776,10 @@ functions:
37723776
NODE_JS_VERSION: ${node_js_version}
37733777
AWS_AUTH_IAM_ACCESS_KEY_ID: ${devtools_ci_aws_key}
37743778
AWS_AUTH_IAM_SECRET_ACCESS_KEY: ${devtools_ci_aws_secret}
3779+
STREAMS_E2E_SPI_CONNECTION_STRING: ${streams_e2e_spi_connection_string}
3780+
STREAMS_E2E_DB_USER: ${streams_e2e_db_user}
3781+
STREAMS_E2E_DB_PASSWORD: ${streams_e2e_db_password}
3782+
STREAMS_E2E_CLUSTER_CONNECTION_STRING: ${streams_e2e_cluster_connection_string}
37753783
DISABLE_OPENSSL_SHARED_CONFIG_FOR_BUNDLED_OPENSSL: ${disable_openssl_shared_config_for_bundled_openssl}
37763784
TASK_NAME: ${task_name}
37773785

Lines changed: 328 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,328 @@
1+
import { bson } from '@mongosh/service-provider-core';
2+
import type { Db, Collection, Document } from '@mongosh/service-provider-core';
3+
import { MongoClient } from 'mongodb';
4+
import { expect } from 'chai';
5+
import type { TestShell } from './test-shell';
6+
import { sleep } from './util-helpers';
7+
import { eventually } from '../../../testing/eventually';
8+
9+
const {
10+
STREAMS_E2E_SPI_CONNECTION_STRING = '',
11+
STREAMS_E2E_DB_USER = '',
12+
STREAMS_E2E_DB_PASSWORD = '',
13+
STREAMS_E2E_CLUSTER_CONNECTION_STRING = '',
14+
} = process.env;
15+
16+
describe('e2e Streams', function () {
17+
this.timeout(60_000);
18+
let shell: TestShell;
19+
20+
before(function () {
21+
if (!STREAMS_E2E_SPI_CONNECTION_STRING) {
22+
console.error(
23+
'Stream Instance connection string not found - skipping Streams E2E tests...'
24+
);
25+
return this.skip();
26+
}
27+
28+
if (!STREAMS_E2E_CLUSTER_CONNECTION_STRING) {
29+
console.error(
30+
'Cluster connection string not found - skipping Streams E2E tests...'
31+
);
32+
return this.skip();
33+
}
34+
35+
if (!STREAMS_E2E_DB_USER) {
36+
console.error(
37+
'Atlas database user for Stream Processing not found - skipping Streams E2E tests...'
38+
);
39+
return this.skip();
40+
}
41+
42+
if (!STREAMS_E2E_DB_PASSWORD) {
43+
console.error(
44+
'Password for Atlas database user not found - skipping Streams E2E tests...'
45+
);
46+
return this.skip();
47+
}
48+
});
49+
50+
describe('basic stream processor operations', function () {
51+
let processorName = '';
52+
let db: Db;
53+
let collection: Collection<Document>;
54+
let client: MongoClient;
55+
56+
beforeEach(async function () {
57+
shell = this.startTestShell({
58+
args: [
59+
STREAMS_E2E_SPI_CONNECTION_STRING,
60+
'--tls',
61+
'--authenticationDatabase admin',
62+
'--username',
63+
STREAMS_E2E_DB_USER,
64+
'--password',
65+
STREAMS_E2E_DB_PASSWORD,
66+
],
67+
removeSigintListeners: true,
68+
});
69+
await shell.waitForPromptOrExit({ timeout: 60_000 });
70+
71+
processorName = `spi${new bson.ObjectId().toHexString()}`;
72+
client = await MongoClient.connect(
73+
STREAMS_E2E_CLUSTER_CONNECTION_STRING,
74+
{}
75+
);
76+
db = client.db(processorName);
77+
const collectionName = 'processedData';
78+
collection = db.collection(collectionName);
79+
80+
// this stream processor reads from the sample stream and inserts documents into an Atlas database
81+
const sourceStage = {
82+
$source: {
83+
connectionName: 'sample_stream_solar',
84+
},
85+
};
86+
87+
const mergeStage = {
88+
$merge: {
89+
into: {
90+
connectionName: 'testClusterConnection',
91+
db: processorName,
92+
coll: collectionName,
93+
},
94+
},
95+
};
96+
97+
const aggPipeline = [sourceStage, mergeStage];
98+
99+
const createResult = await shell.executeLine(
100+
`sp.createStreamProcessor("${processorName}", ${JSON.stringify(
101+
aggPipeline
102+
)})`
103+
);
104+
expect(createResult).to.include(
105+
`Atlas Stream Processor: ${processorName}`
106+
);
107+
});
108+
109+
afterEach(async function () {
110+
try {
111+
await db.dropDatabase();
112+
await client.close();
113+
114+
const result = await shell.executeLine(`sp.${processorName}.drop()`);
115+
expect(result).to.include(`{ ok: 1 }`);
116+
} catch (err: any) {
117+
console.error(
118+
`Could not clean up stream processor ${processorName}:`,
119+
err
120+
);
121+
}
122+
});
123+
124+
it('can list stream processors', async function () {
125+
const listResult = await shell.executeLine(`sp.listStreamProcessors()`);
126+
// make sure the processor created in the beforeEach is present
127+
expect(listResult).to.include(`name: '${processorName}'`);
128+
});
129+
130+
it('can start and stop a stream processor', async function () {
131+
// this should be a unique collection for this test run, so no data to start
132+
const initialDocsCount = await collection.countDocuments();
133+
expect(initialDocsCount).to.eq(0);
134+
135+
const startResult = await shell.executeLine(
136+
`sp.${processorName}.start()`
137+
);
138+
expect(startResult).to.include('{ ok: 1 }');
139+
140+
// sleep for a bit to let the processor do stuff
141+
await sleep(500);
142+
143+
const stopResult = await shell.executeLine(`sp.${processorName}.stop()`);
144+
expect(stopResult).to.include('{ ok: 1 }');
145+
146+
const updatedDocCount = await collection.countDocuments();
147+
expect(updatedDocCount).to.be.greaterThan(0);
148+
149+
// sleep again to make sure the processor isn't doing any more inserts
150+
await sleep(500);
151+
152+
const countAfterStopping = await collection.countDocuments();
153+
expect(countAfterStopping).to.eq(updatedDocCount);
154+
});
155+
156+
it(`can modify an existing stream processor's pipeline`, async function () {
157+
// this field is not present on any docs emit by the stream processor
158+
// created in the beforeEach
159+
const newField = 'newField';
160+
161+
const startResult = await shell.executeLine(
162+
`sp.${processorName}.start()`
163+
);
164+
expect(startResult).to.include('{ ok: 1 }');
165+
166+
// sleep for a bit to let the processor do stuff
167+
await sleep(500);
168+
169+
const stopResult = await shell.executeLine(`sp.${processorName}.stop()`);
170+
expect(stopResult).to.include('{ ok: 1 }');
171+
172+
const initialDocsWithNewField = await collection.countDocuments({
173+
[newField]: { $exists: true },
174+
});
175+
expect(initialDocsWithNewField).to.eq(0);
176+
177+
// define a new pipeline that will append our newField to the docs the stream
178+
// processor inserts into the database
179+
const sourceStage = {
180+
$source: {
181+
connectionName: 'sample_stream_solar',
182+
},
183+
};
184+
185+
const addFieldStage = {
186+
$addFields: {
187+
newField,
188+
},
189+
};
190+
191+
const mergeStage = {
192+
$merge: {
193+
into: {
194+
connectionName: 'testClusterConnection',
195+
db: processorName,
196+
coll: collection.collectionName,
197+
},
198+
},
199+
};
200+
201+
const updatedAggPipeline = [sourceStage, addFieldStage, mergeStage];
202+
203+
const modifyResult = await shell.executeLine(
204+
`sp.${processorName}.modify(${JSON.stringify(updatedAggPipeline)})`
205+
);
206+
expect(modifyResult).to.include('{ ok: 1 }');
207+
208+
const secondStartResult = await shell.executeLine(
209+
`sp.${processorName}.start()`
210+
);
211+
expect(secondStartResult).to.include('{ ok: 1 }');
212+
213+
// sleep again to let the processor work again with the updated pipeline
214+
await sleep(500);
215+
216+
const updatedDocsWithNewField = await collection.countDocuments({
217+
[newField]: { $exists: true },
218+
});
219+
expect(updatedDocsWithNewField).to.be.greaterThan(0);
220+
});
221+
222+
it('can view stats for a stream processor', async function () {
223+
const statsResult = await shell.executeLine(
224+
`sp.${processorName}.stats()`
225+
);
226+
expect(statsResult).to.include(`name: '${processorName}'`);
227+
expect(statsResult).to.include(`state: 'CREATED'`);
228+
expect(statsResult).to.include('stats: {');
229+
expect(statsResult).to.include(`pipeline: [`);
230+
expect(statsResult).to.include(
231+
`{ '$source': { connectionName: 'sample_stream_solar' } },`
232+
);
233+
});
234+
});
235+
236+
describe('sampling from a running stream processor', function () {
237+
beforeEach(async function () {
238+
shell = this.startTestShell({
239+
args: [
240+
STREAMS_E2E_SPI_CONNECTION_STRING,
241+
'--tls',
242+
'--authenticationDatabase admin',
243+
'--username',
244+
STREAMS_E2E_DB_USER,
245+
'--password',
246+
STREAMS_E2E_DB_PASSWORD,
247+
],
248+
removeSigintListeners: true,
249+
});
250+
await shell.waitForPromptOrExit({ timeout: 60_000 });
251+
});
252+
253+
it('should output streamed documents to the shell', async function () {
254+
// this processor is pre-defined on the cloud-dev test project
255+
// it reads from sample solar stream, appends a field with the processor name to each doc, and
256+
// inserts the docs into an Atlas collection
257+
const immortalProcessorName = 'immortalProcessor';
258+
259+
shell.writeInputLine(`sp.${immortalProcessorName}.sample()`);
260+
// data from the sample solar stream isn't deterministic, so just assert that
261+
// the processorName field appears in the shell output after sampling
262+
await eventually(() => {
263+
shell.assertContainsOutput(`processorName: '${immortalProcessorName}'`);
264+
});
265+
});
266+
});
267+
268+
describe('creating an interactive stream processor with .process()', function () {
269+
let interactiveId = '';
270+
const collectionName = 'processedData';
271+
272+
beforeEach(async function () {
273+
shell = this.startTestShell({
274+
args: [
275+
STREAMS_E2E_SPI_CONNECTION_STRING,
276+
'--tls',
277+
'--authenticationDatabase admin',
278+
'--username',
279+
STREAMS_E2E_DB_USER,
280+
'--password',
281+
STREAMS_E2E_DB_PASSWORD,
282+
],
283+
removeSigintListeners: true,
284+
});
285+
await shell.waitForPromptOrExit({ timeout: 60_000 });
286+
287+
interactiveId = new bson.ObjectId().toHexString();
288+
});
289+
290+
it('should output streamed documents to the shell', async function () {
291+
// the pipeline for our interactive processor reads from sample solar stream, adds a
292+
// unique test id to each document, and inserts it into an Atlas collection
293+
const sourceStage = {
294+
$source: {
295+
connectionName: 'sample_stream_solar',
296+
},
297+
};
298+
299+
const addFieldStage = {
300+
$addFields: {
301+
interactiveId,
302+
},
303+
};
304+
305+
const mergeStage = {
306+
$merge: {
307+
into: {
308+
connectionName: 'testClusterConnection',
309+
db: interactiveId,
310+
coll: collectionName,
311+
},
312+
},
313+
};
314+
315+
const aggPipeline = [sourceStage, addFieldStage, mergeStage];
316+
317+
shell.writeInputLine(`sp.process(${JSON.stringify(aggPipeline)})`);
318+
// data from the sample solar stream isn't deterministic, so just assert that
319+
// the interactiveId field appears in the shell output after sampling
320+
await eventually(
321+
() => {
322+
shell.assertContainsOutput(`interactiveId: '${interactiveId}'`);
323+
},
324+
{ timeout: 60_000 }
325+
);
326+
});
327+
});
328+
});

0 commit comments

Comments
 (0)