Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions test/dotnet/Integration/CustomType.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using System.Collections.Generic;

namespace Microsoft.Azure.WebJobs.Extensions.Redis.Tests.Integration
{
public class CustomType
Expand All @@ -6,4 +8,9 @@ public class CustomType
public string Field { get; set; }
public string Random { get; set; }
}
public record CosmosDBListData
(
string id,
List<string> value
);
}
107 changes: 107 additions & 0 deletions test/dotnet/Integration/ListsCosmosIntegrationTestFunctions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
using StackExchange.Redis;
using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.Linq;
using Microsoft.Extensions.Logging;
using Container = Microsoft.Azure.Cosmos.Container;
using System.Collections.Generic;



namespace Microsoft.Azure.WebJobs.Extensions.Redis.Tests.Integration
{
public static class ListsCosmosIntegrationTestFunctions
{
public const string redisConnectionString = "redisConnectionString";
public const string CosmosDBConnectionString = "CosmosDBConnectionString";
public const int pollingInterval = 100;
private static readonly IDatabase cache = ConnectionMultiplexer.Connect(Environment.GetEnvironmentVariable(redisConnectionString)).GetDatabase();

//CosmosDB database name and container name declared here
public const string CosmosDbDatabaseId = "CosmosDbDatabaseId";
public const string CosmosDbContainerId = "CosmosDbContainerId";

//Uses the key of the user's choice and should be changed accordingly
public const string key = "userListName";

public static async Task ToCacheAsync(FeedResponse<CosmosDBListData> response, CosmosDBListData item, string listEntry)
{
var fullEntry = response.Take(response.Count);

if (fullEntry == null) return;

foreach (CosmosDBListData inputValues in fullEntry)
{
RedisValue[] redisValues = Array.ConvertAll(inputValues.value.ToArray(), item => (RedisValue)item);
await cache.ListRightPushAsync(listEntry, redisValues);
}
}

[FunctionName("WriteAroundListTrigger")]
public static void WriteAroundListTrigger([CosmosDBTrigger(
databaseName: "%CosmosDbDatabaseId%",
containerName: "%CosmosDbContainerId%",
Connection = "CosmosDBConnectionString",
LeaseContainerName = "leases")]IReadOnlyList<CosmosDBListData> readOnlyList, ILogger log)
{
if (readOnlyList == null || readOnlyList.Count <= 0) return;

foreach (CosmosDBListData inputValues in readOnlyList)
{
if (inputValues.id == key)
{
RedisValue[] redisValues = Array.ConvertAll(inputValues.value.ToArray(), item => (RedisValue)item);
cache.ListRightPush(key, redisValues);
}
}
}

[FunctionName(nameof(ListTriggerWriteBehind))]
public static async Task ListTriggerWriteBehind(
[RedisListTrigger(redisConnectionString, key)] string listEntry, [CosmosDB(
Connection = "CosmosDBConnectionString" )]CosmosClient client,
ILogger logger)
{
Container db = client.GetDatabase(Environment.GetEnvironmentVariable(CosmosDbDatabaseId)).GetContainer(Environment.GetEnvironmentVariable(CosmosDbContainerId));

IOrderedQueryable<CosmosDBListData> query = db.GetItemLinqQueryable<CosmosDBListData>();
using FeedIterator<CosmosDBListData> results = query
.Where(p => p.id == key)
.ToFeedIterator();

FeedResponse<CosmosDBListData> response = await results.ReadNextAsync();
CosmosDBListData item = response.FirstOrDefault(defaultValue: null);

List<string> resultsHolder = item?.value ?? new List<string>();

resultsHolder.Add(listEntry);
CosmosDBListData newEntry = new CosmosDBListData(id: key, value: resultsHolder);
await db.UpsertItemAsync<CosmosDBListData>(newEntry);
}

[FunctionName(nameof(ListTriggerReadThroughFunc))]
public static async Task ListTriggerReadThroughFunc(
[RedisPubSubTrigger(redisConnectionString, "__keyevent@0__:keymiss")] string listEntry, [CosmosDB(
Connection = "CosmosDBConnectionString" )]CosmosClient client,
ILogger logger)
{
Container db = client.GetDatabase(Environment.GetEnvironmentVariable(CosmosDbDatabaseId)).GetContainer(Environment.GetEnvironmentVariable(CosmosDbContainerId));

IOrderedQueryable<CosmosDBListData> query = db.GetItemLinqQueryable<CosmosDBListData>();
using FeedIterator<CosmosDBListData> results = query
.Where(p => p.id == listEntry)
.ToFeedIterator();

FeedResponse<CosmosDBListData> response = await results.ReadNextAsync();
CosmosDBListData item = response.FirstOrDefault(defaultValue: null);

if (item == null) return;
else
{
await ToCacheAsync(response, item, listEntry);
}
}
}
}
189 changes: 189 additions & 0 deletions test/dotnet/Integration/ListsCosmosIntegrationTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
using StackExchange.Redis;
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using Newtonsoft.Json;
using System.Threading.Tasks;
using Xunit;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.Fluent;
using Microsoft.Azure.Cosmos.Linq;
using Microsoft.Identity.Client;
using Newtonsoft.Json.Linq;
using Microsoft.Extensions.Logging;
using Azure;
using System.ComponentModel;
using Container = Microsoft.Azure.Cosmos.Container;
using System.Collections.Generic;

