Skip to content

feat: emit metrics from CRT HTTP engine #1017

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changes/e7c3c7ab-749e-4371-8b25-42ea76aa870d.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"id": "e7c3c7ab-749e-4371-8b25-42ea76aa870d",
"type": "feature",
"description": "Emit metrics from CRT HTTP engine",
"issues": [
"https://github.com/awslabs/smithy-kotlin/issues/893"
]
}
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ okio-version = "3.6.0"
otel-version = "1.32.0"
slf4j-version = "2.0.9"
slf4j-v1x-version = "1.7.36"
crt-kotlin-version = "0.8.2"
crt-kotlin-version = "0.8.4-SNAPSHOT"

# codegen
smithy-version = "1.42.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ import aws.smithy.kotlin.runtime.crt.SdkDefaultIO
import aws.smithy.kotlin.runtime.http.HttpErrorCode
import aws.smithy.kotlin.runtime.http.HttpException
import aws.smithy.kotlin.runtime.http.engine.ProxyConfig
import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics
import aws.smithy.kotlin.runtime.http.request.HttpRequest
import aws.smithy.kotlin.runtime.io.Closeable
import aws.smithy.kotlin.runtime.net.TlsVersion
import aws.smithy.kotlin.runtime.telemetry.metrics.measureSeconds
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.Semaphore
Expand All @@ -27,8 +30,10 @@ import aws.smithy.kotlin.runtime.net.TlsVersion as SdkTlsVersion

