From ed098cc8ce7407d07da875d08dcf042e35d0eb9e Mon Sep 17 00:00:00 2001 From: pajowu Date: Tue, 15 Apr 2025 15:08:21 +0200 Subject: [PATCH 01/12] Add option to align FindMyAccessories key generation Sometimes the key generation diverges for example if the accessory has no power. FindMy solves this be re-aligning the key generation if a btle connection is established. FindMy.app stores there alignment records in the `KeyAlignmentRecord` directory. This PR extends the FindMyAccessory class to read those records during `from_plist`-generation and re-sync the key generation by this --- examples/real_airtag.py | 23 +++++++++++------- findmy/accessory.py | 54 +++++++++++++++++++++++++++++++++++------ 2 files changed, 60 insertions(+), 17 deletions(-) diff --git a/examples/real_airtag.py b/examples/real_airtag.py index 5eeb858..9393145 100644 --- a/examples/real_airtag.py +++ b/examples/real_airtag.py @@ -4,6 +4,7 @@ from __future__ import annotations +import argparse import logging import sys from pathlib import Path @@ -19,10 +20,15 @@ logging.basicConfig(level=logging.INFO) -def main(plist_path: str) -> int: +def main(plist_path: Path, alignment_plist_path: Path | None) -> int: # Step 0: create an accessory key generator - with Path(plist_path).open("rb") as f: - airtag = FindMyAccessory.from_plist(f) + with plist_path.open("rb") as f: + f2 = alignment_plist_path.open("rb") if alignment_plist_path else None + + airtag = FindMyAccessory.from_plist(f, f2) + + if f2: + f2.close() # Step 1: log into an Apple account print("Logging into account") @@ -43,10 +49,9 @@ def main(plist_path: str) -> int: if __name__ == "__main__": - if len(sys.argv) < 2: - print(f"Usage: {sys.argv[0]} ", file=sys.stderr) - print(file=sys.stderr) - print("The plist file should be dumped from MacOS's FindMy app.", file=sys.stderr) - sys.exit(1) + parser = argparse.ArgumentParser() + parser.add_argument("plist_path", type=Path) + parser.add_argument("--alignment_plist_path", default=None, type=Path) + args = parser.parse_args() - sys.exit(main(sys.argv[1])) + sys.exit(main(args.plist_path, args.alignment_plist_path)) diff --git a/findmy/accessory.py b/findmy/accessory.py index f06cd14..852c8e3 100644 --- a/findmy/accessory.py +++ b/findmy/accessory.py @@ -77,6 +77,8 @@ def __init__( # noqa: PLR0913 name: str | None = None, model: str | None = None, identifier: str | None = None, + alignment_date: datetime | None = None, + alignment_index: int | None = None, ) -> None: """ Initialize a FindMyAccessory. These values are usually obtained during pairing. @@ -98,6 +100,16 @@ def __init__( # noqa: PLR0913 self._name = name self._model = model self._identifier = identifier + self._alignment_date = ( + alignment_date if alignment_date is not None else paired_at + ) + self._alignment_index = alignment_index if alignment_index is not None else 0 + if self._alignment_date.tzinfo is None: + self._alignment_date = self._alignment_date.astimezone() + logging.warning( + "Alignment datetime is timezone-naive. Assuming system tz: %s.", + self._alignment_date.tzname(), + ) @property def paired_at(self) -> datetime: @@ -140,25 +152,29 @@ def keys_at(self, ind: int | datetime) -> set[KeyPair]: secondary_offset = 0 if isinstance(ind, datetime): - # number of 15-minute slots since pairing time - ind = ( + # number of 15-minute slots since alignment + slots_since_alignment = ( int( - (ind - self._paired_at).total_seconds() / (15 * 60), + (ind - self._alignment_date).total_seconds() / (15 * 60), ) + 1 ) + ind = self._alignment_index + slots_since_alignment + # number of slots until first 4 am - first_rollover = self._paired_at.astimezone().replace( + first_rollover = self._alignment_date.astimezone().replace( hour=4, minute=0, second=0, microsecond=0, ) - if first_rollover < self._paired_at: # we rolled backwards, so increment the day + if ( + first_rollover < self._alignment_date + ): # we rolled backwards, so increment the day first_rollover += timedelta(days=1) secondary_offset = ( int( - (first_rollover - self._paired_at).total_seconds() / (15 * 60), + (first_rollover - self._alignment_date).total_seconds() / (15 * 60), ) + 1 ) @@ -177,7 +193,9 @@ def keys_at(self, ind: int | datetime) -> set[KeyPair]: return possible_keys @classmethod - def from_plist(cls, plist: IO[bytes]) -> FindMyAccessory: + def from_plist( + cls, plist: IO[bytes], key_alignment_plist: IO[bytes] | None = None + ) -> FindMyAccessory: """Create a FindMyAccessory from a .plist file dumped from the FindMy app.""" device_data = plistlib.load(plist) @@ -201,7 +219,27 @@ def from_plist(cls, plist: IO[bytes]) -> FindMyAccessory: model = device_data["model"] identifier = device_data["identifier"] - return cls(master_key, skn, sks, paired_at, None, model, identifier) + alignment_date = None + index = None + if key_alignment_plist: + alignment_data = plistlib.load(key_alignment_plist) + + alignment_date = alignment_data["lastIndexObservationDate"].replace( + tzinfo=timezone.utc, + ) + index = alignment_data["lastIndexObserved"] + + return cls( + master_key, + skn, + sks, + paired_at, + None, + model, + identifier, + alignment_date, + index, + ) class AccessoryKeyGenerator(KeyGenerator[KeyPair]): From 6c7a644897e6d66f07ca55186d368e3dc2eaf6c0 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci-lite[bot]" <117423508+pre-commit-ci-lite[bot]@users.noreply.github.com> Date: Tue, 15 Apr 2025 13:24:21 +0000 Subject: [PATCH 02/12] [pre-commit.ci lite] apply automatic fixes --- findmy/accessory.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/findmy/accessory.py b/findmy/accessory.py index 852c8e3..09130fb 100644 --- a/findmy/accessory.py +++ b/findmy/accessory.py @@ -100,9 +100,7 @@ def __init__( # noqa: PLR0913 self._name = name self._model = model self._identifier = identifier - self._alignment_date = ( - alignment_date if alignment_date is not None else paired_at - ) + self._alignment_date = alignment_date if alignment_date is not None else paired_at self._alignment_index = alignment_index if alignment_index is not None else 0 if self._alignment_date.tzinfo is None: self._alignment_date = self._alignment_date.astimezone() @@ -168,9 +166,7 @@ def keys_at(self, ind: int | datetime) -> set[KeyPair]: second=0, microsecond=0, ) - if ( - first_rollover < self._alignment_date - ): # we rolled backwards, so increment the day + if first_rollover < self._alignment_date: # we rolled backwards, so increment the day first_rollover += timedelta(days=1) secondary_offset = ( int( @@ -194,7 +190,9 @@ def keys_at(self, ind: int | datetime) -> set[KeyPair]: @classmethod def from_plist( - cls, plist: IO[bytes], key_alignment_plist: IO[bytes] | None = None + cls, + plist: IO[bytes], + key_alignment_plist: IO[bytes] | None = None, ) -> FindMyAccessory: """Create a FindMyAccessory from a .plist file dumped from the FindMy app.""" device_data = plistlib.load(plist) From f771ad97d509e7093a8f59150112717209bdff84 Mon Sep 17 00:00:00 2001 From: "Mike A." Date: Thu, 7 Aug 2025 21:02:55 +0200 Subject: [PATCH 03/12] refactor: abstraction for plist reading --- examples/real_airtag.py | 8 +------- findmy/accessory.py | 25 +++++++++---------------- findmy/util/files.py | 28 ++++++++++++++++++++++++++++ 3 files changed, 38 insertions(+), 23 deletions(-) diff --git a/examples/real_airtag.py b/examples/real_airtag.py index 0b7b7e0..9aff41a 100644 --- a/examples/real_airtag.py +++ b/examples/real_airtag.py @@ -33,13 +33,7 @@ def main(plist_path: Path, alignment_plist_path: Path | None) -> int: # Step 0: create an accessory key generator - with plist_path.open("rb") as f: - f2 = alignment_plist_path.open("rb") if alignment_plist_path else None - - airtag = FindMyAccessory.from_plist(f, f2) - - if f2: - f2.close() + airtag = FindMyAccessory.from_plist(plist_path, alignment_plist_path) # Step 1: log into an Apple account print("Logging into account") diff --git a/findmy/accessory.py b/findmy/accessory.py index daf1443..40c1b25 100644 --- a/findmy/accessory.py +++ b/findmy/accessory.py @@ -7,22 +7,21 @@ from __future__ import annotations import logging -import plistlib from abc import ABC, abstractmethod from datetime import datetime, timedelta, timezone -from pathlib import Path -from typing import IO, TYPE_CHECKING, Literal, TypedDict, overload +from typing import TYPE_CHECKING, Literal, TypedDict, overload from typing_extensions import override from findmy.util.abc import Serializable -from findmy.util.files import read_data_json, save_and_return_json +from findmy.util.files import read_data_json, read_data_plist, save_and_return_json from .keys import KeyGenerator, KeyPair, KeyType from .util import crypto if TYPE_CHECKING: from collections.abc import Generator + from pathlib import Path logger = logging.getLogger(__name__) @@ -224,21 +223,13 @@ def keys_at(self, ind: int | datetime) -> set[KeyPair]: @classmethod def from_plist( cls, - plist: str | Path | dict | bytes | IO[bytes], - key_alignment_plist: IO[bytes] | None = None, + plist: str | Path | dict | bytes, + key_alignment_plist: str | Path | dict | bytes | None = None, *, name: str | None = None, ) -> FindMyAccessory: """Create a FindMyAccessory from a .plist file dumped from the FindMy app.""" - if isinstance(plist, bytes): - # plist is a bytes object - device_data = plistlib.loads(plist) - elif isinstance(plist, (str, Path)): - device_data = plistlib.loads(Path(plist).read_bytes()) - elif isinstance(plist, IO): - device_data = plistlib.load(plist) - else: - device_data = plist + device_data = read_data_plist(plist) # PRIVATE master key. 28 (?) bytes. master_key = device_data["privateKey"]["key"]["data"][-28:] @@ -263,11 +254,13 @@ def from_plist( alignment_date = None index = None if key_alignment_plist: - alignment_data = plistlib.load(key_alignment_plist) + alignment_data = read_data_plist(key_alignment_plist) + # last observed date alignment_date = alignment_data["lastIndexObservationDate"].replace( tzinfo=timezone.utc, ) + # primary index value at last observed date index = alignment_data["lastIndexObserved"] return cls( diff --git a/findmy/util/files.py b/findmy/util/files.py index 1686bbf..a366f5a 100644 --- a/findmy/util/files.py +++ b/findmy/util/files.py @@ -3,6 +3,7 @@ from __future__ import annotations import json +import plistlib from collections.abc import Mapping from pathlib import Path from typing import TypeVar, cast @@ -32,3 +33,30 @@ def read_data_json(val: str | Path | _T) -> _T: val = cast("_T", json.loads(val.read_text())) return val + + +def save_and_return_plist(data: _T, dst: str | Path | None) -> _T: + """Save and return a Plist file.""" + if dst is None: + return data + + if isinstance(dst, str): + dst = Path(dst) + + dst.write_bytes(plistlib.dumps(data)) + + return data + + +def read_data_plist(val: str | Path | _T | bytes) -> _T: + """Read Plist data from a file if a path is passed, or return the argument itself.""" + if isinstance(val, str): + val = Path(val) + + if isinstance(val, Path): + val = val.read_bytes() + + if isinstance(val, bytes): + val = cast("_T", plistlib.loads(val)) + + return val From 32686a898d64d8fcc167c5fa41fec51a5c247f53 Mon Sep 17 00:00:00 2001 From: "Mike A." Date: Thu, 7 Aug 2025 21:21:21 +0200 Subject: [PATCH 04/12] fix: include alignment parameters in accessory json --- findmy/accessory.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/findmy/accessory.py b/findmy/accessory.py index 40c1b25..dd20902 100644 --- a/findmy/accessory.py +++ b/findmy/accessory.py @@ -37,6 +37,8 @@ class FindMyAccessoryMapping(TypedDict): name: str | None model: str | None identifier: str | None + alignment_date: str | None + alignment_index: int | None class RollingKeyPairSource(ABC): @@ -277,6 +279,10 @@ def from_plist( @override def to_json(self, path: str | Path | None = None, /) -> FindMyAccessoryMapping: + alignment_date = None + if self._alignment_date is not None: + alignment_date = self._alignment_date.isoformat() + res: FindMyAccessoryMapping = { "type": "accessory", "master_key": self._primary_gen.master_key.hex(), @@ -286,6 +292,8 @@ def to_json(self, path: str | Path | None = None, /) -> FindMyAccessoryMapping: "name": self.name, "model": self.model, "identifier": self.identifier, + "alignment_date": alignment_date, + "alignment_index": self._alignment_index, } return save_and_return_json(res, path) @@ -301,6 +309,10 @@ def from_json( assert val["type"] == "accessory" try: + alignment_date = val["alignment_date"] + if alignment_date is not None: + alignment_date = datetime.fromisoformat(alignment_date) + return cls( master_key=bytes.fromhex(val["master_key"]), skn=bytes.fromhex(val["skn"]), @@ -309,6 +321,8 @@ def from_json( name=val["name"], model=val["model"], identifier=val["identifier"], + alignment_date=alignment_date, + alignment_index=val["alignment_index"], ) except KeyError as e: msg = f"Failed to restore account data: {e}" From e992df7f8864b3805d60f8e6bad0c76b3f8444e7 Mon Sep 17 00:00:00 2001 From: "Mike A." Date: Fri, 8 Aug 2025 21:57:16 +0200 Subject: [PATCH 05/12] feat: dump alignment info when importing accessories --- findmy/__main__.py | 5 ++-- findmy/plist.py | 65 +++++++++++++++++++++++++++++++++++++--------- 2 files changed, 55 insertions(+), 15 deletions(-) diff --git a/findmy/__main__.py b/findmy/__main__.py index acc48a6..af59873 100644 --- a/findmy/__main__.py +++ b/findmy/__main__.py @@ -8,7 +8,7 @@ from importlib.metadata import version from pathlib import Path -from .plist import get_key, list_accessories +from .plist import list_accessories def main() -> None: # noqa: D103 @@ -96,8 +96,7 @@ def get_path(d, acc) -> Path | None: # noqa: ANN001 d.mkdir(parents=True, exist_ok=True) return d / f"{acc.identifier}.json" - key = get_key() - accs = list_accessories(key=key) + accs = list_accessories() jsons = [acc.to_json(get_path(out_dir, acc)) for acc in accs] print(json.dumps(jsons, indent=4, ensure_ascii=False)) # noqa: T201 diff --git a/findmy/plist.py b/findmy/plist.py index 046243e..e849f4f 100644 --- a/findmy/plist.py +++ b/findmy/plist.py @@ -1,7 +1,16 @@ -"""Utils for decrypting the encypted .record files into .plist files.""" +""" +Utils for decrypting the encypted .record files into .plist files. + +Originally from: +Author: Shane B. +in https://github.com/parawanderer/OpenTagViewer/blob/08a59cab551721afb9dc9f829ad31dae8d5bd400/python/airtag_decryptor.py +which was based on: +Based on: https://gist.github.com/airy10/5205dc851fbd0715fcd7a5cdde25e7c8 +""" from __future__ import annotations +import logging import plistlib import subprocess from pathlib import Path @@ -11,24 +20,56 @@ from .accessory import FindMyAccessory -# Originally from: -# Author: Shane B. -# in https://github.com/parawanderer/OpenTagViewer/blob/08a59cab551721afb9dc9f829ad31dae8d5bd400/python/airtag_decryptor.py -# which was based on: -# Based on: https://gist.github.com/airy10/5205dc851fbd0715fcd7a5cdde25e7c8 +logger = logging.getLogger(__name__) + + +_DEFAULT_SEARCH_PATH = Path.home() / "Library" / "com.apple.icloud.searchpartyd" # consider switching to this library https://github.com/microsoft/keyper # once they publish a version of it that includes my MR with the changes to make it compatible # with keys that are non-utf-8 encoded (like the BeaconStore one) # if I contribute this, properly escape the label argument here... -def get_key() -> bytes: +def _get_beaconstore_key() -> bytes: """Get the decryption key for BeaconStore using the system password prompt window.""" # This thing will pop up 2 Password Input windows... key_in_hex = subprocess.getoutput("/usr/bin/security find-generic-password -l 'BeaconStore' -w") # noqa: S605 return bytes.fromhex(key_in_hex) +def _get_accessory_name( + accessory_id: str, + key: bytes, + *, + search_path: Path | None = None, +) -> str | None: + search_path = search_path or _DEFAULT_SEARCH_PATH + path = next((search_path / "BeaconNamingRecord" / accessory_id).glob(pattern="*.record"), None) + if path is None: + logger.warning( + "Accessory %s does not have a BeaconNamingRecord, defaulting to None", accessory_id + ) + return None + + naming_record_plist = decrypt_plist(path, key) + return naming_record_plist.get("name", None) + + +def _get_alignment_plist( + accessory_id: str, + key: bytes, + *, + search_path: Path | None = None, +) -> dict | None: + search_path = search_path or _DEFAULT_SEARCH_PATH + path = next((search_path / "KeyAlignmentRecords" / accessory_id).glob(pattern="*.record"), None) + if path is None: + logger.warning("Accessory %s does not have a KeyAlignmentRecord", accessory_id) + return None + + return decrypt_plist(path, key) + + def decrypt_plist(encrypted: str | Path | bytes | IO[bytes], key: bytes) -> dict: """ Decrypts the encrypted plist file at :meth:`encrypted` using the provided :meth:`key`. @@ -76,15 +117,15 @@ def list_accessories( search_path = Path.home() / "Library" / "com.apple.icloud.searchpartyd" search_path = Path(search_path) if key is None: - key = get_key() + key = _get_beaconstore_key() accesories = [] encrypted_plist_paths = search_path.glob("OwnedBeacons/*.record") for path in encrypted_plist_paths: plist = decrypt_plist(path, key) - naming_record_path = next((search_path / "BeaconNamingRecord" / path.stem).glob("*.record")) - naming_record_plist = decrypt_plist(naming_record_path, key) - name = naming_record_plist["name"] - accessory = FindMyAccessory.from_plist(plist, name=name) + name = _get_accessory_name(path.stem, key) + alignment_plist = _get_alignment_plist(path.stem, key) + + accessory = FindMyAccessory.from_plist(plist, alignment_plist, name=name) accesories.append(accessory) return accesories From fbaf57ed094fccb289fc8e05988de94ff7ae46b6 Mon Sep 17 00:00:00 2001 From: "Mike A." Date: Fri, 5 Sep 2025 00:36:37 +0200 Subject: [PATCH 06/12] feat!: implement key alignment algorithm for accessories BREAKING: due to fundamental issues with Apple's API, this commit also DEPRECATES the `fetch_[last_]reports` methods on Apple account instances. It has been replaced by a method named `fetch_location`, which only returns a single location report (the latest one) and does not support setting a date range. --- findmy/accessory.py | 158 ++++++++++++++--------- findmy/reports/account.py | 235 ++++++++-------------------------- findmy/reports/reports.py | 263 ++++++++++++++++++++++---------------- 3 files changed, 306 insertions(+), 350 deletions(-) diff --git a/findmy/accessory.py b/findmy/accessory.py index dd20902..29c54f2 100644 --- a/findmy/accessory.py +++ b/findmy/accessory.py @@ -23,6 +23,8 @@ from collections.abc import Generator from pathlib import Path + from findmy.reports.reports import LocationReport + logger = logging.getLogger(__name__) @@ -48,37 +50,38 @@ class RollingKeyPairSource(ABC): @abstractmethod def interval(self) -> timedelta: """KeyPair rollover interval.""" + raise NotImplementedError @abstractmethod - def keys_at(self, ind: int | datetime) -> set[KeyPair]: - """Generate potential key(s) occurring at a certain index or timestamp.""" + def get_min_index(self, dt: datetime) -> int: + """Get the minimum key index that the accessory could be broadcasting at a specific time.""" raise NotImplementedError - @overload - def keys_between(self, start: int, end: int) -> set[KeyPair]: - pass + @abstractmethod + def get_max_index(self, dt: datetime) -> int: + """Get the maximum key index that the accessory could be broadcasting at a specific time.""" + raise NotImplementedError - @overload - def keys_between(self, start: datetime, end: datetime) -> set[KeyPair]: - pass + @abstractmethod + def update_alignment(self, report: LocationReport, index: int) -> None: + """ + Update alignment of the accessory. - def keys_between(self, start: int | datetime, end: int | datetime) -> set[KeyPair]: - """Generate potential key(s) occurring between two indices or timestamps.""" - keys: set[KeyPair] = set() + Alignment can be updated based on a LocationReport that was observed at a specific index. + """ + raise NotImplementedError - if isinstance(start, int) and isinstance(end, int): - while start < end: - keys.update(self.keys_at(start)) + @abstractmethod + def keys_at(self, ind: int) -> set[KeyPair]: + """Generate potential key(s) occurring at a certain index.""" + raise NotImplementedError - start += 1 - elif isinstance(start, datetime) and isinstance(end, datetime): - while start < end: - keys.update(self.keys_at(start)) + def keys_between(self, start: int, end: int) -> set[KeyPair]: + """Generate potential key(s) occurring between two indices.""" + keys: set[KeyPair] = set() - start += self.interval - else: - msg = "Invalid start/end type" - raise TypeError(msg) + for ind in range(start, end + 1): + keys.update(self.keys_at(ind)) return keys @@ -174,53 +177,82 @@ def interval(self) -> timedelta: return timedelta(minutes=15) @override - def keys_at(self, ind: int | datetime) -> set[KeyPair]: - """Get the potential primary and secondary keys active at a certain time or index.""" - if isinstance(ind, datetime) and ind < self._paired_at: - return set() - if isinstance(ind, int) and ind < 0: - return set() + def get_min_index(self, dt: datetime) -> int: + if dt.tzinfo is None: + end = dt.astimezone() + logger.warning( + "Datetime is timezone-naive. Assuming system tz: %s.", + end.tzname(), + ) - secondary_offset = 0 + if dt >= self._alignment_date: + # in the worst case, the accessory has not rolled over at all since alignment + return self._alignment_index - if isinstance(ind, datetime): - # number of 15-minute slots since alignment - slots_since_alignment = ( - int( - (ind - self._alignment_date).total_seconds() / (15 * 60), - ) - + 1 - ) - ind = self._alignment_index + slots_since_alignment - - # number of slots until first 4 am - first_rollover = self._alignment_date.astimezone().replace( - hour=4, - minute=0, - second=0, - microsecond=0, - ) - if first_rollover < self._alignment_date: # we rolled backwards, so increment the day - first_rollover += timedelta(days=1) - secondary_offset = ( - int( - (first_rollover - self._alignment_date).total_seconds() / (15 * 60), - ) - + 1 + # the accessory key will rollover AT MOST once every 15 minutes, so + # this is the minimum index for which we will need to generate keys. + # it's possible that rollover has progressed slower or not at all. + ind_before_alignment = (self._alignment_date - dt) // self.interval + return self._alignment_index - ind_before_alignment + + @override + def get_max_index(self, dt: datetime) -> int: + if dt.tzinfo is None: + end = dt.astimezone() + logger.warning( + "Datetime is timezone-naive. Assuming system tz: %s.", + end.tzname(), ) - possible_keys = set() - # primary key can always be determined - possible_keys.add(self._primary_gen[ind]) + if dt <= self._alignment_date: + # in the worst case, the accessory has not rolled over at all since `dt`, + # in which case it was at the alignment index. We can't go lower than that. + return self._alignment_index + + # the accessory key will rollover AT MOST once every 15 minutes, so + # this is the maximum index for which we will need to generate keys. + # it's possible that rollover has progressed slower or not at all. + ind_since_alignment = (dt - self._alignment_date) // self.interval + return self._alignment_index + ind_since_alignment + + @override + def update_alignment(self, report: LocationReport, index: int) -> None: + if report.timestamp < self._alignment_date: + # we only care about the most recent report + return + logger.info("Updating alignment based on report observed at index %i", index) + + self._alignment_date = report.timestamp + self._alignment_index = index + + def _primary_key_at(self, ind: int) -> KeyPair: + """Get the primary key at a certain index.""" + return self._primary_gen[ind] + + def _secondary_keys_at(self, ind: int) -> tuple[KeyPair, KeyPair]: + """Get possible secondary keys at a certain primary index.""" # when the accessory has been rebooted, it will use the following secondary key - possible_keys.add(self._secondary_gen[ind // 96 + 1]) + key_1 = self._secondary_gen[ind // 96 + 1] - if ind > secondary_offset: - # after the first 4 am after pairing, we need to account for the first day - possible_keys.add(self._secondary_gen[(ind - secondary_offset) // 96 + 2]) + # in some cases, the secondary index may not be at primary_ind // 96 + 1, but at +2 instead. + # example: if we paired at 3:00 am, the first secondary key will be used until 4:00 am, + # at which point the second secondary key will be used. The primary index at 4:00 am is 4, + # but the 'second' secondary key is used. + # however, since we don't know the exact index rollover pattern, we just take a guess here + # and return both keys. for alignment, it's better to underestimate progression of the index + # than to overestimate it. + key_2 = self._secondary_gen[ind // 96 + 2] - return possible_keys + return key_1, key_2 + + @override + def keys_at(self, ind: int) -> set[KeyPair]: + """Get the primary and secondary keys that might be active at a certain index.""" + if ind < 0: + return set() + + return {self._primary_key_at(ind), *self._secondary_keys_at(ind)} @classmethod def from_plist( @@ -377,6 +409,10 @@ def key_type(self) -> KeyType: return self._key_type def _get_sk(self, ind: int) -> bytes: + if ind < 0: + msg = "The key index must be non-negative" + raise ValueError(msg) + if ind < self._cur_sk_ind: # behind us; need to reset :( self._cur_sk = self._initial_sk self._cur_sk_ind = 0 diff --git a/findmy/reports/account.py b/findmy/reports/account.py index 53f1ab1..05f9ed5 100644 --- a/findmy/reports/account.py +++ b/findmy/reports/account.py @@ -233,90 +233,45 @@ def td_2fa_submit(self, code: str) -> MaybeCoro[LoginState]: @overload @abstractmethod - def fetch_reports( + def fetch_location( self, keys: HasHashedPublicKey, - date_from: datetime, - date_to: datetime | None, - ) -> MaybeCoro[list[LocationReport]]: ... + ) -> MaybeCoro[LocationReport | None]: ... @overload @abstractmethod - def fetch_reports( + def fetch_location( self, keys: RollingKeyPairSource, - date_from: datetime, - date_to: datetime | None, - ) -> MaybeCoro[list[LocationReport]]: ... + ) -> MaybeCoro[LocationReport | None]: ... @overload @abstractmethod - def fetch_reports( + def fetch_location( self, keys: Sequence[HasHashedPublicKey | RollingKeyPairSource], - date_from: datetime, - date_to: datetime | None, - ) -> MaybeCoro[dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]]: ... + ) -> MaybeCoro[ + dict[HasHashedPublicKey | RollingKeyPairSource, LocationReport | None] | None + ]: ... @abstractmethod - def fetch_reports( + def fetch_location( self, keys: HasHashedPublicKey | Sequence[HasHashedPublicKey | RollingKeyPairSource] | RollingKeyPairSource, - date_from: datetime, - date_to: datetime | None, ) -> MaybeCoro[ - list[LocationReport] | dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]] + LocationReport + | dict[HasHashedPublicKey | RollingKeyPairSource, LocationReport | None] + | None ]: """ - Fetch location reports for :class:`HasHashedPublicKey`s between `date_from` and `date_end`. + Fetch location for :class:`HasHashedPublicKey`s. Returns a dictionary mapping :class:`HasHashedPublicKey`s to their location reports. """ raise NotImplementedError - @overload - @abstractmethod - def fetch_last_reports( - self, - keys: HasHashedPublicKey, - hours: int = 7 * 24, - ) -> MaybeCoro[list[LocationReport]]: ... - - @overload - @abstractmethod - def fetch_last_reports( - self, - keys: RollingKeyPairSource, - hours: int = 7 * 24, - ) -> MaybeCoro[list[LocationReport]]: ... - - @overload - @abstractmethod - def fetch_last_reports( - self, - keys: Sequence[HasHashedPublicKey | RollingKeyPairSource], - hours: int = 7 * 24, - ) -> MaybeCoro[dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]]: ... - - @abstractmethod - def fetch_last_reports( - self, - keys: HasHashedPublicKey - | RollingKeyPairSource - | Sequence[HasHashedPublicKey | RollingKeyPairSource], - hours: int = 7 * 24, - ) -> MaybeCoro[ - list[LocationReport] | dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]] - ]: - """ - Fetch location reports for :class:`HasHashedPublicKey`s for the last `hours` hours. - - Utility method as an alternative to using :meth:`BaseAppleAccount.fetch_reports` directly. - """ - raise NotImplementedError - @abstractmethod def get_anisette_headers( self, @@ -617,17 +572,19 @@ async def td_2fa_submit(self, code: str) -> LoginState: @require_login_state(LoginState.LOGGED_IN) async def fetch_raw_reports( self, - start: datetime, - end: datetime, - devices: list[list[str]], - ) -> dict[str, Any]: + devices: list[tuple[list[str], list[str]]], + ) -> list[LocationReport]: """Make a request for location reports, returning raw data.""" + logger.debug("Fetching raw reports for %d device(s)", len(devices)) + + now = datetime.now(tz=timezone.utc) + start_ts = int((now - timedelta(days=7)).timestamp()) * 1000 + end_ts = int(now.timestamp()) * 1000 + auth = ( self._login_state_data["dsid"], self._login_state_data["mobileme_data"]["tokens"]["searchPartyToken"], ) - start_ts = int(start.timestamp() * 1000) - end_ts = int(end.timestamp() * 1000) data = { "clientContext": { "clientBundleIdentifier": "com.apple.icloud.searchpartyuseragent", @@ -640,8 +597,8 @@ async def fetch_raw_reports( "startDate": start_ts, "startDateSecondary": start_ts, "endDate": end_ts, - # passing all keys as primary seems to work fine - "primaryIds": device_keys, + "primaryIds": device_keys[0], + "secondaryIds": device_keys[1], } for device_keys in devices ], @@ -679,90 +636,51 @@ async def _do_request() -> HttpResponse: msg = f"Failed to fetch reports: {resp.get('statusCode')}" raise UnhandledProtocolError(msg) - return resp["acsnLocations"] - - @overload - async def fetch_reports( - self, - keys: HasHashedPublicKey, - date_from: datetime, - date_to: datetime | None, - ) -> list[LocationReport]: ... - - @overload - async def fetch_reports( - self, - keys: RollingKeyPairSource, - date_from: datetime, - date_to: datetime | None, - ) -> list[LocationReport]: ... + # parse reports + reports: list[LocationReport] = [] + for key_reports in resp.get("acsnLocations", {}).get("locationPayload", []): + hashed_adv_key_bytes = base64.b64decode(key_reports["id"]) - @overload - async def fetch_reports( - self, - keys: Sequence[HasHashedPublicKey | RollingKeyPairSource], - date_from: datetime, - date_to: datetime | None, - ) -> dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]: ... + for report in key_reports.get("locationInfo", []): + payload = base64.b64decode(report) + loc_report = LocationReport(payload, hashed_adv_key_bytes) - @require_login_state(LoginState.LOGGED_IN) - @override - async def fetch_reports( - self, - keys: HasHashedPublicKey - | RollingKeyPairSource - | Sequence[HasHashedPublicKey | RollingKeyPairSource], - date_from: datetime, - date_to: datetime | None, - ) -> ( - list[LocationReport] | dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]] - ): - """See :meth:`BaseAppleAccount.fetch_reports`.""" - date_to = date_to or datetime.now().astimezone() + reports.append(loc_report) - return await self._reports.fetch_reports( - date_from, - date_to, - keys, - ) + return reports @overload - async def fetch_last_reports( + async def fetch_location( self, keys: HasHashedPublicKey, - hours: int = 7 * 24, - ) -> list[LocationReport]: ... + ) -> LocationReport | None: ... @overload - async def fetch_last_reports( + async def fetch_location( self, keys: RollingKeyPairSource, - hours: int = 7 * 24, - ) -> list[LocationReport]: ... + ) -> LocationReport | None: ... @overload - async def fetch_last_reports( + async def fetch_location( self, keys: Sequence[HasHashedPublicKey | RollingKeyPairSource], - hours: int = 7 * 24, - ) -> dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]: ... + ) -> dict[HasHashedPublicKey | RollingKeyPairSource, LocationReport | None]: ... @require_login_state(LoginState.LOGGED_IN) @override - async def fetch_last_reports( + async def fetch_location( self, keys: HasHashedPublicKey | RollingKeyPairSource | Sequence[HasHashedPublicKey | RollingKeyPairSource], - hours: int = 7 * 24, ) -> ( - list[LocationReport] | dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]] + LocationReport + | dict[HasHashedPublicKey | RollingKeyPairSource, LocationReport | None] + | None ): - """See :meth:`BaseAppleAccount.fetch_last_reports`.""" - end = datetime.now(tz=timezone.utc) - start = end - timedelta(hours=hours) - - return await self.fetch_reports(keys, start, end) + """See :meth:`BaseAppleAccount.fetch_reports`.""" + return await self._reports.fetch_location(keys) @require_login_state(LoginState.LOGGED_OUT, LoginState.REQUIRE_2FA, LoginState.LOGGED_IN) async def _gsa_authenticate( @@ -1101,77 +1019,36 @@ def td_2fa_submit(self, code: str) -> LoginState: return self._evt_loop.run_until_complete(coro) @overload - def fetch_reports( + def fetch_location( self, keys: HasHashedPublicKey, - date_from: datetime, - date_to: datetime | None, - ) -> list[LocationReport]: ... + ) -> LocationReport | None: ... @overload - def fetch_reports( + def fetch_location( self, keys: RollingKeyPairSource, - date_from: datetime, - date_to: datetime | None, - ) -> list[LocationReport]: ... + ) -> LocationReport | None: ... @overload - def fetch_reports( + def fetch_location( self, keys: Sequence[HasHashedPublicKey | RollingKeyPairSource], - date_from: datetime, - date_to: datetime | None, - ) -> dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]: ... + ) -> dict[HasHashedPublicKey | RollingKeyPairSource, LocationReport | None]: ... @override - def fetch_reports( + def fetch_location( self, keys: HasHashedPublicKey | Sequence[HasHashedPublicKey | RollingKeyPairSource] | RollingKeyPairSource, - date_from: datetime, - date_to: datetime | None, - ) -> ( - list[LocationReport] | dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]] - ): - """See :meth:`AsyncAppleAccount.fetch_reports`.""" - coro = self._asyncacc.fetch_reports(keys, date_from, date_to) - return self._evt_loop.run_until_complete(coro) - - @overload - def fetch_last_reports( - self, - keys: HasHashedPublicKey, - hours: int = 7 * 24, - ) -> list[LocationReport]: ... - - @overload - def fetch_last_reports( - self, - keys: RollingKeyPairSource, - hours: int = 7 * 24, - ) -> list[LocationReport]: ... - - @overload - def fetch_last_reports( - self, - keys: Sequence[HasHashedPublicKey | RollingKeyPairSource], - hours: int = 7 * 24, - ) -> dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]: ... - - @override - def fetch_last_reports( - self, - keys: HasHashedPublicKey - | RollingKeyPairSource - | Sequence[HasHashedPublicKey | RollingKeyPairSource], - hours: int = 7 * 24, ) -> ( - list[LocationReport] | dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]] + LocationReport + | dict[HasHashedPublicKey | RollingKeyPairSource, LocationReport | None] + | None ): - """See :meth:`AsyncAppleAccount.fetch_last_reports`.""" - coro = self._asyncacc.fetch_last_reports(keys, hours) + """See :meth:`AsyncAppleAccount.fetch_location`.""" + coro = self._asyncacc.fetch_location(keys) return self._evt_loop.run_until_complete(coro) @override diff --git a/findmy/reports/reports.py b/findmy/reports/reports.py index a8062b8..a8d051d 100644 --- a/findmy/reports/reports.py +++ b/findmy/reports/reports.py @@ -8,7 +8,7 @@ import struct from collections import defaultdict from datetime import datetime, timedelta, timezone -from typing import TYPE_CHECKING, Literal, TypedDict, Union, cast, overload +from typing import TYPE_CHECKING, Literal, TypedDict, Union, overload from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import ec @@ -16,7 +16,7 @@ from typing_extensions import override from findmy.accessory import RollingKeyPairSource -from findmy.keys import HasHashedPublicKey, KeyPair, KeyPairMapping +from findmy.keys import HasHashedPublicKey, KeyPair, KeyPairMapping, KeyType from findmy.util.abc import Serializable from findmy.util.files import read_data_json, save_and_return_json @@ -337,144 +337,187 @@ def __init__(self, account: AsyncAppleAccount) -> None: self._account: AsyncAppleAccount = account @overload - async def fetch_reports( + async def fetch_location( self, - date_from: datetime, - date_to: datetime, device: HasHashedPublicKey, - ) -> list[LocationReport]: ... + ) -> LocationReport | None: ... @overload - async def fetch_reports( + async def fetch_location( self, - date_from: datetime, - date_to: datetime, device: RollingKeyPairSource, - ) -> list[LocationReport]: ... + ) -> LocationReport | None: ... @overload - async def fetch_reports( + async def fetch_location( self, - date_from: datetime, - date_to: datetime, device: Sequence[HasHashedPublicKey | RollingKeyPairSource], - ) -> dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]: ... + ) -> dict[HasHashedPublicKey | RollingKeyPairSource, LocationReport | None]: ... - async def fetch_reports( # noqa: C901 + async def fetch_location( self, - date_from: datetime, - date_to: datetime, device: HasHashedPublicKey | RollingKeyPairSource | Sequence[HasHashedPublicKey | RollingKeyPairSource], ) -> ( - list[LocationReport] | dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]] + LocationReport + | dict[HasHashedPublicKey | RollingKeyPairSource, LocationReport | None] + | None ): """ - Fetch location reports for a certain device. + Fetch location for a certain device or multipel devices. When `device` is a single :class:`HasHashedPublicKey`, this method will return - a list of location reports corresponding to that key. - When `device` is a :class:`RollingKeyPairSource`, it will return a list of - location reports corresponding to that source. + a location report corresponding to that key, or None if unavailable. + When `device` is a :class:`RollingKeyPairSource`, it will return a location + report corresponding to that source, or None if unavailable. When `device` is a sequence of :class:`HasHashedPublicKey`s or RollingKeyPairSource's, - it will return a dictionary with the provided object - as key, and a list of location reports as value. + it will return a dictionary with the provided objects + as keys, and a location report (or None) as value. """ - key_devs: dict[HasHashedPublicKey, HasHashedPublicKey | RollingKeyPairSource] = {} - key_batches: list[list[HasHashedPublicKey]] = [] if isinstance(device, HasHashedPublicKey): # single key - key_devs = {device: device} - key_batches.append([device]) - elif isinstance(device, RollingKeyPairSource): + key_reports = await self._fetch_key_reports([device]) + return key_reports.get(device, None) + + if isinstance(device, RollingKeyPairSource): # key generator - # add 12h margin to the generator - keys = device.keys_between( - date_from - timedelta(hours=12), - date_to + timedelta(hours=12), - ) - key_devs = dict.fromkeys(keys, device) - key_batches.append(list(keys)) - elif isinstance(device, list) and all( + return await self._fetch_accessory_report(device) + + if not isinstance(device, list) or not all( isinstance(x, HasHashedPublicKey | RollingKeyPairSource) for x in device ): - # multiple key generators - # add 12h margin to each generator - device = cast("list[HasHashedPublicKey | RollingKeyPairSource]", device) - for dev in device: - if isinstance(dev, HasHashedPublicKey): - key_devs[dev] = dev - key_batches.append([dev]) - elif isinstance(dev, RollingKeyPairSource): - keys = dev.keys_between( - date_from - timedelta(hours=12), - date_to + timedelta(hours=12), - ) - for key in keys: - key_devs[key] = dev - key_batches.append(list(keys)) - else: - msg = "Unknown device type: %s" - raise ValueError(msg, type(device)) - - # sequence of keys (fetch 256 max at a time) - key_reports: dict[HasHashedPublicKey, list[LocationReport]] = await self._fetch_reports( - date_from, - date_to, - key_batches, - ) + # unsupported type + msg = "Device must be a HasHashedPublicKey, RollingKeyPairSource, or list thereof." + raise ValueError(msg) + + # multiple key generators + # we can batch static keys in a single request, + # but key generators need to be queried separately + static_keys: list[HasHashedPublicKey] = [] + reports: dict[HasHashedPublicKey | RollingKeyPairSource, LocationReport | None] = {} + for dev in device: + if isinstance(dev, HasHashedPublicKey): + # save for later batch request + static_keys.append(dev) + elif isinstance(dev, RollingKeyPairSource): + # query immediately + reports[dev] = await self._fetch_accessory_report(dev) + + if static_keys: # batch request for static keys + key_reports = await self._fetch_key_reports(static_keys) + reports.update(dict(key_reports.items())) - # combine (key -> list[report]) and (key -> device) into (device -> list[report]) - device_reports = defaultdict(list) - for key, reports in key_reports.items(): - device_reports[key_devs[key]].extend(reports) - for dev in device_reports: - device_reports[dev] = sorted(device_reports[dev]) - - # result - if isinstance(device, (HasHashedPublicKey, RollingKeyPairSource)): - # single key or generator - return device_reports[device] - # multiple static keys or key generators - return device_reports - - async def _fetch_reports( + return reports + + async def _fetch_accessory_report( self, - date_from: datetime, - date_to: datetime, - device_keys: Sequence[Sequence[HasHashedPublicKey]], - ) -> dict[HasHashedPublicKey, list[LocationReport]]: - logger.debug("Fetching reports for %s device(s)", len(device_keys)) - - # lock requested time range to the past 7 days, +- 12 hours, then filter the response. - # this is due to an Apple backend bug where the time range is not respected. - # More info: https://github.com/biemster/FindMy/issues/7 + accessory: RollingKeyPairSource, + ) -> LocationReport | None: + logger.debug("Fetching location report for accessory") + now = datetime.now().astimezone() - start_date = now - timedelta(days=7, hours=12) - end_date = now + timedelta(hours=12) - ids = [[key.hashed_adv_key_b64 for key in keys] for keys in device_keys] - data = await self._account.fetch_raw_reports(start_date, end_date, ids) - - id_to_key: dict[bytes, HasHashedPublicKey] = { - key.hashed_adv_key_bytes: key for keys in device_keys for key in keys - } - reports: dict[HasHashedPublicKey, list[LocationReport]] = defaultdict(list) - for key_reports in data.get("locationPayload", []): - hashed_adv_key_bytes = base64.b64decode(key_reports["id"]) - key = id_to_key[hashed_adv_key_bytes] - - for report in key_reports.get("locationInfo", []): - payload = base64.b64decode(report) - loc_report = LocationReport(payload, hashed_adv_key_bytes) - - if loc_report.timestamp < date_from or loc_report.timestamp > date_to: - continue - - # pre-decrypt if possible - if isinstance(key, KeyPair): - loc_report.decrypt(key) + start_date = now - timedelta(days=7) + end_date = now + + # mappings + key_to_ind: dict[KeyPair, set[int]] = defaultdict(set) + id_to_key: dict[bytes, KeyPair] = {} + + # state variables + cur_keys_primary: set[str] = set() + cur_keys_secondary: set[str] = set() + cur_index = accessory.get_min_index(start_date) + ret: LocationReport | None = None + + async def _fetch() -> LocationReport | None: + """Fetch current keys and add them to final reports.""" + new_reports: list[LocationReport] = await self._account.fetch_raw_reports( + [(list(cur_keys_primary), (list(cur_keys_secondary)))] + ) + logger.info("Fetched %d new reports (index %i)", len(new_reports), cur_index) + + if new_reports: + report = sorted(new_reports)[-1] - reports[key].append(loc_report) + key = id_to_key[report.hashed_adv_key_bytes] + report.decrypt(key) + + # update alignment data on every report + # if a key maps to multiple indices, only feed it the maximum index, + # since apple only returns the latest reports per request. + # This makes the value more likely to be stable. + accessory.update_alignment(report, max(key_to_ind[key])) + else: + report = None + + cur_keys_primary.clear() + cur_keys_secondary.clear() + + return report + + while cur_index <= accessory.get_max_index(end_date): + key_batch = accessory.keys_at(cur_index) + + # split into primary and secondary keys + # (UNKNOWN keys are filed as primary) + new_keys_primary: set[str] = { + key.hashed_adv_key_b64 for key in key_batch if key.key_type == KeyType.PRIMARY + } + new_keys_secondary: set[str] = { + key.hashed_adv_key_b64 for key in key_batch if key.key_type != KeyType.PRIMARY + } + + # 290 seems to be the maximum number of keys that Apple accepts in a single request, + # so if adding the new keys would exceed that, fire a request first + if ( + len(cur_keys_primary | new_keys_primary) > 290 + or len(cur_keys_secondary | new_keys_secondary) > 290 + ): + report = await _fetch() + if ret is None or (report is not None and report.timestamp > ret.timestamp): + ret = report + + # build mappings before adding to current keys + for key in key_batch: + key_to_ind[key].add(cur_index) + id_to_key[key.hashed_adv_key_bytes] = key + cur_keys_primary |= new_keys_primary + cur_keys_secondary |= new_keys_secondary + + cur_index += 1 + + if cur_keys_primary or cur_keys_secondary: + # fetch remaining keys + report = await _fetch() + if ret is None or (report is not None and report.timestamp > ret.timestamp): + ret = report + + # filter duplicate reports (can happen since key batches may overlap) + return ret + + async def _fetch_key_reports( + self, + keys: Sequence[HasHashedPublicKey], + ) -> dict[HasHashedPublicKey, LocationReport | None]: + logger.debug("Fetching reports for %s key(s)", len(keys)) + + # fetch all as primary keys + ids = [([key.hashed_adv_key_b64], []) for key in keys] + encrypted_reports: list[LocationReport] = await self._account.fetch_raw_reports(ids) + + id_to_key: dict[bytes, HasHashedPublicKey] = {key.hashed_adv_key_bytes: key for key in keys} + reports: dict[HasHashedPublicKey, LocationReport | None] = dict.fromkeys(keys) + for report in encrypted_reports: + key = id_to_key[report.hashed_adv_key_bytes] + + cur_report = reports[key] + if cur_report is None or report.timestamp > cur_report.timestamp: + # more recent report, replace + reports[key] = report + + # pre-decrypt report if possible + if isinstance(key, KeyPair): + report.decrypt(key) return reports From 05ea2c9a747693c6d51a52faff7bf6a7bfc3b6c2 Mon Sep 17 00:00:00 2001 From: "Mike A." Date: Sat, 6 Sep 2025 21:29:00 +0200 Subject: [PATCH 07/12] feat: re-introduce location history --- findmy/reports/account.py | 123 +++++++++++++++++++++++++++++++++++--- findmy/reports/reports.py | 80 +++++++++++-------------- 2 files changed, 150 insertions(+), 53 deletions(-) diff --git a/findmy/reports/account.py b/findmy/reports/account.py index 05f9ed5..528afea 100644 --- a/findmy/reports/account.py +++ b/findmy/reports/account.py @@ -231,6 +231,49 @@ def td_2fa_submit(self, code: str) -> MaybeCoro[LoginState]: """ raise NotImplementedError + @overload + @abstractmethod + def fetch_location_history( + self, + keys: HasHashedPublicKey, + ) -> MaybeCoro[list[LocationReport]]: ... + + @overload + @abstractmethod + def fetch_location_history( + self, + keys: RollingKeyPairSource, + ) -> MaybeCoro[list[LocationReport]]: ... + + @overload + @abstractmethod + def fetch_location_history( + self, + keys: Sequence[HasHashedPublicKey | RollingKeyPairSource], + ) -> MaybeCoro[dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]]: ... + + @abstractmethod + def fetch_location_history( + self, + keys: HasHashedPublicKey + | Sequence[HasHashedPublicKey | RollingKeyPairSource] + | RollingKeyPairSource, + ) -> MaybeCoro[ + list[LocationReport] | dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]] + ]: + """ + Fetch location history for :class:`HasHashedPublicKey`s and :class:`RollingKeyPairSource`s. + + Note that location history for devices is provided on a best-effort + basis and may not be fully complete or stable. Multiple consecutive calls to this method + may result in different location reports, especially for reports further in the past. + However, each one of these reports is guaranteed to be in line with the data reported by + Apple, and the most recent report will always be included in the results. + + Unless you really need to use this method, and use :meth:`fetch_location` instead. + """ + raise NotImplementedError + @overload @abstractmethod def fetch_location( @@ -649,6 +692,36 @@ async def _do_request() -> HttpResponse: return reports + @overload + async def fetch_location_history( + self, + keys: HasHashedPublicKey, + ) -> list[LocationReport]: ... + + @overload + async def fetch_location_history( + self, + keys: RollingKeyPairSource, + ) -> list[LocationReport]: ... + + @overload + async def fetch_location_history( + self, + keys: Sequence[HasHashedPublicKey | RollingKeyPairSource], + ) -> dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]: ... + + @override + async def fetch_location_history( + self, + keys: HasHashedPublicKey + | Sequence[HasHashedPublicKey | RollingKeyPairSource] + | RollingKeyPairSource, + ) -> ( + list[LocationReport] | dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]] + ): + """See `BaseAppleAccount.fetch_location_history`.""" + return await self._reports.fetch_location_history(keys) + @overload async def fetch_location( self, @@ -679,8 +752,12 @@ async def fetch_location( | dict[HasHashedPublicKey | RollingKeyPairSource, LocationReport | None] | None ): - """See :meth:`BaseAppleAccount.fetch_reports`.""" - return await self._reports.fetch_location(keys) + """See :meth:`BaseAppleAccount.fetch_location`.""" + hist = await self.fetch_location_history(keys) + if isinstance(hist, list): + return sorted(hist)[-1] if hist else None + + return {dev: sorted(reports)[-1] if reports else None for dev, reports in hist.items()} @require_login_state(LoginState.LOGGED_OUT, LoginState.REQUIRE_2FA, LoginState.LOGGED_IN) async def _gsa_authenticate( @@ -1019,10 +1096,35 @@ def td_2fa_submit(self, code: str) -> LoginState: return self._evt_loop.run_until_complete(coro) @overload - def fetch_location( + def fetch_location_history( self, keys: HasHashedPublicKey, - ) -> LocationReport | None: ... + ) -> list[LocationReport]: ... + + @overload + def fetch_location_history( + self, + keys: RollingKeyPairSource, + ) -> list[LocationReport]: ... + + @overload + def fetch_location_history( + self, + keys: Sequence[HasHashedPublicKey | RollingKeyPairSource], + ) -> dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]: ... + + @override + def fetch_location_history( + self, + keys: HasHashedPublicKey + | Sequence[HasHashedPublicKey | RollingKeyPairSource] + | RollingKeyPairSource, + ) -> ( + list[LocationReport] | dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]] + ): + """See `BaseAppleAccount.fetch_location_history`.""" + coro = self._asyncacc.fetch_location_history(keys) + return self._evt_loop.run_until_complete(coro) @overload def fetch_location( @@ -1040,16 +1142,19 @@ def fetch_location( def fetch_location( self, keys: HasHashedPublicKey - | Sequence[HasHashedPublicKey | RollingKeyPairSource] - | RollingKeyPairSource, + | RollingKeyPairSource + | Sequence[HasHashedPublicKey | RollingKeyPairSource], ) -> ( LocationReport | dict[HasHashedPublicKey | RollingKeyPairSource, LocationReport | None] | None ): - """See :meth:`AsyncAppleAccount.fetch_location`.""" - coro = self._asyncacc.fetch_location(keys) - return self._evt_loop.run_until_complete(coro) + """See :meth:`BaseAppleAccount.fetch_location`.""" + hist = self.fetch_location_history(keys) + if isinstance(hist, list): + return sorted(hist)[-1] if hist else None + + return {dev: sorted(reports)[-1] if reports else None for dev, reports in hist.items()} @override def get_anisette_headers( diff --git a/findmy/reports/reports.py b/findmy/reports/reports.py index a8d051d..7759c8d 100644 --- a/findmy/reports/reports.py +++ b/findmy/reports/reports.py @@ -3,6 +3,7 @@ from __future__ import annotations import base64 +import bisect import hashlib import logging import struct @@ -337,48 +338,50 @@ def __init__(self, account: AsyncAppleAccount) -> None: self._account: AsyncAppleAccount = account @overload - async def fetch_location( + async def fetch_location_history( self, device: HasHashedPublicKey, - ) -> LocationReport | None: ... + ) -> list[LocationReport]: ... @overload - async def fetch_location( + async def fetch_location_history( self, device: RollingKeyPairSource, - ) -> LocationReport | None: ... + ) -> list[LocationReport]: ... @overload - async def fetch_location( + async def fetch_location_history( self, device: Sequence[HasHashedPublicKey | RollingKeyPairSource], - ) -> dict[HasHashedPublicKey | RollingKeyPairSource, LocationReport | None]: ... + ) -> dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]: ... - async def fetch_location( + async def fetch_location_history( self, device: HasHashedPublicKey | RollingKeyPairSource | Sequence[HasHashedPublicKey | RollingKeyPairSource], ) -> ( - LocationReport - | dict[HasHashedPublicKey | RollingKeyPairSource, LocationReport | None] - | None + list[LocationReport] | dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]] ): """ - Fetch location for a certain device or multipel devices. + Fetch location history for a certain device or multiple devices. When `device` is a single :class:`HasHashedPublicKey`, this method will return - a location report corresponding to that key, or None if unavailable. - When `device` is a :class:`RollingKeyPairSource`, it will return a location - report corresponding to that source, or None if unavailable. + a list of location reports corresponding to that key. + When `device` is a :class:`RollingKeyPairSource`, it will return a list of location + reports corresponding to that source. When `device` is a sequence of :class:`HasHashedPublicKey`s or RollingKeyPairSource's, it will return a dictionary with the provided objects - as keys, and a location report (or None) as value. + as keys, and a list of location reports as value. + + Note that the location history of :class:`RollingKeyPairSource` devices is not guaranteed + to be complete, and may be missing certain historical reports. The most recent report is + however guaranteed to be in line with what Apple reports. """ if isinstance(device, HasHashedPublicKey): # single key key_reports = await self._fetch_key_reports([device]) - return key_reports.get(device, None) + return key_reports.get(device, []) if isinstance(device, RollingKeyPairSource): # key generator @@ -395,7 +398,9 @@ async def fetch_location( # we can batch static keys in a single request, # but key generators need to be queried separately static_keys: list[HasHashedPublicKey] = [] - reports: dict[HasHashedPublicKey | RollingKeyPairSource, LocationReport | None] = {} + reports: dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]] = { + dev: [] for dev in device + } for dev in device: if isinstance(dev, HasHashedPublicKey): # save for later batch request @@ -413,7 +418,7 @@ async def fetch_location( async def _fetch_accessory_report( self, accessory: RollingKeyPairSource, - ) -> LocationReport | None: + ) -> list[LocationReport]: logger.debug("Fetching location report for accessory") now = datetime.now().astimezone() @@ -428,18 +433,16 @@ async def _fetch_accessory_report( cur_keys_primary: set[str] = set() cur_keys_secondary: set[str] = set() cur_index = accessory.get_min_index(start_date) - ret: LocationReport | None = None + ret: set[LocationReport] = set() - async def _fetch() -> LocationReport | None: + async def _fetch() -> set[LocationReport]: """Fetch current keys and add them to final reports.""" new_reports: list[LocationReport] = await self._account.fetch_raw_reports( [(list(cur_keys_primary), (list(cur_keys_secondary)))] ) logger.info("Fetched %d new reports (index %i)", len(new_reports), cur_index) - if new_reports: - report = sorted(new_reports)[-1] - + for report in new_reports: key = id_to_key[report.hashed_adv_key_bytes] report.decrypt(key) @@ -448,13 +451,11 @@ async def _fetch() -> LocationReport | None: # since apple only returns the latest reports per request. # This makes the value more likely to be stable. accessory.update_alignment(report, max(key_to_ind[key])) - else: - report = None cur_keys_primary.clear() cur_keys_secondary.clear() - return report + return set(new_reports) while cur_index <= accessory.get_max_index(end_date): key_batch = accessory.keys_at(cur_index) @@ -474,9 +475,7 @@ async def _fetch() -> LocationReport | None: len(cur_keys_primary | new_keys_primary) > 290 or len(cur_keys_secondary | new_keys_secondary) > 290 ): - report = await _fetch() - if ret is None or (report is not None and report.timestamp > ret.timestamp): - ret = report + ret |= await _fetch() # build mappings before adding to current keys for key in key_batch: @@ -489,17 +488,14 @@ async def _fetch() -> LocationReport | None: if cur_keys_primary or cur_keys_secondary: # fetch remaining keys - report = await _fetch() - if ret is None or (report is not None and report.timestamp > ret.timestamp): - ret = report + ret |= await _fetch() - # filter duplicate reports (can happen since key batches may overlap) - return ret + return sorted(ret) async def _fetch_key_reports( self, keys: Sequence[HasHashedPublicKey], - ) -> dict[HasHashedPublicKey, LocationReport | None]: + ) -> dict[HasHashedPublicKey, list[LocationReport]]: logger.debug("Fetching reports for %s key(s)", len(keys)) # fetch all as primary keys @@ -507,17 +503,13 @@ async def _fetch_key_reports( encrypted_reports: list[LocationReport] = await self._account.fetch_raw_reports(ids) id_to_key: dict[bytes, HasHashedPublicKey] = {key.hashed_adv_key_bytes: key for key in keys} - reports: dict[HasHashedPublicKey, LocationReport | None] = dict.fromkeys(keys) + reports: dict[HasHashedPublicKey, list[LocationReport]] = {key: [] for key in keys} for report in encrypted_reports: key = id_to_key[report.hashed_adv_key_bytes] + bisect.insort(reports[key], report) - cur_report = reports[key] - if cur_report is None or report.timestamp > cur_report.timestamp: - # more recent report, replace - reports[key] = report - - # pre-decrypt report if possible - if isinstance(key, KeyPair): - report.decrypt(key) + # pre-decrypt report if possible + if isinstance(key, KeyPair): + report.decrypt(key) return reports From 48007752af0fd0dd9cfeeafaa288957ff69dc3e4 Mon Sep 17 00:00:00 2001 From: "Mike A." Date: Sat, 6 Sep 2025 21:40:08 +0200 Subject: [PATCH 08/12] fix: Make examples work --- examples/fetch_reports.py | 20 +++++++++++++------- examples/fetch_reports_async.py | 18 ++++++++++++------ examples/real_airtag.py | 20 +++++++++----------- findmy/reports/account.py | 6 ++++++ pyproject.toml | 1 + 5 files changed, 41 insertions(+), 24 deletions(-) diff --git a/examples/fetch_reports.py b/examples/fetch_reports.py index 21a6e2b..f1fdbb9 100644 --- a/examples/fetch_reports.py +++ b/examples/fetch_reports.py @@ -32,16 +32,22 @@ def fetch_reports(priv_key: str) -> int: # Step 1: construct a key object and get its location reports key = KeyPair.from_b64(priv_key) - reports = acc.fetch_last_reports(key) + location = acc.fetch_location(key) - # Step 2: print the reports! - for report in sorted(reports): - print(report) + # Step 2: print it! + print("Last known location:") + print(f" - {location}") - # We can save the report to a file if we want - report.to_json("last_report.json") + # Step 3 (optional): We can save the location report to a file if we want. + # BUT WATCH OUT! This file will contain the tag's private key! + if location is not None: + location.to_json("last_report.json") - # Step 3: Make sure to save account state when you're done! + # To load it later: + # loc = LocationReport.from_json("last_report.json") + + # Step 4: Make sure to save account state when you're done! + # Otherwise you have to log in again... acc.to_json(STORE_PATH) return 0 diff --git a/examples/fetch_reports_async.py b/examples/fetch_reports_async.py index 169faa9..7a7cd2e 100644 --- a/examples/fetch_reports_async.py +++ b/examples/fetch_reports_async.py @@ -34,18 +34,24 @@ async def fetch_reports(priv_key: str) -> int: # Step 1: construct a key object and get its location reports key = KeyPair.from_b64(priv_key) - reports = await acc.fetch_last_reports(key) + location = await acc.fetch_location(key) - # Step 2: print the reports! - for report in sorted(reports): - print(report) + # Step 2: print it! + print("Last known location:") + print(f" - {location}") - # We can save the report to a file if we want - report.to_json("last_report.json") + # Step 3 (optional): We can save the location report to a file if we want. + # BUT WATCH OUT! This file will contain the tag's private key! + if location is not None: + location.to_json("last_report.json") + + # To load it later: + # loc = LocationReport.from_json("last_report.json") finally: await acc.close() # Make sure to save account state when you're done! + # Otherwise you have to log in again... acc.to_json(STORE_PATH) return 0 diff --git a/examples/real_airtag.py b/examples/real_airtag.py index 9aff41a..310e98b 100644 --- a/examples/real_airtag.py +++ b/examples/real_airtag.py @@ -31,34 +31,32 @@ logging.basicConfig(level=logging.INFO) -def main(plist_path: Path, alignment_plist_path: Path | None) -> int: +def main(airtag_path: Path) -> int: # Step 0: create an accessory key generator - airtag = FindMyAccessory.from_plist(plist_path, alignment_plist_path) + airtag = FindMyAccessory.from_json(airtag_path) # Step 1: log into an Apple account print("Logging into account") acc = get_account_sync(STORE_PATH, ANISETTE_SERVER, ANISETTE_LIBS_PATH) # step 2: fetch reports! - print("Fetching reports") - reports = acc.fetch_last_reports(airtag) + print("Fetching location") + location = acc.fetch_location(airtag) # step 3: print 'em - print() - print("Location reports:") - for report in sorted(reports): - print(f" - {report}") + print("Last known location:") + print(f" - {location}") # step 4: save current account state to disk acc.to_json(STORE_PATH) + airtag.to_json(airtag_path) return 0 if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument("plist_path", type=Path) - parser.add_argument("--alignment_plist_path", default=None, type=Path) + parser.add_argument("airtag_path", type=Path) args = parser.parse_args() - sys.exit(main(args.plist_path, args.alignment_plist_path)) + sys.exit(main(args.airtag_path)) diff --git a/findmy/reports/account.py b/findmy/reports/account.py index 528afea..3dade65 100644 --- a/findmy/reports/account.py +++ b/findmy/reports/account.py @@ -1126,6 +1126,12 @@ def fetch_location_history( coro = self._asyncacc.fetch_location_history(keys) return self._evt_loop.run_until_complete(coro) + @overload + def fetch_location( + self, + keys: HasHashedPublicKey, + ) -> LocationReport | None: ... + @overload def fetch_location( self, diff --git a/pyproject.toml b/pyproject.toml index 155e01e..b72e3bd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -75,6 +75,7 @@ ignore = [ "S101", # use of "assert" "D", # documentation "INP001", # namespacing + "ERA001", # commented out code ] "scripts/*" = [ "T201", # use of "print" From 066fd0a7147db70769ad86221b5384336fbd3774 Mon Sep 17 00:00:00 2001 From: "Mike A." Date: Sat, 6 Sep 2025 21:56:19 +0200 Subject: [PATCH 09/12] fix: resolve typing issues in scanner --- findmy/accessory.py | 29 ++++++++++++++++++----------- findmy/reports/reports.py | 2 +- findmy/scanner/scanner.py | 34 +++++++++++++++++++++++++++------- 3 files changed, 46 insertions(+), 19 deletions(-) diff --git a/findmy/accessory.py b/findmy/accessory.py index 29c54f2..c2bfd73 100644 --- a/findmy/accessory.py +++ b/findmy/accessory.py @@ -23,8 +23,6 @@ from collections.abc import Generator from pathlib import Path - from findmy.reports.reports import LocationReport - logger = logging.getLogger(__name__) @@ -63,7 +61,7 @@ def get_max_index(self, dt: datetime) -> int: raise NotImplementedError @abstractmethod - def update_alignment(self, report: LocationReport, index: int) -> None: + def update_alignment(self, dt: datetime, index: int) -> None: """ Update alignment of the accessory. @@ -76,14 +74,23 @@ def keys_at(self, ind: int) -> set[KeyPair]: """Generate potential key(s) occurring at a certain index.""" raise NotImplementedError - def keys_between(self, start: int, end: int) -> set[KeyPair]: - """Generate potential key(s) occurring between two indices.""" - keys: set[KeyPair] = set() + def keys_between( + self, start: int | datetime, end: int | datetime + ) -> Generator[tuple[int, KeyPair], None, None]: + """Generate potential key(s) that could be occurring between two indices or datetimes.""" + if isinstance(start, datetime): + start = self.get_min_index(start) + if isinstance(end, datetime): + end = self.get_max_index(end) + yielded: set[KeyPair] = set() for ind in range(start, end + 1): - keys.update(self.keys_at(ind)) + for key in self.keys_at(ind): + if key in yielded: + continue - return keys + yielded.add(key) + yield ind, key class FindMyAccessory(RollingKeyPairSource, Serializable[FindMyAccessoryMapping]): @@ -216,14 +223,14 @@ def get_max_index(self, dt: datetime) -> int: return self._alignment_index + ind_since_alignment @override - def update_alignment(self, report: LocationReport, index: int) -> None: - if report.timestamp < self._alignment_date: + def update_alignment(self, dt: datetime, index: int) -> None: + if dt < self._alignment_date: # we only care about the most recent report return logger.info("Updating alignment based on report observed at index %i", index) - self._alignment_date = report.timestamp + self._alignment_date = dt self._alignment_index = index def _primary_key_at(self, ind: int) -> KeyPair: diff --git a/findmy/reports/reports.py b/findmy/reports/reports.py index 7759c8d..3c4e77d 100644 --- a/findmy/reports/reports.py +++ b/findmy/reports/reports.py @@ -450,7 +450,7 @@ async def _fetch() -> set[LocationReport]: # if a key maps to multiple indices, only feed it the maximum index, # since apple only returns the latest reports per request. # This makes the value more likely to be stable. - accessory.update_alignment(report, max(key_to_ind[key])) + accessory.update_alignment(report.timestamp, max(key_to_ind[key])) cur_keys_primary.clear() cur_keys_secondary.clear() diff --git a/findmy/scanner/scanner.py b/findmy/scanner/scanner.py index ad50589..1f9d7b6 100644 --- a/findmy/scanner/scanner.py +++ b/findmy/scanner/scanner.py @@ -6,7 +6,7 @@ import logging import time from abc import ABC, abstractmethod -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from typing import TYPE_CHECKING, Any from bleak import BleakScanner @@ -152,12 +152,22 @@ def is_from(self, other_device: HasPublicKey | RollingKeyPairSource) -> bool: if isinstance(other_device, HasPublicKey): return other_device.adv_key_bytes.startswith(self._first_adv_key_bytes) if isinstance(other_device, RollingKeyPairSource): - # 1 hour margin around the detected time + # 12 hour margin around the detected time potential_keys = other_device.keys_between( - self.detected_at - timedelta(hours=1), - self.detected_at + timedelta(hours=1), + self.detected_at - timedelta(hours=12), + self.detected_at + timedelta(hours=12), ) - return any(self.is_from(key) for key in potential_keys) + for ind, key in potential_keys: + if not self.is_from(key): + continue + + # update alignment of found key + now = datetime.now(tz=timezone.utc) + other_device.update_alignment(now, ind) + + return True + + return False msg = f"Cannot compare against {type(other_device)}" raise ValueError(msg) @@ -232,14 +242,24 @@ def adv_key_bytes(self) -> bytes: def is_from(self, other_device: HasPublicKey | RollingKeyPairSource) -> bool: """Check whether the OF device's identity originates from a specific key source.""" if isinstance(other_device, HasPublicKey): - return self.adv_key_bytes == other_device.adv_key_bytes + return other_device.adv_key_bytes == self.adv_key_bytes if isinstance(other_device, RollingKeyPairSource): # 12 hour margin around the detected time potential_keys = other_device.keys_between( self.detected_at - timedelta(hours=12), self.detected_at + timedelta(hours=12), ) - return any(self.is_from(key) for key in potential_keys) + for ind, key in potential_keys: + if not self.is_from(key): + continue + + # update alignment of found key + now = datetime.now(tz=timezone.utc) + other_device.update_alignment(now, ind) + + return True + + return False msg = f"Cannot compare against {type(other_device)}" raise ValueError(msg) From 04918946afe41277b18affdedadae0d40812fc9f Mon Sep 17 00:00:00 2001 From: "Mike A." Date: Mon, 8 Sep 2025 00:43:30 +0200 Subject: [PATCH 10/12] feat: support airtags in scanner example --- examples/{device_scanner.py => scanner.py} | 36 ++++++++++++++++------ 1 file changed, 27 insertions(+), 9 deletions(-) rename examples/{device_scanner.py => scanner.py} (60%) diff --git a/examples/device_scanner.py b/examples/scanner.py similarity index 60% rename from examples/device_scanner.py rename to examples/scanner.py index 78d35bb..7728a6a 100644 --- a/examples/device_scanner.py +++ b/examples/scanner.py @@ -1,10 +1,12 @@ from __future__ import annotations +import argparse import asyncio import logging -import sys +from pathlib import Path from findmy import KeyPair +from findmy.accessory import FindMyAccessory from findmy.scanner import ( NearbyOfflineFindingDevice, OfflineFindingScanner, @@ -35,7 +37,7 @@ def _print_separated(device: SeparatedOfflineFindingDevice) -> None: print() -async def scan(check_key: KeyPair | None = None) -> None: +async def scan(check_key: KeyPair | FindMyAccessory | None = None) -> bool: scanner = await OfflineFindingScanner.create() print("Scanning for FindMy-devices...") @@ -56,15 +58,31 @@ async def scan(check_key: KeyPair | None = None) -> None: if check_key and device.is_from(check_key): scan_device = device + print() if scan_device: - print("Key or accessory was found in scan results! :D") + print("Device was found in scan results! :D") elif check_key: - print("Selected key or accessory was not found in scan results... :c") + print("Device was not found in scan results... :c") + return scan_device is not None and check_key is not None -if __name__ == "__main__": - key = None - if len(sys.argv) >= 2: - key = KeyPair.from_b64(sys.argv[1]) - asyncio.run(scan(key)) +if __name__ == "__main__": + parser = argparse.ArgumentParser() + group = parser.add_mutually_exclusive_group() + group.add_argument("--private_key", type=str) + group.add_argument("--airtag_file", type=Path) + args = parser.parse_args() + + dev: KeyPair | FindMyAccessory | None = None + if args.private_key: + dev = KeyPair.from_b64(args.private_key) + elif args.airtag_file: + dev = FindMyAccessory.from_json(args.airtag_file) + + device_found = asyncio.run(scan(dev)) + + if device_found and isinstance(dev, FindMyAccessory): + print("Current scan results were used to align the accessory.") + print(f'Updated alignment will be saved to "{args.airtag_file}".') + dev.to_json(args.airtag_file) From ed94179b321df83c8407142578a6342859258c6d Mon Sep 17 00:00:00 2001 From: "Mike A." Date: Mon, 8 Sep 2025 00:52:59 +0200 Subject: [PATCH 11/12] fix: resolve import error --- examples/real_airtag.py | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/real_airtag.py b/examples/real_airtag.py index 0ef2ccb..310e98b 100644 --- a/examples/real_airtag.py +++ b/examples/real_airtag.py @@ -7,6 +7,7 @@ import argparse import logging import sys +from pathlib import Path from _login import get_account_sync From 4e3e30d0bb56b9ba8cd50dd83ae41c85bd4c9bf7 Mon Sep 17 00:00:00 2001 From: "Mike A." Date: Mon, 8 Sep 2025 00:53:15 +0200 Subject: [PATCH 12/12] feat: add plist -> json conversion example --- examples/plist_to_json.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 examples/plist_to_json.py diff --git a/examples/plist_to_json.py b/examples/plist_to_json.py new file mode 100644 index 0000000..6e332cc --- /dev/null +++ b/examples/plist_to_json.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +from pathlib import Path + +from findmy import FindMyAccessory + + +def main(output: Path, accessory_plist: Path, alignment_plist: Path | None = None) -> int: + accessory = FindMyAccessory.from_plist(accessory_plist, alignment_plist) + accessory.to_json(output) + return 0 + + +if __name__ == "__main__": + import argparse + import sys + + parser = argparse.ArgumentParser() + parser.add_argument("accessory_plist", type=Path, help="Input accessory plist file") + parser.add_argument("output", type=Path, help="Output JSON file") + parser.add_argument( + "--alignment-plist", + type=Path, + help="Input alignment plist file (if available)", + default=None, + ) + args = parser.parse_args() + + sys.exit(main(args.output, args.accessory_plist, args.alignment_plist))