Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve peer test #288

Merged
merged 1 commit into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions Networking/Sources/Networking/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
case connecting(continuations: [CheckedContinuation<Void, Error>])
case connected(publicKey: Data)
case closed
case closing
case reconnect(publicKey: Data)
}

Expand All @@ -51,7 +52,7 @@
nil
case let .connected(publicKey):
publicKey
case .closed:
case .closed, .closing:
nil
case let .reconnect(publicKey):
publicKey
Expand Down Expand Up @@ -99,30 +100,41 @@
for continuation in continuations {
continuation.resume(throwing: ConnectionError.closed)
}
state = .closed
}
state = .closed
}
}

func closing() {
state.write { state in
if case let .connecting(continuations) = state {
for continuation in continuations {
continuation.resume(throwing: ConnectionError.closed)
}
}

Check warning on line 114 in Networking/Sources/Networking/Connection.swift

View check run for this annotation

Codecov / codecov/patch

Networking/Sources/Networking/Connection.swift#L111-L114

Added lines #L111 - L114 were not covered by tests
state = .closing
}
}

func reconnect(publicKey: Data) {
state.write { state in
if case let .connecting(continuations) = state {
for continuation in continuations {
continuation.resume(throwing: ConnectionError.reconnect)
}
state = .reconnect(publicKey: publicKey)
}
state = .reconnect(publicKey: publicKey)
}
}

public var isClosed: Bool {
state.read {
if case .closed = $0 {
return true
switch $0 {
case .closed, .closing:
true
case .connected, .reconnect, .connecting:
false
}
return false
}
}

Expand All @@ -140,11 +152,7 @@
switch $0 {
case .connecting:
false
case .connected:
true
case .closed:
true
case .reconnect:
case .connected, .closed, .closing, .reconnect:
true
}
}
Expand All @@ -165,6 +173,7 @@
}

public func close(abort: Bool = false) {
closing()
try? connection.shutdown(errorCode: abort ? 1 : 0) // TODO: define some error code
}

Expand Down
12 changes: 8 additions & 4 deletions Networking/Sources/Networking/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,17 @@
public func broadcast(
kind: Handler.PresistentHandler.StreamKind, message: Handler.PresistentHandler.Message
) {
let connections = impl.connections.read { connections in
connections.byId.values
}
guard let messageData = try? message.encode() else {
impl.logger.warning("Failed to encode message: \(message)")
return
}
let connections = impl.connections.read { connections in
connections.byId.values
}
for connection in connections {
if connection.isClosed {
continue

Check warning on line 189 in Networking/Sources/Networking/Peer.swift

View check run for this annotation

Codecov / codecov/patch

Networking/Sources/Networking/Peer.swift#L189

Added line #L189 was not covered by tests
}
if let stream = try? connection.createPreistentStream(kind: kind) {
Task {
let res = await Result {
Expand All @@ -210,8 +213,9 @@
}

// there should be only one connection per peer
// exlcude closed connections
public var peersCount: Int {
impl.connections.read { $0.byId.count }
impl.connections.read { $0.byId.count { $0.value.isClosed == false } }
}

public var peersRole: PeerRole {
Expand Down
9 changes: 5 additions & 4 deletions Networking/Tests/NetworkingTests/PeerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,9 @@ struct PeerTests {
let con = try peer.connect(to: centerPeer.listenAddress(), role: .builder)
try await con.ready()
}
// Simulate close connections 1~3s
try? await Task.sleep(for: .milliseconds(1000))

#expect(centerPeer.peersCount == 3)

centerPeer.broadcast(kind: .uniqueA, message: .init(kind: .uniqueA, data: Data("connection rotation strategy".utf8)))
try? await Task.sleep(for: .milliseconds(100))
var receivedCount = 0
Expand Down Expand Up @@ -546,7 +547,7 @@ struct PeerTests {
#expect(receivedData == messageData + Data(" response".utf8))
try? await Task.sleep(for: .milliseconds(100))
// Simulate abnormal shutdown of connections
connection.close(abort: true)
try connection.connection.shutdown(errorCode: 1)
// Wait to simulate downtime & reconnected 3~5s
try? await Task.sleep(for: .milliseconds(3000))
peer1.broadcast(
Expand Down Expand Up @@ -650,7 +651,7 @@ struct PeerTests {
#expect(receivedData == messageData + Data(" response".utf8))
try? await Task.sleep(for: .milliseconds(100))
// Simulate a peer failure by disconnecting one peer
connection.close(abort: false)
try connection.connection.shutdown()
// Wait to simulate downtime
try? await Task.sleep(for: .milliseconds(200))
// Reconnect the failing peer
Expand Down