Skip to content

Commit

Permalink
scalafmt
Browse files Browse the repository at this point in the history
  • Loading branch information
jtjeferreira committed Oct 1, 2024
1 parent 8fcc920 commit 4d49c30
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import scala.util.control.NoStackTrace
* Mixed into the Http2ServerDemux graph logic.
*/
@InternalApi
//noinspection ConvertibleToMethodValue,ScalaWeakerAccess,ScalaUnusedSymbol
private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper { self =>
// required API from demux
def isServer: Boolean
Expand Down Expand Up @@ -446,9 +445,10 @@ private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper
case _: WindowUpdateFrame =>
// We're not planning on sending any data on this stream anymore, so we don't care about window updates.
this
case rst@RstStreamFrame(streamId, _) =>
val headers = ParsedHeadersFrame(streamId, endStream = false, Seq((":status", "429")), None)
dispatchSubstream(headers, Right(Source.failed(new PeerClosedStreamException(rst.streamId, rst.errorCode))), correlationAttributes)
case rst: RstStreamFrame =>
val headers = ParsedHeadersFrame(rst.streamId, endStream = false, Seq((":status", "429")), None)
dispatchSubstream(headers, Right(Source.failed(new PeerClosedStreamException(rst.streamId, rst.errorCode))),
correlationAttributes)
Closed
case _ =>
expectIncomingStream(event, Closed, HalfClosedLocal(_), correlationAttributes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ import scala.collection.immutable
* * if applicable: provide response frames
* * validate the produced application-level responses
*/
//noinspection TypeAnnotation
class Http2ClientSpec extends PekkoSpecWithMaterializer("""
pekko.http.client.remote-address-header = on
pekko.http.client.http2.log-frames = on
Expand Down Expand Up @@ -292,23 +291,24 @@ class Http2ClientSpec extends PekkoSpecWithMaterializer("""
val dynamicTableUpdateTo8192 = ByteString(63, 225, 63)
headerPayload.take(3) shouldBe dynamicTableUpdateTo8192
})
"close stream if peer sends RST_STREAM frame with REFUSED_STREAM".inAssertAllStagesStopped(new TestSetup with NetProbes {
val data = ByteString("abcd")
user.emitRequest(Post("/", HttpEntity(data)))
val TheStreamId = network.expect[HeadersFrame]().streamId
network.expectDATA(TheStreamId, endStream = true, data)
"close stream if peer sends RST_STREAM frame with REFUSED_STREAM".inAssertAllStagesStopped(
new TestSetup with NetProbes {
val data = ByteString("abcd")
user.emitRequest(Post("/", HttpEntity(data)))
val TheStreamId = network.expect[HeadersFrame]().streamId
network.expectDATA(TheStreamId, endStream = true, data)

network.sendRST_STREAM(TheStreamId, ErrorCode.REFUSED_STREAM)
network.sendRST_STREAM(TheStreamId, ErrorCode.REFUSED_STREAM)

val response = user.expectResponse()
response.status should be(StatusCodes.TooManyRequests)
val response = user.expectResponse()
response.status should be(StatusCodes.TooManyRequests)

val entityDataIn = ByteStringSinkProbe(response.entity.dataBytes)
val error = entityDataIn.expectError()
error.getMessage shouldBe "Stream with ID [1] was closed by peer with code REFUSED_STREAM(0x07)"
val entityDataIn = ByteStringSinkProbe(response.entity.dataBytes)
val error = entityDataIn.expectError()
error.getMessage shouldBe "Stream with ID [1] was closed by peer with code REFUSED_STREAM(0x07)"

connectionShouldStillBeUsable()
})
connectionShouldStillBeUsable()
})
}

"support stream for response data" should {
Expand Down

0 comments on commit 4d49c30

Please sign in to comment.