7
7
8
8
Requires aiohttp and tqdm:
9
9
10
- pip3 install aiohttp aiodns tqdm
10
+ python3 -m pip install aiohttp aiodns tqdm aiohttp-client-cache aiosqlite
11
11
12
12
Usage:
13
13
17
17
18
18
import asyncio
19
19
from collections import defaultdict
20
+ from datetime import datetime , timedelta
20
21
from functools import partial
21
22
import os
22
23
from pprint import pformat
23
24
import time
24
25
26
+ from dateutil .parser import parse as parse_date
27
+
28
+ from aiohttp_client_cache import CachedSession , SQLiteBackend
25
29
import aiohttp
26
30
import tqdm
31
+ import tqdm .asyncio
27
32
import yaml
28
33
29
34
HERE = os .path .dirname (__file__ )
30
35
31
36
# delete builds used for CI, as well
32
37
CI_STRINGS = ["binderhub-ci-repos-" , "binderhub-2dci-2drepos-" ]
33
38
39
+ # don't delete images that *don't* appear to be r2d builds
40
+ # (image repository could have other images in it!)
41
+ R2D_STRINGS = ["r2d-" ]
42
+
43
+ TODAY = datetime .now ()
44
+ FIVE_YEARS_AGO = TODAY - timedelta (days = 5 * 365 )
45
+ TOMORROW = TODAY + timedelta (days = 1 )
46
+
34
47
35
48
class RequestFailed (Exception ):
36
49
"""Nicely formatted error for failed requests"""
@@ -49,10 +62,12 @@ def __str__(self):
49
62
)
50
63
51
64
52
- async def raise_for_status (r , action = "" ):
65
+ async def raise_for_status (r , action = "" , allowed_errors = None ):
53
66
"""raise an informative error on failed requests"""
54
67
if r .status < 400 :
55
68
return
69
+ if allowed_errors and r .status in allowed_errors :
70
+ return
56
71
if r .headers .get ("Content-Type" ) == "application/json" :
57
72
# try to parse json error messages
58
73
content = await r .json ()
@@ -68,21 +83,49 @@ async def raise_for_status(r, action=""):
68
83
raise RequestFailed (r .status , r .request_info .method , r .request_info .url , content )
69
84
70
85
71
- async def list_images (session , project ):
86
+ def list_images (session , image_prefix ):
87
+ if image_prefix .count ("/" ) == 1 :
88
+ # docker hub, can't use catalog endpoint
89
+ docker_hub_user = image_prefix .split ("/" , 1 )[0 ]
90
+ return list_images_docker_hub (session , docker_hub_user )
91
+ elif image_prefix .count ("/" ) == 2 :
92
+ registry_host = image_prefix .split ("/" , 1 )[0 ]
93
+ registry_url = f"https://{ registry_host } "
94
+ return list_images_catalog (session , registry_url )
95
+
96
+
97
+ async def list_images_docker_hub (session , docker_hub_user ):
72
98
"""List the images for a project"""
73
- images = []
74
- first = True
75
- url = "https://gcr.io/v2/_catalog"
99
+ url = f"https://hub.docker.com/v2/repositories/{ docker_hub_user } /?page_size=100"
76
100
while url :
77
101
async with session .get (url ) as r :
78
102
await raise_for_status (r , "listing images" )
79
103
resp = await r .json ()
80
- for image in resp ["repositories " ]:
81
- if image . startswith ( project + "/" ):
82
- yield image
104
+ for image in resp ["results " ]:
105
+ # filter-out not our images??
106
+ yield f" { image [ 'user' ] } / { image [ 'name' ] } "
83
107
url = resp .get ("next" )
84
108
85
109
110
+ async def list_images_catalog (session , registry_host ):
111
+ """List the images for a project"""
112
+ url = f"{ registry_host } /v2/_catalog"
113
+ while url :
114
+ async with session .get (url ) as r :
115
+ await raise_for_status (r , "listing images" )
116
+ resp = await r .json ()
117
+ for image in resp ["repositories" ]:
118
+ # filter-out not our images??
119
+ yield image
120
+ if "next" in resp :
121
+ url = resp ["next" ]
122
+ elif "next" in r .links :
123
+ url = r .links ["next" ]["url" ]
124
+ else :
125
+ url = None
126
+
127
+
128
+
86
129
async def get_manifest (session , image ):
87
130
"""List the tags for an image
88
131
@@ -115,22 +158,32 @@ async def delete_image(session, image, digest, tags, dry_run=False):
115
158
# delete tags first (required)
116
159
for tag in tags :
117
160
async with fetch (f"{ manifests } /{ tag } " ) as r :
118
- await raise_for_status (r , f"{ verb } tag { image } @{ tag } " )
161
+ # allow 404 because previous delete may have been cached
162
+ await raise_for_status (r , f"{ verb } tag { image } @{ tag } " , allowed_errors = [404 ])
119
163
120
164
# this is the actual deletion
121
165
async with fetch (f"{ manifests } /{ digest } " ) as r :
122
- await raise_for_status (r , f"{ verb } image { image } @{ digest } " )
166
+ # allow 404 because previous delete may have been cached
167
+ await raise_for_status (r , f"{ verb } image { image } @{ digest } " , allowed_errors = [404 ])
123
168
124
169
125
- async def main (release = "staging" , project = None , concurrency = 20 , dry_run = True ):
170
+ async def main (
171
+ release = "staging" , project = None , concurrency = 20 , delete_before = None , dry_run = True
172
+ ):
126
173
if dry_run :
127
174
print ("THIS IS A DRY RUN. NO IMAGES WILL BE DELETED." )
128
175
to_be = "to be "
129
176
else :
130
177
to_be = ""
131
178
179
+ if delete_before :
180
+ # docker uses millisecond integer timestamps
181
+ delete_before_ms = int (delete_before .timestamp ()) * 1e3
182
+ else :
183
+ delete_before_ms = float ("inf" )
184
+
132
185
if not project :
133
- project = f"binder- { release } "
186
+ project = "binderhub-288415 "
134
187
with open (os .path .join (HERE , os .pardir , "config" , release + ".yaml" )) as f :
135
188
config = yaml .safe_load (f )
136
189
@@ -142,6 +195,7 @@ async def main(release="staging", project=None, concurrency=20, dry_run=True):
142
195
config = yaml .safe_load (f )
143
196
144
197
password = config ["binderhub" ]["registry" ]["password" ]
198
+ username = config ["binderhub" ]["registry" ].get ("username" , "_json_key" )
145
199
146
200
start = time .perf_counter ()
147
201
semaphores = defaultdict (lambda : asyncio .BoundedSemaphore (concurrency ))
@@ -164,49 +218,129 @@ async def bounded(f, *args, **kwargs):
164
218
async with semaphores [f ]:
165
219
return await f (* args , ** kwargs )
166
220
167
- async with aiohttp .ClientSession (
168
- auth = aiohttp .BasicAuth ("_json_key" , password ),
221
+ # TODO: basic auth is only sufficient for gcr
222
+ # need to request a token for non-gcr endpoints (ovh, turing on docker hub)
223
+ # e.g.
224
+ auth_kwargs = {}
225
+ print (prefix )
226
+ if prefix .startswith ("gcr.io" ):
227
+ auth_kwargs ["auth" ] = aiohttp .BasicAuth (username , password )
228
+ else :
229
+ # get bearer token
230
+ if prefix .count ("/" ) == 2 :
231
+ # ovh
232
+ registry_host = prefix .split ("/" , 1 )[0 ]
233
+ token_url = f"https://{ registry_host } /service/token?service=harbor-registry&scope=registry:catalog:*"
234
+ else :
235
+ # turing
236
+ raise NotImplementedError ("Can't get docker hub creds yet" )
237
+
238
+ async with aiohttp .ClientSession (
239
+ auth = aiohttp .BasicAuth (username , password )
240
+ ) as session :
241
+ response = await session .get (token_url )
242
+ token_info = await response .json ()
243
+ auth_kwargs ["headers" ] = {
244
+ "Authorization" : f"Bearer { token_info ['token' ]} "
245
+ }
246
+
247
+ async with CachedSession (
169
248
connector = aiohttp .TCPConnector (limit = 2 * concurrency ),
249
+ cache = SQLiteBackend (expire_after = 24 * 3600 ),
250
+ ** auth_kwargs ,
170
251
) as session :
171
252
172
- print (f "Fetching images" )
253
+ print ("Fetching images" )
173
254
tag_futures = []
174
255
matches = 0
175
- total_images = 0
176
- async for image in list_images (session , project ):
177
- total_images += 1
256
+ repos_to_keep = 0
257
+ repos_to_delete = 0
258
+
259
+ def should_delete_repository (image ):
260
+ """Whether we should delete the whole repository"""
178
261
if f"gcr.io/{ image } " .startswith (prefix ) and not any (
179
262
ci_string in image for ci_string in CI_STRINGS
180
263
):
181
- matches += 1
182
- continue
183
- # don't call ensure_future here
184
- # because we don't want to kick off everything before
185
- tag_futures .append (
186
- asyncio .ensure_future (bounded (get_manifest , session , image ))
187
- )
188
- if not matches :
264
+ return False
265
+
266
+ def should_fetch_repository (image ):
267
+ if not any (substring in image for substring in R2D_STRINGS ):
268
+ # ignore non-r2d builds
269
+ return False
270
+ if delete_before or should_delete_repository (image ):
271
+ # if delete_before, we are deleting old builds of images we are keeping,
272
+ # otherwise, only delete builds that don't match our image prefix
273
+ return True
274
+ else :
275
+ return False
276
+
277
+ async for image in tqdm .asyncio .tqdm (
278
+ list_images (session , prefix ),
279
+ unit_scale = True ,
280
+ desc = "listing images" ,
281
+ ):
282
+ if should_fetch_repository (image ):
283
+ if should_delete_repository (image ):
284
+ repos_to_delete += 1
285
+ else :
286
+ repos_to_keep += 1
287
+ tag_futures .append (
288
+ asyncio .ensure_future (bounded (get_manifest , session , image ))
289
+ )
290
+ else :
291
+ repos_to_keep += 1
292
+
293
+ if not repos_to_keep :
189
294
raise RuntimeError (
190
295
f"No images matching prefix { prefix } . Would delete all images!"
191
296
)
192
- print (f"Not deleting { matches } images starting with { prefix } " )
297
+ print (f"Not deleting { repos_to_keep } images starting with { prefix } " )
193
298
if not tag_futures :
194
299
print ("Nothing to delete" )
195
300
return
196
301
print (f"{ len (tag_futures )} images to delete (not counting tags)" )
197
302
198
303
delete_futures = set ()
304
+ done = set ()
199
305
print ("Fetching tags" )
200
306
delete_progress = tqdm .tqdm (
201
- total = len ( tag_futures ) ,
307
+ total = repos_to_delete ,
202
308
position = 2 ,
203
309
unit_scale = True ,
204
310
desc = f"builds { to_be } deleted" ,
205
311
)
206
312
delete_byte_progress = tqdm .tqdm (
207
- total = 0 , position = 3 , unit = "B" , unit_scale = True , desc = f"bytes { to_be } deleted"
313
+ total = 0 ,
314
+ position = 3 ,
315
+ unit = "B" ,
316
+ unit_scale = True ,
317
+ desc = f"bytes { to_be } deleted" ,
208
318
)
209
319
320
+ def should_delete_tag (image , info ):
321
+ if should_delete_repository (image ):
322
+ return True
323
+ if not delete_before :
324
+ # no date cutoff
325
+ return False
326
+
327
+ # check cutoff
328
+ image_ms = int (info ["timeCreatedMs" ])
329
+ image_datetime = datetime .fromtimestamp (image_ms / 1e3 )
330
+ # sanity check timestamps
331
+ if image_datetime < FIVE_YEARS_AGO or image_datetime > TOMORROW :
332
+ raise RuntimeError (
333
+ f"Not deleting image with weird date: { image } , { info } , { image_datetime } "
334
+ )
335
+ if delete_before_ms > image_ms :
336
+ if dry_run :
337
+ print (
338
+ f"\n Would delete { image } :{ ',' .join (info ['tag' ])} { image_datetime .isoformat ()} "
339
+ )
340
+ return True
341
+ else :
342
+ return False
343
+
210
344
try :
211
345
for f in tqdm .tqdm (
212
346
asyncio .as_completed (tag_futures ),
@@ -216,9 +350,15 @@ async def bounded(f, *args, **kwargs):
216
350
):
217
351
manifest = await f
218
352
image = manifest ["name" ]
219
- if len (manifest ["manifest" ]) > 1 :
353
+ delete_whole_repo = should_delete_repository (image )
354
+ if delete_whole_repo and len (manifest ["manifest" ]) > 1 :
220
355
delete_progress .total += len (manifest ["manifest" ]) - 1
221
356
for digest , info in manifest ["manifest" ].items ():
357
+ if not should_delete_tag (image , info ):
358
+ continue
359
+ if not delete_whole_repo :
360
+ # not counted yet
361
+ delete_progress .total += 1
222
362
nbytes = int (info ["imageSizeBytes" ])
223
363
delete_byte_progress .total += nbytes
224
364
f = asyncio .ensure_future (
@@ -240,7 +380,8 @@ async def bounded(f, *args, **kwargs):
240
380
nbytes ,
241
381
)
242
382
)
243
- done , delete_futures = await asyncio .wait (delete_futures , timeout = 0 )
383
+ if delete_futures :
384
+ done , delete_futures = await asyncio .wait (delete_futures , timeout = 0 )
244
385
if done :
245
386
# collect possible errors
246
387
await asyncio .gather (* done )
@@ -275,6 +416,12 @@ async def bounded(f, *args, **kwargs):
275
416
default = "" ,
276
417
help = "The gcloud project to use; only needed if not of the form `binder-{release}`." ,
277
418
)
419
+ parser .add_argument (
420
+ "--delete-before" ,
421
+ type = lambda s : s and parse_date (s ),
422
+ default = "" ,
423
+ help = "Delete any images older than this date. If unspecified, do not use date cutoff." ,
424
+ )
278
425
parser .add_argument (
279
426
"-j" ,
280
427
"--concurrency" ,
@@ -290,5 +437,11 @@ async def bounded(f, *args, **kwargs):
290
437
)
291
438
opts = parser .parse_args ()
292
439
asyncio .get_event_loop ().run_until_complete (
293
- main (opts .release , opts .project , opts .concurrency , dry_run = opts .dry_run )
440
+ main (
441
+ opts .release ,
442
+ opts .project ,
443
+ opts .concurrency ,
444
+ delete_before = opts .delete_before ,
445
+ dry_run = opts .dry_run ,
446
+ )
294
447
)
0 commit comments