internal class ConnectionManager(
private val config: CrtHttpEngineConfig,
private val metrics: HttpClientMetrics,
) : Closeable {
private val leases = Semaphore(config.maxConnections.toInt())
private val pending = atomic(0L)

private val crtTlsContext: TlsContext = TlsContextOptionsBuilder()
.apply {
Expand Down Expand Up @@ -61,16 +66,22 @@ internal class ConnectionManager(
val manager = getManagerForUri(request.uri, proxyConfig)
var leaseAcquired = false

metrics.queuedRequests = pending.incrementAndGet()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correctness: If we are trying to acquire a connection then it's not queued right? I think queued would be before we hit the requestLimiter.

metrics.queuedRequests = pending.incrementAndGet()
requestLimiter.withPermit {
    metrics.queuedRequests = pending.decrementAndGet()
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, if "queued" means "before we attempt to acquire a connection" then I'm guessing that the requestsQueuedDuration measurement below is also wrong. I'll move it too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to look at the definitions I gave them again and I think this would be inline with what it says

queued=waiting to be executed (e.g. waiting for thread to be available), in-flight=actively processing

If we are to the point of acquiring a connection we are "actively processing" the request. I can see where the definition could be interpreted differently though as in "I have all the resources needed at this point to execute the request" but I think establishing a connection (or waiting on one to be available) is part of overall request processing. Curious if others disagree.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still doesn't look correct, I don't think queuedRequests is calculated in connection manager.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I think I was confusing the semaphore inside ConnectionManager with the one inside CrtHttpEngine. Switching.


return try {
// wait for an actual connection
val conn = withTimeout(config.connectionAcquireTimeout) {
// get a permit to acquire a connection (limits overall connections since managers are per/host)
leases.acquire()
leaseAcquired = true
manager.acquireConnection()
}
metrics.requestsQueuedDuration.measureSeconds {
// wait for an actual connection
val conn = withTimeout(config.connectionAcquireTimeout) {
// get a permit to acquire a connection (limits overall connections since managers are per/host)
leases.acquire()
leaseAcquired = true
metrics.connectionAcquireDuration.measureSeconds {
manager.acquireConnection()
}
}

LeasedConnection(conn)
LeasedConnection(conn)
}
} catch (ex: Exception) {
if (leaseAcquired) {
leases.release()
Expand All @@ -82,8 +93,18 @@ internal class ConnectionManager(
}

throw httpEx
} finally {
metrics.queuedRequests = pending.decrementAndGet()
emitMetrics()
}
}

private fun emitMetrics() {
val acquiredConnections = connManagers.values.sumOf { it.managerMetrics.leasedConcurrency }
metrics.acquiredConnections = acquiredConnections
metrics.idleConnections = config.maxConnections.toLong() - acquiredConnections
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix: This still isn't right, idle connections are connections established with the server not currently in use. If I'm not mistaken this is just availableConcurrency which would require the same summing across managers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, sum of availableConcurrency sounds doable, although it will exceed max connections under some circumstances.

}

private suspend fun getManagerForUri(uri: Uri, proxyConfig: ProxyConfig): HttpClientConnectionManager = mutex.withLock {
connManagers.getOrPut(uri.authority) {
val connOpts = options.apply {
Expand All @@ -105,6 +126,7 @@ internal class ConnectionManager(
HttpClientConnectionManager(connOpts)
}
}

override fun close() {
connManagers.forEach { entry -> entry.value.close() }
crtTlsContext.close()
Expand All @@ -117,6 +139,7 @@ internal class ConnectionManager(
delegate.close()
} finally {
leases.release()
emitMetrics()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import aws.smithy.kotlin.runtime.http.config.EngineFactory
import aws.smithy.kotlin.runtime.http.engine.HttpClientEngine
import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineBase
import aws.smithy.kotlin.runtime.http.engine.callContext
import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics
import aws.smithy.kotlin.runtime.http.request.HttpRequest
import aws.smithy.kotlin.runtime.io.internal.SdkDispatchers
import aws.smithy.kotlin.runtime.operation.ExecutionContext
Expand All @@ -22,6 +23,8 @@ import kotlinx.coroutines.sync.withPermit
internal const val DEFAULT_WINDOW_SIZE_BYTES: Int = 16 * 1024
internal const val CHUNK_BUFFER_SIZE: Long = 64 * 1024

private const val TELEMETRY_SCOPE = "aws.smithy.kotlin.runtime.http.engine.crt"

/**
* [HttpClientEngine] based on the AWS Common Runtime HTTP client
*/
Expand Down Expand Up @@ -51,7 +54,11 @@ public class CrtHttpEngine(public override val config: CrtHttpEngineConfig) : Ht
// }

private val requestLimiter = Semaphore(config.maxConcurrency.toInt())
private val connectionManager = ConnectionManager(config)
private val metrics = HttpClientMetrics(TELEMETRY_SCOPE, config.telemetryProvider).apply {
connectionsLimit = config.maxConnections.toLong()
requestConcurrencyLimit = config.maxConcurrency.toLong()
}
private val connectionManager = ConnectionManager(config, metrics)

override suspend fun roundTrip(context: ExecutionContext, request: HttpRequest): HttpCall = requestLimiter.withPermit {
val callContext = callContext()
Expand All @@ -63,15 +70,15 @@ public class CrtHttpEngine(public override val config: CrtHttpEngineConfig) : Ht
val conn = connectionManager.acquire(request)
logger.trace { "Acquired connection ${conn.id}" }

val respHandler = SdkStreamResponseHandler(conn, callContext)
val respHandler = SdkStreamResponseHandler(conn, callContext, context, metrics)
callContext.job.invokeOnCompletion {
logger.trace { "completing handler; cause=$it" }
// ensures the stream is driven to completion regardless of what the downstream consumer does
respHandler.complete()
}

val reqTime = Instant.now()
val engineRequest = request.toCrtRequest(callContext)
val engineRequest = request.toCrtRequest(callContext, metrics)

val stream = mapCrtException {
conn.makeRequest(engineRequest, respHandler).also { stream ->
Expand All @@ -81,7 +88,7 @@ public class CrtHttpEngine(public override val config: CrtHttpEngineConfig) : Ht

if (request.isChunked) {
withContext(SdkDispatchers.IO) {
stream.sendChunkedBody(request.body)
stream.sendChunkedBody(request.body, metrics)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package aws.smithy.kotlin.runtime.http.engine.crt
import aws.sdk.kotlin.crt.CRT
import aws.sdk.kotlin.crt.CrtRuntimeException
import aws.sdk.kotlin.crt.http.HeadersBuilder
import aws.sdk.kotlin.crt.http.HttpRequestBodyStream
import aws.sdk.kotlin.crt.http.HttpStream
import aws.sdk.kotlin.crt.io.Protocol
import aws.sdk.kotlin.crt.io.Uri
Expand All @@ -18,10 +17,13 @@ import aws.smithy.kotlin.runtime.crt.SdkSourceBodyStream
import aws.smithy.kotlin.runtime.http.HttpBody
import aws.smithy.kotlin.runtime.http.HttpErrorCode
import aws.smithy.kotlin.runtime.http.HttpException
import aws.smithy.kotlin.runtime.http.engine.crt.io.reportingTo
import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics
import aws.smithy.kotlin.runtime.http.request.HttpRequest
import aws.smithy.kotlin.runtime.io.SdkBuffer
import aws.smithy.kotlin.runtime.io.buffer
import aws.smithy.kotlin.runtime.io.readToByteArray
import aws.smithy.kotlin.runtime.io.source
import kotlinx.coroutines.job
import kotlin.coroutines.CoroutineContext

Expand All @@ -42,18 +44,27 @@ internal val HttpRequest.uri: Uri
}
}

internal fun HttpRequest.toCrtRequest(callContext: CoroutineContext): aws.sdk.kotlin.crt.http.HttpRequest {
internal fun HttpRequest.toCrtRequest(
callContext: CoroutineContext,
metrics: HttpClientMetrics,
): aws.sdk.kotlin.crt.http.HttpRequest {
val body = this.body
check(!body.isDuplex) { "CrtHttpEngine does not yet support full duplex streams" }
val bodyStream = if (isChunked) {
null
} else {
when (body) {
is HttpBody.Empty -> null
is HttpBody.Bytes -> HttpRequestBodyStream.fromByteArray(body.bytes())
is HttpBody.ChannelContent -> ReadChannelBodyStream(body.readFrom(), callContext)
is HttpBody.Bytes -> {
val source = body.bytes().source().reportingTo(metrics.bytesSent)
SdkSourceBodyStream(source)
}
is HttpBody.ChannelContent -> {
val source = body.readFrom().reportingTo(metrics.bytesSent)
ReadChannelBodyStream(source, callContext)
}
is HttpBody.SourceContent -> {
val source = body.readFrom()
val source = body.readFrom().reportingTo(metrics.bytesSent)
callContext.job.invokeOnCompletion {
source.close()
}
Expand Down Expand Up @@ -85,10 +96,10 @@ internal val HttpRequest.isChunked: Boolean get() = (this.body is HttpBody.Sourc
* Send a chunked body using the CRT writeChunk bindings.
* @param body an HTTP body that has a chunked content encoding. Must be [HttpBody.SourceContent] or [HttpBody.ChannelContent]
*/
internal suspend fun HttpStream.sendChunkedBody(body: HttpBody) {
internal suspend fun HttpStream.sendChunkedBody(body: HttpBody, metrics: HttpClientMetrics) {
when (body) {
is HttpBody.SourceContent -> {
val source = body.readFrom()
val source = body.readFrom().reportingTo(metrics.bytesSent)
val bufferedSource = source.buffer()

while (!bufferedSource.exhausted()) {
Expand All @@ -97,7 +108,7 @@ internal suspend fun HttpStream.sendChunkedBody(body: HttpBody) {
}
}
is HttpBody.ChannelContent -> {
val chan = body.readFrom()
val chan = body.readFrom().reportingTo(metrics.bytesSent)
var buffer = SdkBuffer()
val nextBuffer = SdkBuffer()
var sentFirstChunk = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,27 @@ import aws.sdk.kotlin.crt.io.Buffer
import aws.smithy.kotlin.runtime.http.*
import aws.smithy.kotlin.runtime.http.HeadersBuilder
import aws.smithy.kotlin.runtime.http.HttpException
import aws.smithy.kotlin.runtime.http.engine.EngineAttributes
import aws.smithy.kotlin.runtime.http.engine.crt.io.reportingTo
import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics
import aws.smithy.kotlin.runtime.http.response.HttpResponse
import aws.smithy.kotlin.runtime.http.response.copy
import aws.smithy.kotlin.runtime.io.SdkBuffer
import aws.smithy.kotlin.runtime.io.SdkByteChannel
import aws.smithy.kotlin.runtime.io.SdkByteReadChannel
import aws.smithy.kotlin.runtime.io.source
import aws.smithy.kotlin.runtime.operation.ExecutionContext
import aws.smithy.kotlin.runtime.telemetry.logging.logger
import aws.smithy.kotlin.runtime.telemetry.metrics.recordSeconds
import aws.smithy.kotlin.runtime.time.Instant
import aws.smithy.kotlin.runtime.time.fromEpochNanoseconds
import aws.smithy.kotlin.runtime.util.derivedName
import kotlinx.atomicfu.locks.reentrantLock
import kotlinx.atomicfu.locks.withLock
import kotlinx.coroutines.*
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlin.coroutines.CoroutineContext

/**
Expand All @@ -30,6 +41,8 @@ import kotlin.coroutines.CoroutineContext
internal class SdkStreamResponseHandler(
private val conn: HttpClientConnection,
private val callContext: CoroutineContext,
private val execContext: ExecutionContext,
private val clientMetrics: HttpClientMetrics,
) : HttpStreamResponseHandler {
// TODO - need to cancel the stream when the body is closed from the caller side early.
// There is no great way to do that currently without either (1) closing the connection or (2) throwing an
Expand Down Expand Up @@ -57,6 +70,10 @@ internal class SdkStreamResponseHandler(

private var streamCompleted = false

init {
clientMetrics.incrementInflightRequests()
}

/**
* Called by the response read channel as data is consumed
* @param size the number of bytes consumed
Expand Down Expand Up @@ -184,7 +201,30 @@ internal class SdkStreamResponseHandler(
}

internal suspend fun waitForResponse(): HttpResponse =
responseReady.receive()
responseReady.receive().wrapBody()

private fun HttpResponse.wrapBody(): HttpResponse {
val wrappedBody = when (val originalBody = body) {
is HttpBody.Empty -> return this // Don't need an object copy since we're not wrapping the body
is HttpBody.Bytes ->
originalBody
.bytes()
.source()
.reportingTo(clientMetrics.bytesReceived)
.toHttpBody(originalBody.contentLength)
is HttpBody.SourceContent ->
originalBody
.readFrom()
.reportingTo(clientMetrics.bytesReceived)
.toHttpBody(originalBody.contentLength)
is HttpBody.ChannelContent ->
originalBody
.readFrom()
.reportingTo(clientMetrics.bytesReceived)
.toHttpBody(originalBody.contentLength)
}
return copy(body = wrappedBody)
}

/**
* Invoked only after the consumer is finished with the response and it is safe to cleanup resources
Expand All @@ -197,6 +237,8 @@ internal class SdkStreamResponseHandler(
// and more data is pending arrival). It can also happen if the coroutine for this request is cancelled
// before onResponseComplete fires.
lock.withLock {
clientMetrics.decrementInflightRequests()

val forceClose = !streamCompleted

if (forceClose) {
Expand All @@ -210,4 +252,19 @@ internal class SdkStreamResponseHandler(
conn.close()
}
}

override fun onMetrics(stream: HttpStream, metrics: HttpStreamMetrics) {
val sendEnd = positiveInstantOrNull(metrics.sendEndTimestampNs)
val receiveStart = positiveInstantOrNull(metrics.receiveStartTimestampNs)

if (sendEnd != null && receiveStart != null) {
val ttfb = receiveStart - sendEnd
if (ttfb.isPositive()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: have you seen any instances where this is negative?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not directly but event streams may begin receiving traffic before sending has concluded since the communication is bidirectional and duplexed. It seemed prudent in that situation to not report TTFB.

clientMetrics.timeToFirstByteDuration.recordSeconds(ttfb)
execContext[EngineAttributes.TimeToFirstByte] = ttfb
}
}
}
}

private fun positiveInstantOrNull(ns: Long): Instant? = if (ns > 0) Instant.fromEpochNanoseconds(ns) else null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment: These new body types are missing tests

Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package aws.smithy.kotlin.runtime.http.engine.crt.io

import aws.smithy.kotlin.runtime.io.SdkBuffer
import aws.smithy.kotlin.runtime.io.SdkByteReadChannel
import aws.smithy.kotlin.runtime.telemetry.metrics.MonotonicCounter

private class ReportingByteReadChannel(
val delegate: SdkByteReadChannel,
val metric: MonotonicCounter,
) : SdkByteReadChannel by delegate {
override suspend fun read(sink: SdkBuffer, limit: Long): Long = delegate.read(sink, limit).also {
if (it > 0) metric.add(it)
}
}

internal fun SdkByteReadChannel.reportingTo(metric: MonotonicCounter): SdkByteReadChannel =
ReportingByteReadChannel(this, metric)
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package aws.smithy.kotlin.runtime.http.engine.crt.io

import aws.smithy.kotlin.runtime.io.SdkBuffer
import aws.smithy.kotlin.runtime.io.SdkSource
import aws.smithy.kotlin.runtime.telemetry.metrics.MonotonicCounter

private class ReportingSource(val delegate: SdkSource, val metric: MonotonicCounter) : SdkSource by delegate {
override fun read(sink: SdkBuffer, limit: Long): Long = delegate.read(sink, limit).also {
if (it > 0) metric.add(it)
}
}

internal fun SdkSource.reportingTo(metric: MonotonicCounter): SdkSource = ReportingSource(this, metric)
Loading