|
9 | 9 | from celery import shared_task, chain, chord, group
|
10 | 10 | from celery.utils.log import get_task_logger
|
11 | 11 | from django.db import transaction
|
| 12 | +from django.db.utils import IntegrityError |
| 13 | + |
12 | 14 |
|
13 | 15 | from core.utils import log_execution
|
14 | 16 | from core.settings import DJANGO_DB_BULK_CREATE_BATCH_SIZE
|
@@ -353,133 +355,141 @@ def parse_info_subtask(self, prev_result, id, file_name, *args, **kwargs):
|
353 | 355 | # add task
|
354 | 356 | task = ValidationTask.objects.create(request=request, type=ValidationTask.Type.PARSE_INFO)
|
355 | 357 |
|
356 |
| - task.mark_as_initiated() |
357 |
| - # check for header policy |
358 |
| - check_script = os.path.join(os.path.dirname(__file__), "checks", "header_policy", "validate_header.py") |
359 |
| - |
360 |
| - try: |
361 |
| - logger.debug(f'before header validation task, path {file_path}, script {check_script} ') |
362 |
| - proc = subprocess.run( |
363 |
| - [sys.executable, check_script, file_path], |
364 |
| - stdout=subprocess.PIPE, |
365 |
| - stderr=subprocess.PIPE, |
366 |
| - text=True, |
367 |
| - timeout=TASK_TIMEOUT_LIMIT # Add timeout to prevent infinite hangs |
368 |
| - ) |
| 358 | + prev_result_succeeded = prev_result is not None and prev_result['is_valid'] is True |
| 359 | + if prev_result_succeeded: |
| 360 | + task.mark_as_initiated() |
| 361 | + # check for header policy |
| 362 | + check_script = os.path.join(os.path.dirname(__file__), "checks", "header_policy", "validate_header.py") |
369 | 363 |
|
370 |
| - except subprocess.TimeoutExpired as err: |
371 |
| - task.mark_as_failed(err) |
372 |
| - raise |
373 |
| - except Exception as err: |
374 |
| - task.mark_as_failed(err) |
375 |
| - raise |
376 |
| - |
377 |
| - if (proc.returncode is not None and proc.returncode != 0) or (len(proc.stderr) > 0): |
378 |
| - error_message = f"Running {' '.join(proc.args)} failed with exit code {proc.returncode}\n{proc.stdout}\n{proc.stderr}" |
379 |
| - task.mark_as_failed(error_message) |
380 |
| - raise RuntimeError(error_message) |
381 |
| - |
382 |
| - header_validation = {} |
383 |
| - stdout_lines = proc.stdout.splitlines() |
384 |
| - for line in stdout_lines: |
385 | 364 | try:
|
386 |
| - header_validation = json.loads(line) |
387 |
| - except json.JSONDecodeError: |
388 |
| - continue |
389 |
| - |
390 |
| - logger.debug(f'header validation output : {header_validation}') |
391 |
| - |
392 |
| - with transaction.atomic(): |
393 |
| - # create or retrieve Model info |
394 |
| - model = get_or_create_ifc_model(id) |
395 |
| - |
396 |
| - # update Model info |
397 |
| - agg_status = task.determine_aggregate_status() |
398 |
| - model.status_prereq = agg_status |
399 |
| - |
400 |
| - # size |
401 |
| - model.size = os.path.getsize(file_path) |
402 |
| - logger.debug(f'Detected size = {model.size} bytes') |
403 |
| - |
404 |
| - # schema |
405 |
| - model.schema = header_validation.get('schema_identifier') |
406 |
| - |
407 |
| - logger.debug(f'The schema identifier = {header_validation.get("schema")}') |
408 |
| - |
409 |
| - # time_stamp |
410 |
| - if ifc_file_time_stamp := header_validation.get('time_stamp', False): |
411 |
| - try: |
412 |
| - logger.debug(f'Timestamp within file = {ifc_file_time_stamp}') |
413 |
| - date = datetime.datetime.strptime(ifc_file_time_stamp, "%Y-%m-%dT%H:%M:%S") |
414 |
| - date_with_tz = datetime.datetime( |
415 |
| - date.year, |
416 |
| - date.month, |
417 |
| - date.day, |
418 |
| - date.hour, |
419 |
| - date.minute, |
420 |
| - date.second, |
421 |
| - tzinfo=datetime.timezone.utc) |
422 |
| - model.date = date_with_tz |
423 |
| - except ValueError: |
| 365 | + logger.debug(f'before header validation task, path {file_path}, script {check_script} ') |
| 366 | + proc = subprocess.run( |
| 367 | + [sys.executable, check_script, file_path], |
| 368 | + stdout=subprocess.PIPE, |
| 369 | + stderr=subprocess.PIPE, |
| 370 | + text=True, |
| 371 | + timeout=TASK_TIMEOUT_LIMIT # Add timeout to prevent infinite hangs |
| 372 | + ) |
| 373 | + |
| 374 | + |
| 375 | + if (proc.returncode is not None and proc.returncode != 0) or (len(proc.stderr) > 0): |
| 376 | + error_message = f"Running {' '.join(proc.args)} failed with exit code {proc.returncode}\n{proc.stdout}\n{proc.stderr}" |
| 377 | + task.mark_as_failed(error_message) |
| 378 | + raise RuntimeError(error_message) |
| 379 | + |
| 380 | + header_validation = {} |
| 381 | + stdout_lines = proc.stdout.splitlines() |
| 382 | + for line in stdout_lines: |
424 | 383 | try:
|
425 |
| - model.date = datetime.datetime.fromisoformat(ifc_file_time_stamp) |
426 |
| - except ValueError: |
427 |
| - pass |
| 384 | + header_validation = json.loads(line) |
| 385 | + except json.JSONDecodeError: |
| 386 | + continue |
| 387 | + |
| 388 | + logger.debug(f'header validation output : {header_validation}') |
| 389 | + |
| 390 | + with transaction.atomic(): |
| 391 | + # create or retrieve Model info |
| 392 | + model = get_or_create_ifc_model(id) |
| 393 | + |
| 394 | + # update Model info |
| 395 | + agg_status = task.determine_aggregate_status() |
| 396 | + model.status_prereq = agg_status |
428 | 397 |
|
429 |
| - # mvd |
430 |
| - model.mvd = header_validation.get('mvd') |
431 |
| - |
432 |
| - app = header_validation.get('application_name') |
433 |
| - |
434 |
| - version = header_validation.get('version') |
435 |
| - name = None if any(value in (None, "Not defined") for value in (app, version)) else app + ' ' + version |
436 |
| - company_name = header_validation.get('company_name') |
437 |
| - logger.debug(f'Detected Authoring Tool in file = {name}') |
438 |
| - |
439 |
| - validation_errors = header_validation.get('validation_errors', []) |
440 |
| - invalid_marker_fields = ['originating_system', 'version', 'company_name', 'application_name'] |
441 |
| - |
442 |
| - if any(field in validation_errors for field in invalid_marker_fields): |
443 |
| - model.status_header = Model.Status.INVALID |
444 |
| - else: |
445 |
| - # parsing was successful and model can be considered for scorecards |
446 |
| - model.status_header = Model.Status.VALID |
447 |
| - authoring_tool = AuthoringTool.find_by_full_name(full_name=name) |
448 |
| - if (isinstance(authoring_tool, AuthoringTool)): |
| 398 | + # size |
| 399 | + model.size = os.path.getsize(file_path) |
| 400 | + logger.debug(f'Detected size = {model.size} bytes') |
449 | 401 |
|
450 |
| - if authoring_tool.company is None: |
451 |
| - company, _ = Company.objects.get_or_create(name=company_name) |
452 |
| - authoring_tool.company = company |
453 |
| - authoring_tool.save() |
454 |
| - logger.debug(f'Updated existing Authoring Tool with company: {company.name}') |
455 |
| - |
456 |
| - model.produced_by = authoring_tool |
457 |
| - logger.debug(f'Retrieved existing Authoring Tool from DB = {model.produced_by.full_name}') |
458 |
| - |
459 |
| - elif authoring_tool is None: |
460 |
| - company, _ = Company.objects.get_or_create(name=company_name) |
461 |
| - authoring_tool, _ = AuthoringTool.objects.get_or_create( |
462 |
| - company=company, |
463 |
| - name=app, |
464 |
| - version=version |
465 |
| - ) |
466 |
| - model.produced_by = authoring_tool |
467 |
| - logger.debug(f'Authoring app not found, ApplicationFullName = {app}, Version = {version} - created new instance') |
468 |
| - else: |
469 |
| - model.produced_by = None |
470 |
| - logger.warning(f'Retrieved multiple Authoring Tool from DB: {authoring_tool} - could not assign any') |
| 402 | + # schema |
| 403 | + model.schema = header_validation.get('schema_identifier') |
| 404 | + |
| 405 | + logger.debug(f'The schema identifier = {header_validation.get("schema")}') |
| 406 | + # time_stamp |
| 407 | + if ifc_file_time_stamp := header_validation.get('time_stamp', False): |
| 408 | + try: |
| 409 | + logger.debug(f'Timestamp within file = {ifc_file_time_stamp}') |
| 410 | + date = datetime.datetime.strptime(ifc_file_time_stamp, "%Y-%m-%dT%H:%M:%S") |
| 411 | + date_with_tz = datetime.datetime( |
| 412 | + date.year, |
| 413 | + date.month, |
| 414 | + date.day, |
| 415 | + date.hour, |
| 416 | + date.minute, |
| 417 | + date.second, |
| 418 | + tzinfo=datetime.timezone.utc) |
| 419 | + model.date = date_with_tz |
| 420 | + except ValueError: |
| 421 | + try: |
| 422 | + model.date = datetime.datetime.fromisoformat(ifc_file_time_stamp) |
| 423 | + except ValueError: |
| 424 | + pass |
471 | 425 |
|
472 |
| - # update header validation |
473 |
| - model.header_validation = header_validation |
474 |
| - model.save(update_fields=['status_header', 'header_validation']) |
475 |
| - model.save() |
476 |
| - |
477 |
| - |
478 |
| - # update Task info and return |
479 |
| - is_valid = agg_status != Model.Status.INVALID |
480 |
| - reason = f'agg_status = {Model.Status(agg_status).label}\nraw_output = {header_validation}' |
481 |
| - task.mark_as_completed(reason) |
482 |
| - return {'is_valid': is_valid, 'reason': reason} |
| 426 | + # mvd |
| 427 | + model.mvd = header_validation.get('mvd') |
| 428 | + |
| 429 | + app = header_validation.get('application_name') |
| 430 | + |
| 431 | + version = header_validation.get('version') |
| 432 | + name = None if any(value in (None, "Not defined") for value in (app, version)) else app + ' ' + version |
| 433 | + company_name = header_validation.get('company_name') |
| 434 | + logger.debug(f'Detected Authoring Tool in file = {name}') |
| 435 | + |
| 436 | + validation_errors = header_validation.get('validation_errors', []) |
| 437 | + invalid_marker_fields = ['originating_system', 'version', 'company_name', 'application_name'] |
| 438 | + if any(field in validation_errors for field in invalid_marker_fields): |
| 439 | + model.status_header = Model.Status.INVALID |
| 440 | + else: |
| 441 | + # parsing was successful and model can be considered for scorecards |
| 442 | + model.status_header = Model.Status.VALID |
| 443 | + authoring_tool = AuthoringTool.find_by_full_name(full_name=name) |
| 444 | + if (isinstance(authoring_tool, AuthoringTool)): |
| 445 | + |
| 446 | + if authoring_tool.company is None: |
| 447 | + company, _ = Company.objects.get_or_create(name=company_name) |
| 448 | + authoring_tool.company = company |
| 449 | + authoring_tool.save() |
| 450 | + logger.debug(f'Updated existing Authoring Tool with company: {company.name}') |
| 451 | + |
| 452 | + model.produced_by = authoring_tool |
| 453 | + logger.debug(f'Retrieved existing Authoring Tool from DB = {model.produced_by.full_name}') |
| 454 | + |
| 455 | + elif authoring_tool is None: |
| 456 | + company, _ = Company.objects.get_or_create(name=company_name) |
| 457 | + authoring_tool, _ = AuthoringTool.objects.get_or_create( |
| 458 | + company=company, |
| 459 | + name=app, |
| 460 | + version=version |
| 461 | + ) |
| 462 | + model.produced_by = authoring_tool |
| 463 | + logger.debug(f'Authoring app not found, ApplicationFullName = {app}, Version = {version} - created new instance') |
| 464 | + else: |
| 465 | + model.produced_by = None |
| 466 | + logger.warning(f'Retrieved multiple Authoring Tool from DB: {authoring_tool} - could not assign any') |
| 467 | + |
| 468 | + # update header validation |
| 469 | + model.header_validation = header_validation |
| 470 | + model.save(update_fields=['status_header', 'header_validation']) |
| 471 | + model.save() |
| 472 | + |
| 473 | + |
| 474 | + # update Task info and return |
| 475 | + is_valid = agg_status != Model.Status.INVALID |
| 476 | + reason = f'agg_status = {Model.Status(agg_status).label}\nraw_output = {header_validation}' |
| 477 | + task.mark_as_completed(reason) |
| 478 | + return {'is_valid': is_valid, 'reason': reason} |
| 479 | + |
| 480 | + except subprocess.TimeoutExpired as err: |
| 481 | + task.mark_as_failed(err) |
| 482 | + raise |
| 483 | + except IntegrityError as err: |
| 484 | + task.mark_as_failed(err) |
| 485 | + raise |
| 486 | + except Exception as err: |
| 487 | + task.mark_as_failed(err) |
| 488 | + raise |
| 489 | + else: |
| 490 | + reason = f'Skipped as prev_result = {prev_result}.' |
| 491 | + task.mark_as_skipped(reason) |
| 492 | + return {'is_valid': None, 'reason': reason} |
483 | 493 |
|
484 | 494 |
|
485 | 495 | @shared_task(bind=True)
|
|
0 commit comments