Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Work-package submission #290

Merged
merged 16 commits into from
Feb 20, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ public enum RuntimeEvents {
public let block: BlockRef
}

// New WorkPackagesReceived from network
public struct WorkPackagesReceived: Event {
public let items: [WorkPackage]

public init(items: [WorkPackage]) {
self.items = items
}
}

// New WorkPackagesGenerated by Guaranteeing Service
public struct WorkPackagesGenerated: Event {
public let items: [WorkPackage]
Expand Down
10 changes: 8 additions & 2 deletions Blockchain/Sources/Blockchain/Types/RefinementContext.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import Utils

// A refinement context, denoted by the set X, describes the context of the chain
// at the point that the report’s corresponding work-package was evaluated.
public struct RefinementContext: Comparable, Sendable, Equatable, Codable {
public struct Anchor: Comparable, Sendable, Equatable, Codable {
public struct RefinementContext: Comparable, Sendable, Equatable, Codable, Hashable {
public struct Anchor: Comparable, Sendable, Equatable, Codable, Hashable {
// a
public var headerHash: Data32
// s
Expand Down Expand Up @@ -74,6 +74,12 @@ public struct RefinementContext: Comparable, Sendable, Equatable, Codable {
}
return lhs.lookupAnchor < rhs.lookupAnchor
}

public func hash(into hasher: inout Hasher) {
hasher.combine(anchor)
hasher.combine(lookupAnchor)
hasher.combine(prerequisiteWorkPackages)
}
}

extension RefinementContext: Dummy {
Expand Down
6 changes: 3 additions & 3 deletions Blockchain/Sources/Blockchain/Types/WorkItem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ import Foundation
import Utils

// I
public struct WorkItem: Sendable, Equatable, Codable {
public struct ImportedDataSegment: Sendable, Equatable, Codable {
public enum DataSegmentRootKind: Sendable, Equatable {
public struct WorkItem: Sendable, Equatable, Codable, Hashable {
public struct ImportedDataSegment: Sendable, Equatable, Codable, Hashable {
public enum DataSegmentRootKind: Sendable, Equatable, Hashable {
case segmentRoot(Data32)
case workPackageHash(Data32)
}
Expand Down
12 changes: 12 additions & 0 deletions Blockchain/Sources/Blockchain/Types/WorkPackage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ public struct WorkPackage: Comparable, Sendable, Equatable, Codable {
}
}

extension WorkPackage: Hashable {
public func hash(into hasher: inout Hasher) {
hasher.combine(authorizationToken)
hasher.combine(authorizationServiceIndex)
hasher.combine(authorizationCodeHash)
hasher.combine(parameterizationBlob)
hasher.combine(context)
hasher.combine(workItems.count)
workItems.forEach { hasher.combine($0) }
}
}

extension WorkPackage {
public func hash() -> Data32 {
try! JamEncoder.encode(self).blake2b256hash()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ public final class WorkPackagePoolService: ServiceBase, @unchecked Sendable {
await subscribe(RuntimeEvents.WorkPackagesGenerated.self, id: "WorkPackagePool.WorkPackagesGenerated") { [weak self] event in
try await self?.on(workPackagesGenerated: event)
}
await subscribe(RuntimeEvents.WorkPackagesReceived.self, id: "WorkPackagePool.WorkPackagesReceived") { [weak self] event in
try await self?.on(workPackagesReceived: event)
}
// TODO: add remove subscribe
// TODO: add receive subscribe?
}

private func on(workPackagesGenerated event: RuntimeEvents.WorkPackagesGenerated) async throws {
Expand All @@ -74,6 +76,12 @@ public final class WorkPackagePoolService: ServiceBase, @unchecked Sendable {
await storage.add(packages: event.items, config: config)
}

private func on(workPackagesReceived event: RuntimeEvents.WorkPackagesReceived) async throws {
let state = try await dataProvider.getBestState()
try await storage.update(state: state, config: config)
await storage.add(packages: event.items, config: config)
}

public func update(state: StateRef, config: ProtocolConfigRef) async throws {
try await storage.update(state: state, config: config)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,26 @@ struct WorkPackagePoolServiceTests {
allWorkPackages.append(workpackage)
}
await services.eventBus.publish(RuntimeEvents.WorkPackagesGenerated(items: allWorkPackages))
let workPackages = await workPackagecPoolService.getWorkPackages()
var workPackages = await workPackagecPoolService.getWorkPackages()
#expect(workPackages.array == Array(allWorkPackages).sorted())
let workpackage = WorkPackage.dummy(config: services.config)
try await workPackagecPoolService.addWorkPackages(packages: [workpackage])
try await workPackagecPoolService.removeWorkPackages(packages: [workpackage])
#expect(workPackages.array.count == services.config.value.totalNumberOfCores)
let event = RuntimeEvents.WorkPackagesReceived(items: [workpackage])
await services.eventBus.publish(event)
await services.eventBus.publish(event) // duplicate
// Wait for the event to be processed
await services.storeMiddleware.wait()
workPackages = await workPackagecPoolService.getWorkPackages()
#expect(workPackages.array.count > services.config.value.totalNumberOfCores)
await services.blockchain.publish(event: RuntimeEvents.WorkPackagesReceived(
items: [
workpackage,
]
))
await services.storeMiddleware.wait()
workPackages = await workPackagecPoolService.getWorkPackages()
#expect(workPackages.array.count > services.config.value.totalNumberOfCores)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
case blockRequest(BlockRequest)
case safroleTicket1(SafroleTicketMessage)
case safroleTicket2(SafroleTicketMessage)
case workPackageSubmission(WorkPackageMessage)
}

extension CERequest: RequestProtocol {
Expand All @@ -20,6 +21,8 @@
try JamEncoder.encode(message)
case let .safroleTicket2(message):
try JamEncoder.encode(message)
case let .workPackageSubmission(message):
try JamEncoder.encode(message)

Check warning on line 25 in Node/Sources/Node/NetworkingProtocol/CommonEphemeral/CERequest.swift

View check run for this annotation

Codecov / codecov/patch

Node/Sources/Node/NetworkingProtocol/CommonEphemeral/CERequest.swift#L25

Added line #L25 was not covered by tests
}
}

Expand All @@ -31,6 +34,8 @@
.safroleTicket1
case .safroleTicket2:
.safroleTicket2
case .workPackageSubmission:
.workPackageSubmission

Check warning on line 38 in Node/Sources/Node/NetworkingProtocol/CommonEphemeral/CERequest.swift

View check run for this annotation

Codecov / codecov/patch

Node/Sources/Node/NetworkingProtocol/CommonEphemeral/CERequest.swift#L38

Added line #L38 was not covered by tests
}
}

Expand All @@ -42,6 +47,8 @@
SafroleTicketMessage.self
case .safroleTicket2:
SafroleTicketMessage.self
case .workPackageSubmission:
WorkPackageMessage.self

Check warning on line 51 in Node/Sources/Node/NetworkingProtocol/CommonEphemeral/CERequest.swift

View check run for this annotation

Codecov / codecov/patch

Node/Sources/Node/NetworkingProtocol/CommonEphemeral/CERequest.swift#L51

Added line #L51 was not covered by tests
default:
fatalError("unimplemented")
}
Expand All @@ -64,6 +71,9 @@
return nil
}
return .safroleTicket2(message)
case .workPackageSubmission:
guard let message = data as? WorkPackageMessage else { return nil }
return .workPackageSubmission(message)

Check warning on line 76 in Node/Sources/Node/NetworkingProtocol/CommonEphemeral/CERequest.swift

View check run for this annotation

Codecov / codecov/patch

Node/Sources/Node/NetworkingProtocol/CommonEphemeral/CERequest.swift#L75-L76

Added lines #L75 - L76 were not covered by tests
default:
fatalError("unimplemented")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import Blockchain
import Codec
import Foundation

public struct WorkPackageMessage: Codable, Sendable, Equatable, Hashable {
/// The core index associated with the work-package.
public var coreIndex: CoreIndex

/// The work-package data.
public var workPackage: WorkPackage

/// The extrinsic data referenced by the work-package.
public var extrinsics: [Data]

public init(coreIndex: CoreIndex, workPackage: WorkPackage, extrinsics: [Data]) {
self.coreIndex = coreIndex
self.workPackage = workPackage
self.extrinsics = extrinsics
}
}
7 changes: 7 additions & 0 deletions Node/Sources/Node/NetworkingProtocol/NetworkManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,13 @@
]
))
return []
case let .workPackageSubmission(message):
blockchain.publish(event: RuntimeEvents.WorkPackagesReceived(
items: [
message.workPackage,
]
))
return []

Check warning on line 235 in Node/Sources/Node/NetworkingProtocol/NetworkManager.swift

View check run for this annotation

Codecov / codecov/patch

Node/Sources/Node/NetworkingProtocol/NetworkManager.swift#L230-L235

Added lines #L230 - L235 were not covered by tests
}
}

Expand Down
28 changes: 28 additions & 0 deletions Node/Tests/NodeTests/NetworkManagerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,34 @@ struct NetworkManagerTests {
)
}

