diff --git a/.changes/e7c3c7ab-749e-4371-8b25-42ea76aa870d.json b/.changes/e7c3c7ab-749e-4371-8b25-42ea76aa870d.json new file mode 100644 index 000000000..4684338eb --- /dev/null +++ b/.changes/e7c3c7ab-749e-4371-8b25-42ea76aa870d.json @@ -0,0 +1,8 @@ +{ + "id": "e7c3c7ab-749e-4371-8b25-42ea76aa870d", + "type": "feature", + "description": "Emit metrics from CRT HTTP engine", + "issues": [ + "https://github.com/awslabs/smithy-kotlin/issues/893" + ] +} \ No newline at end of file diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 89bc5c453..13b6e5e53 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -12,7 +12,7 @@ okio-version = "3.6.0" otel-version = "1.32.0" slf4j-version = "2.0.9" slf4j-v1x-version = "1.7.36" -crt-kotlin-version = "0.8.2" +crt-kotlin-version = "0.8.4" # codegen smithy-version = "1.42.0" diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/ConnectionManager.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/ConnectionManager.kt index 03d94ee6f..2017f03a6 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/ConnectionManager.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/ConnectionManager.kt @@ -13,9 +13,11 @@ import aws.smithy.kotlin.runtime.crt.SdkDefaultIO import aws.smithy.kotlin.runtime.http.HttpErrorCode import aws.smithy.kotlin.runtime.http.HttpException import aws.smithy.kotlin.runtime.http.engine.ProxyConfig +import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics import aws.smithy.kotlin.runtime.http.request.HttpRequest import aws.smithy.kotlin.runtime.io.Closeable import aws.smithy.kotlin.runtime.net.TlsVersion +import aws.smithy.kotlin.runtime.telemetry.metrics.measureSeconds import kotlinx.coroutines.TimeoutCancellationException import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Semaphore @@ -27,6 +29,7 @@ import aws.smithy.kotlin.runtime.net.TlsVersion as SdkTlsVersion internal class ConnectionManager( private val config: CrtHttpEngineConfig, + private val metrics: HttpClientMetrics, ) : Closeable { private val leases = Semaphore(config.maxConnections.toInt()) @@ -67,7 +70,9 @@ internal class ConnectionManager( // get a permit to acquire a connection (limits overall connections since managers are per/host) leases.acquire() leaseAcquired = true - manager.acquireConnection() + metrics.connectionAcquireDuration.measureSeconds { + manager.acquireConnection() + } } LeasedConnection(conn) @@ -82,8 +87,21 @@ internal class ConnectionManager( } throw httpEx + } finally { + emitMetrics() } } + + private fun emitMetrics() { + val (acquiredConnections, idleConnections) = connManagers + .values + .map { it.managerMetrics } + .fold(0L to 0L) { (a, i), m -> a + m.leasedConcurrency to i + m.availableConcurrency } + + metrics.acquiredConnections = acquiredConnections + metrics.idleConnections = idleConnections + } + private suspend fun getManagerForUri(uri: Uri, proxyConfig: ProxyConfig): HttpClientConnectionManager = mutex.withLock { connManagers.getOrPut(uri.authority) { val connOpts = options.apply { @@ -105,6 +123,7 @@ internal class ConnectionManager( HttpClientConnectionManager(connOpts) } } + override fun close() { connManagers.forEach { entry -> entry.value.close() } crtTlsContext.close() @@ -117,6 +136,7 @@ internal class ConnectionManager( delegate.close() } finally { leases.release() + emitMetrics() } } } diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt index 838b7e05f..b278e4a93 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/CrtHttpEngine.kt @@ -10,18 +10,23 @@ import aws.smithy.kotlin.runtime.http.config.EngineFactory import aws.smithy.kotlin.runtime.http.engine.HttpClientEngine import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineBase import aws.smithy.kotlin.runtime.http.engine.callContext +import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics import aws.smithy.kotlin.runtime.http.request.HttpRequest import aws.smithy.kotlin.runtime.io.internal.SdkDispatchers import aws.smithy.kotlin.runtime.operation.ExecutionContext import aws.smithy.kotlin.runtime.telemetry.logging.logger +import aws.smithy.kotlin.runtime.telemetry.metrics.recordSeconds import aws.smithy.kotlin.runtime.time.Instant import kotlinx.coroutines.* import kotlinx.coroutines.sync.Semaphore import kotlinx.coroutines.sync.withPermit +import kotlin.time.TimeSource internal const val DEFAULT_WINDOW_SIZE_BYTES: Int = 16 * 1024 internal const val CHUNK_BUFFER_SIZE: Long = 64 * 1024 +private const val TELEMETRY_SCOPE = "aws.smithy.kotlin.runtime.http.engine.crt" + /** * [HttpClientEngine] based on the AWS Common Runtime HTTP client */ @@ -51,43 +56,54 @@ public class CrtHttpEngine(public override val config: CrtHttpEngineConfig) : Ht // } private val requestLimiter = Semaphore(config.maxConcurrency.toInt()) - private val connectionManager = ConnectionManager(config) - - override suspend fun roundTrip(context: ExecutionContext, request: HttpRequest): HttpCall = requestLimiter.withPermit { - val callContext = callContext() - val logger = callContext.logger() - - // LIFETIME: connection will be released back to the pool/manager when - // the response completes OR on exception (both handled by the completion handler registered on the stream - // handler) - val conn = connectionManager.acquire(request) - logger.trace { "Acquired connection ${conn.id}" } - - val respHandler = SdkStreamResponseHandler(conn, callContext) - callContext.job.invokeOnCompletion { - logger.trace { "completing handler; cause=$it" } - // ensures the stream is driven to completion regardless of what the downstream consumer does - respHandler.complete() - } + private val metrics = HttpClientMetrics(TELEMETRY_SCOPE, config.telemetryProvider).apply { + connectionsLimit = config.maxConnections.toLong() + requestConcurrencyLimit = config.maxConcurrency.toLong() + } + private val connectionManager = ConnectionManager(config, metrics) + + override suspend fun roundTrip(context: ExecutionContext, request: HttpRequest): HttpCall { + metrics.incrementQueuedRequests() + val enqueued = TimeSource.Monotonic.markNow() + return requestLimiter.withPermit { + metrics.requestsQueuedDuration.recordSeconds(enqueued.elapsedNow()) + metrics.decrementQueuedRequests() + + val callContext = callContext() + val logger = callContext.logger() + + // LIFETIME: connection will be released back to the pool/manager when + // the response completes OR on exception (both handled by the completion handler registered on the stream + // handler) + val conn = connectionManager.acquire(request) + logger.trace { "Acquired connection ${conn.id}" } + + val respHandler = SdkStreamResponseHandler(conn, callContext, context, metrics) + callContext.job.invokeOnCompletion { + logger.trace { "completing handler; cause=$it" } + // ensures the stream is driven to completion regardless of what the downstream consumer does + respHandler.complete() + } - val reqTime = Instant.now() - val engineRequest = request.toCrtRequest(callContext) + val reqTime = Instant.now() + val engineRequest = request.toCrtRequest(callContext, metrics) - val stream = mapCrtException { - conn.makeRequest(engineRequest, respHandler).also { stream -> - stream.activate() + val stream = mapCrtException { + conn.makeRequest(engineRequest, respHandler).also { stream -> + stream.activate() + } } - } - if (request.isChunked) { - withContext(SdkDispatchers.IO) { - stream.sendChunkedBody(request.body) + if (request.isChunked) { + withContext(SdkDispatchers.IO) { + stream.sendChunkedBody(request.body, metrics) + } } - } - val resp = respHandler.waitForResponse() + val resp = respHandler.waitForResponse() - return HttpCall(request, resp, reqTime, Instant.now(), callContext) + HttpCall(request, resp, reqTime, Instant.now(), callContext) + } } override fun shutdown() { diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt index e367bdea5..4c86aa716 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/RequestUtil.kt @@ -8,7 +8,6 @@ package aws.smithy.kotlin.runtime.http.engine.crt import aws.sdk.kotlin.crt.CRT import aws.sdk.kotlin.crt.CrtRuntimeException import aws.sdk.kotlin.crt.http.HeadersBuilder -import aws.sdk.kotlin.crt.http.HttpRequestBodyStream import aws.sdk.kotlin.crt.http.HttpStream import aws.sdk.kotlin.crt.io.Protocol import aws.sdk.kotlin.crt.io.Uri @@ -18,10 +17,13 @@ import aws.smithy.kotlin.runtime.crt.SdkSourceBodyStream import aws.smithy.kotlin.runtime.http.HttpBody import aws.smithy.kotlin.runtime.http.HttpErrorCode import aws.smithy.kotlin.runtime.http.HttpException +import aws.smithy.kotlin.runtime.http.engine.crt.io.reportingTo +import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics import aws.smithy.kotlin.runtime.http.request.HttpRequest import aws.smithy.kotlin.runtime.io.SdkBuffer import aws.smithy.kotlin.runtime.io.buffer import aws.smithy.kotlin.runtime.io.readToByteArray +import aws.smithy.kotlin.runtime.io.source import kotlinx.coroutines.job import kotlin.coroutines.CoroutineContext @@ -42,7 +44,10 @@ internal val HttpRequest.uri: Uri } } -internal fun HttpRequest.toCrtRequest(callContext: CoroutineContext): aws.sdk.kotlin.crt.http.HttpRequest { +internal fun HttpRequest.toCrtRequest( + callContext: CoroutineContext, + metrics: HttpClientMetrics, +): aws.sdk.kotlin.crt.http.HttpRequest { val body = this.body check(!body.isDuplex) { "CrtHttpEngine does not yet support full duplex streams" } val bodyStream = if (isChunked) { @@ -50,10 +55,16 @@ internal fun HttpRequest.toCrtRequest(callContext: CoroutineContext): aws.sdk.ko } else { when (body) { is HttpBody.Empty -> null - is HttpBody.Bytes -> HttpRequestBodyStream.fromByteArray(body.bytes()) - is HttpBody.ChannelContent -> ReadChannelBodyStream(body.readFrom(), callContext) + is HttpBody.Bytes -> { + val source = body.bytes().source().reportingTo(metrics.bytesSent) + SdkSourceBodyStream(source) + } + is HttpBody.ChannelContent -> { + val source = body.readFrom().reportingTo(metrics.bytesSent) + ReadChannelBodyStream(source, callContext) + } is HttpBody.SourceContent -> { - val source = body.readFrom() + val source = body.readFrom().reportingTo(metrics.bytesSent) callContext.job.invokeOnCompletion { source.close() } @@ -85,10 +96,10 @@ internal val HttpRequest.isChunked: Boolean get() = (this.body is HttpBody.Sourc * Send a chunked body using the CRT writeChunk bindings. * @param body an HTTP body that has a chunked content encoding. Must be [HttpBody.SourceContent] or [HttpBody.ChannelContent] */ -internal suspend fun HttpStream.sendChunkedBody(body: HttpBody) { +internal suspend fun HttpStream.sendChunkedBody(body: HttpBody, metrics: HttpClientMetrics) { when (body) { is HttpBody.SourceContent -> { - val source = body.readFrom() + val source = body.readFrom().reportingTo(metrics.bytesSent) val bufferedSource = source.buffer() while (!bufferedSource.exhausted()) { @@ -97,7 +108,7 @@ internal suspend fun HttpStream.sendChunkedBody(body: HttpBody) { } } is HttpBody.ChannelContent -> { - val chan = body.readFrom() + val chan = body.readFrom().reportingTo(metrics.bytesSent) var buffer = SdkBuffer() val nextBuffer = SdkBuffer() var sentFirstChunk = false diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandler.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandler.kt index cd327364f..ddcb440f1 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandler.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandler.kt @@ -10,16 +10,27 @@ import aws.sdk.kotlin.crt.io.Buffer import aws.smithy.kotlin.runtime.http.* import aws.smithy.kotlin.runtime.http.HeadersBuilder import aws.smithy.kotlin.runtime.http.HttpException +import aws.smithy.kotlin.runtime.http.engine.EngineAttributes +import aws.smithy.kotlin.runtime.http.engine.crt.io.reportingTo +import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics import aws.smithy.kotlin.runtime.http.response.HttpResponse +import aws.smithy.kotlin.runtime.http.response.copy import aws.smithy.kotlin.runtime.io.SdkBuffer import aws.smithy.kotlin.runtime.io.SdkByteChannel import aws.smithy.kotlin.runtime.io.SdkByteReadChannel +import aws.smithy.kotlin.runtime.io.source +import aws.smithy.kotlin.runtime.operation.ExecutionContext import aws.smithy.kotlin.runtime.telemetry.logging.logger +import aws.smithy.kotlin.runtime.telemetry.metrics.recordSeconds +import aws.smithy.kotlin.runtime.time.Instant +import aws.smithy.kotlin.runtime.time.fromEpochNanoseconds import aws.smithy.kotlin.runtime.util.derivedName import kotlinx.atomicfu.locks.reentrantLock import kotlinx.atomicfu.locks.withLock -import kotlinx.coroutines.* +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch import kotlin.coroutines.CoroutineContext /** @@ -30,6 +41,8 @@ import kotlin.coroutines.CoroutineContext internal class SdkStreamResponseHandler( private val conn: HttpClientConnection, private val callContext: CoroutineContext, + private val execContext: ExecutionContext, + private val clientMetrics: HttpClientMetrics, ) : HttpStreamResponseHandler { // TODO - need to cancel the stream when the body is closed from the caller side early. // There is no great way to do that currently without either (1) closing the connection or (2) throwing an @@ -57,6 +70,10 @@ internal class SdkStreamResponseHandler( private var streamCompleted = false + init { + clientMetrics.incrementInflightRequests() + } + /** * Called by the response read channel as data is consumed * @param size the number of bytes consumed @@ -184,7 +201,30 @@ internal class SdkStreamResponseHandler( } internal suspend fun waitForResponse(): HttpResponse = - responseReady.receive() + responseReady.receive().wrapBody() + + private fun HttpResponse.wrapBody(): HttpResponse { + val wrappedBody = when (val originalBody = body) { + is HttpBody.Empty -> return this // Don't need an object copy since we're not wrapping the body + is HttpBody.Bytes -> + originalBody + .bytes() + .source() + .reportingTo(clientMetrics.bytesReceived) + .toHttpBody(originalBody.contentLength) + is HttpBody.SourceContent -> + originalBody + .readFrom() + .reportingTo(clientMetrics.bytesReceived) + .toHttpBody(originalBody.contentLength) + is HttpBody.ChannelContent -> + originalBody + .readFrom() + .reportingTo(clientMetrics.bytesReceived) + .toHttpBody(originalBody.contentLength) + } + return copy(body = wrappedBody) + } /** * Invoked only after the consumer is finished with the response and it is safe to cleanup resources @@ -197,6 +237,8 @@ internal class SdkStreamResponseHandler( // and more data is pending arrival). It can also happen if the coroutine for this request is cancelled // before onResponseComplete fires. lock.withLock { + clientMetrics.decrementInflightRequests() + val forceClose = !streamCompleted if (forceClose) { @@ -210,4 +252,19 @@ internal class SdkStreamResponseHandler( conn.close() } } + + override fun onMetrics(stream: HttpStream, metrics: HttpStreamMetrics) { + val sendEnd = positiveInstantOrNull(metrics.sendEndTimestampNs) + val receiveStart = positiveInstantOrNull(metrics.receiveStartTimestampNs) + + if (sendEnd != null && receiveStart != null) { + val ttfb = receiveStart - sendEnd + if (ttfb.isPositive()) { + clientMetrics.timeToFirstByteDuration.recordSeconds(ttfb) + execContext[EngineAttributes.TimeToFirstByte] = ttfb + } + } + } } + +private fun positiveInstantOrNull(ns: Long): Instant? = if (ns > 0) Instant.fromEpochNanoseconds(ns) else null diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/io/ReportingByteReadChannel.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/io/ReportingByteReadChannel.kt new file mode 100644 index 000000000..af3958dfe --- /dev/null +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/io/ReportingByteReadChannel.kt @@ -0,0 +1,21 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ +package aws.smithy.kotlin.runtime.http.engine.crt.io + +import aws.smithy.kotlin.runtime.io.SdkBuffer +import aws.smithy.kotlin.runtime.io.SdkByteReadChannel +import aws.smithy.kotlin.runtime.telemetry.metrics.MonotonicCounter + +private class ReportingByteReadChannel( + val delegate: SdkByteReadChannel, + val metric: MonotonicCounter, +) : SdkByteReadChannel by delegate { + override suspend fun read(sink: SdkBuffer, limit: Long): Long = delegate.read(sink, limit).also { + if (it > 0) metric.add(it) + } +} + +internal fun SdkByteReadChannel.reportingTo(metric: MonotonicCounter): SdkByteReadChannel = + ReportingByteReadChannel(this, metric) diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/io/ReportingSource.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/io/ReportingSource.kt new file mode 100644 index 000000000..4ebb614b6 --- /dev/null +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/src/aws/smithy/kotlin/runtime/http/engine/crt/io/ReportingSource.kt @@ -0,0 +1,17 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ +package aws.smithy.kotlin.runtime.http.engine.crt.io + +import aws.smithy.kotlin.runtime.io.SdkBuffer +import aws.smithy.kotlin.runtime.io.SdkSource +import aws.smithy.kotlin.runtime.telemetry.metrics.MonotonicCounter + +private class ReportingSource(val delegate: SdkSource, val metric: MonotonicCounter) : SdkSource by delegate { + override fun read(sink: SdkBuffer, limit: Long): Long = delegate.read(sink, limit).also { + if (it > 0) metric.add(it) + } +} + +internal fun SdkSource.reportingTo(metric: MonotonicCounter): SdkSource = ReportingSource(this, metric) diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/RequestConversionTest.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/RequestConversionTest.kt index 88e88561e..7efaccc17 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/RequestConversionTest.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/RequestConversionTest.kt @@ -8,16 +8,20 @@ package aws.smithy.kotlin.runtime.http.engine.crt import aws.smithy.kotlin.runtime.content.ByteStream import aws.smithy.kotlin.runtime.crt.ReadChannelBodyStream import aws.smithy.kotlin.runtime.http.* +import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics import aws.smithy.kotlin.runtime.http.request.HttpRequest import aws.smithy.kotlin.runtime.io.SdkByteReadChannel import aws.smithy.kotlin.runtime.io.SdkSource import aws.smithy.kotlin.runtime.io.source import aws.smithy.kotlin.runtime.net.url.Url +import aws.smithy.kotlin.runtime.telemetry.TelemetryProvider import kotlinx.coroutines.Job import kotlinx.coroutines.cancel import kotlin.coroutines.EmptyCoroutineContext import kotlin.test.* +private val metrics = HttpClientMetrics("", TelemetryProvider.None) + class RequestConversionTest { private fun byteStreamFromContents(contents: String): ByteStream = object : ByteStream.ChannelStream() { @@ -48,7 +52,7 @@ class RequestConversionTest { body, ) - val crtRequest = request.toCrtRequest(EmptyCoroutineContext) + val crtRequest = request.toCrtRequest(EmptyCoroutineContext, metrics) assertEquals("POST", crtRequest.method) assertFalse(crtRequest.body is ReadChannelBodyStream) } @@ -65,7 +69,7 @@ class RequestConversionTest { ) val testContext = EmptyCoroutineContext + Job() - val crtRequest = request.toCrtRequest(testContext) + val crtRequest = request.toCrtRequest(testContext, metrics) assertEquals("POST", crtRequest.method) val crtBody = crtRequest.body as ReadChannelBodyStream crtBody.cancel() @@ -83,7 +87,7 @@ class RequestConversionTest { ) val testContext = EmptyCoroutineContext + Job() - val crtRequest = request.toCrtRequest(testContext) + val crtRequest = request.toCrtRequest(testContext, metrics) assertEquals("6", crtRequest.headers["Content-Length"]) val crtBody = crtRequest.body as ReadChannelBodyStream @@ -100,7 +104,7 @@ class RequestConversionTest { ) val testContext = EmptyCoroutineContext + Job() - val crtRequest = request.toCrtRequest(testContext) + val crtRequest = request.toCrtRequest(testContext, metrics) assertEquals("0", crtRequest.headers["Content-Length"]) } @@ -119,7 +123,7 @@ class RequestConversionTest { ) val testContext = EmptyCoroutineContext + Job() - val crtRequest = request.toCrtRequest(testContext) + val crtRequest = request.toCrtRequest(testContext, metrics) assertNotNull(request.body) assertNull(crtRequest.body) } @@ -139,7 +143,7 @@ class RequestConversionTest { ) val testContext = EmptyCoroutineContext + Job() - val crtRequest = request.toCrtRequest(testContext) + val crtRequest = request.toCrtRequest(testContext, metrics) assertNotNull(request.body) assertNull(crtRequest.body) } diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandlerTest.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandlerTest.kt index 568acb4b4..ad4a3ef53 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandlerTest.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SdkStreamResponseHandlerTest.kt @@ -11,9 +11,12 @@ import aws.smithy.kotlin.runtime.http.HttpBody import aws.smithy.kotlin.runtime.http.HttpErrorCode import aws.smithy.kotlin.runtime.http.HttpException import aws.smithy.kotlin.runtime.http.HttpStatusCode +import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics import aws.smithy.kotlin.runtime.io.SdkSink import aws.smithy.kotlin.runtime.io.readAll import aws.smithy.kotlin.runtime.io.readToBuffer +import aws.smithy.kotlin.runtime.operation.ExecutionContext +import aws.smithy.kotlin.runtime.telemetry.TelemetryProvider import io.kotest.matchers.string.shouldContain import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest @@ -40,9 +43,12 @@ class SdkStreamResponseHandlerTest { private val mockConn = MockHttpClientConnection() + private val execContext = ExecutionContext() + private val metrics = HttpClientMetrics("", TelemetryProvider.None) + @Test fun testWaitSuccessResponse() = runTest { - val handler = SdkStreamResponseHandler(mockConn, coroutineContext) + val handler = SdkStreamResponseHandler(mockConn, coroutineContext, execContext, metrics) val stream = MockHttpStream(200) launch { val headers = listOf( @@ -67,7 +73,7 @@ class SdkStreamResponseHandlerTest { @Test fun testWaitNoHeaders() = runTest { - val handler = SdkStreamResponseHandler(mockConn, coroutineContext) + val handler = SdkStreamResponseHandler(mockConn, coroutineContext, execContext, metrics) val stream = MockHttpStream(200) launch { handler.onResponseComplete(stream, 0) @@ -79,7 +85,7 @@ class SdkStreamResponseHandlerTest { @Test fun testWaitFailedResponse() = runTest { - val handler = SdkStreamResponseHandler(mockConn, coroutineContext) + val handler = SdkStreamResponseHandler(mockConn, coroutineContext, execContext, metrics) val stream = MockHttpStream(200) launch { handler.onResponseComplete(stream, -1) @@ -93,7 +99,7 @@ class SdkStreamResponseHandlerTest { @Test fun testRespBodyCreated() = runTest { - val handler = SdkStreamResponseHandler(mockConn, coroutineContext) + val handler = SdkStreamResponseHandler(mockConn, coroutineContext, execContext, metrics) val stream = MockHttpStream(200) launch { val headers = listOf( @@ -120,7 +126,7 @@ class SdkStreamResponseHandlerTest { @Test fun testRespBody() = runTest { - val handler = SdkStreamResponseHandler(mockConn, coroutineContext) + val handler = SdkStreamResponseHandler(mockConn, coroutineContext, execContext, metrics) val stream = MockHttpStream(200) val data = "Fool of a Took! Throw yourself in next time and rid us of your stupidity!" launch { @@ -148,7 +154,7 @@ class SdkStreamResponseHandlerTest { @Test fun testStreamError() = runTest { - val handler = SdkStreamResponseHandler(mockConn, coroutineContext) + val handler = SdkStreamResponseHandler(mockConn, coroutineContext, execContext, metrics) val stream = MockHttpStream(200) val data = "foo bar" val socketClosedEc = 1051 diff --git a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SendChunkedBodyTest.kt b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SendChunkedBodyTest.kt index 8c0f8a05d..dab925ea5 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SendChunkedBodyTest.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-crt/jvm/test/aws/smithy/kotlin/runtime/http/engine/crt/SendChunkedBodyTest.kt @@ -6,13 +6,17 @@ package aws.smithy.kotlin.runtime.http.engine.crt import aws.sdk.kotlin.crt.http.HttpStream +import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics import aws.smithy.kotlin.runtime.http.toHttpBody import aws.smithy.kotlin.runtime.io.SdkByteReadChannel import aws.smithy.kotlin.runtime.io.readToByteArray import aws.smithy.kotlin.runtime.io.source +import aws.smithy.kotlin.runtime.telemetry.TelemetryProvider import kotlinx.coroutines.test.runTest import kotlin.test.* +private val metrics = HttpClientMetrics("", TelemetryProvider.None) + class SendChunkedBodyTest { private class MockHttpStream(override val responseStatusCode: Int) : HttpStream { var closed: Boolean = false @@ -33,7 +37,7 @@ class SendChunkedBodyTest { val source = chunkedBytes.source() - stream.sendChunkedBody(source.toHttpBody(chunkedBytes.size.toLong())) + stream.sendChunkedBody(source.toHttpBody(chunkedBytes.size.toLong()), metrics) // source should be fully consumed with 1 chunk written assertEquals(0, source.readToByteArray().size) @@ -52,7 +56,7 @@ class SendChunkedBodyTest { val source = chunkedBytes.source() - stream.sendChunkedBody(source.toHttpBody(chunkedBytes.size.toLong())) + stream.sendChunkedBody(source.toHttpBody(chunkedBytes.size.toLong()), metrics) // source should be fully consumed assertEquals(0, source.readToByteArray().size) @@ -71,7 +75,7 @@ class SendChunkedBodyTest { val channel = SdkByteReadChannel(chunkedBytes) - stream.sendChunkedBody(channel.toHttpBody(chunkedBytes.size.toLong())) + stream.sendChunkedBody(channel.toHttpBody(chunkedBytes.size.toLong()), metrics) // channel should be fully consumed with 1 chunk written assertEquals(0, channel.availableForRead) @@ -91,7 +95,7 @@ class SendChunkedBodyTest { val channel = SdkByteReadChannel(chunkedBytes) - stream.sendChunkedBody(channel.toHttpBody(chunkedBytes.size.toLong())) + stream.sendChunkedBody(channel.toHttpBody(chunkedBytes.size.toLong()), metrics) // source should be fully consumed assertEquals(0, channel.availableForRead) diff --git a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt index ea8c8773d..abe928b99 100644 --- a/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt +++ b/runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt @@ -40,7 +40,9 @@ public class OkHttpEngine( override val engineConstructor: (OkHttpEngineConfig.Builder.() -> Unit) -> OkHttpEngine = ::invoke } - private val metrics = HttpClientMetrics(TELEMETRY_SCOPE, config.telemetryProvider) + private val metrics = HttpClientMetrics(TELEMETRY_SCOPE, config.telemetryProvider).apply { + requestConcurrencyLimit = config.maxConcurrency.toLong() + } private val client = config.buildClient(metrics) override suspend fun roundTrip(context: ExecutionContext, request: HttpRequest): HttpCall { diff --git a/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/EngineAttributes.kt b/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/EngineAttributes.kt index b8eb7ea18..52ae787f7 100644 --- a/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/EngineAttributes.kt +++ b/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/EngineAttributes.kt @@ -15,8 +15,8 @@ import kotlin.time.Duration @InternalApi public object EngineAttributes { /** - * The time between sending the request completely and receiving the first byte of the response. This effectively - * measures the time spent waiting on a response. + * The interval between sending the request completely and receiving the first byte of the response. This + * effectively measures the time spent waiting on a response. */ public val TimeToFirstByte: AttributeKey = AttributeKey("aws.smithy.kotlin#TimeToFirstByte") } diff --git a/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/internal/HttpClientMetrics.kt b/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/internal/HttpClientMetrics.kt index a193a81dc..dfedf390a 100644 --- a/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/internal/HttpClientMetrics.kt +++ b/runtime/protocol/http-client/common/src/aws/smithy/kotlin/runtime/http/engine/internal/HttpClientMetrics.kt @@ -176,6 +176,34 @@ public class HttpClientMetrics( _inFlightRequests.update { value } } + /** + * Atomically increase the number of inflight requests by 1. + */ + public fun incrementInflightRequests() { + _inFlightRequests.incrementAndGet() + } + + /** + * Atomically decrease the number of inflight requests by 1. + */ + public fun decrementInflightRequests() { + _inFlightRequests.decrementAndGet() + } + + /** + * Atomically increase the number of queued requests by 1. + */ + public fun incrementQueuedRequests() { + _queuedRequests.incrementAndGet() + } + + /** + * Atomically decrease the number of queued requests by 1. + */ + public fun decrementQueuedRequests() { + _queuedRequests.decrementAndGet() + } + private fun recordRequestsState(measurement: LongAsyncMeasurement) { measurement.record(inFlightRequests, HttpClientMetricAttributes.InFlightRequest) measurement.record(queuedRequests, HttpClientMetricAttributes.QueuedRequest) diff --git a/runtime/runtime-core/api/runtime-core.api b/runtime/runtime-core/api/runtime-core.api index 767ecd7f5..8c29b4224 100644 --- a/runtime/runtime-core/api/runtime-core.api +++ b/runtime/runtime-core/api/runtime-core.api @@ -1537,6 +1537,7 @@ public final class aws/smithy/kotlin/runtime/time/Instant : java/lang/Comparable public final fun getEpochSeconds ()J public final fun getNanosecondsOfSecond ()I public fun hashCode ()I + public final fun minus-5sfh64U (Laws/smithy/kotlin/runtime/time/Instant;)J public final fun minus-LRDsOJo (J)Laws/smithy/kotlin/runtime/time/Instant; public final fun plus-LRDsOJo (J)Laws/smithy/kotlin/runtime/time/Instant; public fun toString ()Ljava/lang/String; @@ -1555,6 +1556,7 @@ public final class aws/smithy/kotlin/runtime/time/Instant$Companion { public final class aws/smithy/kotlin/runtime/time/InstantKt { public static final fun fromEpochMilliseconds (Laws/smithy/kotlin/runtime/time/Instant$Companion;J)Laws/smithy/kotlin/runtime/time/Instant; + public static final fun fromEpochNanoseconds (Laws/smithy/kotlin/runtime/time/Instant$Companion;J)Laws/smithy/kotlin/runtime/time/Instant; public static final fun getEpochMilliseconds (Laws/smithy/kotlin/runtime/time/Instant;)J public static final fun toEpochDouble (Laws/smithy/kotlin/runtime/time/Instant;)D public static final fun until (Laws/smithy/kotlin/runtime/time/Instant;Laws/smithy/kotlin/runtime/time/Instant;)J diff --git a/runtime/runtime-core/common/src/aws/smithy/kotlin/runtime/time/Instant.kt b/runtime/runtime-core/common/src/aws/smithy/kotlin/runtime/time/Instant.kt index f9ffd6395..458f19c95 100644 --- a/runtime/runtime-core/common/src/aws/smithy/kotlin/runtime/time/Instant.kt +++ b/runtime/runtime-core/common/src/aws/smithy/kotlin/runtime/time/Instant.kt @@ -44,6 +44,13 @@ public expect class Instant : Comparable { */ public operator fun minus(duration: Duration): Instant + /** + * Returns a duration representing the amount of time between this and [other]. If [other] is before this instant, + * the resulting duration will be negative. + * @param other The [Instant] marking the end of the duration + */ + public operator fun minus(other: Instant): Duration + public companion object { /** * Parse an ISO-8601 formatted string into an [Instant] @@ -102,4 +109,7 @@ public fun Instant.Companion.fromEpochMilliseconds(milliseconds: Long): Instant return fromEpochSeconds(secs, ns.toInt()) } +public fun Instant.Companion.fromEpochNanoseconds(ns: Long): Instant = + fromEpochSeconds(ns / NS_PER_SEC, ns.mod(NS_PER_SEC)) + public fun Instant.until(other: Instant): Duration = (other.epochMilliseconds - epochMilliseconds).milliseconds diff --git a/runtime/runtime-core/common/test/aws/smithy/kotlin/runtime/time/InstantTest.kt b/runtime/runtime-core/common/test/aws/smithy/kotlin/runtime/time/InstantTest.kt index cd75980dd..b7bac9094 100644 --- a/runtime/runtime-core/common/test/aws/smithy/kotlin/runtime/time/InstantTest.kt +++ b/runtime/runtime-core/common/test/aws/smithy/kotlin/runtime/time/InstantTest.kt @@ -197,6 +197,17 @@ class InstantTest { assertEquals(expected2, Instant.fromEpochMilliseconds(ts2)) } + @Test + fun testFromEpochNanoseconds() { + val ts1 = 1_234_567_890_000_000_000L + val expected1 = Instant.fromEpochSeconds(1_234_567_890L) + assertEquals(expected1, Instant.fromEpochNanoseconds(ts1)) + + val ts2 = 1_234_567_890_123_456_789L + val expected2 = Instant.fromEpochSeconds(1_234_567_890L, 123_456_789) + assertEquals(expected2, Instant.fromEpochNanoseconds(ts2)) + } + // Select tests pulled from edge cases/tickets in the V2 Java SDK. // Always good to learn from others... class V2JavaSdkTests { @@ -244,6 +255,17 @@ class InstantTest { assertEquals(Instant.fromEpochSeconds(990, 0), start - offset) } + @Test + fun testMinusInstant() { + val start = Instant.fromEpochSeconds(1000, 1000) + val end = Instant.fromEpochSeconds(1500, 1200) + val duration = end - start + duration.toComponents { seconds, ns -> + assertEquals(500L, seconds) + assertEquals(200, ns) + } + } + @Test fun testRoundTripUtcOffset() { // sanity check we only ever emit UTC timestamps (e.g. round trip a response with UTC offset) diff --git a/runtime/runtime-core/jvm/src/aws/smithy/kotlin/runtime/time/InstantJVM.kt b/runtime/runtime-core/jvm/src/aws/smithy/kotlin/runtime/time/InstantJVM.kt index 54f32fc30..6af064f84 100644 --- a/runtime/runtime-core/jvm/src/aws/smithy/kotlin/runtime/time/InstantJVM.kt +++ b/runtime/runtime-core/jvm/src/aws/smithy/kotlin/runtime/time/InstantJVM.kt @@ -22,6 +22,7 @@ import java.time.format.SignStyle import java.time.temporal.ChronoField import java.time.temporal.ChronoUnit import kotlin.time.Duration +import kotlin.time.Duration.Companion.nanoseconds import java.time.Instant as jtInstant public actual class Instant(internal val value: jtInstant) : Comparable { @@ -57,6 +58,18 @@ public actual class Instant(internal val value: jtInstant) : Comparable */ public actual operator fun minus(duration: Duration): Instant = plus(-duration) + private val ns: Long get() = value.epochSecond * NS_PER_SEC + value.nano + + /** + * Returns a duration representing the amount of time between this and [other]. If [other] is before this instant, + * the resulting duration will be negative. + * @param other The [Instant] marking the end of the duration + */ + public actual operator fun minus(other: Instant): Duration { + val delta = this.ns - other.ns + return delta.nanoseconds + } + /** * Encode the [Instant] as a string into the format specified by [TimestampFormat] */ diff --git a/runtime/runtime-core/native/src/aws/smithy/kotlin/runtime/time/InstantNative.kt b/runtime/runtime-core/native/src/aws/smithy/kotlin/runtime/time/InstantNative.kt index 671cae6e3..434559a91 100644 --- a/runtime/runtime-core/native/src/aws/smithy/kotlin/runtime/time/InstantNative.kt +++ b/runtime/runtime-core/native/src/aws/smithy/kotlin/runtime/time/InstantNative.kt @@ -40,6 +40,15 @@ public actual class Instant : Comparable { TODO("Not yet implemented") } + /** + * Returns a duration representing the amount of time between this and [other]. If [other] is before this instant, + * the resulting duration will be negative. + * @param other The [Instant] marking the end of the duration + */ + public actual operator fun minus(other: Instant): Duration { + TODO("Not yet implemented") + } + public actual companion object { /** * Parse an ISO-8601 formatted string into an [Instant]