namespace Microsoft.Azure.WebJobs.Extensions.Redis.Tests.Integration
{
[Collection("RedisTriggerTests")]
public class ListsCosmosIntegrationTests
{
//Replace with desired key name
string key = "userListName";

//Replace DatabaseName and ContainerName with user's info
public const string CosmosDbDatabaseID = "CosmosDbDatabaseID";
public const string CosmosDbContainerID = "CosmosDbContainerID";

//Replace with number of values associated to the key in cosmos (for ReadThrough and CosmosToRedis Tests)
int iterations = 2;

[Fact]
public async void ListsTrigger_WriteBehind()
{
string functionName = nameof(ListsCosmosIntegrationTestFunctions.ListTriggerWriteBehind);
RedisValue[] valuesArray = new RedisValue[] { "a", "b" };

ConcurrentDictionary<string, int> counts = new ConcurrentDictionary<string, int>();
counts.TryAdd($"Executed '{functionName}' (Succeeded", valuesArray.Length);

using (ConnectionMultiplexer multiplexer = ConnectionMultiplexer.Connect(RedisUtilities.ResolveConnectionString(IntegrationTestHelpers.localsettings, ListsCosmosIntegrationTestFunctions.redisConnectionString)))
{
await multiplexer.GetDatabase().KeyDeleteAsync(functionName);

using (Process functionsProcess = IntegrationTestHelpers.StartFunction(functionName, 7071))
{
functionsProcess.OutputDataReceived += IntegrationTestHelpers.CounterHandlerCreator(counts);

await multiplexer.GetDatabase().ListLeftPushAsync(key, valuesArray);

await Task.Delay(TimeSpan.FromSeconds(5));

await multiplexer.CloseAsync();
functionsProcess.Kill();
};
var incorrect = counts.Where(pair => pair.Value != 0);
Assert.False(incorrect.Any(), JsonConvert.SerializeObject(incorrect));
}
}

[Fact]
public async void ListsTrigger_WriteBehindHeavyLoading()
{
string functionName = nameof(ListsCosmosIntegrationTestFunctions.ListTriggerWriteBehind);
RedisValue[] valuesArray = new RedisValue[2000];
for (int i = 0; i < valuesArray.Length; i++)
{
valuesArray[i] = i;
}

ConcurrentDictionary<string, int> counts = new ConcurrentDictionary<string, int>();
counts.TryAdd($"Executed '{functionName}' (Succeeded", valuesArray.Length);

using (ConnectionMultiplexer multiplexer = ConnectionMultiplexer.Connect(RedisUtilities.ResolveConnectionString(IntegrationTestHelpers.localsettings, ListsCosmosIntegrationTestFunctions.redisConnectionString)))
{
await multiplexer.GetDatabase().KeyDeleteAsync(functionName);

using (Process functionsProcess = IntegrationTestHelpers.StartFunction(functionName, 7071))
{
functionsProcess.OutputDataReceived += IntegrationTestHelpers.CounterHandlerCreator(counts);

await multiplexer.GetDatabase().ListLeftPushAsync(key, valuesArray);

await Task.Delay(TimeSpan.FromSeconds(2566));

await multiplexer.CloseAsync();
functionsProcess.Kill();
};
var incorrect = counts.Where(pair => pair.Value != 0);
Assert.False(incorrect.Any(), JsonConvert.SerializeObject(incorrect));
}
}

[Fact]
public async void ListsTrigger_InCosmos()
{
CosmosClientBuilder clientBuilder = new CosmosClientBuilder(RedisUtilities.ResolveConnectionString(IntegrationTestHelpers.localsettings, ListsCosmosIntegrationTestFunctions.CosmosDBConnectionString));
CosmosClient cosmosClient = clientBuilder.Build();

Container db = cosmosClient.GetDatabase(CosmosDbDatabaseID).GetContainer(CosmosDbContainerID);
var query = db.GetItemLinqQueryable<CosmosDBListData>();
using FeedIterator<CosmosDBListData> results = query
.Where(p => p.id == key)
.ToFeedIterator();

var response = await results.ReadNextAsync();
var item = response.FirstOrDefault(defaultValue: null);

await Task.Delay(TimeSpan.FromSeconds(5));

Assert.Equal(item.id, key);
}

[Fact]
public async void ListsTrigger_CosmosToRedis()
{
ConnectionMultiplexer multiplexer = ConnectionMultiplexer.Connect(RedisUtilities.ResolveConnectionString(IntegrationTestHelpers.localsettings, ListsCosmosIntegrationTestFunctions.redisConnectionString));
bool exists = true;

CosmosClientBuilder clientBuilder = new CosmosClientBuilder(RedisUtilities.ResolveConnectionString(IntegrationTestHelpers.localsettings, ListsCosmosIntegrationTestFunctions.CosmosDBConnectionString));
CosmosClient cosmosClient = clientBuilder.Build();

Container db = cosmosClient.GetDatabase(CosmosDbDatabaseID).GetContainer(CosmosDbContainerID);

var query = db.GetItemLinqQueryable<CosmosDBListData>();
using FeedIterator<CosmosDBListData> results = query
.Where(p => p.id == key)
.ToFeedIterator();

FeedResponse<CosmosDBListData> response = await results.ReadNextAsync();
CosmosDBListData item = response.FirstOrDefault(defaultValue: null);

var fullEntry = response.Take(response.Count);

if (fullEntry == null) return;

foreach (CosmosDBListData inputValues in fullEntry)
{
RedisValue[] redisValues = Array.ConvertAll(inputValues.value.ToArray(), item => (RedisValue)item);
await multiplexer.GetDatabase().ListRightPushAsync(key, redisValues);

}

await Task.Delay(TimeSpan.FromSeconds(10));

exists = await multiplexer.GetDatabase().KeyExistsAsync(key);

Assert.True(exists);
}




[Fact]
public async void ListsTrigger_ReadThrough()
{
ConnectionMultiplexer multiplexer = ConnectionMultiplexer.Connect(RedisUtilities.ResolveConnectionString(IntegrationTestHelpers.localsettings, ListsCosmosIntegrationTestFunctions.redisConnectionString));
string functionName = nameof(ListsCosmosIntegrationTestFunctions.ListTriggerReadThroughFunc);
bool exists = true;

multiplexer.GetDatabase().KeyDelete(key);

using (Process functionsProcess = IntegrationTestHelpers.StartFunction(functionName, 7071))
{
for (int i = 0; i < iterations; i++)
{
multiplexer.GetDatabase().ListRange(key, 0, -1);
await Task.Delay(TimeSpan.FromSeconds(4));
if (i != iterations - 1)
{
multiplexer.GetDatabase().KeyDelete(key);
}
}

exists = multiplexer.GetDatabase().KeyExists(key);

multiplexer.Close();
functionsProcess.Kill();

Assert.True(exists);
}

}

}
}