@@ -31,24 +31,23 @@ def update_dtable_asset_sizes(dtable_uuid_sizes, db_session):
31
31
step = 1000
32
32
updated_at = datetime .utcnow ()
33
33
for i in range (0 , len (dtable_uuid_sizes ), step ):
34
- updates = ', ' .join (["('%s', %s, '%s')" % tuple (dtable_uuid_size + [updated_at ]) for dtable_uuid_size in dtable_uuid_sizes [i : i + step ]])
34
+ updates = ', ' .join (["('%s', %s, '%s')" % (
35
+ uuid_str_to_32_chars (dtable_uuid_size [0 ]), dtable_uuid_size [1 ], updated_at
36
+ ) for dtable_uuid_size in dtable_uuid_sizes [i : i + step ]])
35
37
sql = '''
36
38
INSERT INTO dtable_asset_stats(dtable_uuid, size, updated_at) VALUES %s
37
39
ON DUPLICATE KEY UPDATE size=VALUES(size), updated_at=VALUES(updated_at)
38
40
''' % updates
39
- try :
40
- db_session .execute (sql )
41
- db_session .commit ()
42
- except Exception as e :
43
- logger .error ('update dtable asset assets error: %s' , e )
41
+ db_session .execute (sql )
42
+ db_session .commit ()
44
43
45
44
46
45
class DTableAssetStatsWorker (Thread ):
47
46
def __init__ (self , config ):
48
47
Thread .__init__ (self )
49
48
self ._finished = Event ()
50
49
self ._db_session_class = init_db_session_class (config )
51
- self .interval = 5 * 60 # listen to seafile event for 5 mins and then calc dtable asset storage
50
+ self .interval = 5 * 60 # listen to seafile event for some time and then calc dtable asset storage
52
51
self .last_stats_time = time .time ()
53
52
self ._redis_client = RedisClient (config )
54
53
@@ -70,6 +69,8 @@ def run(self):
70
69
content = msg .get ('content' )
71
70
if not isinstance (content , str ) or '\t ' not in content :
72
71
continue
72
+ if not content .startswith ('repo-update' ):
73
+ continue
73
74
ctime = msg .get ('ctime' )
74
75
if not isinstance (ctime , int ) or ctime < time .time () - 30 * 60 : # ignore messages half hour ago
75
76
continue
@@ -81,15 +82,16 @@ def run(self):
81
82
repo_id_ctime_dict [repo_id ] = ctime
82
83
83
84
def stats_dtable_asset_storage (self , repo_id_ctime_dict ):
85
+ logger .info ('Starting stats repo dtable asset storage...' )
84
86
dtable_uuid_sizes = []
85
87
for repo_id , ctime in repo_id_ctime_dict .items ():
86
88
logger .debug ('start stats repo: %s ctime: %s' , repo_id , ctime )
87
89
try :
88
90
repo = seafile_api .get_repo (repo_id )
89
91
if not repo :
90
92
continue
91
- asset_dir_id = seafile_api .get_dir_id_by_path (repo_id , '/asset' )
92
- if not asset_dir_id :
93
+ asset_dirent = seafile_api .get_dirent_by_path (repo_id , '/asset' )
94
+ if not asset_dirent or asset_dirent . mtime < ctime :
93
95
continue
94
96
dirents = seafile_api .list_dir_by_path (repo_id , '/asset' , offset = - 1 , limit = - 1 )
95
97
for dirent in dirents :
@@ -98,7 +100,7 @@ def stats_dtable_asset_storage(self, repo_id_ctime_dict):
98
100
if not is_valid_uuid (dirent .obj_name ):
99
101
continue
100
102
logger .debug ('start stats repo: %s dirent: %s' , repo_id , dirent .obj_name )
101
- if dirent .mtime > ctime - 5 :
103
+ if dirent .mtime >= ctime :
102
104
dtable_uuid = dirent .obj_name
103
105
size = seafile_api .get_file_count_info_by_path (repo_id , f'/asset/{ dtable_uuid } ' ).size
104
106
logger .debug ('start stats repo: %s dirent: %s size: %s' , repo_id , dirent .obj_name , size )
@@ -111,47 +113,9 @@ def stats_dtable_asset_storage(self, repo_id_ctime_dict):
111
113
logger .debug ('totally need to update dtable: %s' , len (dtable_uuid_sizes ))
112
114
db_session = self ._db_session_class ()
113
115
try :
114
- update_dtable_asset_sizes (dtable_uuid_sizes )
116
+ update_dtable_asset_sizes (dtable_uuid_sizes , db_session )
115
117
except Exception as e :
116
118
logger .exception (e )
117
119
logger .error ('update dtable asset sizes error: %s' , e )
118
120
finally :
119
121
db_session .close ()
120
-
121
- def listen_redis_and_update (self ):
122
- logger .info ('Starting handle table rows count...' )
123
- subscriber = self ._redis_client .get_subscriber ('stat-asset' )
124
- while not self ._finished .is_set ():
125
- try :
126
- message = subscriber .get_message ()
127
- if message is not None :
128
- dtable_uuid_repo_ids = json .loads (message ['data' ])
129
- session = self ._db_session_class ()
130
- try :
131
- self .stats_dtable_uuids (dtable_uuid_repo_ids , session )
132
- except Exception as e :
133
- logger .error ('Handle table rows count: %s' % e )
134
- finally :
135
- session .close ()
136
- else :
137
- time .sleep (0.5 )
138
- except Exception as e :
139
- logger .error ('Failed get message from redis: %s' % e )
140
- subscriber = self ._redis_client .get_subscriber ('count-rows' )
141
-
142
- def stats_dtable_uuids (self , dtable_uuid_repo_ids , db_session ):
143
- dtable_uuid_sizes = []
144
- for dtable_uuid , repo_id in dtable_uuid_repo_ids :
145
- try :
146
- asset_path = f'/asset/{ uuid_str_to_36_chars (dtable_uuid )} '
147
- asset_dir_id = seafile_api .get_dir_id_by_path (repo_id , asset_path )
148
- if not asset_dir_id :
149
- dtable_uuid_sizes .append ([uuid_str_to_32_chars (dtable_uuid ), 0 ])
150
- size = seafile_api .get_file_count_info_by_path (repo_id , asset_path ).size
151
- dtable_uuid_sizes .append ([uuid_str_to_32_chars (dtable_uuid ), size ])
152
- logger .debug ('redis repo: %s dtable_uuid: %s size: %s' , repo_id , dtable_uuid , size )
153
- except Exception as e :
154
- logger .exception (e )
155
- logger .error ('check repo: %s dtable: %s asset size error: %s' , repo_id , dtable_uuid , e )
156
- logger .debug ('redis totally need to update dtable: %s' , len (dtable_uuid_sizes ))
157
- update_dtable_asset_sizes (dtable_uuid_sizes , db_session )
0 commit comments