In the scala spray structure, how can I create several http clients that have different configurations (for example, timeouts, retries)

I have two spray client clients, for example:

  val pipelineFoo: HttpRequest => Future[Foo] = (
    sendReceive
    ~> unmarshal[Message.Foo])

  val pipelineBar: HttpRequest => Future[Bar] = (
    sendReceive
    ~> unmarshal[Message.Bar])

  def execFoo(h: String, p: Int): Future[Foo] = {
    val uri = Uri.from(scheme = "http", host = h, port = p, path = "/foo")
    pipelineFoo(Get(uri))
  }

  def execBar(h: String, p: Int): Future[Bar] = {
    val uri = Uri.from(scheme = "http", host = h, port = p, path = "/bar")
    pipelineBar(Get(uri))
  }

I would like to retry foo several times with a long timeout and ask the bar not to retry and have a short timeout (say 1 second). How can I achieve this in a spray (sorry if it’s somewhere in the documentation, but I couldn’t find it), I only found documentation on setting such configuration parameters around the world).

+4
source share
2 answers

It should not be too complicated. sendReceive can actually take more parameters. For example, here is a signature for one of the alternatives:

def sendReceive(transport: ActorRef)(implicit ec: ExecutionContext, futureTimeout: Timeout): SendReceive

, -, .

, :

lazy val pipeline: HttpRequest => Future[HttpResponse] = (
addCredentials(BasicHttpCredentials(clientConnection.credentials._1, clientConnection.credentials._2))
  ~> addHeader(`User-Agent`(ProductVersion("<YOUR NAME HERE>", "<YOUR VERSION HERE>", "http://github.com/<WHEREVER YOUR PROJECT IS>"), ProductVersion("spray-client", "1.3.1", "http://spray.io")))
  ~> logRequest(log)
  ~> sendReceive(clientConnection.connection)(clientConnection.context, clientConnection.timeout)
  ~> decode(Deflate)
  ~> decode(Gzip)
)

ClientConnection . , , , , application.conf

+1

2 , , , . , / Spray.

import akka.actor.{ActorRef, ActorSystem}
import akka.io.IO
import akka.pattern.ask
import com.typesafe.config.Config
import spray.can.Http
import spray.can.Http.HostConnectorSetup
import spray.can.client.HostConnectorSettings
import spray.client.pipelining.sendReceive
import spray.http.Uri.Host
import spray.http.{HttpRequest, HttpResponse, Uri}

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContextExecutor, Future}

case class HttpCustomSettings(
  requestTimeout: Duration,
  maxRetries:     Int,
  maxConnections: Int
)

/**
 * Implement a new HTTP client on top of akka IO and spray HTTP
 * to provide a way for caller to set client parameters on request basis instead
 * of globally in application.conf
 *
 * This client defaults all its configuration with the one set in spray.conf
 * see spray.can.client and spray.can.host-connector
 * But you can override some of them on demand
 * - maxRetries
 * - requestTimeout
 * - maxConnections
 */
class HttpClient(actorSystem: ActorSystem, config: Config) {
  private implicit val context: ActorSystem = actorSystem
  private implicit val dispatcher: ExecutionContextExecutor = actorSystem.dispatcher

  private val HTTP = "http"
  private val HTTPS = "https"

  private val defaultSettings: HostConnectorSettings =
    HostConnectorSettings.fromSubConfig(config.getConfig("spray.can"))

  //not configurable since this timeout has little to no use practically
  //this timeout DOES NOT kill the open connection
  //http://kamon.io/teamblog/2014/11/02/understanding-spray-client-timeout-settings/
  private implicit val clientFutureTimeout: akka.util.Timeout = 5.seconds

  def send(
    request:        HttpRequest,
    customSettings: Option[HttpCustomSettings] = None
  ): Future[HttpResponse] = {
    val pipeline: Future[HttpRequest ⇒ Future[HttpResponse]] =
      pipelineForUri(request.uri, customSettings)

    pipeline.flatMap(send ⇒ send(request))
  }

  /**
   * To understand more this method
   * @see http://kamon.io/assets/img/diagrams/spray-client-actors.png
   * @see [[spray.can.HttpManager]]
   * @see [[spray.can.client.HttpHostConnector]]
   * @see [[spray.can.Http]]
   */
  private def pipelineForUri(
    uri:            Uri,
    customSettings: Option[HttpCustomSettings]
  ): Future[HttpRequest ⇒ Future[HttpResponse]] = {
    for { 
      Http.HostConnectorInfo(connector, _) ← IO(Http) ? connectorSetup(uri, customSettings) 
} yield sendReceive(connector)
  }

  private def connectorSetup(
    uri:            Uri,
    customSettings: Option[HttpCustomSettings]
  ): HostConnectorSetup = {
    require(
      uri.scheme == HTTP || uri.scheme == HTTPS,
      s"Not a valid $HTTP URI scheme: '${uri.scheme}' in '$uri'. (Did you forget $HTTP:// ?)"
    )

    val connector: HostConnectorSetup = HostConnectorSetup(
      uri.authority.host.toString,
      uri.effectivePort,
      sslEncryption = uri.scheme == HTTPS
    )

    customSettings match {
      case Some(custom) ⇒ connector.copy(settings = Option(mapCustomSettings(defaultSettings, custom)))
      case None         ⇒ connector.copy(settings = Option(defaultSettings))
    }
  }

  private def mapCustomSettings(
    settings:       HostConnectorSettings,
    customSettings: HttpCustomSettings
  ): HostConnectorSettings = {
    settings.copy(
      maxRetries = customSettings.maxRetries,
      maxConnections = customSettings.maxConnections,
      connectionSettings = settings.connectionSettings.copy(requestTimeout = customSettings.requestTimeout)
    )
  }

}
0

Source: https://habr.com/ru/post/1544800/


All Articles