Skip to content

Add alternative provider retrieval measurement #571

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/bin/spark.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ assert(DEAL_INGESTER_TOKEN, 'DEAL_INGESTER_TOKEN is required')
const client = new pg.Pool({
connectionString: DATABASE_URL,
// allow the pool to close all connections and become empty
min: 0,
// min: 0,
// this values should correlate with service concurrency hard_limit configured in fly.toml
// and must take into account the connection limit of our PG server, see
// https://fly.io/docs/postgres/managing/configuration-tuning/
Expand Down
36 changes: 33 additions & 3 deletions api/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ const createMeasurement = async (req, res, client) => {
validate(measurement, 'stationId', { type: 'string', required: true })
assert(measurement.stationId.match(/^[0-9a-fA-F]{88}$/), 400, 'Invalid Station ID')

if (measurement.alternativeProviderCheck) {
validate(measurement, 'alternativeProviderCheck', { type: 'object', required: false })
validate(measurement.alternativeProviderCheck, 'statusCode', { type: 'number', required: false })
validate(measurement.alternativeProviderCheck, 'timeout', { type: 'boolean', required: false })
validate(measurement.alternativeProviderCheck, 'carTooLarge', { type: 'boolean', required: false })
validate(measurement.alternativeProviderCheck, 'endAt', { type: 'date', required: false })
validate(measurement.alternativeProviderCheck, 'protocol', { type: 'string', required: false })
validate(measurement.alternativeProviderCheck, 'providerId', { type: 'string', required: false })
}

const inetGroup = await mapRequestToInetGroup(client, req)
logNetworkInfo(req.headers, measurement.stationId, recordNetworkInfoTelemetry)

Expand All @@ -124,10 +134,16 @@ const createMeasurement = async (req, res, client) => {
indexer_result,
miner_id,
provider_id,
alternative_provider_check_status_code,
alternative_provider_check_timeout,
alternative_provider_check_car_too_large,
alternative_provider_check_end_at,
alternative_provider_check_protocol,
alternative_provider_check_provider_id,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throwing this out there: We could alternatively implement this in such a way that after the alternative provider check has been completed, two measurements will have been created. One would link to the other. This would save us from having to duplicate the measurement schema inside itself. I don't think this is worth it yet though

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No that suggestion is really not important since spark-api's business is just buffering measurements until it flushes them again. If anything, we should discuss this in a repo that's further down the data processing pipeline

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this a bit and you are right. What started of as adding one field (status code for alternative retrieval) turned up into duplicating code a lot. We might be better off with adding a relationship between columns in the measurements table between regular and alternative provider check. That way we could avoid duplicating code down the processing and evaluation pipeline.

completed_at_round
)
SELECT
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21,
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27
id as completed_at_round
FROM spark_rounds
ORDER BY id DESC
Expand All @@ -154,7 +170,13 @@ const createMeasurement = async (req, res, client) => {
measurement.carChecksum,
measurement.indexerResult,
measurement.minerId,
measurement.providerId
measurement.providerId,
measurement.alternativeProviderCheck?.statusCode,
measurement.alternativeProviderCheck?.timeout,
measurement.alternativeProviderCheck?.carTooLarge ?? false,
measurement.alternativeProviderCheck?.endAt,
measurement.alternativeProviderCheck?.protocol,
measurement.alternativeProviderCheck?.providerId
])
json(res, { id: rows[0].id })
}
Expand Down Expand Up @@ -190,7 +212,15 @@ const getMeasurement = async (req, res, client, measurementId) => {
endAt: resultRow.end_at,
byteLength: resultRow.byte_length,
carTooLarge: resultRow.car_too_large,
attestation: resultRow.attestation
attestation: resultRow.attestation,
alternativeProviderCheck: {
statusCode: resultRow.alternative_provider_check_status_code,
timeout: resultRow.alternative_provider_check_timeout,
carTooLarge: resultRow.alternative_provider_check_car_too_large,
endAt: resultRow.alternative_provider_check_end_at,
protocol: resultRow.alternative_provider_check_protocol,
providerId: resultRow.alternative_provider_check_provider_id
}
})
}

Expand Down
19 changes: 18 additions & 1 deletion api/test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,15 @@ const VALID_MEASUREMENT = {
carChecksum: 'somehash',
minerId: 'f02abc',
providerId: 'provider-pubkey',
indexerResult: 'OK'
indexerResult: 'OK',
alternativeProviderCheck: {
statusCode: 200,
timeout: false,
carTooLarge: false,
endAt: new Date(),
protocol: 'graphsync',
providerId: 'alt-provider-pubkey'
}
}

const assertResponseStatus = async (res, status) => {
Expand Down Expand Up @@ -194,6 +202,15 @@ describe('Routes', () => {
assert.strictEqual(measurementRow.miner_id, measurement.minerId)
assert.strictEqual(measurementRow.provider_id, measurement.providerId)
assert.strictEqual(measurementRow.station_id, measurement.stationId)
assert.strictEqual(measurementRow.alternative_provider_check_status_code, measurement.alternativeProviderCheck.statusCode)
assert.strictEqual(measurementRow.alternative_provider_check_timeout, measurement.alternativeProviderCheck.timeout)
assert.strictEqual(measurementRow.alternative_provider_check_car_too_large, measurement.alternativeProviderCheck.carTooLarge)
assert.strictEqual(
measurementRow.alternative_provider_check_end_at.toJSON(),
measurement.alternativeProviderCheck.endAt.toJSON()
)
assert.strictEqual(measurementRow.alternative_provider_check_protocol, measurement.alternativeProviderCheck.protocol)
assert.strictEqual(measurementRow.alternative_provider_check_provider_id, measurement.alternativeProviderCheck.providerId)
})

it('allows older format with walletAddress', async () => {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
ALTER TABLE measurements
ADD COLUMN alternative_provider_check_status_code INTEGER,
ADD COLUMN alternative_provider_check_timeout BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN alternative_provider_check_car_too_large BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN alternative_provider_check_end_at TIMESTAMPTZ,
ADD COLUMN alternative_provider_check_protocol protocol,
ADD COLUMN alternative_provider_check_provider_id TEXT;
8 changes: 7 additions & 1 deletion publish/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,13 @@ export const publish = async ({
provider_id,
cid,
provider_address,
protocol
protocol,
alternative_provider_check_status_code,
alternative_provider_check_timeout,
alternative_provider_check_car_too_large,
alternative_provider_check_end_at,
alternative_provider_check_protocol,
alternative_provider_check_provider_id
FROM measurements
LIMIT $1
`, [
Expand Down
13 changes: 11 additions & 2 deletions publish/test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,17 @@ describe('integration', () => {
carTooLarge: true,
minerId: 'f02abc',
providerId: 'provider-pubkey',
round: 42
}]
round: 42,
alternativeProviderCheck: {
statusCode: 200,
timeout: false,
carTooLarge: false,
endAt: new Date(),
protocol: 'graphsync',
providerId: 'alt-provider-pubkey'
}
}
]

for (const measurement of measurements) {
await insertMeasurement(client, measurement)
Expand Down
Loading