@Test
func testWorkPackagesReceived() async throws {
// Create dummy work packages
let workPackages = [
WorkPackage.dummy(config: services.config),
]

// Publish WorkPackagesReceived event
await services.eventBus.publish(RuntimeEvents.WorkPackagesReceived(items: workPackages))

// Wait for event processing
await storeMiddleware.wait()

#expect(workPackages.first?.hash() != nil)
#expect(workPackages.first?.hashValue != nil)
// Verify network calls
#expect(
network.contain(calls: [
.init(function: "connect", parameters: ["address": devPeers.first!, "role": PeerRole.validator]),
.init(function: "sendToPeer", parameters: [
"message": CERequest.workPackageSubmission(
WorkPackageMessage(coreIndex: 0, workPackage: workPackages[0], extrinsics: [])
),
]),
])
)
}

@Test
func testBlockBroadcast() async throws {
// Import a block
Expand Down
2 changes: 1 addition & 1 deletion Utils/Sources/Utils/SaturatingNumber.swift
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Codec
import Numerics

public struct SaturatingNumber<T: FixedWidthInteger & Sendable>: Sendable {
public struct SaturatingNumber<T: FixedWidthInteger & Sendable>: Sendable, Hashable {
public private(set) var value: T

public static var max: SaturatingNumber {
Expand Down