Skip to content

Refactor Teams SSO middleware #6884

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion libraries/Microsoft.Bot.Builder.Azure.Blobs/BlobsStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -293,7 +294,9 @@ internal BlobsStorage(BlobContainerClient containerClient, JsonSerializer jsonSe
catch (RequestFailedException ex)
when (ex.Status == (int)HttpStatusCode.PreconditionFailed)
{
throw new InvalidOperationException($"Etag conflict: {ex.Message}");
var items = await ReadAsync(new[] { keyValuePair.Key }, cancellationToken).ConfigureAwait(false);
var item = items.FirstOrDefault().Value as IStoreItem;
throw new ETagException(blobName, storeItem?.ETag, item?.ETag, ex);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,13 @@ await _container.UpsertItemAsync(
}
}
catch (CosmosException ex)
when (ex.StatusCode == HttpStatusCode.PreconditionFailed)
{
var items = await ReadAsync(new[] { change.Key }, cancellationToken).ConfigureAwait(false);
var item = items.FirstOrDefault().Value as IStoreItem;
throw new ETagException(documentChange.Id, etag, item?.ETag, ex);
}
catch (CosmosException ex)
{
// This check could potentially be performed before even attempting to upsert the item
// so that a request wouldn't be made to Cosmos if it's expected to fail.
Expand Down
67 changes: 67 additions & 0 deletions libraries/Microsoft.Bot.Builder/ETagException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license.

using System;

namespace Microsoft.Bot.Builder
{
/// <summary>
/// An exception thrown when a 412 Precondition Failed error happens.
/// </summary>
public class ETagException : Exception
{
/// <summary>
/// Initializes a new instance of the <see cref="ETagException"/> class.
/// </summary>
public ETagException()
{
}

/// <summary>
/// Initializes a new instance of the <see cref="ETagException"/> class with an exception message.
/// </summary>
/// <param name="message">The exception message.</param>
public ETagException(string message)
: base(message)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="ETagException"/> class with an exception message and inner exception.
/// </summary>
/// <param name="message">The exception message.</param>
/// <param name="innerException">The inner exception.</param>
public ETagException(string message, Exception innerException)
: base(message, innerException)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="ETagException"/> class with an item key, ETag, storage ETag, and inner exception.
/// </summary>
/// <param name="itemKey">The key of the storage item.</param>
/// <param name="itemETag">The ETag to set to the storage item.</param>
/// <param name="itemStorageETag">The ETag that's currently in the storage item.</param>
/// <param name="innerException">The inner exception.</param>
public ETagException(string itemKey, string itemETag, string itemStorageETag, Exception innerException = null)
: base(CreateMessage(itemKey, itemETag, itemStorageETag), innerException)
{
}

/// <inheritdoc/>
public override string Message
{
// Add a prefix to the message to avoid breaking existing code that looks for the message starting with "Etag conflict:"
get
{
var conflictMessage = "Etag conflict:";
return base.Message.StartsWith(conflictMessage) ? base.Message : $"{conflictMessage} {base.Message}";
}
}

private static string CreateMessage(string key, string etag, string storageETag)
{
return $"Unable to write the Item to the Storage due to a 412 Precondition Failed error.\nThis could happen when the Item was already processed by another machine or thread to avoid conflicts or data loss.\n\nKey: {key}\nETag to write: {etag}\nETag in storage: {storageETag}";
}
}
}
2 changes: 1 addition & 1 deletion libraries/Microsoft.Bot.Builder/MemoryStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public Task WriteAsync(IDictionary<string, object> changes, CancellationToken ca
&&
newStoreItem.ETag != oldStateETag)
{
throw new ArgumentException($"Etag conflict.\r\n\r\nOriginal: {newStoreItem.ETag}\r\nCurrent: {oldStateETag}");
throw new ETagException(change.Key, newStoreItem?.ETag, oldStateETag);
}

newState["eTag"] = (_eTag++).ToString(CultureInfo.InvariantCulture);
Expand Down
201 changes: 130 additions & 71 deletions libraries/Microsoft.Bot.Builder/Teams/TeamsSSOTokenExchangeMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -14,28 +15,36 @@
namespace Microsoft.Bot.Builder.Teams
{
/// <summary>
/// If the activity name is signin/tokenExchange, this middleware will attempt to
/// exchange the token, and deduplicate the incoming call, ensuring only one
/// exchange request is processed.
/// This middleware is designed to intercept incoming 'signin/tokenExchange' activities,
/// perform the token exchange operation, and in case of duplicated requests, the bot will only process one of them.
/// </summary>
/// <remarks>
/// If a user is signed into multiple Teams clients, the Bot could receive a
/// "signin/tokenExchange" from each client. Each token exchange request for a
/// specific user login will have an identical Activity.Value.Id.
///
/// Only one of these token exchange requests should be processed by the bot.
/// The others return <see cref="System.Net.HttpStatusCode.PreconditionFailed"/>.
/// For a distributed bot in production, this requires a distributed storage
/// <para>
/// <b>Duplicated requests:</b> when a user is signed into multiple MS Teams clients, the Bot will receive a 'signin/tokenExchange' activity from each client,
/// causing the next conversation step to be executed more than once.<br/>
/// When this behavior happens, the middleware will detect any duplicated request,
/// executing <see cref="NextDelegate"/> inside <see cref="OnTurnAsync"/> method once, to continue the to next middleware.<br/>
/// <i>NOTE: when receiving this type of request, the 'signin/tokenExchange' activity will contain the same Value.Id.</i>
/// </para>
/// <para>
/// <b>Token exchange:</b> when a user is signed into multiple MS Teams clients, the token exchange will be done to each client,
/// ensuring that if the token could not be exchanged (which could be due to a consent requirement),
/// the bot will notify the sender with a <see cref="HttpStatusCode.PreconditionFailed"/> so they can respond accordingly.
/// </para>
/// <para>
/// For a distributed bot (multiple process instances, machines, cluster, etc.) in production, this requires a distributed storage
/// ensuring only one token exchange is processed. This middleware supports
/// CosmosDb storage found in Microsoft.Bot.Builder.Azure, or MemoryStorage for
/// local development. IStorage's ETag implementation for token exchange activity
/// deduplication.
/// </para>
/// </remarks>
public class TeamsSSOTokenExchangeMiddleware : IMiddleware
{
private readonly IStorage _storage;
private readonly string _oAuthConnectionName;

private readonly IStorage _storage;

/// <summary>
/// Initializes a new instance of the <see cref="TeamsSSOTokenExchangeMiddleware"/> class.
/// </summary>
Expand All @@ -61,72 +70,123 @@ public TeamsSSOTokenExchangeMiddleware(IStorage storage, string connectionName)
/// <inheritdoc/>
public async Task OnTurnAsync(ITurnContext turnContext, NextDelegate next, CancellationToken cancellationToken = default)
{
if (string.Equals(Channels.Msteams, turnContext.Activity.ChannelId, StringComparison.OrdinalIgnoreCase)
&& string.Equals(SignInConstants.TokenExchangeOperationName, turnContext.Activity.Name, StringComparison.OrdinalIgnoreCase))
if (await ProcessActivityAsync(turnContext, cancellationToken).ConfigureAwait(false))
{
// If the TokenExchange is NOT successful, the response will have already been sent by ExchangedTokenAsync
if (!await this.ExchangedTokenAsync(turnContext, cancellationToken).ConfigureAwait(false))
{
return;
}

// Only one token exchange should proceed from here. Deduplication is performed second because in the case
// of failure due to consent required, every caller needs to receive the
if (!await DeduplicatedTokenExchangeIdAsync(turnContext, cancellationToken).ConfigureAwait(false))
{
// If the token is not exchangeable, do not process this activity further.
return;
}
await next(cancellationToken).ConfigureAwait(false);
}

await next(cancellationToken).ConfigureAwait(false);
}

private async Task<bool> DeduplicatedTokenExchangeIdAsync(ITurnContext turnContext, CancellationToken cancellationToken)
/// <summary>
/// Evaluates if the incoming activity is a 'signin/tokenExchange' activity,
/// it processes the token exchange operation and in case of duplicated requests,
/// it will return true on the first one, whereas the rest will return false and send an invoke response to notify the bot.
/// </summary>
/// <param name="turnContext">The context object for this turn.</param>
/// <param name="cancellationToken">A cancellation token that can be used by other objects
/// or threads to receive notice of cancellation.</param>
/// <returns>False if the token couldn't be exchanged or if the method received a duplicated 'signin/tokenExchange' activity, otherwise True.</returns>
private async Task<bool> ProcessActivityAsync(ITurnContext turnContext, CancellationToken cancellationToken = default)
{
// Create a StoreItem with Etag of the unique 'signin/tokenExchange' request
var storeItem = new TokenStoreItem
if (!IsTokenExchangeActivity(turnContext))
{
ETag = (turnContext.Activity.Value as JObject).Value<string>("id")
};
return true;
}

var storeItems = new Dictionary<string, object> { { TokenStoreItem.GetStorageKey(turnContext), storeItem } };
try
var isDuplicated = await InitDeduplicateAsync(turnContext, cancellationToken);

// Exchange the token.
if (!await ExchangeTokenAsync(turnContext, cancellationToken))
{
// Writing the IStoreItem with ETag of unique id will succeed only once
await _storage.WriteAsync(storeItems, cancellationToken).ConfigureAwait(false);
return false;
}
catch (Exception ex)

// Memory storage throws a generic exception with a Message of 'Etag conflict. [other error info]'
// CosmosDbPartitionedStorage throws: ex.Message.Contains("pre-condition is not met")
when (ex.Message.StartsWith("Etag conflict", StringComparison.OrdinalIgnoreCase) || ex.Message.Contains("pre-condition is not met"))
// Deduplicate the request.
if (await isDuplicated())
{
// Do NOT proceed processing this message, some other thread or machine already has processed it.

// Send 200 invoke response.
await SendInvokeResponseAsync(turnContext, cancellationToken: cancellationToken).ConfigureAwait(false);
return false;
}

return true;
}

private async Task SendInvokeResponseAsync(ITurnContext turnContext, object body = null, HttpStatusCode httpStatusCode = HttpStatusCode.OK, CancellationToken cancellationToken = default)
/// <summary>
/// Evaluates if the incoming activity is from MSTeams channel and if it's a 'signin/tokenExchange' activity.
/// </summary>
/// <param name="turnContext">The context object for this turn.</param>
/// <returns>True if the activity is from MS Teams and it's a token exchange request.</returns>
private bool IsTokenExchangeActivity(ITurnContext turnContext)
{
return string.Equals(Channels.Msteams, turnContext.Activity.ChannelId, StringComparison.OrdinalIgnoreCase)
&& string.Equals(SignInConstants.TokenExchangeOperationName, turnContext.Activity.Name, StringComparison.OrdinalIgnoreCase);
}

private async Task<Func<Task<bool>>> InitDeduplicateAsync(ITurnContext turnContext, CancellationToken cancellationToken)
{
await turnContext.SendActivityAsync(
new Activity
var key = CreateKey(turnContext);
var items = await _storage.ReadAsync<TokenStoreItem>(new string[] { key }, cancellationToken).ConfigureAwait(false);
var item = items.FirstOrDefault().Value;

var changes = new Dictionary<string, object>
{
[key] = new TokenStoreItem { ETag = item?.ETag },
};

if (item == null)
{
// Create the item in the Storage for the first time to gather the ETag, to then use it later for concurrency control and avoid deduplication.
await _storage.WriteAsync(changes, cancellationToken).ConfigureAwait(false);
items = await _storage.ReadAsync<TokenStoreItem>(new string[] { key }, cancellationToken).ConfigureAwait(false);
item = items.FirstOrDefault().Value;
(changes[key] as TokenStoreItem).ETag = item?.ETag;
}

return async () =>
{
// Delay processing to capture a burst of incoming duplicated requests from the same user over a small period of time.
await Task.Delay(1000, cancellationToken).ConfigureAwait(false);

try
{
// Will be processed when there is an ETag assigned.
await _storage.WriteAsync(changes, cancellationToken).ConfigureAwait(false);
return false;
}
catch (ETagException)
{
Type = ActivityTypesEx.InvokeResponse,
Value = new InvokeResponse
{
Status = (int)httpStatusCode,
Body = body,
},
}, cancellationToken).ConfigureAwait(false);
// Notify the sender that the request is duplicated. Send 200 invoke response.
await SendInvokeResponseAsync(turnContext, cancellationToken: cancellationToken).ConfigureAwait(false);
return true;
}
};
}

private async Task<bool> ExchangedTokenAsync(ITurnContext turnContext, CancellationToken cancellationToken)
/// <summary>
/// Sends an invoke response to the caller.
/// </summary>
/// <param name="turnContext">The context object for this turn.</param>
/// <param name="body">The body response to send. Defaults to null.</param>
/// <param name="httpStatusCode">The HTTP status code to send. Defaults to <see cref="HttpStatusCode.OK"/>.</param>
/// <param name="cancellationToken">A cancellation token that can be used by other objects
/// or threads to receive notice of cancellation.</param>
/// <returns>A void task.</returns>
private async Task SendInvokeResponseAsync(ITurnContext turnContext, object body = null, HttpStatusCode httpStatusCode = HttpStatusCode.OK, CancellationToken cancellationToken = default)
{
var activity = new Activity
{
Type = ActivityTypesEx.InvokeResponse,
Value = new InvokeResponse { Status = (int)httpStatusCode, Body = body },
};
await turnContext.SendActivityAsync(activity, cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// It performs the token exchange operation based on the token exchange request activity for a specific connection.
/// </summary>
/// <param name="turnContext">The context object for this turn.</param>
/// <param name="cancellationToken">A cancellation token that can be used by other objects
/// or threads to receive notice of cancellation.</param>
/// <returns>False if the token couldn't be exchanged, otherwise True.</returns>
private async Task<bool> ExchangeTokenAsync(ITurnContext turnContext, CancellationToken cancellationToken)
{
TokenResponse tokenExchangeResponse = null;
var tokenExchangeRequest = ((JObject)turnContext.Activity.Value)?.ToObject<TokenExchangeInvokeRequest>();
Expand Down Expand Up @@ -186,24 +246,23 @@ private async Task<bool> ExchangedTokenAsync(ITurnContext turnContext, Cancellat
return true;
}

/// <summary>
/// Creates a key based on the channel, conversation and user id.
/// </summary>
/// <param name="turnContext">The context object for this turn.</param>
/// <returns>The generated key for a specific user.</returns>
/// <exception cref="InvalidOperationException">If any of the ChannelId, Conversation.Id and From.Id are missing.</exception>
private string CreateKey(ITurnContext turnContext)
{
var channelId = turnContext.Activity.ChannelId ?? throw new InvalidOperationException("Invalid Activity, missing ChannelId property.");
var conversationId = turnContext.Activity.Conversation?.Id ?? throw new InvalidOperationException("Invalid Activity, missing Conversation.Id property.");
var userId = turnContext.Activity.From?.Id ?? throw new InvalidOperationException("Invalid Activity, missing From.Id property.");
return $"{channelId}/conversations/{conversationId}/users/{userId}/tokenExchange";
}

private class TokenStoreItem : IStoreItem
{
public string ETag { get; set; }

public static string GetStorageKey(ITurnContext turnContext)
{
var activity = turnContext.Activity;
var channelId = activity.ChannelId ?? throw new InvalidOperationException("invalid activity-missing channelId");
var conversationId = activity.Conversation?.Id ?? throw new InvalidOperationException("invalid activity-missing Conversation.Id");

var value = activity.Value as JObject;
if (value == null || !value.ContainsKey("id"))
{
throw new InvalidOperationException("Invalid signin/tokenExchange. Missing activity.Value.Id.");
}

return $"{channelId}/{conversationId}/{value.Value<string>("id")}";
}
}
}
}
Loading