Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Play25 ws #23

Open
wants to merge 34 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
177991e
upgrade to play 2.5.1 WIP
ornicar Mar 30, 2016
1d6f6a0
Merge branch 'master' of github.com:ornicar/lila into play25
ornicar Mar 31, 2016
af189d1
only recompile messages when needed
ornicar Mar 31, 2016
1eb2fcd
play 2.5 WS migration WIP
ornicar Apr 1, 2016
a5e299c
Merge branch 'play25-ws' of github.com:ornicar/lila into play25-ws
ornicar Apr 1, 2016
aa63948
rewrite LilaSocket
ornicar Apr 1, 2016
d0656c8
play 2.5 WIP
ornicar Apr 2, 2016
54516a3
Merge branch 'killPRM' of github.com:ornicar/lila into play25
ornicar Apr 2, 2016
a8e5040
Merge branch 'killPRM' of github.com:ornicar/lila into play25
ornicar Apr 2, 2016
cecc37b
Merge branch 'killPRM' of github.com:ornicar/lila into play25
ornicar Apr 3, 2016
5c72663
Merge branch 'master' of github.com:ornicar/lila into play25
ornicar Apr 3, 2016
71a4402
play 2.5 migration WIP
ornicar Apr 3, 2016
9b1d82b
Global is deprecated, rewrite event handling
ornicar Apr 3, 2016
34fb45c
remove debug
ornicar Apr 3, 2016
9b6090c
Merge branch 'master' of github.com:ornicar/lila into play25
ornicar Apr 3, 2016
84b790d
explicit config
ornicar Apr 3, 2016
c8a0a18
fix forum DB queries
ornicar Apr 3, 2016
1d5cc0f
fix user.sha512 flag mapping
ornicar Apr 3, 2016
301cc43
tweak WS config
ornicar Apr 3, 2016
421c93f
tweak UserRepo
ornicar Apr 3, 2016
1295e3b
Merge branch 'play25' into play25-ws
ornicar Apr 3, 2016
91ffacb
fix build-deps
ornicar Apr 3, 2016
49f8fc9
remove deprecated tests
ornicar Apr 3, 2016
9e184f2
make PimpedJson an AnyVal
ornicar Apr 3, 2016
838e2f1
extract future extensions and make them an AnyVal
ornicar Apr 3, 2016
c562234
site actor compiles
ornicar Apr 4, 2016
61ce0ba
upgrade akka
ornicar Apr 4, 2016
91391d5
more rewrite of websocket code for akka streams
ornicar Apr 4, 2016
118bac6
implement WS rate limits with akka throttle flow
ornicar Apr 4, 2016
2651eb9
fix base.conf
ornicar Apr 4, 2016
9fe1571
simplify socket flow creation
ornicar Apr 4, 2016
553ce53
fix lobby burst
ornicar Apr 4, 2016
6f1dc19
rewrite event sources WIP
ornicar Apr 4, 2016
4975f6c
more fighting with akka streams
ornicar Apr 6, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 2 additions & 43 deletions app/Global.scala
Original file line number Diff line number Diff line change
@@ -1,51 +1,10 @@
package lila.app

import lila.common.HTTPRequest
import play.api.mvc._
import play.api.mvc.Results._
import play.api.{ Application, GlobalSettings, Mode }
import play.api.{ Application, GlobalSettings }

object Global extends GlobalSettings {

override def onStart(app: Application) {
kamon.Kamon.start()
lila.app.Env.current
lila.app.Env.current // preload modules
}

override def onStop(app: Application) {
kamon.Kamon.shutdown()
}

override def onRouteRequest(req: RequestHeader): Option[Handler] = {
lila.mon.http.request.all()
Env.i18n.requestHandler(req) orElse super.onRouteRequest(req)
}

private def niceError(req: RequestHeader): Boolean =
req.method == "GET" &&
HTTPRequest.isSynchronousHttp(req) &&
!HTTPRequest.hasFileExtension(req)

override def onHandlerNotFound(req: RequestHeader) =
if (niceError(req)) controllers.Main.notFound(req)
else fuccess(NotFound("404 - Resource not found"))

override def onBadRequest(req: RequestHeader, error: String) =
if (error startsWith "Illegal character in path") fuccess(Redirect("/"))
else if (error startsWith "Cannot parse parameter") onHandlerNotFound(req)
else if (niceError(req)) {
lila.mon.http.response.code400()
controllers.Lobby.handleStatus(req, Results.BadRequest)
}
else fuccess(BadRequest(error))

override def onError(req: RequestHeader, ex: Throwable) =
if (niceError(req)) {
if (lila.common.PlayApp.isProd) {
lila.mon.http.response.code500()
fuccess(InternalServerError(views.html.base.errorPage(ex)(lila.api.Context(req))))
}
else super.onError(req, ex)
}
else fuccess(InternalServerError(ex.getMessage))
}
76 changes: 76 additions & 0 deletions app/LilaPlayStuff.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package lila.app

