From bdb0814248cd8ebe49abf9de9d60ea52a0a933e6 Mon Sep 17 00:00:00 2001 From: Ian Botsford <83236726+ianbotsf@users.noreply.github.com> Date: Thu, 21 Dec 2023 15:35:15 +0000 Subject: [PATCH 1/6] feat: emit metrics from CRT HTTP engine --- .../e7c3c7ab-749e-4371-8b25-42ea76aa870d.json | 8 +++ .../http/engine/crt/ConnectionManager.kt | 39 ++++++++++--- .../runtime/http/engine/crt/CrtHttpEngine.kt | 15 +++-- .../runtime/http/engine/crt/RequestUtil.kt | 26 ++++++--- .../engine/crt/SdkStreamResponseHandler.kt | 58 ++++++++++++++++++- .../engine/crt/io/ReportingByteReadChannel.kt | 21 +++++++ .../http/engine/crt/io/ReportingSource.kt | 17 ++++++ .../http/engine/crt/RequestConversionTest.kt | 16 +++-- .../crt/SdkStreamResponseHandlerTest.kt | 21 +++++-- .../http/engine/crt/SendChunkedBodyTest.kt | 12 ++-- .../http/engine/okhttp/OkHttpEngine.kt | 4 +- .../runtime/http/engine/EngineAttributes.kt | 5 +- .../http/engine/internal/HttpClientMetrics.kt | 14 +++++ runtime/runtime-core/api/runtime-core.api | 2 + .../aws/smithy/kotlin/runtime/time/Instant.kt | 10 ++++ .../smithy/kotlin/runtime/time/InstantJVM.kt | 13 +++++ .../kotlin/runtime/time/InstantNative.kt | 9 +++ 17 files changed, 250 insertions(+), 40 deletions(-) create mode 100644 .changes/e7c3c7ab-749e-4371-8b25-42ea76aa870d.json create mode 100644 runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/io/ReportingByteReadChannel.kt create mode 100644 runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/io/ReportingSource.kt diff --git a/.changes/e7c3c7ab-749e-4371-8b25-42ea76aa870d.json b/.changes/e7c3c7ab-749e-4371-8b25-42ea76aa870d.json new file mode 100644 index 000000000..4684338eb --- /dev/null +++ b/.changes/e7c3c7ab-749e-4371-8b25-42ea76aa870d.json @@ -0,0 +1,8 @@ +{ + "id": "e7c3c7ab-749e-4371-8b25-42ea76aa870d", + "type": "feature", + "description": "Emit metrics from CRT HTTP engine", + "issues": [ + "https://github.com/awslabs/smithy-kotlin/issues/893" + ] +} \ No newline at end of file diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/ConnectionManager.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/ConnectionManager.kt index 03d94ee6f..add72ece8 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/ConnectionManager.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/ConnectionManager.kt @@ -13,9 +13,12 @@ import aws.smithy.kotlin.runtime.crt.SdkDefaultIO import aws.smithy.kotlin.runtime.http.HttpErrorCode import aws.smithy.kotlin.runtime.http.HttpException import aws.smithy.kotlin.runtime.http.engine.ProxyConfig +import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics import aws.smithy.kotlin.runtime.http.request.HttpRequest import aws.smithy.kotlin.runtime.io.Closeable import aws.smithy.kotlin.runtime.net.TlsVersion +import aws.smithy.kotlin.runtime.telemetry.metrics.measureSeconds +import kotlinx.atomicfu.atomic import kotlinx.coroutines.TimeoutCancellationException import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Semaphore @@ -27,8 +30,10 @@ import aws.smithy.kotlin.runtime.net.TlsVersion as SdkTlsVersion internal class ConnectionManager( private val config: CrtHttpEngineConfig, + private val metrics: HttpClientMetrics, ) : Closeable { private val leases = Semaphore(config.maxConnections.toInt()) + private val pending = atomic(0L) private val crtTlsContext: TlsContext = TlsContextOptionsBuilder() .apply { @@ -61,16 +66,22 @@ internal class ConnectionManager( val manager = getManagerForUri(request.uri, proxyConfig) var leaseAcquired = false + metrics.queuedRequests = pending.incrementAndGet() + return try { - // wait for an actual connection - val conn = withTimeout(config.connectionAcquireTimeout) { - // get a permit to acquire a connection (limits overall connections since managers are per/host) - leases.acquire() - leaseAcquired = true - manager.acquireConnection() - } + metrics.requestsQueuedDuration.measureSeconds { + // wait for an actual connection + val conn = withTimeout(config.connectionAcquireTimeout) { + // get a permit to acquire a connection (limits overall connections since managers are per/host) + leases.acquire() + leaseAcquired = true + metrics.connectionAcquireDuration.measureSeconds { + manager.acquireConnection() + } + } - LeasedConnection(conn) + LeasedConnection(conn) + } } catch (ex: Exception) { if (leaseAcquired) { leases.release() @@ -82,8 +93,18 @@ internal class ConnectionManager( } throw httpEx + } finally { + metrics.queuedRequests = pending.decrementAndGet() + emitConnections() } } + + private fun emitConnections() { + val idleConnections = leases.availablePermits.toLong() + metrics.idleConnections = idleConnections + metrics.acquiredConnections = config.maxConnections.toLong() - idleConnections + } + private suspend fun getManagerForUri(uri: Uri, proxyConfig: ProxyConfig): HttpClientConnectionManager = mutex.withLock { connManagers.getOrPut(uri.authority) { val connOpts = options.apply { @@ -105,6 +126,7 @@ internal class ConnectionManager( HttpClientConnectionManager(connOpts) } } + override fun close() { connManagers.forEach { entry -> entry.value.close() } crtTlsContext.close() @@ -117,6 +139,7 @@ internal class ConnectionManager( delegate.close() } finally { leases.release() + emitConnections() } } } diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt index 838b7e05f..e51c7139d 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt @@ -10,6 +10,7 @@ import aws.smithy.kotlin.runtime.http.config.EngineFactory import aws.smithy.kotlin.runtime.http.engine.HttpClientEngine import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineBase import aws.smithy.kotlin.runtime.http.engine.callContext +import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics import aws.smithy.kotlin.runtime.http.request.HttpRequest import aws.smithy.kotlin.runtime.io.internal.SdkDispatchers import aws.smithy.kotlin.runtime.operation.ExecutionContext @@ -22,6 +23,8 @@ import kotlinx.coroutines.sync.withPermit internal const val DEFAULT_WINDOW_SIZE_BYTES: Int = 16 * 1024 internal const val CHUNK_BUFFER_SIZE: Long = 64 * 1024 +private const val TELEMETRY_SCOPE = "aws.smithy.kotlin.runtime.http.engine.crt" + /** * [HttpClientEngine] based on the AWS Common Runtime HTTP client */ @@ -51,7 +54,11 @@ public class CrtHttpEngine(public override val config: CrtHttpEngineConfig) : Ht // } private val requestLimiter = Semaphore(config.maxConcurrency.toInt()) - private val connectionManager = ConnectionManager(config) + private val metrics = HttpClientMetrics(TELEMETRY_SCOPE, config.telemetryProvider).apply { + connectionsLimit = config.maxConnections.toLong() + requestConcurrencyLimit = config.maxConcurrency.toLong() + } + private val connectionManager = ConnectionManager(config, metrics) override suspend fun roundTrip(context: ExecutionContext, request: HttpRequest): HttpCall = requestLimiter.withPermit { val callContext = callContext() @@ -63,7 +70,7 @@ public class CrtHttpEngine(public override val config: CrtHttpEngineConfig) : Ht val conn = connectionManager.acquire(request) logger.trace { "Acquired connection ${conn.id}" } - val respHandler = SdkStreamResponseHandler(conn, callContext) + val respHandler = SdkStreamResponseHandler(conn, callContext, context, metrics) callContext.job.invokeOnCompletion { logger.trace { "completing handler; cause=$it" } // ensures the stream is driven to completion regardless of what the downstream consumer does @@ -71,7 +78,7 @@ public class CrtHttpEngine(public override val config: CrtHttpEngineConfig) : Ht } val reqTime = Instant.now() - val engineRequest = request.toCrtRequest(callContext) + val engineRequest = request.toCrtRequest(callContext, metrics) val stream = mapCrtException { conn.makeRequest(engineRequest, respHandler).also { stream -> @@ -81,7 +88,7 @@ public class CrtHttpEngine(public override val config: CrtHttpEngineConfig) : Ht if (request.isChunked) { withContext(SdkDispatchers.IO) { - stream.sendChunkedBody(request.body) + stream.sendChunkedBody(request.body, metrics) } } diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt index e367bdea5..254a603bb 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt @@ -18,10 +18,13 @@ import aws.smithy.kotlin.runtime.crt.SdkSourceBodyStream import aws.smithy.kotlin.runtime.http.HttpBody import aws.smithy.kotlin.runtime.http.HttpErrorCode import aws.smithy.kotlin.runtime.http.HttpException +import aws.smithy.kotlin.runtime.http.engine.crt.io.reportingTo +import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics import aws.smithy.kotlin.runtime.http.request.HttpRequest import aws.smithy.kotlin.runtime.io.SdkBuffer import aws.smithy.kotlin.runtime.io.buffer import aws.smithy.kotlin.runtime.io.readToByteArray +import aws.smithy.kotlin.runtime.io.source import kotlinx.coroutines.job import kotlin.coroutines.CoroutineContext @@ -42,7 +45,10 @@ internal val HttpRequest.uri: Uri } } -internal fun HttpRequest.toCrtRequest(callContext: CoroutineContext): aws.sdk.kotlin.crt.http.HttpRequest { +internal fun HttpRequest.toCrtRequest( + callContext: CoroutineContext, + metrics: HttpClientMetrics, +): aws.sdk.kotlin.crt.http.HttpRequest { val body = this.body check(!body.isDuplex) { "CrtHttpEngine does not yet support full duplex streams" } val bodyStream = if (isChunked) { @@ -50,10 +56,16 @@ internal fun HttpRequest.toCrtRequest(callContext: CoroutineContext): aws.sdk.ko } else { when (body) { is HttpBody.Empty -> null - is HttpBody.Bytes -> HttpRequestBodyStream.fromByteArray(body.bytes()) - is HttpBody.ChannelContent -> ReadChannelBodyStream(body.readFrom(), callContext) + is HttpBody.Bytes -> { + val source = body.bytes().source().reportingTo(metrics.bytesSent) + SdkSourceBodyStream(source) + } + is HttpBody.ChannelContent -> { + val source = body.readFrom().reportingTo(metrics.bytesSent) + ReadChannelBodyStream(source, callContext) + } is HttpBody.SourceContent -> { - val source = body.readFrom() + val source = body.readFrom().reportingTo(metrics.bytesSent) callContext.job.invokeOnCompletion { source.close() } @@ -85,10 +97,10 @@ internal val HttpRequest.isChunked: Boolean get() = (this.body is HttpBody.Sourc * Send a chunked body using the CRT writeChunk bindings. * @param body an HTTP body that has a chunked content encoding. Must be [HttpBody.SourceContent] or [HttpBody.ChannelContent] */ -internal suspend fun HttpStream.sendChunkedBody(body: HttpBody) { +internal suspend fun HttpStream.sendChunkedBody(body: HttpBody, metrics: HttpClientMetrics) { when (body) { is HttpBody.SourceContent -> { - val source = body.readFrom() + val source = body.readFrom().reportingTo(metrics.bytesSent) val bufferedSource = source.buffer() while (!bufferedSource.exhausted()) { @@ -97,7 +109,7 @@ internal suspend fun HttpStream.sendChunkedBody(body: HttpBody) { } } is HttpBody.ChannelContent -> { - val chan = body.readFrom() + val chan = body.readFrom().reportingTo(metrics.bytesSent) var buffer = SdkBuffer() val nextBuffer = SdkBuffer() var sentFirstChunk = false diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandler.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandler.kt index cd327364f..63d2a13a8 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandler.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandler.kt @@ -10,16 +10,27 @@ import aws.sdk.kotlin.crt.io.Buffer import aws.smithy.kotlin.runtime.http.* import aws.smithy.kotlin.runtime.http.HeadersBuilder import aws.smithy.kotlin.runtime.http.HttpException +import aws.smithy.kotlin.runtime.http.engine.EngineAttributes +import aws.smithy.kotlin.runtime.http.engine.crt.io.reportingTo +import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics import aws.smithy.kotlin.runtime.http.response.HttpResponse +import aws.smithy.kotlin.runtime.http.response.copy import aws.smithy.kotlin.runtime.io.SdkBuffer import aws.smithy.kotlin.runtime.io.SdkByteChannel import aws.smithy.kotlin.runtime.io.SdkByteReadChannel +import aws.smithy.kotlin.runtime.io.source +import aws.smithy.kotlin.runtime.operation.ExecutionContext import aws.smithy.kotlin.runtime.telemetry.logging.logger +import aws.smithy.kotlin.runtime.telemetry.metrics.recordSeconds +import aws.smithy.kotlin.runtime.time.Instant +import aws.smithy.kotlin.runtime.time.fromEpochNanoseconds import aws.smithy.kotlin.runtime.util.derivedName import kotlinx.atomicfu.locks.reentrantLock import kotlinx.atomicfu.locks.withLock -import kotlinx.coroutines.* +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch import kotlin.coroutines.CoroutineContext /** @@ -30,6 +41,8 @@ import kotlin.coroutines.CoroutineContext internal class SdkStreamResponseHandler( private val conn: HttpClientConnection, private val callContext: CoroutineContext, + private val execContext: ExecutionContext, + private val clientMetrics: HttpClientMetrics, ) : HttpStreamResponseHandler { // TODO - need to cancel the stream when the body is closed from the caller side early. // There is no great way to do that currently without either (1) closing the connection or (2) throwing an @@ -57,6 +70,10 @@ internal class SdkStreamResponseHandler( private var streamCompleted = false + init { + clientMetrics.incrementInflightRequests() + } + /** * Called by the response read channel as data is consumed * @param size the number of bytes consumed @@ -184,7 +201,27 @@ internal class SdkStreamResponseHandler( } internal suspend fun waitForResponse(): HttpResponse = - responseReady.receive() + responseReady.receive().wrapBody() + + private fun HttpResponse.wrapBody(): HttpResponse { + val wrappedBody = when (val originalBody = body) { + is HttpBody.Empty -> return this + is HttpBody.Bytes -> originalBody + .bytes() + .source() + .reportingTo(clientMetrics.bytesReceived) + .toHttpBody(originalBody.contentLength) + is HttpBody.SourceContent -> originalBody + .readFrom() + .reportingTo(clientMetrics.bytesReceived) + .toHttpBody(originalBody.contentLength) + is HttpBody.ChannelContent -> originalBody + .readFrom() + .reportingTo(clientMetrics.bytesReceived) + .toHttpBody(originalBody.contentLength) + } + return copy(body = wrappedBody) + } /** * Invoked only after the consumer is finished with the response and it is safe to cleanup resources @@ -197,6 +234,8 @@ internal class SdkStreamResponseHandler( // and more data is pending arrival). It can also happen if the coroutine for this request is cancelled // before onResponseComplete fires. lock.withLock { + clientMetrics.decrementInflightRequests() + val forceClose = !streamCompleted if (forceClose) { @@ -210,4 +249,19 @@ internal class SdkStreamResponseHandler( conn.close() } } + + override fun onMetrics(stream: HttpStream, metrics: HttpStreamMetrics) { + val sendEnd = positiveInstantOrNull(metrics.sendEndTimestampNs) + val receiveStart = positiveInstantOrNull(metrics.receiveStartTimestampNs) + + if (sendEnd != null && receiveStart != null) { + val ttfb = receiveStart - sendEnd + if (ttfb.isPositive()) { + clientMetrics.timeToFirstByteDuration.recordSeconds(ttfb) + execContext[EngineAttributes.TimeToFirstByte] = ttfb + } + } + } } + +private fun positiveInstantOrNull(ns: Long): Instant? = if (ns > 0) Instant.fromEpochNanoseconds(ns) else null diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/io/ReportingByteReadChannel.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/io/ReportingByteReadChannel.kt new file mode 100644 index 000000000..af3958dfe --- /dev/null +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/io/ReportingByteReadChannel.kt @@ -0,0 +1,21 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ +package aws.smithy.kotlin.runtime.http.engine.crt.io + +import aws.smithy.kotlin.runtime.io.SdkBuffer +import aws.smithy.kotlin.runtime.io.SdkByteReadChannel +import aws.smithy.kotlin.runtime.telemetry.metrics.MonotonicCounter + +private class ReportingByteReadChannel( + val delegate: SdkByteReadChannel, + val metric: MonotonicCounter, +) : SdkByteReadChannel by delegate { + override suspend fun read(sink: SdkBuffer, limit: Long): Long = delegate.read(sink, limit).also { + if (it > 0) metric.add(it) + } +} + +internal fun SdkByteReadChannel.reportingTo(metric: MonotonicCounter): SdkByteReadChannel = + ReportingByteReadChannel(this, metric) diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/io/ReportingSource.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/io/ReportingSource.kt new file mode 100644 index 000000000..4ebb614b6 --- /dev/null +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/io/ReportingSource.kt @@ -0,0 +1,17 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ +package aws.smithy.kotlin.runtime.http.engine.crt.io + +import aws.smithy.kotlin.runtime.io.SdkBuffer +import aws.smithy.kotlin.runtime.io.SdkSource +import aws.smithy.kotlin.runtime.telemetry.metrics.MonotonicCounter + +private class ReportingSource(val delegate: SdkSource, val metric: MonotonicCounter) : SdkSource by delegate { + override fun read(sink: SdkBuffer, limit: Long): Long = delegate.read(sink, limit).also { + if (it > 0) metric.add(it) + } +} + +internal fun SdkSource.reportingTo(metric: MonotonicCounter): SdkSource = ReportingSource(this, metric) diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/RequestConversionTest.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/RequestConversionTest.kt index 88e88561e..7efaccc17 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/RequestConversionTest.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/RequestConversionTest.kt @@ -8,16 +8,20 @@ package aws.smithy.kotlin.runtime.http.engine.crt import aws.smithy.kotlin.runtime.content.ByteStream import aws.smithy.kotlin.runtime.crt.ReadChannelBodyStream import aws.smithy.kotlin.runtime.http.* +import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics import aws.smithy.kotlin.runtime.http.request.HttpRequest import aws.smithy.kotlin.runtime.io.SdkByteReadChannel import aws.smithy.kotlin.runtime.io.SdkSource import aws.smithy.kotlin.runtime.io.source import aws.smithy.kotlin.runtime.net.url.Url +import aws.smithy.kotlin.runtime.telemetry.TelemetryProvider import kotlinx.coroutines.Job import kotlinx.coroutines.cancel import kotlin.coroutines.EmptyCoroutineContext import kotlin.test.* +private val metrics = HttpClientMetrics("", TelemetryProvider.None) + class RequestConversionTest { private fun byteStreamFromContents(contents: String): ByteStream = object : ByteStream.ChannelStream() { @@ -48,7 +52,7 @@ class RequestConversionTest { body, ) - val crtRequest = request.toCrtRequest(EmptyCoroutineContext) + val crtRequest = request.toCrtRequest(EmptyCoroutineContext, metrics) assertEquals("POST", crtRequest.method) assertFalse(crtRequest.body is ReadChannelBodyStream) } @@ -65,7 +69,7 @@ class RequestConversionTest { ) val testContext = EmptyCoroutineContext + Job() - val crtRequest = request.toCrtRequest(testContext) + val crtRequest = request.toCrtRequest(testContext, metrics) assertEquals("POST", crtRequest.method) val crtBody = crtRequest.body as ReadChannelBodyStream crtBody.cancel() @@ -83,7 +87,7 @@ class RequestConversionTest { ) val testContext = EmptyCoroutineContext + Job() - val crtRequest = request.toCrtRequest(testContext) + val crtRequest = request.toCrtRequest(testContext, metrics) assertEquals("6", crtRequest.headers["Content-Length"]) val crtBody = crtRequest.body as ReadChannelBodyStream @@ -100,7 +104,7 @@ class RequestConversionTest { ) val testContext = EmptyCoroutineContext + Job() - val crtRequest = request.toCrtRequest(testContext) + val crtRequest = request.toCrtRequest(testContext, metrics) assertEquals("0", crtRequest.headers["Content-Length"]) } @@ -119,7 +123,7 @@ class RequestConversionTest { ) val testContext = EmptyCoroutineContext + Job() - val crtRequest = request.toCrtRequest(testContext) + val crtRequest = request.toCrtRequest(testContext, metrics) assertNotNull(request.body) assertNull(crtRequest.body) } @@ -139,7 +143,7 @@ class RequestConversionTest { ) val testContext = EmptyCoroutineContext + Job() - val crtRequest = request.toCrtRequest(testContext) + val crtRequest = request.toCrtRequest(testContext, metrics) assertNotNull(request.body) assertNull(crtRequest.body) } diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandlerTest.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandlerTest.kt index 568acb4b4..22a01ecfa 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandlerTest.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandlerTest.kt @@ -3,17 +3,23 @@ * SPDX-License-Identifier: Apache-2.0 */ +@file:OptIn(ExperimentalApi::class) + package aws.smithy.kotlin.runtime.http.engine.crt import aws.sdk.kotlin.crt.http.* import aws.sdk.kotlin.crt.io.byteArrayBuffer +import aws.smithy.kotlin.runtime.ExperimentalApi import aws.smithy.kotlin.runtime.http.HttpBody import aws.smithy.kotlin.runtime.http.HttpErrorCode import aws.smithy.kotlin.runtime.http.HttpException import aws.smithy.kotlin.runtime.http.HttpStatusCode +import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics import aws.smithy.kotlin.runtime.io.SdkSink import aws.smithy.kotlin.runtime.io.readAll import aws.smithy.kotlin.runtime.io.readToBuffer +import aws.smithy.kotlin.runtime.operation.ExecutionContext +import aws.smithy.kotlin.runtime.telemetry.TelemetryProvider import io.kotest.matchers.string.shouldContain import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest @@ -39,10 +45,13 @@ class SdkStreamResponseHandlerTest { } private val mockConn = MockHttpClientConnection() + + private val execContext = ExecutionContext() + private val metrics = HttpClientMetrics("", TelemetryProvider.None) @Test fun testWaitSuccessResponse() = runTest { - val handler = SdkStreamResponseHandler(mockConn, coroutineContext) + val handler = SdkStreamResponseHandler(mockConn, coroutineContext, execContext, metrics) val stream = MockHttpStream(200) launch { val headers = listOf( @@ -67,7 +76,7 @@ class SdkStreamResponseHandlerTest { @Test fun testWaitNoHeaders() = runTest { - val handler = SdkStreamResponseHandler(mockConn, coroutineContext) + val handler = SdkStreamResponseHandler(mockConn, coroutineContext, execContext, metrics) val stream = MockHttpStream(200) launch { handler.onResponseComplete(stream, 0) @@ -79,7 +88,7 @@ class SdkStreamResponseHandlerTest { @Test fun testWaitFailedResponse() = runTest { - val handler = SdkStreamResponseHandler(mockConn, coroutineContext) + val handler = SdkStreamResponseHandler(mockConn, coroutineContext, execContext, metrics) val stream = MockHttpStream(200) launch { handler.onResponseComplete(stream, -1) @@ -93,7 +102,7 @@ class SdkStreamResponseHandlerTest { @Test fun testRespBodyCreated() = runTest { - val handler = SdkStreamResponseHandler(mockConn, coroutineContext) + val handler = SdkStreamResponseHandler(mockConn, coroutineContext, execContext, metrics) val stream = MockHttpStream(200) launch { val headers = listOf( @@ -120,7 +129,7 @@ class SdkStreamResponseHandlerTest { @Test fun testRespBody() = runTest { - val handler = SdkStreamResponseHandler(mockConn, coroutineContext) + val handler = SdkStreamResponseHandler(mockConn, coroutineContext, execContext, metrics) val stream = MockHttpStream(200) val data = "Fool of a Took! Throw yourself in next time and rid us of your stupidity!" launch { @@ -148,7 +157,7 @@ class SdkStreamResponseHandlerTest { @Test fun testStreamError() = runTest { - val handler = SdkStreamResponseHandler(mockConn, coroutineContext) + val handler = SdkStreamResponseHandler(mockConn, coroutineContext, execContext, metrics) val stream = MockHttpStream(200) val data = "foo bar" val socketClosedEc = 1051 diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SendChunkedBodyTest.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SendChunkedBodyTest.kt index 8c0f8a05d..dab925ea5 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SendChunkedBodyTest.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SendChunkedBodyTest.kt @@ -6,13 +6,17 @@ package aws.smithy.kotlin.runtime.http.engine.crt import aws.sdk.kotlin.crt.http.HttpStream +import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics import aws.smithy.kotlin.runtime.http.toHttpBody import aws.smithy.kotlin.runtime.io.SdkByteReadChannel import aws.smithy.kotlin.runtime.io.readToByteArray import aws.smithy.kotlin.runtime.io.source +import aws.smithy.kotlin.runtime.telemetry.TelemetryProvider import kotlinx.coroutines.test.runTest import kotlin.test.* +private val metrics = HttpClientMetrics("", TelemetryProvider.None) + class SendChunkedBodyTest { private class MockHttpStream(override val responseStatusCode: Int) : HttpStream { var closed: Boolean = false @@ -33,7 +37,7 @@ class SendChunkedBodyTest { val source = chunkedBytes.source() - stream.sendChunkedBody(source.toHttpBody(chunkedBytes.size.toLong())) + stream.sendChunkedBody(source.toHttpBody(chunkedBytes.size.toLong()), metrics) // source should be fully consumed with 1 chunk written assertEquals(0, source.readToByteArray().size) @@ -52,7 +56,7 @@ class SendChunkedBodyTest { val source = chunkedBytes.source() - stream.sendChunkedBody(source.toHttpBody(chunkedBytes.size.toLong())) + stream.sendChunkedBody(source.toHttpBody(chunkedBytes.size.toLong()), metrics) // source should be fully consumed assertEquals(0, source.readToByteArray().size) @@ -71,7 +75,7 @@ class SendChunkedBodyTest { val channel = SdkByteReadChannel(chunkedBytes) - stream.sendChunkedBody(channel.toHttpBody(chunkedBytes.size.toLong())) + stream.sendChunkedBody(channel.toHttpBody(chunkedBytes.size.toLong()), metrics) // channel should be fully consumed with 1 chunk written assertEquals(0, channel.availableForRead) @@ -91,7 +95,7 @@ class SendChunkedBodyTest { val channel = SdkByteReadChannel(chunkedBytes) - stream.sendChunkedBody(channel.toHttpBody(chunkedBytes.size.toLong())) + stream.sendChunkedBody(channel.toHttpBody(chunkedBytes.size.toLong()), metrics) // source should be fully consumed assertEquals(0, channel.availableForRead) diff --git a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt index ea8c8773d..abe928b99 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt @@ -40,7 +40,9 @@ public class OkHttpEngine( override val engineConstructor: (OkHttpEngineConfig.Builder.() -> Unit) -> OkHttpEngine = ::invoke } - private val metrics = HttpClientMetrics(TELEMETRY_SCOPE, config.telemetryProvider) + private val metrics = HttpClientMetrics(TELEMETRY_SCOPE, config.telemetryProvider).apply { + requestConcurrencyLimit = config.maxConcurrency.toLong() + } private val client = config.buildClient(metrics) override suspend fun roundTrip(context: ExecutionContext, request: HttpRequest): HttpCall { diff --git a/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/EngineAttributes.kt b/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/EngineAttributes.kt index b8eb7ea18..6e03bc88f 100644 --- a/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/EngineAttributes.kt +++ b/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/EngineAttributes.kt @@ -7,6 +7,7 @@ package aws.smithy.kotlin.runtime.http.engine import aws.smithy.kotlin.runtime.InternalApi import aws.smithy.kotlin.runtime.collections.AttributeKey +import aws.smithy.kotlin.runtime.time.Instant import kotlin.time.Duration /** @@ -15,8 +16,8 @@ import kotlin.time.Duration @InternalApi public object EngineAttributes { /** - * The time between sending the request completely and receiving the first byte of the response. This effectively - * measures the time spent waiting on a response. + * The interval between sending the request completely and receiving the first byte of the response. This + * effectively measures the time spent waiting on a response. */ public val TimeToFirstByte: AttributeKey = AttributeKey("aws.smithy.kotlin#TimeToFirstByte") } diff --git a/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/internal/HttpClientMetrics.kt b/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/internal/HttpClientMetrics.kt index a193a81dc..4871f4838 100644 --- a/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/internal/HttpClientMetrics.kt +++ b/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/internal/HttpClientMetrics.kt @@ -176,6 +176,20 @@ public class HttpClientMetrics( _inFlightRequests.update { value } } + /** + * Atomically increase the number of inflight requests by 1. + */ + public fun incrementInflightRequests() { + _inFlightRequests.incrementAndGet() + } + + /** + * Atomically decrease the number of inflight requests by 1. + */ + public fun decrementInflightRequests() { + _inFlightRequests.decrementAndGet() + } + private fun recordRequestsState(measurement: LongAsyncMeasurement) { measurement.record(inFlightRequests, HttpClientMetricAttributes.InFlightRequest) measurement.record(queuedRequests, HttpClientMetricAttributes.QueuedRequest) diff --git a/runtime/runtime-core/api/runtime-core.api b/runtime/runtime-core/api/runtime-core.api index 767ecd7f5..8c29b4224 100644 --- a/runtime/runtime-core/api/runtime-core.api +++ b/runtime/runtime-core/api/runtime-core.api @@ -1537,6 +1537,7 @@ public final class aws/smithy/kotlin/runtime/time/Instant : java/lang/Comparable public final fun getEpochSeconds ()J public final fun getNanosecondsOfSecond ()I public fun hashCode ()I + public final fun minus-5sfh64U (Laws/smithy/kotlin/runtime/time/Instant;)J public final fun minus-LRDsOJo (J)Laws/smithy/kotlin/runtime/time/Instant; public final fun plus-LRDsOJo (J)Laws/smithy/kotlin/runtime/time/Instant; public fun toString ()Ljava/lang/String; @@ -1555,6 +1556,7 @@ public final class aws/smithy/kotlin/runtime/time/Instant$Companion { public final class aws/smithy/kotlin/runtime/time/InstantKt { public static final fun fromEpochMilliseconds (Laws/smithy/kotlin/runtime/time/Instant$Companion;J)Laws/smithy/kotlin/runtime/time/Instant; + public static final fun fromEpochNanoseconds (Laws/smithy/kotlin/runtime/time/Instant$Companion;J)Laws/smithy/kotlin/runtime/time/Instant; public static final fun getEpochMilliseconds (Laws/smithy/kotlin/runtime/time/Instant;)J public static final fun toEpochDouble (Laws/smithy/kotlin/runtime/time/Instant;)D public static final fun until (Laws/smithy/kotlin/runtime/time/Instant;Laws/smithy/kotlin/runtime/time/Instant;)J diff --git a/runtime/runtime-core/common/src/aws/smithy/kotlin/runtime/time/Instant.kt b/runtime/runtime-core/common/src/aws/smithy/kotlin/runtime/time/Instant.kt index f9ffd6395..458f19c95 100644 --- a/runtime/runtime-core/common/src/aws/smithy/kotlin/runtime/time/Instant.kt +++ b/runtime/runtime-core/common/src/aws/smithy/kotlin/runtime/time/Instant.kt @@ -44,6 +44,13 @@ public expect class Instant : Comparable { */ public operator fun minus(duration: Duration): Instant + /** + * Returns a duration representing the amount of time between this and [other]. If [other] is before this instant, + * the resulting duration will be negative. + * @param other The [Instant] marking the end of the duration + */ + public operator fun minus(other: Instant): Duration + public companion object { /** * Parse an ISO-8601 formatted string into an [Instant] @@ -102,4 +109,7 @@ public fun Instant.Companion.fromEpochMilliseconds(milliseconds: Long): Instant return fromEpochSeconds(secs, ns.toInt()) } +public fun Instant.Companion.fromEpochNanoseconds(ns: Long): Instant = + fromEpochSeconds(ns / NS_PER_SEC, ns.mod(NS_PER_SEC)) + public fun Instant.until(other: Instant): Duration = (other.epochMilliseconds - epochMilliseconds).milliseconds diff --git a/runtime/runtime-core/jvm/src/aws/smithy/kotlin/runtime/time/InstantJVM.kt b/runtime/runtime-core/jvm/src/aws/smithy/kotlin/runtime/time/InstantJVM.kt index 54f32fc30..6af064f84 100644 --- a/runtime/runtime-core/jvm/src/aws/smithy/kotlin/runtime/time/InstantJVM.kt +++ b/runtime/runtime-core/jvm/src/aws/smithy/kotlin/runtime/time/InstantJVM.kt @@ -22,6 +22,7 @@ import java.time.format.SignStyle import java.time.temporal.ChronoField import java.time.temporal.ChronoUnit import kotlin.time.Duration +import kotlin.time.Duration.Companion.nanoseconds import java.time.Instant as jtInstant public actual class Instant(internal val value: jtInstant) : Comparable { @@ -57,6 +58,18 @@ public actual class Instant(internal val value: jtInstant) : Comparable */ public actual operator fun minus(duration: Duration): Instant = plus(-duration) + private val ns: Long get() = value.epochSecond * NS_PER_SEC + value.nano + + /** + * Returns a duration representing the amount of time between this and [other]. If [other] is before this instant, + * the resulting duration will be negative. + * @param other The [Instant] marking the end of the duration + */ + public actual operator fun minus(other: Instant): Duration { + val delta = this.ns - other.ns + return delta.nanoseconds + } + /** * Encode the [Instant] as a string into the format specified by [TimestampFormat] */ diff --git a/runtime/runtime-core/native/src/aws/smithy/kotlin/runtime/time/InstantNative.kt b/runtime/runtime-core/native/src/aws/smithy/kotlin/runtime/time/InstantNative.kt index 671cae6e3..434559a91 100644 --- a/runtime/runtime-core/native/src/aws/smithy/kotlin/runtime/time/InstantNative.kt +++ b/runtime/runtime-core/native/src/aws/smithy/kotlin/runtime/time/InstantNative.kt @@ -40,6 +40,15 @@ public actual class Instant : Comparable { TODO("Not yet implemented") } + /** + * Returns a duration representing the amount of time between this and [other]. If [other] is before this instant, + * the resulting duration will be negative. + * @param other The [Instant] marking the end of the duration + */ + public actual operator fun minus(other: Instant): Duration { + TODO("Not yet implemented") + } + public actual companion object { /** * Parse an ISO-8601 formatted string into an [Instant] From 2174a835e5bf088418db88368d4f888204ed9008 Mon Sep 17 00:00:00 2001 From: Ian Botsford <83236726+ianbotsf@users.noreply.github.com> Date: Thu, 21 Dec 2023 21:46:30 +0000 Subject: [PATCH 2/6] lint --- .../runtime/http/engine/crt/RequestUtil.kt | 1 - .../engine/crt/SdkStreamResponseHandler.kt | 31 ++++++++++--------- .../crt/SdkStreamResponseHandlerTest.kt | 2 +- .../runtime/http/engine/EngineAttributes.kt | 1 - 4 files changed, 18 insertions(+), 17 deletions(-) diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt index 254a603bb..4c86aa716 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt @@ -8,7 +8,6 @@ package aws.smithy.kotlin.runtime.http.engine.crt import aws.sdk.kotlin.crt.CRT import aws.sdk.kotlin.crt.CrtRuntimeException import aws.sdk.kotlin.crt.http.HeadersBuilder -import aws.sdk.kotlin.crt.http.HttpRequestBodyStream import aws.sdk.kotlin.crt.http.HttpStream import aws.sdk.kotlin.crt.io.Protocol import aws.sdk.kotlin.crt.io.Uri diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandler.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandler.kt index 63d2a13a8..ddcb440f1 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandler.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandler.kt @@ -205,20 +205,23 @@ internal class SdkStreamResponseHandler( private fun HttpResponse.wrapBody(): HttpResponse { val wrappedBody = when (val originalBody = body) { - is HttpBody.Empty -> return this - is HttpBody.Bytes -> originalBody - .bytes() - .source() - .reportingTo(clientMetrics.bytesReceived) - .toHttpBody(originalBody.contentLength) - is HttpBody.SourceContent -> originalBody - .readFrom() - .reportingTo(clientMetrics.bytesReceived) - .toHttpBody(originalBody.contentLength) - is HttpBody.ChannelContent -> originalBody - .readFrom() - .reportingTo(clientMetrics.bytesReceived) - .toHttpBody(originalBody.contentLength) + is HttpBody.Empty -> return this // Don't need an object copy since we're not wrapping the body + is HttpBody.Bytes -> + originalBody + .bytes() + .source() + .reportingTo(clientMetrics.bytesReceived) + .toHttpBody(originalBody.contentLength) + is HttpBody.SourceContent -> + originalBody + .readFrom() + .reportingTo(clientMetrics.bytesReceived) + .toHttpBody(originalBody.contentLength) + is HttpBody.ChannelContent -> + originalBody + .readFrom() + .reportingTo(clientMetrics.bytesReceived) + .toHttpBody(originalBody.contentLength) } return copy(body = wrappedBody) } diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandlerTest.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandlerTest.kt index 22a01ecfa..7bba18c0f 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandlerTest.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandlerTest.kt @@ -45,7 +45,7 @@ class SdkStreamResponseHandlerTest { } private val mockConn = MockHttpClientConnection() - + private val execContext = ExecutionContext() private val metrics = HttpClientMetrics("", TelemetryProvider.None) diff --git a/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/EngineAttributes.kt b/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/EngineAttributes.kt index 6e03bc88f..52ae787f7 100644 --- a/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/EngineAttributes.kt +++ b/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/EngineAttributes.kt @@ -7,7 +7,6 @@ package aws.smithy.kotlin.runtime.http.engine import aws.smithy.kotlin.runtime.InternalApi import aws.smithy.kotlin.runtime.collections.AttributeKey -import aws.smithy.kotlin.runtime.time.Instant import kotlin.time.Duration /** From 0357d855a8f2c5a822662e545d10a1a988572428 Mon Sep 17 00:00:00 2001 From: Ian Botsford <83236726+ianbotsf@users.noreply.github.com> Date: Fri, 5 Jan 2024 22:02:09 +0000 Subject: [PATCH 3/6] bump to latest aws-crt-kotlin release --- gradle/libs.versions.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index cc0edd931..4398d965d 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -10,7 +10,7 @@ okio-version = "3.6.0" otel-version = "1.32.0" slf4j-version = "2.0.9" slf4j-v1x-version = "1.7.36" -crt-kotlin-version = "0.8.2" +crt-kotlin-version = "0.8.3" # codegen smithy-version = "1.42.0" From 24278a6e7e7121bfed8b2dd561e56d4e360f738d Mon Sep 17 00:00:00 2001 From: Ian Botsford <83236726+ianbotsf@users.noreply.github.com> Date: Mon, 8 Jan 2024 17:09:40 +0000 Subject: [PATCH 4/6] addressing PR feedback --- gradle/libs.versions.toml | 2 +- .../http/engine/crt/ConnectionManager.kt | 12 +++++----- .../crt/SdkStreamResponseHandlerTest.kt | 3 --- .../smithy/kotlin/runtime/time/InstantTest.kt | 24 +++++++++++++++++++ 4 files changed, 31 insertions(+), 10 deletions(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 4baa30cd1..09dd7ab73 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -12,7 +12,7 @@ okio-version = "3.6.0" otel-version = "1.32.0" slf4j-version = "2.0.9" slf4j-v1x-version = "1.7.36" -crt-kotlin-version = "0.8.3" +crt-kotlin-version = "0.8.4-SNAPSHOT" # codegen smithy-version = "1.42.0" diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/ConnectionManager.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/ConnectionManager.kt index add72ece8..de6cd44f4 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/ConnectionManager.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/ConnectionManager.kt @@ -95,14 +95,14 @@ internal class ConnectionManager( throw httpEx } finally { metrics.queuedRequests = pending.decrementAndGet() - emitConnections() + emitMetrics() } } - private fun emitConnections() { - val idleConnections = leases.availablePermits.toLong() - metrics.idleConnections = idleConnections - metrics.acquiredConnections = config.maxConnections.toLong() - idleConnections + private fun emitMetrics() { + val acquiredConnections = connManagers.values.sumOf { it.managerMetrics.leasedConcurrency } + metrics.acquiredConnections = acquiredConnections + metrics.idleConnections = config.maxConnections.toLong() - acquiredConnections } private suspend fun getManagerForUri(uri: Uri, proxyConfig: ProxyConfig): HttpClientConnectionManager = mutex.withLock { @@ -139,7 +139,7 @@ internal class ConnectionManager( delegate.close() } finally { leases.release() - emitConnections() + emitMetrics() } } } diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandlerTest.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandlerTest.kt index 7bba18c0f..ad4a3ef53 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandlerTest.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandlerTest.kt @@ -3,13 +3,10 @@ * SPDX-License-Identifier: Apache-2.0 */ -@file:OptIn(ExperimentalApi::class) - package aws.smithy.kotlin.runtime.http.engine.crt import aws.sdk.kotlin.crt.http.* import aws.sdk.kotlin.crt.io.byteArrayBuffer -import aws.smithy.kotlin.runtime.ExperimentalApi import aws.smithy.kotlin.runtime.http.HttpBody import aws.smithy.kotlin.runtime.http.HttpErrorCode import aws.smithy.kotlin.runtime.http.HttpException diff --git a/runtime/runtime-core/common/test/aws/smithy/kotlin/runtime/time/InstantTest.kt b/runtime/runtime-core/common/test/aws/smithy/kotlin/runtime/time/InstantTest.kt index cd75980dd..f22585624 100644 --- a/runtime/runtime-core/common/test/aws/smithy/kotlin/runtime/time/InstantTest.kt +++ b/runtime/runtime-core/common/test/aws/smithy/kotlin/runtime/time/InstantTest.kt @@ -13,6 +13,7 @@ import kotlin.time.Duration.Companion.hours import kotlin.time.Duration.Companion.minutes import kotlin.time.Duration.Companion.nanoseconds import kotlin.time.Duration.Companion.seconds +import kotlin.time.DurationUnit // tests for conversion from a parsed representation into an Instant instance @@ -197,6 +198,18 @@ class InstantTest { assertEquals(expected2, Instant.fromEpochMilliseconds(ts2)) } + @Test + fun testFromEpochNanoseconds() { + val ts1 = 1_234_567_890_000_000_000L + val expected1 = Instant.fromEpochSeconds(1_234_567_890L) + assertEquals(expected1, Instant.fromEpochNanoseconds(ts1)) + + val ts2 = 1_234_567_890_123_456_789L + val expected2 = Instant.fromEpochSeconds(1_234_567_890L, 123_456_789) + assertEquals(expected2, Instant.fromEpochNanoseconds(ts2)) + + } + // Select tests pulled from edge cases/tickets in the V2 Java SDK. // Always good to learn from others... class V2JavaSdkTests { @@ -244,6 +257,17 @@ class InstantTest { assertEquals(Instant.fromEpochSeconds(990, 0), start - offset) } + @Test + fun testMinusInstant() { + val start = Instant.fromEpochSeconds(1000, 1000) + val end = Instant.fromEpochSeconds(1500, 1200) + val duration = end - start + duration.toComponents { seconds, ns -> + assertEquals(500L, seconds) + assertEquals(200, ns) + } + } + @Test fun testRoundTripUtcOffset() { // sanity check we only ever emit UTC timestamps (e.g. round trip a response with UTC offset) From c36f9fe77e4f7bde5ceca3a88612a12a5934f39b Mon Sep 17 00:00:00 2001 From: Ian Botsford <83236726+ianbotsf@users.noreply.github.com> Date: Wed, 10 Jan 2024 22:57:29 +0000 Subject: [PATCH 5/6] addressing PR feedback --- gradle/libs.versions.toml | 2 +- .../http/engine/crt/ConnectionManager.kt | 33 +++++----- .../runtime/http/engine/crt/CrtHttpEngine.kt | 65 +++++++++++-------- .../http/engine/internal/HttpClientMetrics.kt | 14 ++++ 4 files changed, 67 insertions(+), 47 deletions(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 09dd7ab73..13b6e5e53 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -12,7 +12,7 @@ okio-version = "3.6.0" otel-version = "1.32.0" slf4j-version = "2.0.9" slf4j-v1x-version = "1.7.36" -crt-kotlin-version = "0.8.4-SNAPSHOT" +crt-kotlin-version = "0.8.4" # codegen smithy-version = "1.42.0" diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/ConnectionManager.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/ConnectionManager.kt index de6cd44f4..2017f03a6 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/ConnectionManager.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/ConnectionManager.kt @@ -18,7 +18,6 @@ import aws.smithy.kotlin.runtime.http.request.HttpRequest import aws.smithy.kotlin.runtime.io.Closeable import aws.smithy.kotlin.runtime.net.TlsVersion import aws.smithy.kotlin.runtime.telemetry.metrics.measureSeconds -import kotlinx.atomicfu.atomic import kotlinx.coroutines.TimeoutCancellationException import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Semaphore @@ -33,7 +32,6 @@ internal class ConnectionManager( private val metrics: HttpClientMetrics, ) : Closeable { private val leases = Semaphore(config.maxConnections.toInt()) - private val pending = atomic(0L) private val crtTlsContext: TlsContext = TlsContextOptionsBuilder() .apply { @@ -66,22 +64,18 @@ internal class ConnectionManager( val manager = getManagerForUri(request.uri, proxyConfig) var leaseAcquired = false - metrics.queuedRequests = pending.incrementAndGet() - return try { - metrics.requestsQueuedDuration.measureSeconds { - // wait for an actual connection - val conn = withTimeout(config.connectionAcquireTimeout) { - // get a permit to acquire a connection (limits overall connections since managers are per/host) - leases.acquire() - leaseAcquired = true - metrics.connectionAcquireDuration.measureSeconds { - manager.acquireConnection() - } + // wait for an actual connection + val conn = withTimeout(config.connectionAcquireTimeout) { + // get a permit to acquire a connection (limits overall connections since managers are per/host) + leases.acquire() + leaseAcquired = true + metrics.connectionAcquireDuration.measureSeconds { + manager.acquireConnection() } - - LeasedConnection(conn) } + + LeasedConnection(conn) } catch (ex: Exception) { if (leaseAcquired) { leases.release() @@ -94,15 +88,18 @@ internal class ConnectionManager( throw httpEx } finally { - metrics.queuedRequests = pending.decrementAndGet() emitMetrics() } } private fun emitMetrics() { - val acquiredConnections = connManagers.values.sumOf { it.managerMetrics.leasedConcurrency } + val (acquiredConnections, idleConnections) = connManagers + .values + .map { it.managerMetrics } + .fold(0L to 0L) { (a, i), m -> a + m.leasedConcurrency to i + m.availableConcurrency } + metrics.acquiredConnections = acquiredConnections - metrics.idleConnections = config.maxConnections.toLong() - acquiredConnections + metrics.idleConnections = idleConnections } private suspend fun getManagerForUri(uri: Uri, proxyConfig: ProxyConfig): HttpClientConnectionManager = mutex.withLock { diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt index e51c7139d..b278e4a93 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt @@ -15,10 +15,12 @@ import aws.smithy.kotlin.runtime.http.request.HttpRequest import aws.smithy.kotlin.runtime.io.internal.SdkDispatchers import aws.smithy.kotlin.runtime.operation.ExecutionContext import aws.smithy.kotlin.runtime.telemetry.logging.logger +import aws.smithy.kotlin.runtime.telemetry.metrics.recordSeconds import aws.smithy.kotlin.runtime.time.Instant import kotlinx.coroutines.* import kotlinx.coroutines.sync.Semaphore import kotlinx.coroutines.sync.withPermit +import kotlin.time.TimeSource internal const val DEFAULT_WINDOW_SIZE_BYTES: Int = 16 * 1024 internal const val CHUNK_BUFFER_SIZE: Long = 64 * 1024 @@ -60,41 +62,48 @@ public class CrtHttpEngine(public override val config: CrtHttpEngineConfig) : Ht } private val connectionManager = ConnectionManager(config, metrics) - override suspend fun roundTrip(context: ExecutionContext, request: HttpRequest): HttpCall = requestLimiter.withPermit { - val callContext = callContext() - val logger = callContext.logger() - - // LIFETIME: connection will be released back to the pool/manager when - // the response completes OR on exception (both handled by the completion handler registered on the stream - // handler) - val conn = connectionManager.acquire(request) - logger.trace { "Acquired connection ${conn.id}" } - - val respHandler = SdkStreamResponseHandler(conn, callContext, context, metrics) - callContext.job.invokeOnCompletion { - logger.trace { "completing handler; cause=$it" } - // ensures the stream is driven to completion regardless of what the downstream consumer does - respHandler.complete() - } + override suspend fun roundTrip(context: ExecutionContext, request: HttpRequest): HttpCall { + metrics.incrementQueuedRequests() + val enqueued = TimeSource.Monotonic.markNow() + return requestLimiter.withPermit { + metrics.requestsQueuedDuration.recordSeconds(enqueued.elapsedNow()) + metrics.decrementQueuedRequests() + + val callContext = callContext() + val logger = callContext.logger() + + // LIFETIME: connection will be released back to the pool/manager when + // the response completes OR on exception (both handled by the completion handler registered on the stream + // handler) + val conn = connectionManager.acquire(request) + logger.trace { "Acquired connection ${conn.id}" } + + val respHandler = SdkStreamResponseHandler(conn, callContext, context, metrics) + callContext.job.invokeOnCompletion { + logger.trace { "completing handler; cause=$it" } + // ensures the stream is driven to completion regardless of what the downstream consumer does + respHandler.complete() + } - val reqTime = Instant.now() - val engineRequest = request.toCrtRequest(callContext, metrics) + val reqTime = Instant.now() + val engineRequest = request.toCrtRequest(callContext, metrics) - val stream = mapCrtException { - conn.makeRequest(engineRequest, respHandler).also { stream -> - stream.activate() + val stream = mapCrtException { + conn.makeRequest(engineRequest, respHandler).also { stream -> + stream.activate() + } } - } - if (request.isChunked) { - withContext(SdkDispatchers.IO) { - stream.sendChunkedBody(request.body, metrics) + if (request.isChunked) { + withContext(SdkDispatchers.IO) { + stream.sendChunkedBody(request.body, metrics) + } } - } - val resp = respHandler.waitForResponse() + val resp = respHandler.waitForResponse() - return HttpCall(request, resp, reqTime, Instant.now(), callContext) + HttpCall(request, resp, reqTime, Instant.now(), callContext) + } } override fun shutdown() { diff --git a/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/internal/HttpClientMetrics.kt b/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/internal/HttpClientMetrics.kt index 4871f4838..dfedf390a 100644 --- a/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/internal/HttpClientMetrics.kt +++ b/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/internal/HttpClientMetrics.kt @@ -190,6 +190,20 @@ public class HttpClientMetrics( _inFlightRequests.decrementAndGet() } + /** + * Atomically increase the number of queued requests by 1. + */ + public fun incrementQueuedRequests() { + _queuedRequests.incrementAndGet() + } + + /** + * Atomically decrease the number of queued requests by 1. + */ + public fun decrementQueuedRequests() { + _queuedRequests.decrementAndGet() + } + private fun recordRequestsState(measurement: LongAsyncMeasurement) { measurement.record(inFlightRequests, HttpClientMetricAttributes.InFlightRequest) measurement.record(queuedRequests, HttpClientMetricAttributes.QueuedRequest) From d27f858564aa812da22324039f19844e03f3197d Mon Sep 17 00:00:00 2001 From: Ian Botsford <83236726+ianbotsf@users.noreply.github.com> Date: Wed, 10 Jan 2024 23:01:41 +0000 Subject: [PATCH 6/6] lint --- .../common/test/aws/smithy/kotlin/runtime/time/InstantTest.kt | 2 -- 1 file changed, 2 deletions(-) diff --git a/runtime/runtime-core/common/test/aws/smithy/kotlin/runtime/time/InstantTest.kt b/runtime/runtime-core/common/test/aws/smithy/kotlin/runtime/time/InstantTest.kt index f22585624..b7bac9094 100644 --- a/runtime/runtime-core/common/test/aws/smithy/kotlin/runtime/time/InstantTest.kt +++ b/runtime/runtime-core/common/test/aws/smithy/kotlin/runtime/time/InstantTest.kt @@ -13,7 +13,6 @@ import kotlin.time.Duration.Companion.hours import kotlin.time.Duration.Companion.minutes import kotlin.time.Duration.Companion.nanoseconds import kotlin.time.Duration.Companion.seconds -import kotlin.time.DurationUnit // tests for conversion from a parsed representation into an Instant instance @@ -207,7 +206,6 @@ class InstantTest { val ts2 = 1_234_567_890_123_456_789L val expected2 = Instant.fromEpochSeconds(1_234_567_890L, 123_456_789) assertEquals(expected2, Instant.fromEpochNanoseconds(ts2)) - } // Select tests pulled from edge cases/tickets in the V2 Java SDK.