Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions commands/activation.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,9 @@ var activationPollCmd = &cobra.Command{
printJSON(activation.Logs)
reported[activation.ActivationID] = true
}
if activation.Start > pollSince {
pollSince = activation.Start
}
}
time.Sleep(time.Second * 2)
}
Expand Down
117 changes: 117 additions & 0 deletions tests/src/test/scala/system/basic/HttpProxy.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package system.basic
import java.net.ServerSocket

import akka.http.scaladsl.{Http, HttpsConnectionContext}
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri}
import akka.http.scaladsl.model.Uri.Authority
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import com.typesafe.sslconfig.akka.AkkaSSLConfig
import common.{WskActorSystem, WskProps}
import common.rest.{AcceptAllHostNameVerifier, SSL}
import javax.net.ssl.HostnameVerifier
import org.scalatest.Suite
import org.scalatest.concurrent.ScalaFutures

import scala.collection.mutable.ListBuffer
import scala.concurrent.duration._

/**
* A minimal reverse proxy implementation for test purpose which intercepts the
* request and responses and then make them available to test for validation.
*
* It also allows connecting to https endpoint while still expose a http endpoint
* to local client
*/
trait HttpProxy extends WskActorSystem with ScalaFutures {
self: Suite =>

implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val testConfig: PatienceConfig = PatienceConfig(1.minute)

def withProxy(check: (WskProps, ListBuffer[(HttpRequest, HttpResponse)]) => Unit)(implicit wp: WskProps): Unit = {
val uri = getTargetUri(wp)
val requests = new ListBuffer[(HttpRequest, HttpResponse)]
val port = freePort()
val proxy = Route { context =>
val request = context.request
val handler = Source
.single(proxyRequest(request, uri))
.via(makeHttpFlow(uri))
.runWith(Sink.head)
.map { response =>
requests += ((request, response))
response
}
.flatMap(context.complete(_))
handler
}

val binding = Http(actorSystem).bindAndHandle(handler = proxy, interface = "localhost", port = port)
binding.map { b =>
val proxyProps = wp.copy(apihost = s"http://localhost:$port")
check(proxyProps, requests)
b.unbind()
}.futureValue
}

private def getTargetUri(wp: WskProps) = {
// startsWith(http) includes https
if (wp.apihost.startsWith("http")) {
Uri(wp.apihost)
} else {
Uri().withScheme("https").withHost(wp.apihost)
}
}

private def makeHttpFlow(uri: Uri) = {
if (uri.scheme == "https") {
//Use ssl config which does not validate anything
Http(actorSystem).outgoingConnectionHttps(
uri.authority.host.address(),
uri.effectivePort,
connectionContext = httpsConnectionContext())
} else {
Http(actorSystem).outgoingConnection(uri.authority.host.address(), uri.effectivePort)
}
}

private def httpsConnectionContext() = {
val sslConfig = AkkaSSLConfig().mapSettings { s =>
s.withHostnameVerifierClass(classOf[AcceptAllHostNameVerifier].asInstanceOf[Class[HostnameVerifier]])
}
//SSL.httpsConnectionContext initializes config which is not there in cli test
//So inline the flow as we do not need client auth for this case
new HttpsConnectionContext(SSL.nonValidatingContext(false), Some(sslConfig))
}

private def proxyRequest(req: HttpRequest, uri: Uri): HttpRequest = {
//https://github.com/akka/akka-http/issues/64
req
.copy(headers = req.headers.filterNot(h => h.is("timeout-access")))
.copy(uri = req.uri.copy(scheme = "", authority = Authority.Empty)) //Strip the authority as it refers to proxy
}

private def freePort(): Int = {
val socket = new ServerSocket(0)
try socket.getLocalPort
finally if (socket != null) socket.close()
}
}
103 changes: 103 additions & 0 deletions tests/src/test/scala/system/basic/WskCliActivationTests.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package system.basic

import akka.stream.scaladsl.{Sink, Source}
import common.rest.WskRestOperations
import common.{TestHelpers, TestUtils, Wsk, WskProps, WskTestHelpers}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import spray.json.DefaultJsonProtocol._
import spray.json._

import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.Future

@RunWith(classOf[JUnitRunner])
class WskCliActivationTests extends TestHelpers with WskTestHelpers with HttpProxy {
val wsk = new Wsk
val wskRest = new WskRestOperations
val defaultAction = Some(TestUtils.getTestActionFilename("hello.js"))

behavior of "Wsk poll"

implicit val wskprops: WskProps = WskProps()

it should "change the since time as it polls" in withAssetCleaner(wskprops) {
val name = "pollTest"
(wp, assetHelper) =>
assetHelper.withCleaner(wsk.action, name) { (action, _) =>
action.create(name, Some(TestUtils.getTestActionFilename("hello.js")))
}

val args = Map("payload" -> "test".toJson)
//This test spin up 2 parallel tasks
// 1. Perform blocking invocations with 1 second interval
// 2. Perform poll to pick up those activation results
// For poll it inserts a proxy which intercepts the request sent to server
// and then it asserts if the request sent have there `since` time getting changed or not
withProxy { (proxyProps, requests) =>
//It may taken some time for the activations to show up in poll result
//based on view lag. So keep a bit longer time span for the poll
val pollDuration = 10.seconds
println(s"Running poll for $pollDuration")

val consoleFuture = Future {
//pass the `proxyProps` such that calls from poll cli command go via our proxy
wsk.activation.console(pollDuration, actionName = Some(name))(proxyProps)
}

val runsFuture = Source(1 to 5)
.map { _ =>
val r = wskRest.action.invoke(name, args, blocking = true)
Thread.sleep(2.second.toMillis)
r
}
.runWith(Sink.seq)

val f = for {
rr <- runsFuture
cr <- consoleFuture
} yield (cr, rr)

val (consoleResult, runResult) = Await.result(f, 1.minute)

val activations = runResult.filter(_.statusCode.isSuccess()).map(_.respData.parseJson.asJsObject)
val ids = activations.flatMap(_.fields.get("activationId").map(_.convertTo[String]))
val idsInPoll = ids.filter(consoleResult.stdout.contains(_))

//There should be more than 1 activationId in common between poll output
//and actual invoked actions output
//This is required to ensure that since time can change which would only
//happen if more than one activation result is picked up in poll
withClue(
s"activations received ${activations.mkString("\n")}, console output $consoleResult. Expecting" +
s"more than one matching activation between these 2") {
idsInPoll.size should be > 1

//Collect the 'since' value passed during poll requests
val sinceTimes = requests.map(_._1.uri.query()).flatMap(_.get("since")).toSet

withClue(s"value of 'since' $sinceTimes should have changed") {
sinceTimes.size should be > 1
}
}
}
}

}