Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions supervisor/addons/addon.py
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,7 @@ async def _check_ingress_port(self):
on_condition=AddonsJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def install(self) -> None:
async def install(self, *, progress_job_id: str | None = None) -> None:
"""Install and setup this addon."""
if not self.addon_store:
raise AddonsError("Missing from store, cannot install!")
Expand All @@ -792,7 +792,10 @@ def setup_data():
# Install image
try:
await self.instance.install(
self.latest_version, self.addon_store.image, arch=self.arch
self.latest_version,
self.addon_store.image,
arch=self.arch,
progress_job_id=progress_job_id,
)
except DockerError as err:
await self.sys_addons.data.uninstall(self)
Expand Down Expand Up @@ -876,7 +879,9 @@ def cleanup_config_and_audio():
on_condition=AddonsJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def update(self) -> asyncio.Task | None:
async def update(
self, *, progress_job_id: str | None = None
) -> asyncio.Task | None:
"""Update this addon to latest version.

Returns a Task that completes when addon has state 'started' (see start)
Expand All @@ -890,7 +895,12 @@ async def update(self) -> asyncio.Task | None:
store = self.addon_store.clone()

try:
await self.instance.update(store.version, store.image, arch=self.arch)
await self.instance.update(
store.version,
store.image,
arch=self.arch,
progress_job_id=progress_job_id,
)
except DockerError as err:
raise AddonsError() from err

Expand Down
24 changes: 20 additions & 4 deletions supervisor/addons/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@

from attr import evolve

from supervisor.jobs.const import JobConcurrency

from ..const import AddonBoot, AddonStartup, AddonState
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import (
Expand All @@ -22,6 +20,7 @@
HassioError,
HomeAssistantAPIError,
)
from ..jobs.const import JobConcurrency
from ..jobs.decorator import Job, JobCondition
from ..resolution.const import ContextType, IssueType, SuggestionType
from ..store.addon import AddonStore
Expand Down Expand Up @@ -203,7 +202,13 @@ async def install(
if validation_complete:
validation_complete.set()

await Addon(self.coresys, slug).install()
await Addon(self.coresys, slug).install(
progress_job_id=self.sys_jobs.current.uuid
)

# Make sure our job finishes at 100% so users aren't confused
if self.sys_jobs.current.progress != 100:
self.sys_jobs.current.progress = 100

_LOGGER.info("Add-on '%s' successfully installed", slug)

Expand Down Expand Up @@ -272,7 +277,18 @@ async def update(
addons=[addon.slug],
)

return await addon.update()
# Assume for now the docker image pull is 100% of this task. But from a user
# perspective that isn't true. Other steps we could consider allocating a fixed
# amount of progress for to improve accuracy include: partial backup, image
# cleanup, apparmor update, and addon restart
task = await addon.update(progress_job_id=self.sys_jobs.current.uuid)

# Make sure our job finishes at 100% so users aren't confused
if self.sys_jobs.current.progress != 100:
self.sys_jobs.current.progress = 100

_LOGGER.info("Add-on '%s' successfully updated", slug)
return task

@Job(
name="addon_manager_rebuild",
Expand Down
8 changes: 7 additions & 1 deletion supervisor/docker/addon.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,8 @@ async def update(
image: str | None = None,
latest: bool = False,
arch: CpuArch | None = None,
*,
progress_job_id: str | None = None,
) -> None:
"""Update a docker image."""
image = image or self.image
Expand All @@ -643,6 +645,7 @@ async def update(
latest=latest,
arch=arch,
need_build=self.addon.latest_need_build,
progress_job_id=progress_job_id,
)

@Job(
Expand All @@ -658,12 +661,15 @@ async def install(
arch: CpuArch | None = None,
*,
need_build: bool | None = None,
progress_job_id: str | None = None,
) -> None:
"""Pull Docker image or build it."""
if need_build is None and self.addon.need_build or need_build:
await self._build(version, image)
else:
await super().install(version, image, latest, arch)
await super().install(
version, image, latest, arch, progress_job_id=progress_job_id
)

async def _build(self, version: AwesomeVersion, image: str | None = None) -> None:
"""Build a Docker container."""
Expand Down
80 changes: 71 additions & 9 deletions supervisor/docker/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,12 @@ async def _docker_login(self, image: str) -> None:

await self.sys_run_in_executor(self.sys_docker.docker.login, **credentials)

def _process_pull_image_log(self, job_id: str, reference: PullLogEntry) -> None:
def _process_pull_image_log(
self, reference_id: str, progress_job_id: str, reference: PullLogEntry
) -> None:
"""Process events fired from a docker while pulling an image, filtered to a given job id."""
if (
reference.job_id != job_id
reference.job_id != reference_id
or not reference.id
or not reference.status
or not (stage := PullImageLayerStage.from_status(reference.status))
Expand All @@ -237,21 +239,22 @@ def _process_pull_image_log(self, job_id: str, reference: PullLogEntry) -> None:
name="Pulling container image layer",
initial_stage=stage.status,
reference=reference.id,
parent_id=job_id,
parent_id=progress_job_id,
internal=True,
)
job.done = False
return

# Find our sub job to update details of
for j in self.sys_jobs.jobs:
if j.parent_id == job_id and j.reference == reference.id:
if j.parent_id == progress_job_id and j.reference == reference.id:
job = j
break

# This likely only occurs if the logs came in out of sync and we got progress before the Pulling FS Layer one
if not job:
raise DockerLogOutOfOrder(
f"Received pull image log with status {reference.status} for image id {reference.id} and parent job {job_id} but could not find a matching job, skipping",
f"Received pull image log with status {reference.status} for image id {reference.id} and parent job {progress_job_id} but could not find a matching job, skipping",
_LOGGER.debug,
)

Expand Down Expand Up @@ -325,17 +328,65 @@ def _process_pull_image_log(self, job_id: str, reference: PullLogEntry) -> None:
else job.extra,
)

# Once we have received a progress update for every child job, start to set status of the main one
install_job = self.sys_jobs.get_job(progress_job_id)
layer_jobs = [
job
for job in self.sys_jobs.jobs
if job.parent_id == install_job.uuid
and job.name == "Pulling container image layer"
]

# First set the total bytes to be downloaded/extracted on the main job
if not install_job.extra:
total = 0
for job in layer_jobs:
if not job.extra:
return
total += job.extra["total"]
install_job.extra = {"total": total}
else:
total = install_job.extra["total"]

# Then determine total progress based on progress of each sub-job, factoring in size of each compared to total
progress = 0.0
stage = PullImageLayerStage.PULL_COMPLETE
for job in layer_jobs:
if not job.extra:
return
progress += job.progress * (job.extra["total"] / total)
job_stage = PullImageLayerStage.from_status(cast(str, job.stage))

if job_stage < PullImageLayerStage.EXTRACTING:
stage = PullImageLayerStage.DOWNLOADING
elif (
stage == PullImageLayerStage.PULL_COMPLETE
and job_stage < PullImageLayerStage.PULL_COMPLETE
):
stage = PullImageLayerStage.EXTRACTING

# Ensure progress is 100 at this point to prevent float drift
if stage == PullImageLayerStage.PULL_COMPLETE:
progress = 100

# To reduce noise, limit updates to when result has changed by an entire percent or when stage changed
if stage != install_job.stage or progress >= install_job.progress + 1:
install_job.update(stage=stage.status, progress=progress)

@Job(
name="docker_interface_install",
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
internal=True,
)
async def install(
self,
version: AwesomeVersion,
image: str | None = None,
latest: bool = False,
arch: CpuArch | None = None,
*,
progress_job_id: str | None = None,
) -> None:
"""Pull docker image."""
image = image or self.image
Expand All @@ -351,11 +402,15 @@ async def install(
# Try login if we have defined credentials
await self._docker_login(image)

job_id = self.sys_jobs.current.uuid
reference_id = self.sys_jobs.current.uuid
if not progress_job_id:
progress_job_id = reference_id

async def process_pull_image_log(reference: PullLogEntry) -> None:
try:
self._process_pull_image_log(job_id, reference)
self._process_pull_image_log(
reference_id, progress_job_id, reference
)
except DockerLogOutOfOrder as err:
# Send all these to sentry. Missing a few progress updates
# shouldn't matter to users but matters to us
Expand Down Expand Up @@ -629,7 +684,12 @@ async def check_image(
concurrency=JobConcurrency.GROUP_REJECT,
)
async def update(
self, version: AwesomeVersion, image: str | None = None, latest: bool = False
self,
version: AwesomeVersion,
image: str | None = None,
latest: bool = False,
*,
progress_job_id: str | None = None,
) -> None:
"""Update a Docker image."""
image = image or self.image
Expand All @@ -639,7 +699,9 @@ async def update(
)

# Update docker image
await self.install(version, image=image, latest=latest)
await self.install(
version, image=image, latest=latest, progress_job_id=progress_job_id
)

# Stop container & cleanup
with suppress(DockerError):
Expand Down
13 changes: 12 additions & 1 deletion supervisor/homeassistant/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,18 @@ async def update(
)

# process an update
# Assume for now the docker image pull is 100% of this task. But from a user
# perspective that isn't true. Other steps we could consider allocating a fixed
# amount of progress for to improve accuracy include: partial backup, image
# cleanup, and Home Assistant restart
async def _update(to_version: AwesomeVersion) -> None:
"""Run Home Assistant update."""
_LOGGER.info("Updating Home Assistant to version %s", to_version)
try:
await self.instance.update(
to_version, image=self.sys_updater.image_homeassistant
to_version,
image=self.sys_updater.image_homeassistant,
progress_job_id=self.sys_jobs.current.uuid,
)
except DockerError as err:
raise HomeAssistantUpdateError(
Expand All @@ -285,6 +291,11 @@ async def _update(to_version: AwesomeVersion) -> None:
with suppress(DockerError):
await self.instance.cleanup(old_image=old_image)

# If the user never left the update screen they may actually see the progress bar again depending
# on how frontend works. Just in case (and as good practice) set job to 100% at successful end
if self.sys_jobs.current.progress != 100:
self.sys_jobs.current.progress = 100

# Update Home Assistant
with suppress(HomeAssistantError):
await _update(to_version)
Expand Down
21 changes: 13 additions & 8 deletions supervisor/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,15 @@ def update(
self.stage = stage
if extra != DEFAULT:
self.extra = extra
if done is not None:
self.done = done

self.on_change = on_change
# Just triggers the normal on change
self.reference = self.reference

if done is not None:
# Done has a special event, use it to trigger on change if included
self.done = done
else:
# Just set something to trigger the normal on change
self.reference = self.reference


class JobManager(FileConfiguration, CoreSysAttributes):
Expand Down Expand Up @@ -229,12 +232,14 @@ def _notify_on_job_change(
self, job: SupervisorJob, attribute: Attribute, value: Any
) -> None:
"""Notify Home Assistant of a change to a job and bus on job start/end."""
# Send out job data as a dictionary to prevent concurrency issue with shared job object
# Plus then we can fold in the newly updated value
if attribute.name == "errors":
value = [err.as_dict() for err in value]
job_data = job.as_dict() | {attribute.name: value}

self.sys_homeassistant.websocket.supervisor_event(
WSEvent.JOB, job.as_dict() | {attribute.name: value}
)
if not job.internal:
self.sys_homeassistant.websocket.supervisor_event(WSEvent.JOB, job_data)

if attribute.name == "done":
if value is False:
Expand All @@ -255,7 +260,7 @@ def new_job(
name,
reference=reference,
stage=initial_stage,
on_change=None if internal else self._notify_on_job_change,
on_change=self._notify_on_job_change,
internal=internal,
**({} if parent_id == DEFAULT else {"parent_id": parent_id}), # type: ignore
)
Expand Down
11 changes: 10 additions & 1 deletion supervisor/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ def write_profile() -> Path:
if temp_dir:
await self.sys_run_in_executor(temp_dir.cleanup)

@Job(name="supervisor_update")
async def update(self, version: AwesomeVersion | None = None) -> None:
"""Update Supervisor version."""
version = version or self.latest_version or self.version
Expand All @@ -221,8 +222,14 @@ async def update(self, version: AwesomeVersion | None = None) -> None:

# Update container
_LOGGER.info("Update Supervisor to version %s", version)

# Assume for now the docker image pull is 100% of this task. But from a user
# perspective that isn't true. Should consider allocating a fixed amount of
# progress to the app armor update and restart to improve accuracy
try:
await self.instance.install(version, image=image)
await self.instance.install(
version, image=image, progress_job_id=self.sys_jobs.current.uuid
)
await self.instance.update_start_tag(image, version)
except DockerError as err:
self.sys_resolution.create_issue(
Expand All @@ -237,6 +244,8 @@ async def update(self, version: AwesomeVersion | None = None) -> None:
self.sys_config.image = image
await self.sys_config.save_data()

# Normally we'd set the progress bar to 100% at the end. But once Supervisor stops
# we it's gone so for this one we'll just leave it wherever it was after image pull
self.sys_create_task(self.sys_core.stop())

@Job(
Expand Down
Loading
Loading