-
Notifications
You must be signed in to change notification settings - Fork 75
test: add success e2e tests for stream processing commands MONGOSH-2127 #2459
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 1 commit
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
d2ffa06
test: add success e2e tests for stream processing commands MONGOSH-2127
malexandert fb5ceca
add opts to executeLine; increase waitForPrompt timeouts in tests
malexandert 263d9cb
Merge branch 'main' into MONGOSH-2127
addaleax db857b8
move .process()/.sample() prompt inputs and assertions into eventually()
malexandert File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,328 @@ | ||
import { bson } from '@mongosh/service-provider-core'; | ||
import type { Db, Collection, Document } from '@mongosh/service-provider-core'; | ||
import { MongoClient } from 'mongodb'; | ||
import { expect } from 'chai'; | ||
import type { TestShell } from './test-shell'; | ||
import { sleep } from './util-helpers'; | ||
import { eventually } from '../../../testing/eventually'; | ||
|
||
const { | ||
STREAMS_E2E_SPI_CONNECTION_STRING = '', | ||
STREAMS_E2E_DB_USER = '', | ||
STREAMS_E2E_DB_PASSWORD = '', | ||
STREAMS_E2E_CLUSTER_CONNECTION_STRING = '', | ||
} = process.env; | ||
|
||
describe('e2e Streams', function () { | ||
this.timeout(60_000); | ||
let shell: TestShell; | ||
|
||
before(function () { | ||
if (!STREAMS_E2E_SPI_CONNECTION_STRING) { | ||
console.error( | ||
'Stream Instance connection string not found - skipping Streams E2E tests...' | ||
); | ||
return this.skip(); | ||
} | ||
|
||
if (!STREAMS_E2E_CLUSTER_CONNECTION_STRING) { | ||
console.error( | ||
'Cluster connection string not found - skipping Streams E2E tests...' | ||
); | ||
return this.skip(); | ||
} | ||
|
||
if (!STREAMS_E2E_DB_USER) { | ||
console.error( | ||
'Atlas database user for Stream Processing not found - skipping Streams E2E tests...' | ||
); | ||
return this.skip(); | ||
} | ||
|
||
if (!STREAMS_E2E_DB_PASSWORD) { | ||
console.error( | ||
'Password for Atlas database user not found - skipping Streams E2E tests...' | ||
); | ||
return this.skip(); | ||
} | ||
}); | ||
|
||
describe('basic stream processor operations', function () { | ||
let processorName = ''; | ||
let db: Db; | ||
let collection: Collection<Document>; | ||
let client: MongoClient; | ||
|
||
beforeEach(async function () { | ||
shell = this.startTestShell({ | ||
args: [ | ||
STREAMS_E2E_SPI_CONNECTION_STRING, | ||
'--tls', | ||
'--authenticationDatabase admin', | ||
'--username', | ||
STREAMS_E2E_DB_USER, | ||
'--password', | ||
STREAMS_E2E_DB_PASSWORD, | ||
], | ||
removeSigintListeners: true, | ||
}); | ||
await shell.waitForPromptOrExit({ timeout: 60_000 }); | ||
|
||
processorName = `spi${new bson.ObjectId().toHexString()}`; | ||
client = await MongoClient.connect( | ||
STREAMS_E2E_CLUSTER_CONNECTION_STRING, | ||
{} | ||
); | ||
db = client.db(processorName); | ||
const collectionName = 'processedData'; | ||
collection = db.collection(collectionName); | ||
|
||
// this stream processor reads from the sample stream and inserts documents into an Atlas database | ||
const sourceStage = { | ||
$source: { | ||
connectionName: 'sample_stream_solar', | ||
}, | ||
}; | ||
|
||
const mergeStage = { | ||
$merge: { | ||
into: { | ||
connectionName: 'testClusterConnection', | ||
db: processorName, | ||
coll: collectionName, | ||
}, | ||
}, | ||
}; | ||
|
||
const aggPipeline = [sourceStage, mergeStage]; | ||
|
||
const createResult = await shell.executeLine( | ||
`sp.createStreamProcessor("${processorName}", ${JSON.stringify( | ||
aggPipeline | ||
)})` | ||
); | ||
expect(createResult).to.include( | ||
`Atlas Stream Processor: ${processorName}` | ||
); | ||
}); | ||
|
||
afterEach(async function () { | ||
try { | ||
await db.dropDatabase(); | ||
await client.close(); | ||
|
||
const result = await shell.executeLine(`sp.${processorName}.drop()`); | ||
expect(result).to.include(`{ ok: 1 }`); | ||
} catch (err: any) { | ||
console.error( | ||
`Could not clean up stream processor ${processorName}:`, | ||
err | ||
); | ||
} | ||
}); | ||
|
||
it('can list stream processors', async function () { | ||
const listResult = await shell.executeLine(`sp.listStreamProcessors()`); | ||
// make sure the processor created in the beforeEach is present | ||
expect(listResult).to.include(`name: '${processorName}'`); | ||
}); | ||
|
||
it('can start and stop a stream processor', async function () { | ||
// this should be a unique collection for this test run, so no data to start | ||
const initialDocsCount = await collection.countDocuments(); | ||
expect(initialDocsCount).to.eq(0); | ||
|
||
const startResult = await shell.executeLine( | ||
`sp.${processorName}.start()` | ||
); | ||
expect(startResult).to.include('{ ok: 1 }'); | ||
|
||
// sleep for a bit to let the processor do stuff | ||
await sleep(500); | ||
|
||
const stopResult = await shell.executeLine(`sp.${processorName}.stop()`); | ||
expect(stopResult).to.include('{ ok: 1 }'); | ||
|
||
const updatedDocCount = await collection.countDocuments(); | ||
expect(updatedDocCount).to.be.greaterThan(0); | ||
|
||
// sleep again to make sure the processor isn't doing any more inserts | ||
await sleep(500); | ||
|
||
const countAfterStopping = await collection.countDocuments(); | ||
expect(countAfterStopping).to.eq(updatedDocCount); | ||
}); | ||
|
||
it(`can modify an existing stream processor's pipeline`, async function () { | ||
// this field is not present on any docs emit by the stream processor | ||
// created in the beforeEach | ||
const newField = 'newField'; | ||
|
||
const startResult = await shell.executeLine( | ||
`sp.${processorName}.start()` | ||
); | ||
expect(startResult).to.include('{ ok: 1 }'); | ||
|
||
// sleep for a bit to let the processor do stuff | ||
await sleep(500); | ||
|
||
const stopResult = await shell.executeLine(`sp.${processorName}.stop()`); | ||
expect(stopResult).to.include('{ ok: 1 }'); | ||
|
||
const initialDocsWithNewField = await collection.countDocuments({ | ||
[newField]: { $exists: true }, | ||
}); | ||
expect(initialDocsWithNewField).to.eq(0); | ||
|
||
// define a new pipeline that will append our newField to the docs the stream | ||
// processor inserts into the database | ||
const sourceStage = { | ||
$source: { | ||
connectionName: 'sample_stream_solar', | ||
}, | ||
}; | ||
|
||
const addFieldStage = { | ||
$addFields: { | ||
newField, | ||
}, | ||
}; | ||
|
||
const mergeStage = { | ||
$merge: { | ||
into: { | ||
connectionName: 'testClusterConnection', | ||
db: processorName, | ||
coll: collection.collectionName, | ||
}, | ||
}, | ||
}; | ||
|
||
const updatedAggPipeline = [sourceStage, addFieldStage, mergeStage]; | ||
|
||
const modifyResult = await shell.executeLine( | ||
`sp.${processorName}.modify(${JSON.stringify(updatedAggPipeline)})` | ||
); | ||
expect(modifyResult).to.include('{ ok: 1 }'); | ||
|
||
const secondStartResult = await shell.executeLine( | ||
`sp.${processorName}.start()` | ||
); | ||
expect(secondStartResult).to.include('{ ok: 1 }'); | ||
|
||
// sleep again to let the processor work again with the updated pipeline | ||
await sleep(500); | ||
|
||
const updatedDocsWithNewField = await collection.countDocuments({ | ||
[newField]: { $exists: true }, | ||
}); | ||
expect(updatedDocsWithNewField).to.be.greaterThan(0); | ||
}); | ||
|
||
it('can view stats for a stream processor', async function () { | ||
const statsResult = await shell.executeLine( | ||
`sp.${processorName}.stats()` | ||
); | ||
expect(statsResult).to.include(`name: '${processorName}'`); | ||
expect(statsResult).to.include(`state: 'CREATED'`); | ||
expect(statsResult).to.include('stats: {'); | ||
expect(statsResult).to.include(`pipeline: [`); | ||
expect(statsResult).to.include( | ||
`{ '$source': { connectionName: 'sample_stream_solar' } },` | ||
); | ||
}); | ||
}); | ||
|
||
describe('sampling from a running stream processor', function () { | ||
beforeEach(async function () { | ||
shell = this.startTestShell({ | ||
args: [ | ||
STREAMS_E2E_SPI_CONNECTION_STRING, | ||
'--tls', | ||
'--authenticationDatabase admin', | ||
'--username', | ||
STREAMS_E2E_DB_USER, | ||
'--password', | ||
STREAMS_E2E_DB_PASSWORD, | ||
], | ||
removeSigintListeners: true, | ||
}); | ||
await shell.waitForPromptOrExit({ timeout: 60_000 }); | ||
}); | ||
|
||
it('should output streamed documents to the shell', async function () { | ||
// this processor is pre-defined on the cloud-dev test project | ||
// it reads from sample solar stream, appends a field with the processor name to each doc, and | ||
// inserts the docs into an Atlas collection | ||
const immortalProcessorName = 'immortalProcessor'; | ||
|
||
shell.writeInputLine(`sp.${immortalProcessorName}.sample()`); | ||
// data from the sample solar stream isn't deterministic, so just assert that | ||
// the processorName field appears in the shell output after sampling | ||
await eventually(() => { | ||
shell.assertContainsOutput(`processorName: '${immortalProcessorName}'`); | ||
}); | ||
}); | ||
}); | ||
|
||
describe('creating an interactive stream processor with .process()', function () { | ||
let interactiveId = ''; | ||
const collectionName = 'processedData'; | ||
|
||
beforeEach(async function () { | ||
shell = this.startTestShell({ | ||
args: [ | ||
STREAMS_E2E_SPI_CONNECTION_STRING, | ||
'--tls', | ||
'--authenticationDatabase admin', | ||
'--username', | ||
STREAMS_E2E_DB_USER, | ||
'--password', | ||
STREAMS_E2E_DB_PASSWORD, | ||
], | ||
removeSigintListeners: true, | ||
}); | ||
await shell.waitForPromptOrExit({ timeout: 60_000 }); | ||
|
||
interactiveId = new bson.ObjectId().toHexString(); | ||
}); | ||
|
||
it('should output streamed documents to the shell', async function () { | ||
// the pipeline for our interactive processor reads from sample solar stream, adds a | ||
// unique test id to each document, and inserts it into an Atlas collection | ||
const sourceStage = { | ||
$source: { | ||
connectionName: 'sample_stream_solar', | ||
}, | ||
}; | ||
|
||
const addFieldStage = { | ||
$addFields: { | ||
interactiveId, | ||
}, | ||
}; | ||
|
||
const mergeStage = { | ||
$merge: { | ||
into: { | ||
connectionName: 'testClusterConnection', | ||
db: interactiveId, | ||
coll: collectionName, | ||
}, | ||
}, | ||
}; | ||
|
||
const aggPipeline = [sourceStage, addFieldStage, mergeStage]; | ||
|
||
shell.writeInputLine(`sp.process(${JSON.stringify(aggPipeline)})`); | ||
// data from the sample solar stream isn't deterministic, so just assert that | ||
// the interactiveId field appears in the shell output after sampling | ||
await eventually( | ||
() => { | ||
shell.assertContainsOutput(`interactiveId: '${interactiveId}'`); | ||
}, | ||
{ timeout: 60_000 } | ||
); | ||
}); | ||
}); | ||
}); |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.