@@ -16,6 +16,7 @@ limitations under the License.
16
16
#include < algorithm>
17
17
#include < chrono>
18
18
#include < cstdlib>
19
+ #include < exception>
19
20
#include < iomanip>
20
21
#include < map>
21
22
#include < memory>
@@ -49,9 +50,10 @@ AIBrixBlobStorage::AIBrixBlobStorage(
49
50
global_gc_enabled_(global_gc_enabled),
50
51
global_gc_interval_s_(std::chrono::seconds(global_gc_interval_s)),
51
52
global_ttl_s_(std::chrono::seconds(global_ttl_s)),
52
- ghost_fifo_(capacity_),
53
- small_fifo_(capacity_ * kSmallFifoCapacityRatio ),
54
- main_fifo_(capacity_ - capacity_ * kSmallFifoCapacityRatio ,
53
+ ghost_fifo_(capacity_ / chunk_size),
54
+ small_fifo_(capacity_ / chunk_size * kSmallFifoCapacityRatio ),
55
+ main_fifo_(capacity_ / chunk_size -
56
+ capacity_ / chunk_size * kSmallFifoCapacityRatio ,
55
57
kMinEviction ) {
56
58
kv_cache_ns_ = std::regex_replace (kv_cache_ns_, std::regex (" /" ), " _" );
57
59
kv_cache_ns_ = std::regex_replace (kv_cache_ns_ + " _" , std::regex (" _+" ), " _" );
@@ -221,32 +223,34 @@ Status AIBrixBlobStorage::GetTokenChunkHashes(
221
223
return Status::OK ();
222
224
}
223
225
224
- #define DEFINE_TASK_FN (FN, OP, CB ) \
225
- auto FN = [this , &prefix, &tokens, &kv_tensors, cb = CB]( \
226
- size_t i, \
227
- std::shared_ptr<KVCacheChunkBuilder> builder) -> Status { \
228
- auto chunk_size = this ->chunk_size_ ; \
229
- if (builder == nullptr ) { \
230
- return Status::OK (); \
231
- } \
232
- \
233
- std::vector<int > my_prefix (prefix.begin (), prefix.end ()); \
234
- if (i > 0 ) { \
235
- my_prefix.insert (my_prefix.end (), tokens.begin (), \
236
- tokens.begin () + i * chunk_size); \
237
- } \
238
- std::vector<int > my_tokens (tokens.begin () + i * chunk_size, \
239
- tokens.begin () + (i + 1 ) * chunk_size); \
240
- \
241
- std::vector<std::vector<std::pair<LLMKV, LLMKV>>> my_kv_tensors ( \
242
- kv_tensors.begin () + i * chunk_size, \
243
- kv_tensors.begin () + (i + 1 ) * chunk_size); \
244
- \
245
- auto status = builder->OP (my_prefix, my_tokens, my_kv_tensors); \
246
- if (status.ok ()) { \
247
- cb (i, my_kv_tensors); \
248
- } \
249
- return status; \
226
+ #define DEFINE_TASK_FN (FN, OP, CB ) \
227
+ auto FN = [this , &prefix, &tokens, &kv_tensors, cb = CB]( \
228
+ size_t i, \
229
+ std::shared_ptr<KVCacheChunkBuilder> builder) -> Status { \
230
+ auto chunk_size = this ->chunk_size_ ; \
231
+ if (builder == nullptr ) { \
232
+ return Status::OK (); \
233
+ } \
234
+ \
235
+ std::vector<int > my_prefix (prefix.begin (), prefix.end ()); \
236
+ if (i > 0 ) { \
237
+ my_prefix.insert (my_prefix.end (), tokens.begin (), \
238
+ tokens.begin () + i * chunk_size); \
239
+ } \
240
+ std::vector<int > my_tokens (tokens.begin () + i * chunk_size, \
241
+ tokens.begin () + (i + 1 ) * chunk_size); \
242
+ \
243
+ std::vector<std::vector<std::pair<LLMKV, LLMKV>>> my_kv_tensors ( \
244
+ kv_tensors.begin () + i * chunk_size, \
245
+ kv_tensors.begin () + (i + 1 ) * chunk_size); \
246
+ \
247
+ try { \
248
+ auto status = builder->OP (my_prefix, my_tokens, my_kv_tensors); \
249
+ if (status.ok ()) { \
250
+ cb (i, my_kv_tensors); \
251
+ } \
252
+ return status; \
253
+ } catch (const std::exception & e) { return Status::IOError (e.what ()); } \
250
254
}
251
255
252
256
#define WAIT_TASK_RESULTS (TIDS, COUNTER, FIRST_ERROR, OBJ_NAMES ) \
@@ -864,12 +868,16 @@ Status AIBrixBlobStorage::GlobalGCFunc() {
864
868
return ; \
865
869
} \
866
870
LOG (INFO) << #NAME " started" ; \
867
- Status status = self->NAME ##Func (); \
868
- if (!status.ok ()) { \
869
- LOG (ERROR) << #NAME " failed: " << status.ToString (); \
870
- /* Not a fatal error and wait for next time */ \
871
- } else { \
872
- LOG (INFO) << #NAME " completed" ; \
871
+ try { \
872
+ Status status = self->NAME ##Func (); \
873
+ if (!status.ok ()) { \
874
+ LOG (ERROR) << #NAME " failed: " << status.ToString (); \
875
+ /* Not a fatal error and wait for next time */ \
876
+ } else { \
877
+ LOG (INFO) << #NAME " completed" ; \
878
+ } \
879
+ } catch (const std::exception & e) { \
880
+ LOG (ERROR) << #NAME " failed: " << e.what (); \
873
881
} \
874
882
last_time = std::chrono::duration_cast<std::chrono::seconds>( \
875
883
std::chrono::system_clock::now ().time_since_epoch ()) \
0 commit comments