@@ -23,14 +23,15 @@ use databend_common_ast::ast::ScheduleOptions;
23
23
use databend_common_meta_api:: kv_pb_api:: KVPbApi ;
24
24
use databend_common_meta_api:: kv_pb_api:: UpsertPB ;
25
25
use databend_common_meta_api:: txn_cond_eq_seq;
26
+ use databend_common_meta_api:: txn_op_del;
26
27
use databend_common_meta_api:: util:: txn_put_pb;
27
28
use databend_common_meta_api:: SchemaApi ;
28
29
use databend_common_meta_app:: principal:: task;
29
30
use databend_common_meta_app:: principal:: task:: TaskMessage ;
30
- use databend_common_meta_app:: principal:: task:: TaskStateValue ;
31
+ use databend_common_meta_app:: principal:: task:: TaskSucceededStateValue ;
31
32
use databend_common_meta_app:: principal:: task_dependent_ident:: TaskDependentIdent ;
32
33
use databend_common_meta_app:: principal:: task_message_ident:: TaskMessageIdent ;
33
- use databend_common_meta_app:: principal:: task_state_ident:: TaskStateIdent ;
34
+ use databend_common_meta_app:: principal:: task_state_ident:: TaskSucceededStateIdent ;
34
35
use databend_common_meta_app:: principal:: ScheduleType ;
35
36
use databend_common_meta_app:: principal:: Status ;
36
37
use databend_common_meta_app:: principal:: Task ;
@@ -40,16 +41,16 @@ use databend_common_meta_app::tenant::Tenant;
40
41
use databend_common_meta_kvapi:: kvapi;
41
42
use databend_common_meta_kvapi:: kvapi:: DirName ;
42
43
use databend_common_meta_kvapi:: kvapi:: Key ;
44
+ use databend_common_meta_types:: txn_condition:: Target ;
45
+ use databend_common_meta_types:: ConditionResult ;
43
46
use databend_common_meta_types:: MatchSeq ;
44
47
use databend_common_meta_types:: MetaError ;
45
48
use databend_common_meta_types:: TxnCondition ;
46
49
use databend_common_meta_types:: TxnOp ;
47
50
use databend_common_meta_types:: TxnRequest ;
48
51
use databend_common_meta_types:: With ;
49
- use databend_common_proto_conv:: FromToProto ;
50
52
use futures:: StreamExt ;
51
53
use futures:: TryStreamExt ;
52
- use prost:: Message ;
53
54
use seq_marked:: SeqValue ;
54
55
55
56
use crate :: task:: errors:: TaskApiError ;
@@ -407,7 +408,7 @@ impl TaskMgr {
407
408
) ) ;
408
409
let mut stream = self
409
410
. kv_api
410
- . list_pb_keys ( & DirName :: new ( TaskStateIdent :: new (
411
+ . list_pb_keys ( & DirName :: new ( TaskSucceededStateIdent :: new (
411
412
& self . tenant ,
412
413
task_name,
413
414
"" ,
@@ -456,10 +457,6 @@ impl TaskMgr {
456
457
task_name : & str ,
457
458
) -> Result < Result < Vec < String > , TaskError > , TaskApiError > {
458
459
let task_before_ident = TaskDependentIdent :: new_before ( & self . tenant , task_name) ;
459
- let succeeded_value = TaskStateValue { is_succeeded : true } ;
460
- let not_succeeded_value = TaskStateValue {
461
- is_succeeded : false ,
462
- } ;
463
460
464
461
let Some ( task_before_dependent) = self . kv_api . get_pb ( & task_before_ident) . await ? else {
465
462
return Ok ( Ok ( Vec :: new ( ) ) ) ;
@@ -478,25 +475,24 @@ impl TaskMgr {
478
475
} ;
479
476
let target_after = & target_after_ident. name ( ) . source ;
480
477
let this_task_to_target_state =
481
- TaskStateIdent :: new ( & self . tenant , task_name, target_after) ;
478
+ TaskSucceededStateIdent :: new ( & self . tenant , task_name, target_after) ;
482
479
let mut request = TxnRequest :: new ( vec ! [ ] , vec ! [ ] ) . with_else ( vec ! [ txn_put_pb(
483
480
& this_task_to_target_state,
484
- & succeeded_value ,
481
+ & TaskSucceededStateValue ,
485
482
) ?] ) ;
486
483
487
484
for before_target_after in target_after_dependent. 0 . iter ( ) {
488
485
let task_ident =
489
- TaskStateIdent :: new ( & self . tenant , before_target_after, target_after) ;
486
+ TaskSucceededStateIdent :: new ( & self . tenant , before_target_after, target_after) ;
490
487
// Only care about the predecessors of this task's successor tasks, excluding this task itself.
491
488
if before_target_after != task_name {
492
- request. condition . push ( TxnCondition :: eq_value (
493
- task_ident. to_string_key ( ) ,
494
- succeeded_value. to_pb ( ) ?. encode_to_vec ( ) ,
495
- ) ) ;
489
+ request. condition . push ( TxnCondition {
490
+ key : task_ident. to_string_key ( ) ,
491
+ expected : ConditionResult :: Gt as i32 ,
492
+ target : Some ( Target :: Seq ( 0 ) ) ,
493
+ } ) ;
496
494
}
497
- request
498
- . if_then
499
- . push ( txn_put_pb ( & task_ident, & not_succeeded_value) ?) ;
495
+ request. if_then . push ( txn_op_del ( & task_ident) ) ;
500
496
}
501
497
let reply = self . kv_api . transaction ( request) . await ?;
502
498
0 commit comments