diff --git a/commands/activation.go b/commands/activation.go index dcf1ffa94..9c0fe2076 100644 --- a/commands/activation.go +++ b/commands/activation.go @@ -387,6 +387,9 @@ var activationPollCmd = &cobra.Command{ printJSON(activation.Logs) reported[activation.ActivationID] = true } + if activation.Start > pollSince { + pollSince = activation.Start + } } time.Sleep(time.Second * 2) } diff --git a/tests/src/test/scala/system/basic/HttpProxy.scala b/tests/src/test/scala/system/basic/HttpProxy.scala new file mode 100644 index 000000000..c0aa0e899 --- /dev/null +++ b/tests/src/test/scala/system/basic/HttpProxy.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package system.basic +import java.net.ServerSocket + +import akka.http.scaladsl.{Http, HttpsConnectionContext} +import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri} +import akka.http.scaladsl.model.Uri.Authority +import akka.http.scaladsl.server.Route +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{Sink, Source} +import com.typesafe.sslconfig.akka.AkkaSSLConfig +import common.{WskActorSystem, WskProps} +import common.rest.{AcceptAllHostNameVerifier, SSL} +import javax.net.ssl.HostnameVerifier +import org.scalatest.Suite +import org.scalatest.concurrent.ScalaFutures + +import scala.collection.mutable.ListBuffer +import scala.concurrent.duration._ + +/** + * A minimal reverse proxy implementation for test purpose which intercepts the + * request and responses and then make them available to test for validation. + * + * It also allows connecting to https endpoint while still expose a http endpoint + * to local client + */ +trait HttpProxy extends WskActorSystem with ScalaFutures { + self: Suite => + + implicit val materializer: ActorMaterializer = ActorMaterializer() + implicit val testConfig: PatienceConfig = PatienceConfig(1.minute) + + def withProxy(check: (WskProps, ListBuffer[(HttpRequest, HttpResponse)]) => Unit)(implicit wp: WskProps): Unit = { + val uri = getTargetUri(wp) + val requests = new ListBuffer[(HttpRequest, HttpResponse)] + val port = freePort() + val proxy = Route { context => + val request = context.request + val handler = Source + .single(proxyRequest(request, uri)) + .via(makeHttpFlow(uri)) + .runWith(Sink.head) + .map { response => + requests += ((request, response)) + response + } + .flatMap(context.complete(_)) + handler + } + + val binding = Http(actorSystem).bindAndHandle(handler = proxy, interface = "localhost", port = port) + binding.map { b => + val proxyProps = wp.copy(apihost = s"http://localhost:$port") + check(proxyProps, requests) + b.unbind() + }.futureValue + } + + private def getTargetUri(wp: WskProps) = { + // startsWith(http) includes https + if (wp.apihost.startsWith("http")) { + Uri(wp.apihost) + } else { + Uri().withScheme("https").withHost(wp.apihost) + } + } + + private def makeHttpFlow(uri: Uri) = { + if (uri.scheme == "https") { + //Use ssl config which does not validate anything + Http(actorSystem).outgoingConnectionHttps( + uri.authority.host.address(), + uri.effectivePort, + connectionContext = httpsConnectionContext()) + } else { + Http(actorSystem).outgoingConnection(uri.authority.host.address(), uri.effectivePort) + } + } + + private def httpsConnectionContext() = { + val sslConfig = AkkaSSLConfig().mapSettings { s => + s.withHostnameVerifierClass(classOf[AcceptAllHostNameVerifier].asInstanceOf[Class[HostnameVerifier]]) + } + //SSL.httpsConnectionContext initializes config which is not there in cli test + //So inline the flow as we do not need client auth for this case + new HttpsConnectionContext(SSL.nonValidatingContext(false), Some(sslConfig)) + } + + private def proxyRequest(req: HttpRequest, uri: Uri): HttpRequest = { + //https://github.com/akka/akka-http/issues/64 + req + .copy(headers = req.headers.filterNot(h => h.is("timeout-access"))) + .copy(uri = req.uri.copy(scheme = "", authority = Authority.Empty)) //Strip the authority as it refers to proxy + } + + private def freePort(): Int = { + val socket = new ServerSocket(0) + try socket.getLocalPort + finally if (socket != null) socket.close() + } +} diff --git a/tests/src/test/scala/system/basic/WskCliActivationTests.scala b/tests/src/test/scala/system/basic/WskCliActivationTests.scala new file mode 100644 index 000000000..3cd407a39 --- /dev/null +++ b/tests/src/test/scala/system/basic/WskCliActivationTests.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package system.basic + +import akka.stream.scaladsl.{Sink, Source} +import common.rest.WskRestOperations +import common.{TestHelpers, TestUtils, Wsk, WskProps, WskTestHelpers} +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import spray.json.DefaultJsonProtocol._ +import spray.json._ + +import scala.concurrent._ +import scala.concurrent.duration._ +import scala.concurrent.Future + +@RunWith(classOf[JUnitRunner]) +class WskCliActivationTests extends TestHelpers with WskTestHelpers with HttpProxy { + val wsk = new Wsk + val wskRest = new WskRestOperations + val defaultAction = Some(TestUtils.getTestActionFilename("hello.js")) + + behavior of "Wsk poll" + + implicit val wskprops: WskProps = WskProps() + + it should "change the since time as it polls" in withAssetCleaner(wskprops) { + val name = "pollTest" + (wp, assetHelper) => + assetHelper.withCleaner(wsk.action, name) { (action, _) => + action.create(name, Some(TestUtils.getTestActionFilename("hello.js"))) + } + + val args = Map("payload" -> "test".toJson) + //This test spin up 2 parallel tasks + // 1. Perform blocking invocations with 1 second interval + // 2. Perform poll to pick up those activation results + // For poll it inserts a proxy which intercepts the request sent to server + // and then it asserts if the request sent have there `since` time getting changed or not + withProxy { (proxyProps, requests) => + //It may taken some time for the activations to show up in poll result + //based on view lag. So keep a bit longer time span for the poll + val pollDuration = 10.seconds + println(s"Running poll for $pollDuration") + + val consoleFuture = Future { + //pass the `proxyProps` such that calls from poll cli command go via our proxy + wsk.activation.console(pollDuration, actionName = Some(name))(proxyProps) + } + + val runsFuture = Source(1 to 5) + .map { _ => + val r = wskRest.action.invoke(name, args, blocking = true) + Thread.sleep(2.second.toMillis) + r + } + .runWith(Sink.seq) + + val f = for { + rr <- runsFuture + cr <- consoleFuture + } yield (cr, rr) + + val (consoleResult, runResult) = Await.result(f, 1.minute) + + val activations = runResult.filter(_.statusCode.isSuccess()).map(_.respData.parseJson.asJsObject) + val ids = activations.flatMap(_.fields.get("activationId").map(_.convertTo[String])) + val idsInPoll = ids.filter(consoleResult.stdout.contains(_)) + + //There should be more than 1 activationId in common between poll output + //and actual invoked actions output + //This is required to ensure that since time can change which would only + //happen if more than one activation result is picked up in poll + withClue( + s"activations received ${activations.mkString("\n")}, console output $consoleResult. Expecting" + + s"more than one matching activation between these 2") { + idsInPoll.size should be > 1 + + //Collect the 'since' value passed during poll requests + val sinceTimes = requests.map(_._1.uri.query()).flatMap(_.get("since")).toSet + + withClue(s"value of 'since' $sinceTimes should have changed") { + sinceTimes.size should be > 1 + } + } + } + } + +}