7
7
8
8
Requires aiohttp and tqdm:
9
9
10
- pip3 install aiohttp aiodns tqdm
10
+ python3 -m pip install aiohttp aiodns tqdm aiohttp-client-cache aiosqlite
11
11
12
12
Usage:
13
13
19
19
import os
20
20
import time
21
21
from collections import defaultdict
22
+ from datetime import datetime , timedelta
22
23
from functools import partial
23
24
from pprint import pformat
24
25
26
+ from dateutil .parser import parse as parse_date
27
+
28
+ from aiohttp_client_cache import CachedSession , SQLiteBackend
25
29
import aiohttp
26
30
import tqdm
31
+ import tqdm .asyncio
27
32
import yaml
28
33
29
34
HERE = os .path .dirname (__file__ )
30
35
31
36
# delete builds used for CI, as well
32
37
CI_STRINGS = ["binderhub-ci-repos-" , "binderhub-2dci-2drepos-" ]
33
38
39
+ # don't delete images that *don't* appear to be r2d builds
40
+ # (image repository could have other images in it!)
41
+ R2D_STRINGS = ["r2d-" ]
42
+
43
+ TODAY = datetime .now ()
44
+ FIVE_YEARS_AGO = TODAY - timedelta (days = 5 * 365 )
45
+ TOMORROW = TODAY + timedelta (days = 1 )
46
+
34
47
35
48
class RequestFailed (Exception ):
36
49
"""Nicely formatted error for failed requests"""
@@ -49,10 +62,12 @@ def __str__(self):
49
62
)
50
63
51
64
52
- async def raise_for_status (r , action = "" ):
65
+ async def raise_for_status (r , action = "" , allowed_errors = None ):
53
66
"""raise an informative error on failed requests"""
54
67
if r .status < 400 :
55
68
return
69
+ if allowed_errors and r .status in allowed_errors :
70
+ return
56
71
if r .headers .get ("Content-Type" ) == "application/json" :
57
72
# try to parse json error messages
58
73
content = await r .json ()
@@ -68,19 +83,49 @@ async def raise_for_status(r, action=""):
68
83
raise RequestFailed (r .status , r .request_info .method , r .request_info .url , content )
69
84
70
85
71
- async def list_images (session , project ):
86
+ def list_images (session , image_prefix ):
87
+ if image_prefix .count ("/" ) == 1 :
88
+ # docker hub, can't use catalog endpoint
89
+ docker_hub_user = image_prefix .split ("/" , 1 )[0 ]
90
+ return list_images_docker_hub (session , docker_hub_user )
91
+ elif image_prefix .count ("/" ) == 2 :
92
+ registry_host = image_prefix .split ("/" , 1 )[0 ]
93
+ registry_url = f"https://{ registry_host } "
94
+ return list_images_catalog (session , registry_url )
95
+
96
+
97
+ async def list_images_docker_hub (session , docker_hub_user ):
72
98
"""List the images for a project"""
73
- url = "https://gcr.io /v2/_catalog "
99
+ url = f "https://hub.docker.com /v2/repositories/ { docker_hub_user } /?page_size=100 "
74
100
while url :
75
101
async with session .get (url ) as r :
76
102
await raise_for_status (r , "listing images" )
77
103
resp = await r .json ()
78
- for image in resp ["repositories " ]:
79
- if image . startswith ( project + "/" ):
80
- yield image
104
+ for image in resp ["results " ]:
105
+ # filter-out not our images??
106
+ yield f" { image [ 'user' ] } / { image [ 'name' ] } "
81
107
url = resp .get ("next" )
82
108
83
109
110
+ async def list_images_catalog (session , registry_host ):
111
+ """List the images for a project"""
112
+ url = f"{ registry_host } /v2/_catalog"
113
+ while url :
114
+ async with session .get (url ) as r :
115
+ await raise_for_status (r , "listing images" )
116
+ resp = await r .json ()
117
+ for image in resp ["repositories" ]:
118
+ # filter-out not our images??
119
+ yield image
120
+ if "next" in resp :
121
+ url = resp ["next" ]
122
+ elif "next" in r .links :
123
+ url = r .links ["next" ]["url" ]
124
+ else :
125
+ url = None
126
+
127
+
128
+
84
129
async def get_manifest (session , image ):
85
130
"""List the tags for an image
86
131
@@ -113,22 +158,34 @@ async def delete_image(session, image, digest, tags, dry_run=False):
113
158
# delete tags first (required)
114
159
for tag in tags :
115
160
async with fetch (f"{ manifests } /{ tag } " ) as r :
116
- await raise_for_status (r , f"{ verb } tag { image } @{ tag } " )
161
+ # allow 404 because previous delete may have been cached
162
+ await raise_for_status (r , f"{ verb } tag { image } @{ tag } " , allowed_errors = [404 ])
117
163
118
164
# this is the actual deletion
119
165
async with fetch (f"{ manifests } /{ digest } " ) as r :
120
- await raise_for_status (r , f"{ verb } image { image } @{ digest } " )
166
+ # allow 404 because previous delete may have been cached
167
+ await raise_for_status (
168
+ r , f"{ verb } image { image } @{ digest } " , allowed_errors = [404 ]
169
+ )
121
170
122
171
123
- async def main (release = "staging" , project = None , concurrency = 20 , dry_run = True ):
172
+ async def main (
173
+ release = "staging" , project = None , concurrency = 20 , delete_before = None , dry_run = True
174
+ ):
124
175
if dry_run :
125
176
print ("THIS IS A DRY RUN. NO IMAGES WILL BE DELETED." )
126
177
to_be = "to be "
127
178
else :
128
179
to_be = ""
129
180
181
+ if delete_before :
182
+ # docker uses millisecond integer timestamps
183
+ delete_before_ms = int (delete_before .timestamp ()) * 1e3
184
+ else :
185
+ delete_before_ms = float ("inf" )
186
+
130
187
if not project :
131
- project = f"binder- { release } "
188
+ project = "binderhub-288415 "
132
189
with open (os .path .join (HERE , os .pardir , "config" , release + ".yaml" )) as f :
133
190
config = yaml .safe_load (f )
134
191
@@ -140,6 +197,7 @@ async def main(release="staging", project=None, concurrency=20, dry_run=True):
140
197
config = yaml .safe_load (f )
141
198
142
199
password = config ["binderhub" ]["registry" ]["password" ]
200
+ username = config ["binderhub" ]["registry" ].get ("username" , "_json_key" )
143
201
144
202
start = time .perf_counter ()
145
203
semaphores = defaultdict (lambda : asyncio .BoundedSemaphore (concurrency ))
@@ -162,49 +220,127 @@ async def bounded(f, *args, **kwargs):
162
220
async with semaphores [f ]:
163
221
return await f (* args , ** kwargs )
164
222
165
- async with aiohttp .ClientSession (
166
- auth = aiohttp .BasicAuth ("_json_key" , password ),
223
+ # TODO: basic auth is only sufficient for gcr
224
+ # need to request a token for non-gcr endpoints (ovh, turing on docker hub)
225
+ # e.g.
226
+ auth_kwargs = {}
227
+ print (prefix )
228
+ if prefix .startswith ("gcr.io" ):
229
+ auth_kwargs ["auth" ] = aiohttp .BasicAuth (username , password )
230
+ else :
231
+ # get bearer token
232
+ if prefix .count ("/" ) == 2 :
233
+ # ovh
234
+ registry_host = prefix .split ("/" , 1 )[0 ]
235
+ token_url = f"https://{ registry_host } /service/token?service=harbor-registry&scope=registry:catalog:*"
236
+ else :
237
+ # turing
238
+ raise NotImplementedError ("Can't get docker hub creds yet" )
239
+
240
+ async with aiohttp .ClientSession (
241
+ auth = aiohttp .BasicAuth (username , password )
242
+ ) as session :
243
+ response = await session .get (token_url )
244
+ token_info = await response .json ()
245
+ auth_kwargs ["headers" ] = {"Authorization" : f"Bearer { token_info ['token' ]} " }
246
+
247
+ async with CachedSession (
167
248
connector = aiohttp .TCPConnector (limit = 2 * concurrency ),
249
+ cache = SQLiteBackend (expire_after = 24 * 3600 ),
250
+ ** auth_kwargs ,
168
251
) as session :
169
252
170
253
print ("Fetching images" )
171
254
tag_futures = []
172
255
matches = 0
173
- total_images = 0
174
- async for image in list_images (session , project ):
175
- total_images += 1
256
+ repos_to_keep = 0
257
+ repos_to_delete = 0
258
+
259
+ def should_delete_repository (image ):
260
+ """Whether we should delete the whole repository"""
176
261
if f"gcr.io/{ image } " .startswith (prefix ) and not any (
177
262
ci_string in image for ci_string in CI_STRINGS
178
263
):
179
- matches += 1
180
- continue
181
- # don't call ensure_future here
182
- # because we don't want to kick off everything before
183
- tag_futures .append (
184
- asyncio .ensure_future (bounded (get_manifest , session , image ))
185
- )
186
- if not matches :
264
+ return False
265
+
266
+ def should_fetch_repository (image ):
267
+ if not any (substring in image for substring in R2D_STRINGS ):
268
+ # ignore non-r2d builds
269
+ return False
270
+ if delete_before or should_delete_repository (image ):
271
+ # if delete_before, we are deleting old builds of images we are keeping,
272
+ # otherwise, only delete builds that don't match our image prefix
273
+ return True
274
+ else :
275
+ return False
276
+
277
+ async for image in tqdm .asyncio .tqdm (
278
+ list_images (session , prefix ),
279
+ unit_scale = True ,
280
+ desc = "listing images" ,
281
+ ):
282
+ if should_fetch_repository (image ):
283
+ if should_delete_repository (image ):
284
+ repos_to_delete += 1
285
+ else :
286
+ repos_to_keep += 1
287
+ tag_futures .append (
288
+ asyncio .ensure_future (bounded (get_manifest , session , image ))
289
+ )
290
+ else :
291
+ repos_to_keep += 1
292
+
293
+ if not repos_to_keep :
187
294
raise RuntimeError (
188
295
f"No images matching prefix { prefix } . Would delete all images!"
189
296
)
190
- print (f"Not deleting { matches } images starting with { prefix } " )
297
+ print (f"Not deleting { repos_to_keep } images starting with { prefix } " )
191
298
if not tag_futures :
192
299
print ("Nothing to delete" )
193
300
return
194
301
print (f"{ len (tag_futures )} images to delete (not counting tags)" )
195
302
196
303
delete_futures = set ()
304
+ done = set ()
197
305
print ("Fetching tags" )
198
306
delete_progress = tqdm .tqdm (
199
- total = len ( tag_futures ) ,
307
+ total = repos_to_delete ,
200
308
position = 2 ,
201
309
unit_scale = True ,
202
310
desc = f"builds { to_be } deleted" ,
203
311
)
204
312
delete_byte_progress = tqdm .tqdm (
205
- total = 0 , position = 3 , unit = "B" , unit_scale = True , desc = f"bytes { to_be } deleted"
313
+ total = 0 ,
314
+ position = 3 ,
315
+ unit = "B" ,
316
+ unit_scale = True ,
317
+ desc = f"bytes { to_be } deleted" ,
206
318
)
207
319
320
+ def should_delete_tag (image , info ):
321
+ if should_delete_repository (image ):
322
+ return True
323
+ if not delete_before :
324
+ # no date cutoff
325
+ return False
326
+
327
+ # check cutoff
328
+ image_ms = int (info ["timeCreatedMs" ])
329
+ image_datetime = datetime .fromtimestamp (image_ms / 1e3 )
330
+ # sanity check timestamps
331
+ if image_datetime < FIVE_YEARS_AGO or image_datetime > TOMORROW :
332
+ raise RuntimeError (
333
+ f"Not deleting image with weird date: { image } , { info } , { image_datetime } "
334
+ )
335
+ if delete_before_ms > image_ms :
336
+ # if dry_run:
337
+ # print(
338
+ # f"\nWould delete {image}:{','.join(info['tag'])} {image_datetime.isoformat()}"
339
+ # )
340
+ return True
341
+ else :
342
+ return False
343
+
208
344
try :
209
345
for f in tqdm .tqdm (
210
346
asyncio .as_completed (tag_futures ),
@@ -214,9 +350,15 @@ async def bounded(f, *args, **kwargs):
214
350
):
215
351
manifest = await f
216
352
image = manifest ["name" ]
217
- if len (manifest ["manifest" ]) > 1 :
353
+ delete_whole_repo = should_delete_repository (image )
354
+ if delete_whole_repo and len (manifest ["manifest" ]) > 1 :
218
355
delete_progress .total += len (manifest ["manifest" ]) - 1
219
356
for digest , info in manifest ["manifest" ].items ():
357
+ if not should_delete_tag (image , info ):
358
+ continue
359
+ if not delete_whole_repo :
360
+ # not counted yet
361
+ delete_progress .total += 1
220
362
nbytes = int (info ["imageSizeBytes" ])
221
363
delete_byte_progress .total += nbytes
222
364
f = asyncio .ensure_future (
@@ -238,7 +380,8 @@ async def bounded(f, *args, **kwargs):
238
380
nbytes ,
239
381
)
240
382
)
241
- done , delete_futures = await asyncio .wait (delete_futures , timeout = 0 )
383
+ if delete_futures :
384
+ done , delete_futures = await asyncio .wait (delete_futures , timeout = 0 )
242
385
if done :
243
386
# collect possible errors
244
387
await asyncio .gather (* done )
@@ -273,6 +416,12 @@ async def bounded(f, *args, **kwargs):
273
416
default = "" ,
274
417
help = "The gcloud project to use; only needed if not of the form `binder-{release}`." ,
275
418
)
419
+ parser .add_argument (
420
+ "--delete-before" ,
421
+ type = lambda s : s and parse_date (s ),
422
+ default = "" ,
423
+ help = "Delete any images older than this date. If unspecified, do not use date cutoff." ,
424
+ )
276
425
parser .add_argument (
277
426
"-j" ,
278
427
"--concurrency" ,
@@ -288,5 +437,11 @@ async def bounded(f, *args, **kwargs):
288
437
)
289
438
opts = parser .parse_args ()
290
439
asyncio .get_event_loop ().run_until_complete (
291
- main (opts .release , opts .project , opts .concurrency , dry_run = opts .dry_run )
440
+ main (
441
+ opts .release ,
442
+ opts .project ,
443
+ opts .concurrency ,
444
+ delete_before = opts .delete_before ,
445
+ dry_run = opts .dry_run ,
446
+ )
292
447
)
0 commit comments