import com.google.inject.AbstractModule
import com.google.inject.name.Names
import javax.inject._
import lila.common.HTTPRequest
import play.api._
import play.api.http._
import play.api.inject.ApplicationLifecycle
import play.api.mvc._
import play.api.mvc.RequestHeader
import play.api.mvc.Results._
import play.api.routing.Router
import scala.concurrent._

class LilaHttpRequestHandler @Inject() (errorHandler: HttpErrorHandler,
configuration: HttpConfiguration, filters: HttpFilters,
router: Router) extends DefaultHttpRequestHandler(router, errorHandler, configuration, filters) {

override def routeRequest(req: RequestHeader) = {
lila.mon.http.request.all()
Env.i18n.requestHandler(req) orElse super.routeRequest(req)
}
}

class LilaHttpErrorHandler @Inject() (
env: Environment,
config: Configuration,
sourceMapper: OptionalSourceMapper,
router: Provider[Router]) extends DefaultHttpErrorHandler(env, config, sourceMapper, router) {

private def niceError(req: RequestHeader): Boolean =
req.method == "GET" &&
HTTPRequest.isSynchronousHttp(req) &&
!HTTPRequest.hasFileExtension(req)

def onHandlerNotFound(req: RequestHeader) =
if (niceError(req)) controllers.Main.notFound(req)
else fuccess(NotFound("404 - Resource not found"))

override def onBadRequest(req: RequestHeader, error: String) =
if (error startsWith "Illegal character in path") fuccess(Redirect("/"))
else if (error startsWith "Cannot parse parameter") onHandlerNotFound(req)
else if (niceError(req)) {
lila.mon.http.response.code400()
controllers.Lobby.handleStatus(req, Results.BadRequest)
}
else fuccess(BadRequest(error))

override def onProdServerError(req: RequestHeader, ex: UsefulException): Future[Result] = {
lila.mon.http.response.code500()
fuccess(InternalServerError(views.html.base.errorPage(ex)(lila.api.Context(req))))
}
}

@Singleton
final class LilaLifecycle @Inject() (lifecycle: ApplicationLifecycle) {

play.api.Logger("boot").info("Lifecycle bindings")

lifecycle.addStopHook { () =>
play.api.Logger("play").info("LilaLifecycle shutdown")
kamon.Kamon.shutdown()
funit
}
}

final class LilaModule extends AbstractModule {

play.api.Logger("boot").info("Kamon start")
kamon.Kamon.start()

def configure() = {
bind(classOf[LilaLifecycle]).asEagerSingleton
}
}
2 changes: 1 addition & 1 deletion app/controllers/Challenge.scala
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ object Challenge extends LilaController {
}
}

