Skip to content

Commit 77c2c1f

Browse files
algolia-botFluf22
andcommitted
feat(csharp): add transformation helpers for object indexing with a transformation (generated)
algolia/api-clients-automation#5452 Co-authored-by: algolia-bot <[email protected]> Co-authored-by: Thomas Raffray <[email protected]>
1 parent 3bc477e commit 77c2c1f

File tree

5 files changed

+670
-47
lines changed

5 files changed

+670
-47
lines changed

algoliasearch/Clients/IngestionClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ namespace Algolia.Search.Clients;
1818
/// <summary>
1919
/// Represents a collection of functions to interact with the API endpoints
2020
/// </summary>
21-
public interface IIngestionClient
21+
public partial interface IIngestionClient
2222
{
2323
/// <summary>
2424
/// Creates a new authentication resource.

algoliasearch/Clients/SearchClient.cs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Net.Http;
77
using System.Threading;
88
using System.Threading.Tasks;
9+
using Algolia.Search.Clients;
910
using Algolia.Search.Http;
1011
using Algolia.Search.Models.Search;
1112
using Algolia.Search.Transport;
@@ -2568,6 +2569,7 @@ public partial class SearchClient : ISearchClient
25682569
{
25692570
internal HttpTransport _transport;
25702571
private readonly ILogger<SearchClient> _logger;
2572+
private IIngestionClient _ingestionTransporter;
25712573

25722574
/// <summary>
25732575
/// Create a new Search client for the given appID and apiKey.
@@ -2640,6 +2642,43 @@ public void SetClientApiKey(string apiKey)
26402642
_transport._algoliaConfig.SetClientApiKey(apiKey);
26412643
}
26422644

2645+
/// <summary>
2646+
/// Sets the region of the transformation pipeline. This is required to be called
2647+
/// if you wish to leverage the transformation pipeline (via the *WithTransformation methods).
2648+
/// </summary>
2649+
/// <param name="region">The region ("us" or "eu")</param>
2650+
/// <param name="factory">Logger factory</param>
2651+
public void SetTransformationRegion(string region, ILoggerFactory factory = null)
2652+
{
2653+
if (string.IsNullOrWhiteSpace(region))
2654+
{
2655+
throw new ArgumentException(
2656+
"`region` must be provided when leveraging the transformation pipeline"
2657+
);
2658+
}
2659+
2660+
if (
2661+
string.IsNullOrWhiteSpace(_transport._algoliaConfig.AppId)
2662+
|| string.IsNullOrWhiteSpace(_transport._algoliaConfig.ApiKey)
2663+
)
2664+
{
2665+
throw new ArgumentException("AppId and ApiKey are required for transformation pipeline");
2666+
}
2667+
2668+
_ingestionTransporter = new IngestionClient(
2669+
new IngestionConfig(_transport._algoliaConfig.AppId, _transport._algoliaConfig.ApiKey, region)
2670+
{
2671+
DefaultHeaders = _transport._algoliaConfig.DefaultHeaders,
2672+
ConnectTimeout = _transport._algoliaConfig.ConnectTimeout,
2673+
ReadTimeout = _transport._algoliaConfig.ReadTimeout,
2674+
WriteTimeout = _transport._algoliaConfig.WriteTimeout,
2675+
Compression = _transport._algoliaConfig.Compression,
2676+
CustomHosts = _transport._algoliaConfig.CustomHosts,
2677+
},
2678+
factory
2679+
);
2680+
}
2681+
26432682
/// <inheritdoc />
26442683
public async Task<AddApiKeyResponse> AddApiKeyAsync(
26452684
ApiKey apiKey,
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Text.Json;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using Algolia.Search.Exceptions;
8+
using Algolia.Search.Http;
9+
using Algolia.Search.Models.Ingestion;
10+
using Algolia.Search.Serializer;
11+
using Algolia.Search.Utils;
12+
13+
namespace Algolia.Search.Clients;
14+
15+
public partial interface IIngestionClient
16+
{
17+
/// <summary>
18+
/// Helper method to call ChunkedPushAsync and convert the response types.
19+
/// This simplifies SearchClient helpers that need to use IngestionClient.
20+
/// </summary>
21+
Task<List<WatchResponse>> ChunkedPushAsync(
22+
string indexName,
23+
IEnumerable<object> objects,
24+
Models.Ingestion.Action action,
25+
bool waitForTasks = false,
26+
int batchSize = 1000,
27+
string referenceIndexName = null,
28+
RequestOptions options = null,
29+
CancellationToken cancellationToken = default
30+
);
31+
32+
/// <summary>
33+
/// Synchronous version of ChunkedPushAsync
34+
/// </summary>
35+
List<WatchResponse> ChunkedPush(
36+
string indexName,
37+
IEnumerable<object> objects,
38+
Models.Ingestion.Action action,
39+
bool waitForTasks = false,
40+
int batchSize = 1000,
41+
string referenceIndexName = null,
42+
RequestOptions options = null,
43+
CancellationToken cancellationToken = default
44+
);
45+
}
46+
47+
public partial class IngestionClient : IIngestionClient
48+
{
49+
/// <summary>
50+
/// Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit
51+
/// in `push` requests by leveraging the Transformation pipeline setup in the Push connector
52+
/// (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/).
53+
/// </summary>
54+
/// <param name="indexName">The `indexName` to push `objects` to.</param>
55+
/// <param name="objects">The array of `objects` to store in the given Algolia `indexName`.</param>
56+
/// <param name="action">The `action` to perform on the given array of `objects`.</param>
57+
/// <param name="waitForTasks">Whether or not we should wait until every push task has been processed. This operation may slow the total execution time of this method but is more reliable.</param>
58+
/// <param name="batchSize">The size of the chunk of `objects`. The number of push calls will be equal to `length(objects) / batchSize`. Defaults to 1000.</param>
59+
/// <param name="referenceIndexName">This is required when targeting an index that does not have a push connector setup (e.g. a tmp index), but you wish to attach another index's transformation to it (e.g. the source index name).</param>
60+
/// <param name="options">Add extra http header or query parameters to Algolia.</param>
61+
/// <param name="cancellationToken">Cancellation token to cancel the request</param>
62+
/// <returns>List of WatchResponse objects from the push operations</returns>
63+
public async Task<List<WatchResponse>> ChunkedPushAsync(
64+
string indexName,
65+
IEnumerable<object> objects,
66+
Algolia.Search.Models.Ingestion.Action action,
67+
bool waitForTasks = false,
68+
int batchSize = 1000,
69+
string referenceIndexName = null,
70+
RequestOptions options = null,
71+
CancellationToken cancellationToken = default
72+
)
73+
{
74+
var objectsList = objects.ToList();
75+
var responses = new List<WatchResponse>();
76+
var waitBatchSize = Math.Max(batchSize / 10, 1);
77+
var offset = 0;
78+
79+
for (var i = 0; i < objectsList.Count; i += batchSize)
80+
{
81+
var chunk = objectsList.Skip(i).Take(batchSize);
82+
var records = new List<PushTaskRecords>();
83+
84+
foreach (var obj in chunk)
85+
{
86+
var jsonString = JsonSerializer.Serialize(obj, JsonConfig.Options);
87+
var record = JsonSerializer.Deserialize<PushTaskRecords>(jsonString, JsonConfig.Options);
88+
records.Add(record);
89+
}
90+
91+
var payload = new PushTaskPayload(action, records);
92+
93+
var response = await PushAsync(
94+
indexName,
95+
payload,
96+
watch: null,
97+
referenceIndexName: referenceIndexName,
98+
options: options,
99+
cancellationToken: cancellationToken
100+
)
101+
.ConfigureAwait(false);
102+
103+
responses.Add(response);
104+
105+
if (
106+
waitForTasks
107+
&& responses.Count > 0
108+
&& (responses.Count % waitBatchSize == 0 || i + batchSize >= objectsList.Count)
109+
)
110+
{
111+
for (var j = offset; j < responses.Count; j++)
112+
{
113+
var resp = responses[j];
114+
if (string.IsNullOrEmpty(resp.EventID))
115+
{
116+
throw new AlgoliaException(
117+
"Received unexpected response from the push endpoint, eventID must not be null or empty"
118+
);
119+
}
120+
121+
await RetryHelper
122+
.RetryUntil(
123+
async () =>
124+
{
125+
try
126+
{
127+
return await GetEventAsync(
128+
resp.RunID,
129+
resp.EventID,
130+
cancellationToken: cancellationToken
131+
)
132+
.ConfigureAwait(false);
133+
}
134+
catch (AlgoliaApiException ex) when (ex.HttpErrorCode == 404)
135+
{
136+
return await Task.FromResult<Algolia.Search.Models.Ingestion.Event>(null);
137+
}
138+
},
139+
eventResponse => eventResponse != null,
140+
maxRetries: 50,
141+
ct: cancellationToken
142+
)
143+
.ConfigureAwait(false);
144+
}
145+
offset = responses.Count;
146+
}
147+
}
148+
149+
return responses;
150+
}
151+
152+
/// <summary>
153+
/// Synchronous version of ChunkedPushAsync
154+
/// </summary>
155+
public List<WatchResponse> ChunkedPush(
156+
string indexName,
157+
IEnumerable<object> objects,
158+
Algolia.Search.Models.Ingestion.Action action,
159+
bool waitForTasks = false,
160+
int batchSize = 1000,
161+
string referenceIndexName = null,
162+
RequestOptions options = null,
163+
CancellationToken cancellationToken = default
164+
) =>
165+
AsyncHelper.RunSync(() =>
166+
ChunkedPushAsync(
167+
indexName,
168+
objects,
169+
action,
170+
waitForTasks,
171+
batchSize,
172+
referenceIndexName,
173+
options,
174+
cancellationToken
175+
)
176+
);
177+
}

algoliasearch/Utils/RetryHelper.cs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using Algolia.Search.Exceptions;
5+
6+
namespace Algolia.Search.Utils;
7+
8+
/// <summary>
9+
/// A helper class to retry operations
10+
/// </summary>
11+
public static class RetryHelper
12+
{
13+
/// <summary>
14+
/// The default maximum number of retries
15+
/// </summary>
16+
public const int DefaultMaxRetries = 50;
17+
18+
/// <summary>
19+
/// Retry the given function until the validation function returns true or the maximum number of retries is reached
20+
/// </summary>
21+
/// <typeparam name="T">The type of the function's return value</typeparam>
22+
/// <param name="func">The function to retry</param>
23+
/// <param name="validate">The validation function</param>
24+
/// <param name="maxRetries">The maximum number of retries</param>
25+
/// <param name="timeout">A function that takes the retry count and returns the timeout in milliseconds before the next retry</param>
26+
/// <param name="ct">A cancellation token to cancel the operation</param>
27+
/// <returns>The result of the function if the validation function returns true</returns>
28+
/// <exception cref="AlgoliaException">Thrown if the maximum number of retries is reached</exception>
29+
public static async Task<T> RetryUntil<T>(
30+
Func<Task<T>> func,
31+
Func<T, bool> validate,
32+
int maxRetries = DefaultMaxRetries,
33+
Func<int, int> timeout = null,
34+
CancellationToken ct = default
35+
)
36+
{
37+
timeout ??= NextDelay;
38+
39+
var retryCount = 0;
40+
while (retryCount < maxRetries)
41+
{
42+
var resp = await func().ConfigureAwait(false);
43+
if (validate(resp))
44+
{
45+
return resp;
46+
}
47+
48+
await Task.Delay(timeout(retryCount), ct).ConfigureAwait(false);
49+
retryCount++;
50+
}
51+
52+
throw new AlgoliaException(
53+
"The maximum number of retries exceeded. (" + (retryCount + 1) + "/" + maxRetries + ")"
54+
);
55+
}
56+
57+
private static int NextDelay(int retryCount)
58+
{
59+
return Math.Min(retryCount * 200, 5000);
60+
}
61+
}

0 commit comments

Comments
 (0)