forked from AudaxHealthInc/alpakka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathReferenceSpec.scala
141 lines (112 loc) · 3.99 KB
/
ReferenceSpec.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
/*
* Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
*/
package docs.scaladsl
import akka.{Done, NotUsed}
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.alpakka.reference.scaladsl.Reference
import akka.stream.alpakka.reference._
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.testkit.TestKit
import akka.util.ByteString
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}
import scala.collection.immutable
import scala.concurrent.Future
/**
* Append "Spec" to every Scala test suite.
*/
class ReferenceSpec extends WordSpec with BeforeAndAfterAll with ScalaFutures with Matchers {
implicit val sys = ActorSystem("ReferenceSpec")
implicit val mat: Materializer = ActorMaterializer()
final val ClientId = "test-client-id"
"reference connector" should {
/**
* Type annotations not generally needed on local variables.
* However it allows to check if the types are really what we want.
*/
"compile settings" in {
val providedAuth: Authentication.Provided =
Authentication.Provided().withVerifier(c => true)
val noAuth: Authentication.None =
Authentication.None
val settings: SourceSettings = SourceSettings(ClientId)
settings.withAuthentication(providedAuth)
settings.withAuthentication(noAuth)
}
"compile source" in {
// #source
val settings: SourceSettings = SourceSettings(ClientId)
val source: Source[ReferenceReadResult, Future[Done]] =
Reference.source(settings)
// #source
}
"compile flow" in {
// #flow
val flow: Flow[ReferenceWriteMessage, ReferenceWriteResult, NotUsed] =
Reference.flow()
// #flow
implicit val ec = scala.concurrent.ExecutionContext.global
val flow2: Flow[ReferenceWriteMessage, ReferenceWriteResult, NotUsed] =
Reference.flow()
}
"run source" in {
val source = Reference.source(SourceSettings(ClientId))
val msg = source.runWith(Sink.head).futureValue
msg.data should contain theSameElementsAs Seq(ByteString("one"))
}
"run flow" in {
val flow = Reference.flow()
val source = Source(
immutable.Seq(
ReferenceWriteMessage()
.withData(immutable.Seq(ByteString("one")))
.withMetrics(Map("rps" -> 20L, "rpm" -> 30L)),
ReferenceWriteMessage().withData(
immutable.Seq(
ByteString("two"),
ByteString("three"),
ByteString("four")
)
),
ReferenceWriteMessage().withData(
immutable.Seq(
ByteString("five"),
ByteString("six"),
ByteString("seven")
)
)
)
)
val result = source.via(flow).runWith(Sink.seq).futureValue
result.flatMap(_.message.data) should contain theSameElementsAs Seq(
"one",
"two",
"three",
"four",
"five",
"six",
"seven"
).map(ByteString.apply)
result.head.metrics.get("total") should contain(50L)
}
"resolve resource from application config" in {
val result = Source
.single(ReferenceWriteMessage().withData(immutable.Seq(ByteString("one"))))
.via(Reference.flowWithResource())
.runWith(Sink.seq)
result.futureValue.flatMap(_.message.data).map(_.utf8String) shouldBe Seq("one default msg")
}
"use resource from attributes" in {
val resource = Resource(ResourceSettings("attributes msg"))
val result = Source
.single(ReferenceWriteMessage().withData(immutable.Seq(ByteString("one"))))
.via(Reference.flowWithResource().withAttributes(ReferenceAttributes.resource(resource)))
.runWith(Sink.seq)
result.futureValue.flatMap(_.message.data).map(_.utf8String) shouldBe Seq("one attributes msg")
}
}
override def afterAll() =
TestKit.shutdownActorSystem(sys)
}