def websocket(id: String, apiVersion: Int) = SocketOption[JsValue] { implicit ctx =>
def websocket(id: String, apiVersion: Int) = SocketOption { implicit ctx =>
env.api byId id flatMap {
_ ?? { c =>
get("sri") ?? { uid =>
Expand Down
28 changes: 11 additions & 17 deletions app/controllers/LilaController.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package controllers

import akka.stream.scaladsl._
import ornicar.scalalib.Zero
import play.api.data.Form
import play.api.http._
import play.api.libs.iteratee.{ Iteratee, Enumerator }
import play.api.libs.json.{ Json, JsValue, JsObject, JsArray, Writes }
import play.api.mvc._, Results._
import play.api.mvc.WebSocket.FrameFormatter
import play.twirl.api.Html
import scalaz.Monoid

import lila.api.{ PageData, Context, HeaderContext, BodyContext, TokenBucket }
import lila.app._
Expand Down Expand Up @@ -48,23 +46,19 @@ private[controllers] trait LilaController
CACHE_CONTROL -> "no-cache, no-store, must-revalidate", EXPIRES -> "0"
)

protected def Socket[A: FrameFormatter](f: Context => Fu[(Iteratee[A, _], Enumerator[A])]) =
WebSocket.tryAccept[A] { req => reqToCtx(req) flatMap f map scala.util.Right.apply }

protected def SocketEither[A: FrameFormatter](f: Context => Fu[Either[Result, (Iteratee[A, _], Enumerator[A])]]) =
WebSocket.tryAccept[A] { req => reqToCtx(req) flatMap f }
protected def Socket(f: Context => Fu[JsFlow]) =
WebSocket.acceptOrResult[JsObject, JsObject] { req =>
reqToCtx(req) flatMap f map Right.apply
}

protected def SocketOption[A: FrameFormatter](f: Context => Fu[Option[(Iteratee[A, _], Enumerator[A])]]) =
WebSocket.tryAccept[A] { req =>
reqToCtx(req) flatMap f map {
case None => Left(NotFound(jsonError("socket resource not found")))
case Some(pair) => Right(pair)
}
protected def SocketEither(f: Context => Fu[Either[Result, JsFlow]]) =
WebSocket.acceptOrResult[JsObject, JsObject] { req =>
reqToCtx(req) flatMap f
}

protected def SocketOptionLimited[A: FrameFormatter](consumer: TokenBucket.Consumer, name: String)(f: Context => Fu[Option[(Iteratee[A, _], Enumerator[A])]]) =
rateLimitedSocket[A](consumer, name) { ctx =>
f(ctx) map {
protected def SocketOption(f: Context => Fu[Option[JsFlow]]): WebSocket =
WebSocket.acceptOrResult[JsObject, JsObject] { req =>
reqToCtx(req) flatMap f map {
case None => Left(NotFound(jsonError("socket resource not found")))
case Some(pair) => Right(pair)
}
Expand Down
54 changes: 23 additions & 31 deletions app/controllers/LilaSocket.scala
Original file line number Diff line number Diff line change
@@ -1,47 +1,39 @@
package controllers

import akka.stream.scaladsl._
import play.api.http._
import play.api.libs.iteratee._
import play.api.libs.json._
import play.api.mvc._, Results._
import play.api.mvc.WebSocket.FrameFormatter
import play.api.mvc.WebSocket.MessageFlowTransformer
import scala.concurrent.duration._

import lila.api.{ Context, TokenBucket }
import lila.app._
import lila.common.HTTPRequest

trait LilaSocket { self: LilaController =>

private type AcceptType[A] = Context => Fu[Either[Result, (Iteratee[A, _], Enumerator[A])]]

private val logger = lila.log("ratelimit")
protected implicit val jsonMessageFlowTransformer: MessageFlowTransformer[JsObject, JsObject] = {
import scala.util.control.NonFatal
import play.api.libs.streams.AkkaStreams
import websocket._
def closeOnException[T](block: => Option[T]) = try {
Left(block getOrElse {
sys error "Not a JsObject"
})
}
catch {
case NonFatal(e) => Right(CloseMessage(Some(CloseCodes.Unacceptable),
"Unable to parse json message"))
}

def rateLimitedSocket[A: FrameFormatter](consumer: TokenBucket.Consumer, name: String)(f: AcceptType[A]): WebSocket[A, A] =
WebSocket[A, A] { req =>
reqToCtx(req) flatMap { ctx =>
val ip = HTTPRequest lastRemoteAddress req
def userInfo = {
val sri = get("sri", req) | "none"
val username = ctx.usernameOrAnon
s"user:$username sri:$sri"
}
// logger.debug(s"socket:$name socket connect $ip $userInfo")
f(ctx).map { resultOrSocket =>
resultOrSocket.right.map {
case (readIn, writeOut) => (e, i) => {
writeOut |>> i
e &> Enumeratee.mapInputM { in =>
consumer(ip).map { credit =>
if (credit >= 0) in
else {
logger.info(s"socket:$name socket close $ip $userInfo $in")
Input.EOF
}
}
} |>> readIn
}
}
}
new MessageFlowTransformer[JsObject, JsObject] {
def transform(flow: Flow[JsObject, JsObject, _]) = {
AkkaStreams.bypassWith[Message, JsObject, Message](Flow[Message].collect {
case BinaryMessage(data) => closeOnException(Json.parse(data.iterator.asInputStream).asOpt[JsObject])
case TextMessage(text) => closeOnException(Json.parse(text).asOpt[JsObject])
})(flow map { json => TextMessage(Json.stringify(json)) })
}
}
}
}
14 changes: 12 additions & 2 deletions app/controllers/Lobby.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,22 @@ object Lobby extends LilaController {
size = 10,
rate = 6)

def socket(apiVersion: Int) = SocketOptionLimited[JsValue](socketConsumer, "lobby") { implicit ctx =>
import akka.stream.scaladsl.Flow
import scala.concurrent.duration._
private val wsThrottler = Flow[JsObject].throttle(
elements = 10,
per = 5.second,
maximumBurst = 10,
mode = akka.stream.ThrottleMode.Enforcing)

def socket(apiVersion: Int) = SocketOption { implicit ctx =>
get("sri") ?? { uid =>
Env.lobby.socketHandler(
uid = uid,
user = ctx.me,
mobile = getBool("mobile")) map some
mobile = getBool("mobile")) map { flow =>
wsThrottler.via(flow).some
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion app/controllers/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object Main extends LilaController {

def websocket = SocketOption { implicit ctx =>
get("sri") ?? { uid =>
Env.site.socketHandler(uid, ctx.userId, get("flag")) map some
fuccess(Env.site.socketHandler(uid, ctx.userId, get("flag")).some)
}
}

Expand Down
10 changes: 7 additions & 3 deletions app/controllers/Prismic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@ object Prismic {
case _ => logger info message
}

import play.api.libs.ws._
import play.api.Play.current
private val httpClient: WSClient = WS.client

private val fetchPrismicApi = AsyncCache.single[PrismicApi](
f = PrismicApi.get(Env.api.PrismicApiUrl, logger = prismicLogger),
f = PrismicApi.get(httpClient)(Env.api.PrismicApiUrl, logger = prismicLogger),
timeToLive = 1 minute)

def prismicApi = fetchPrismicApi(true)
Expand All @@ -33,7 +37,7 @@ object Prismic {
api.forms("everything")
.query(s"""[[:d = at(document.id, "$id")]]""")
.ref(api.master.ref)
.submit() map {
.submit(httpClient) map {
_.results.headOption
}
}
Expand All @@ -52,7 +56,7 @@ object Prismic {
api.forms("variant")
.query(s"""[[:d = at(my.variant.key, "${variant.key}")]]""")
.ref(api.master.ref)
.submit() map {
.submit(httpClient) map {
_.results.headOption map (_ -> makeLinkResolver(api))
}
}
Expand Down
4 changes: 2 additions & 2 deletions app/controllers/Round.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ object Round extends LilaController with TheftPrevention {
private def bookmarkApi = Env.bookmark.api
private def analyser = Env.analyse.analyser

def websocketWatcher(gameId: String, color: String) = SocketOption[JsValue] { implicit ctx =>
def websocketWatcher(gameId: String, color: String) = SocketOption { implicit ctx =>
get("sri") ?? { uid =>
env.socketHandler.watcher(
gameId = gameId,
Expand All @@ -35,7 +35,7 @@ object Round extends LilaController with TheftPrevention {
}
}

def websocketPlayer(fullId: String, apiVersion: Int) = SocketEither[JsValue] { implicit ctx =>
def websocketPlayer(fullId: String, apiVersion: Int) = SocketEither { implicit ctx =>
GameRepo pov fullId flatMap {
case Some(pov) =>
if (isTheft(pov)) fuccess(Left(theftResponse))
Expand Down
2 changes: 1 addition & 1 deletion app/controllers/Simul.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ object Simul extends LilaController {
}
}

def websocket(id: String, apiVersion: Int) = SocketOption[JsValue] { implicit ctx =>
def websocket(id: String, apiVersion: Int) = SocketOption { implicit ctx =>
get("sri") ?? { uid =>
env.socketHandler.join(id, uid, ctx.me)
}
Expand Down
2 changes: 1 addition & 1 deletion app/controllers/Tournament.scala
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ object Tournament extends LilaController {
}
}

def websocket(id: String, apiVersion: Int) = SocketOption[JsValue] { implicit ctx =>
def websocket(id: String, apiVersion: Int) = SocketOption { implicit ctx =>
get("sri") ?? { uid =>
env.socketHandler.join(id, uid, ctx.me)
}
Expand Down
9 changes: 5 additions & 4 deletions app/controllers/Tv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,11 @@ object Tv extends LilaController {
import akka.pattern.ask
import lila.round.TvBroadcast
import play.api.libs.EventSource
implicit val encoder = play.api.libs.Comet.CometMessage.jsonMessages
Env.round.tvBroadcast ? TvBroadcast.GetEnumerator mapTo
manifest[TvBroadcast.EnumeratorType] map { enum =>
Ok.chunked(enum &> EventSource()).as("text/event-stream")
import akka.stream.scaladsl.Source
Env.round.tvBroadcast ? TvBroadcast.GetPublisher mapTo
manifest[TvBroadcast.PublisherType] map { publisher =>
val source = Source fromPublisher publisher
Ok.chunked(source via EventSource.flow).as("text/event-stream")
}
}

Expand Down
6 changes: 2 additions & 4 deletions app/controllers/WorldMap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@ object WorldMap extends LilaController {
}

def stream = Action.async {
Env.worldMap.getStream map { stream =>
Ok.chunked(
stream &> EventSource()
) as "text/event-stream"
Env.worldMap.getSource map { source =>
Ok.chunked(source via EventSource.flow).as("text/event-stream")
}
}
}
Loading