diff --git a/examples/fetch_reports.py b/examples/fetch_reports.py index 21a6e2b..f1fdbb9 100644 --- a/examples/fetch_reports.py +++ b/examples/fetch_reports.py @@ -32,16 +32,22 @@ def fetch_reports(priv_key: str) -> int: # Step 1: construct a key object and get its location reports key = KeyPair.from_b64(priv_key) - reports = acc.fetch_last_reports(key) + location = acc.fetch_location(key) - # Step 2: print the reports! - for report in sorted(reports): - print(report) + # Step 2: print it! + print("Last known location:") + print(f" - {location}") - # We can save the report to a file if we want - report.to_json("last_report.json") + # Step 3 (optional): We can save the location report to a file if we want. + # BUT WATCH OUT! This file will contain the tag's private key! + if location is not None: + location.to_json("last_report.json") - # Step 3: Make sure to save account state when you're done! + # To load it later: + # loc = LocationReport.from_json("last_report.json") + + # Step 4: Make sure to save account state when you're done! + # Otherwise you have to log in again... acc.to_json(STORE_PATH) return 0 diff --git a/examples/fetch_reports_async.py b/examples/fetch_reports_async.py index 169faa9..7a7cd2e 100644 --- a/examples/fetch_reports_async.py +++ b/examples/fetch_reports_async.py @@ -34,18 +34,24 @@ async def fetch_reports(priv_key: str) -> int: # Step 1: construct a key object and get its location reports key = KeyPair.from_b64(priv_key) - reports = await acc.fetch_last_reports(key) + location = await acc.fetch_location(key) - # Step 2: print the reports! - for report in sorted(reports): - print(report) + # Step 2: print it! + print("Last known location:") + print(f" - {location}") - # We can save the report to a file if we want - report.to_json("last_report.json") + # Step 3 (optional): We can save the location report to a file if we want. + # BUT WATCH OUT! This file will contain the tag's private key! + if location is not None: + location.to_json("last_report.json") + + # To load it later: + # loc = LocationReport.from_json("last_report.json") finally: await acc.close() # Make sure to save account state when you're done! + # Otherwise you have to log in again... acc.to_json(STORE_PATH) return 0 diff --git a/examples/plist_to_json.py b/examples/plist_to_json.py new file mode 100644 index 0000000..6e332cc --- /dev/null +++ b/examples/plist_to_json.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +from pathlib import Path + +from findmy import FindMyAccessory + + +def main(output: Path, accessory_plist: Path, alignment_plist: Path | None = None) -> int: + accessory = FindMyAccessory.from_plist(accessory_plist, alignment_plist) + accessory.to_json(output) + return 0 + + +if __name__ == "__main__": + import argparse + import sys + + parser = argparse.ArgumentParser() + parser.add_argument("accessory_plist", type=Path, help="Input accessory plist file") + parser.add_argument("output", type=Path, help="Output JSON file") + parser.add_argument( + "--alignment-plist", + type=Path, + help="Input alignment plist file (if available)", + default=None, + ) + args = parser.parse_args() + + sys.exit(main(args.output, args.accessory_plist, args.alignment_plist)) diff --git a/examples/real_airtag.py b/examples/real_airtag.py index 04448e2..310e98b 100644 --- a/examples/real_airtag.py +++ b/examples/real_airtag.py @@ -4,8 +4,10 @@ from __future__ import annotations +import argparse import logging import sys +from pathlib import Path from _login import get_account_sync @@ -29,35 +31,32 @@ logging.basicConfig(level=logging.INFO) -def main(plist_path: str) -> int: +def main(airtag_path: Path) -> int: # Step 0: create an accessory key generator - airtag = FindMyAccessory.from_plist(plist_path) + airtag = FindMyAccessory.from_json(airtag_path) # Step 1: log into an Apple account print("Logging into account") acc = get_account_sync(STORE_PATH, ANISETTE_SERVER, ANISETTE_LIBS_PATH) # step 2: fetch reports! - print("Fetching reports") - reports = acc.fetch_last_reports(airtag) + print("Fetching location") + location = acc.fetch_location(airtag) # step 3: print 'em - print() - print("Location reports:") - for report in sorted(reports): - print(f" - {report}") + print("Last known location:") + print(f" - {location}") # step 4: save current account state to disk acc.to_json(STORE_PATH) + airtag.to_json(airtag_path) return 0 if __name__ == "__main__": - if len(sys.argv) < 2: - print(f"Usage: {sys.argv[0]} ", file=sys.stderr) - print(file=sys.stderr) - print("The plist file should be dumped from MacOS's FindMy app.", file=sys.stderr) - sys.exit(1) + parser = argparse.ArgumentParser() + parser.add_argument("airtag_path", type=Path) + args = parser.parse_args() - sys.exit(main(sys.argv[1])) + sys.exit(main(args.airtag_path)) diff --git a/examples/device_scanner.py b/examples/scanner.py similarity index 60% rename from examples/device_scanner.py rename to examples/scanner.py index 78d35bb..7728a6a 100644 --- a/examples/device_scanner.py +++ b/examples/scanner.py @@ -1,10 +1,12 @@ from __future__ import annotations +import argparse import asyncio import logging -import sys +from pathlib import Path from findmy import KeyPair +from findmy.accessory import FindMyAccessory from findmy.scanner import ( NearbyOfflineFindingDevice, OfflineFindingScanner, @@ -35,7 +37,7 @@ def _print_separated(device: SeparatedOfflineFindingDevice) -> None: print() -async def scan(check_key: KeyPair | None = None) -> None: +async def scan(check_key: KeyPair | FindMyAccessory | None = None) -> bool: scanner = await OfflineFindingScanner.create() print("Scanning for FindMy-devices...") @@ -56,15 +58,31 @@ async def scan(check_key: KeyPair | None = None) -> None: if check_key and device.is_from(check_key): scan_device = device + print() if scan_device: - print("Key or accessory was found in scan results! :D") + print("Device was found in scan results! :D") elif check_key: - print("Selected key or accessory was not found in scan results... :c") + print("Device was not found in scan results... :c") + return scan_device is not None and check_key is not None -if __name__ == "__main__": - key = None - if len(sys.argv) >= 2: - key = KeyPair.from_b64(sys.argv[1]) - asyncio.run(scan(key)) +if __name__ == "__main__": + parser = argparse.ArgumentParser() + group = parser.add_mutually_exclusive_group() + group.add_argument("--private_key", type=str) + group.add_argument("--airtag_file", type=Path) + args = parser.parse_args() + + dev: KeyPair | FindMyAccessory | None = None + if args.private_key: + dev = KeyPair.from_b64(args.private_key) + elif args.airtag_file: + dev = FindMyAccessory.from_json(args.airtag_file) + + device_found = asyncio.run(scan(dev)) + + if device_found and isinstance(dev, FindMyAccessory): + print("Current scan results were used to align the accessory.") + print(f'Updated alignment will be saved to "{args.airtag_file}".') + dev.to_json(args.airtag_file) diff --git a/findmy/__main__.py b/findmy/__main__.py index acc48a6..af59873 100644 --- a/findmy/__main__.py +++ b/findmy/__main__.py @@ -8,7 +8,7 @@ from importlib.metadata import version from pathlib import Path -from .plist import get_key, list_accessories +from .plist import list_accessories def main() -> None: # noqa: D103 @@ -96,8 +96,7 @@ def get_path(d, acc) -> Path | None: # noqa: ANN001 d.mkdir(parents=True, exist_ok=True) return d / f"{acc.identifier}.json" - key = get_key() - accs = list_accessories(key=key) + accs = list_accessories() jsons = [acc.to_json(get_path(out_dir, acc)) for acc in accs] print(json.dumps(jsons, indent=4, ensure_ascii=False)) # noqa: T201 diff --git a/findmy/accessory.py b/findmy/accessory.py index 18b0e58..c2bfd73 100644 --- a/findmy/accessory.py +++ b/findmy/accessory.py @@ -7,22 +7,21 @@ from __future__ import annotations import logging -import plistlib from abc import ABC, abstractmethod from datetime import datetime, timedelta, timezone -from pathlib import Path -from typing import IO, TYPE_CHECKING, Literal, TypedDict, overload +from typing import TYPE_CHECKING, Literal, TypedDict, overload from typing_extensions import override from findmy.util.abc import Serializable -from findmy.util.files import read_data_json, save_and_return_json +from findmy.util.files import read_data_json, read_data_plist, save_and_return_json from .keys import KeyGenerator, KeyPair, KeyType from .util import crypto if TYPE_CHECKING: from collections.abc import Generator + from pathlib import Path logger = logging.getLogger(__name__) @@ -38,6 +37,8 @@ class FindMyAccessoryMapping(TypedDict): name: str | None model: str | None identifier: str | None + alignment_date: str | None + alignment_index: int | None class RollingKeyPairSource(ABC): @@ -47,39 +48,49 @@ class RollingKeyPairSource(ABC): @abstractmethod def interval(self) -> timedelta: """KeyPair rollover interval.""" + raise NotImplementedError @abstractmethod - def keys_at(self, ind: int | datetime) -> set[KeyPair]: - """Generate potential key(s) occurring at a certain index or timestamp.""" + def get_min_index(self, dt: datetime) -> int: + """Get the minimum key index that the accessory could be broadcasting at a specific time.""" raise NotImplementedError - @overload - def keys_between(self, start: int, end: int) -> set[KeyPair]: - pass + @abstractmethod + def get_max_index(self, dt: datetime) -> int: + """Get the maximum key index that the accessory could be broadcasting at a specific time.""" + raise NotImplementedError - @overload - def keys_between(self, start: datetime, end: datetime) -> set[KeyPair]: - pass + @abstractmethod + def update_alignment(self, dt: datetime, index: int) -> None: + """ + Update alignment of the accessory. - def keys_between(self, start: int | datetime, end: int | datetime) -> set[KeyPair]: - """Generate potential key(s) occurring between two indices or timestamps.""" - keys: set[KeyPair] = set() + Alignment can be updated based on a LocationReport that was observed at a specific index. + """ + raise NotImplementedError - if isinstance(start, int) and isinstance(end, int): - while start < end: - keys.update(self.keys_at(start)) + @abstractmethod + def keys_at(self, ind: int) -> set[KeyPair]: + """Generate potential key(s) occurring at a certain index.""" + raise NotImplementedError - start += 1 - elif isinstance(start, datetime) and isinstance(end, datetime): - while start < end: - keys.update(self.keys_at(start)) + def keys_between( + self, start: int | datetime, end: int | datetime + ) -> Generator[tuple[int, KeyPair], None, None]: + """Generate potential key(s) that could be occurring between two indices or datetimes.""" + if isinstance(start, datetime): + start = self.get_min_index(start) + if isinstance(end, datetime): + end = self.get_max_index(end) - start += self.interval - else: - msg = "Invalid start/end type" - raise TypeError(msg) + yielded: set[KeyPair] = set() + for ind in range(start, end + 1): + for key in self.keys_at(ind): + if key in yielded: + continue - return keys + yielded.add(key) + yield ind, key class FindMyAccessory(RollingKeyPairSource, Serializable[FindMyAccessoryMapping]): @@ -95,6 +106,8 @@ def __init__( # noqa: PLR0913 name: str | None = None, model: str | None = None, identifier: str | None = None, + alignment_date: datetime | None = None, + alignment_index: int | None = None, ) -> None: """ Initialize a FindMyAccessory. These values are usually obtained during pairing. @@ -116,6 +129,14 @@ def __init__( # noqa: PLR0913 self._name = name self._model = model self._identifier = identifier + self._alignment_date = alignment_date if alignment_date is not None else paired_at + self._alignment_index = alignment_index if alignment_index is not None else 0 + if self._alignment_date.tzinfo is None: + self._alignment_date = self._alignment_date.astimezone() + logger.warning( + "Alignment datetime is timezone-naive. Assuming system tz: %s.", + self._alignment_date.tzname(), + ) @property def master_key(self) -> bytes: @@ -163,69 +184,93 @@ def interval(self) -> timedelta: return timedelta(minutes=15) @override - def keys_at(self, ind: int | datetime) -> set[KeyPair]: - """Get the potential primary and secondary keys active at a certain time or index.""" - if isinstance(ind, datetime) and ind < self._paired_at: - return set() - if isinstance(ind, int) and ind < 0: - return set() + def get_min_index(self, dt: datetime) -> int: + if dt.tzinfo is None: + end = dt.astimezone() + logger.warning( + "Datetime is timezone-naive. Assuming system tz: %s.", + end.tzname(), + ) - secondary_offset = 0 + if dt >= self._alignment_date: + # in the worst case, the accessory has not rolled over at all since alignment + return self._alignment_index - if isinstance(ind, datetime): - # number of 15-minute slots since pairing time - ind = ( - int( - (ind - self._paired_at).total_seconds() / (15 * 60), - ) - + 1 - ) - # number of slots until first 4 am - first_rollover = self._paired_at.astimezone().replace( - hour=4, - minute=0, - second=0, - microsecond=0, - ) - if first_rollover < self._paired_at: # we rolled backwards, so increment the day - first_rollover += timedelta(days=1) - secondary_offset = ( - int( - (first_rollover - self._paired_at).total_seconds() / (15 * 60), - ) - + 1 + # the accessory key will rollover AT MOST once every 15 minutes, so + # this is the minimum index for which we will need to generate keys. + # it's possible that rollover has progressed slower or not at all. + ind_before_alignment = (self._alignment_date - dt) // self.interval + return self._alignment_index - ind_before_alignment + + @override + def get_max_index(self, dt: datetime) -> int: + if dt.tzinfo is None: + end = dt.astimezone() + logger.warning( + "Datetime is timezone-naive. Assuming system tz: %s.", + end.tzname(), ) - possible_keys = set() - # primary key can always be determined - possible_keys.add(self._primary_gen[ind]) + if dt <= self._alignment_date: + # in the worst case, the accessory has not rolled over at all since `dt`, + # in which case it was at the alignment index. We can't go lower than that. + return self._alignment_index + + # the accessory key will rollover AT MOST once every 15 minutes, so + # this is the maximum index for which we will need to generate keys. + # it's possible that rollover has progressed slower or not at all. + ind_since_alignment = (dt - self._alignment_date) // self.interval + return self._alignment_index + ind_since_alignment + + @override + def update_alignment(self, dt: datetime, index: int) -> None: + if dt < self._alignment_date: + # we only care about the most recent report + return + + logger.info("Updating alignment based on report observed at index %i", index) + + self._alignment_date = dt + self._alignment_index = index + + def _primary_key_at(self, ind: int) -> KeyPair: + """Get the primary key at a certain index.""" + return self._primary_gen[ind] + def _secondary_keys_at(self, ind: int) -> tuple[KeyPair, KeyPair]: + """Get possible secondary keys at a certain primary index.""" # when the accessory has been rebooted, it will use the following secondary key - possible_keys.add(self._secondary_gen[ind // 96 + 1]) + key_1 = self._secondary_gen[ind // 96 + 1] + + # in some cases, the secondary index may not be at primary_ind // 96 + 1, but at +2 instead. + # example: if we paired at 3:00 am, the first secondary key will be used until 4:00 am, + # at which point the second secondary key will be used. The primary index at 4:00 am is 4, + # but the 'second' secondary key is used. + # however, since we don't know the exact index rollover pattern, we just take a guess here + # and return both keys. for alignment, it's better to underestimate progression of the index + # than to overestimate it. + key_2 = self._secondary_gen[ind // 96 + 2] + + return key_1, key_2 - if ind > secondary_offset: - # after the first 4 am after pairing, we need to account for the first day - possible_keys.add(self._secondary_gen[(ind - secondary_offset) // 96 + 2]) + @override + def keys_at(self, ind: int) -> set[KeyPair]: + """Get the primary and secondary keys that might be active at a certain index.""" + if ind < 0: + return set() - return possible_keys + return {self._primary_key_at(ind), *self._secondary_keys_at(ind)} @classmethod def from_plist( cls, - plist: str | Path | dict | bytes | IO[bytes], + plist: str | Path | dict | bytes, + key_alignment_plist: str | Path | dict | bytes | None = None, *, name: str | None = None, ) -> FindMyAccessory: """Create a FindMyAccessory from a .plist file dumped from the FindMy app.""" - if isinstance(plist, bytes): - # plist is a bytes object - device_data = plistlib.loads(plist) - elif isinstance(plist, (str, Path)): - device_data = plistlib.loads(Path(plist).read_bytes()) - elif isinstance(plist, IO): - device_data = plistlib.load(plist) - else: - device_data = plist + device_data = read_data_plist(plist) # PRIVATE master key. 28 (?) bytes. master_key = device_data["privateKey"]["key"]["data"][-28:] @@ -247,6 +292,18 @@ def from_plist( model = device_data["model"] identifier = device_data["identifier"] + alignment_date = None + index = None + if key_alignment_plist: + alignment_data = read_data_plist(key_alignment_plist) + + # last observed date + alignment_date = alignment_data["lastIndexObservationDate"].replace( + tzinfo=timezone.utc, + ) + # primary index value at last observed date + index = alignment_data["lastIndexObserved"] + return cls( master_key=master_key, skn=skn, @@ -255,10 +312,16 @@ def from_plist( name=name, model=model, identifier=identifier, + alignment_date=alignment_date, + alignment_index=index, ) @override def to_json(self, path: str | Path | None = None, /) -> FindMyAccessoryMapping: + alignment_date = None + if self._alignment_date is not None: + alignment_date = self._alignment_date.isoformat() + res: FindMyAccessoryMapping = { "type": "accessory", "master_key": self._primary_gen.master_key.hex(), @@ -268,6 +331,8 @@ def to_json(self, path: str | Path | None = None, /) -> FindMyAccessoryMapping: "name": self.name, "model": self.model, "identifier": self.identifier, + "alignment_date": alignment_date, + "alignment_index": self._alignment_index, } return save_and_return_json(res, path) @@ -283,6 +348,10 @@ def from_json( assert val["type"] == "accessory" try: + alignment_date = val["alignment_date"] + if alignment_date is not None: + alignment_date = datetime.fromisoformat(alignment_date) + return cls( master_key=bytes.fromhex(val["master_key"]), skn=bytes.fromhex(val["skn"]), @@ -291,6 +360,8 @@ def from_json( name=val["name"], model=val["model"], identifier=val["identifier"], + alignment_date=alignment_date, + alignment_index=val["alignment_index"], ) except KeyError as e: msg = f"Failed to restore account data: {e}" @@ -345,6 +416,10 @@ def key_type(self) -> KeyType: return self._key_type def _get_sk(self, ind: int) -> bytes: + if ind < 0: + msg = "The key index must be non-negative" + raise ValueError(msg) + if ind < self._cur_sk_ind: # behind us; need to reset :( self._cur_sk = self._initial_sk self._cur_sk_ind = 0 diff --git a/findmy/plist.py b/findmy/plist.py index 046243e..e849f4f 100644 --- a/findmy/plist.py +++ b/findmy/plist.py @@ -1,7 +1,16 @@ -"""Utils for decrypting the encypted .record files into .plist files.""" +""" +Utils for decrypting the encypted .record files into .plist files. + +Originally from: +Author: Shane B. +in https://github.com/parawanderer/OpenTagViewer/blob/08a59cab551721afb9dc9f829ad31dae8d5bd400/python/airtag_decryptor.py +which was based on: +Based on: https://gist.github.com/airy10/5205dc851fbd0715fcd7a5cdde25e7c8 +""" from __future__ import annotations +import logging import plistlib import subprocess from pathlib import Path @@ -11,24 +20,56 @@ from .accessory import FindMyAccessory -# Originally from: -# Author: Shane B. -# in https://github.com/parawanderer/OpenTagViewer/blob/08a59cab551721afb9dc9f829ad31dae8d5bd400/python/airtag_decryptor.py -# which was based on: -# Based on: https://gist.github.com/airy10/5205dc851fbd0715fcd7a5cdde25e7c8 +logger = logging.getLogger(__name__) + + +_DEFAULT_SEARCH_PATH = Path.home() / "Library" / "com.apple.icloud.searchpartyd" # consider switching to this library https://github.com/microsoft/keyper # once they publish a version of it that includes my MR with the changes to make it compatible # with keys that are non-utf-8 encoded (like the BeaconStore one) # if I contribute this, properly escape the label argument here... -def get_key() -> bytes: +def _get_beaconstore_key() -> bytes: """Get the decryption key for BeaconStore using the system password prompt window.""" # This thing will pop up 2 Password Input windows... key_in_hex = subprocess.getoutput("/usr/bin/security find-generic-password -l 'BeaconStore' -w") # noqa: S605 return bytes.fromhex(key_in_hex) +def _get_accessory_name( + accessory_id: str, + key: bytes, + *, + search_path: Path | None = None, +) -> str | None: + search_path = search_path or _DEFAULT_SEARCH_PATH + path = next((search_path / "BeaconNamingRecord" / accessory_id).glob(pattern="*.record"), None) + if path is None: + logger.warning( + "Accessory %s does not have a BeaconNamingRecord, defaulting to None", accessory_id + ) + return None + + naming_record_plist = decrypt_plist(path, key) + return naming_record_plist.get("name", None) + + +def _get_alignment_plist( + accessory_id: str, + key: bytes, + *, + search_path: Path | None = None, +) -> dict | None: + search_path = search_path or _DEFAULT_SEARCH_PATH + path = next((search_path / "KeyAlignmentRecords" / accessory_id).glob(pattern="*.record"), None) + if path is None: + logger.warning("Accessory %s does not have a KeyAlignmentRecord", accessory_id) + return None + + return decrypt_plist(path, key) + + def decrypt_plist(encrypted: str | Path | bytes | IO[bytes], key: bytes) -> dict: """ Decrypts the encrypted plist file at :meth:`encrypted` using the provided :meth:`key`. @@ -76,15 +117,15 @@ def list_accessories( search_path = Path.home() / "Library" / "com.apple.icloud.searchpartyd" search_path = Path(search_path) if key is None: - key = get_key() + key = _get_beaconstore_key() accesories = [] encrypted_plist_paths = search_path.glob("OwnedBeacons/*.record") for path in encrypted_plist_paths: plist = decrypt_plist(path, key) - naming_record_path = next((search_path / "BeaconNamingRecord" / path.stem).glob("*.record")) - naming_record_plist = decrypt_plist(naming_record_path, key) - name = naming_record_plist["name"] - accessory = FindMyAccessory.from_plist(plist, name=name) + name = _get_accessory_name(path.stem, key) + alignment_plist = _get_alignment_plist(path.stem, key) + + accessory = FindMyAccessory.from_plist(plist, alignment_plist, name=name) accesories.append(accessory) return accesories diff --git a/findmy/reports/account.py b/findmy/reports/account.py index 53f1ab1..3dade65 100644 --- a/findmy/reports/account.py +++ b/findmy/reports/account.py @@ -233,87 +233,85 @@ def td_2fa_submit(self, code: str) -> MaybeCoro[LoginState]: @overload @abstractmethod - def fetch_reports( + def fetch_location_history( self, keys: HasHashedPublicKey, - date_from: datetime, - date_to: datetime | None, ) -> MaybeCoro[list[LocationReport]]: ... @overload @abstractmethod - def fetch_reports( + def fetch_location_history( self, keys: RollingKeyPairSource, - date_from: datetime, - date_to: datetime | None, ) -> MaybeCoro[list[LocationReport]]: ... @overload @abstractmethod - def fetch_reports( + def fetch_location_history( self, keys: Sequence[HasHashedPublicKey | RollingKeyPairSource], - date_from: datetime, - date_to: datetime | None, ) -> MaybeCoro[dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]]: ... @abstractmethod - def fetch_reports( + def fetch_location_history( self, keys: HasHashedPublicKey | Sequence[HasHashedPublicKey | RollingKeyPairSource] | RollingKeyPairSource, - date_from: datetime, - date_to: datetime | None, ) -> MaybeCoro[ list[LocationReport] | dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]] ]: """ - Fetch location reports for :class:`HasHashedPublicKey`s between `date_from` and `date_end`. + Fetch location history for :class:`HasHashedPublicKey`s and :class:`RollingKeyPairSource`s. - Returns a dictionary mapping :class:`HasHashedPublicKey`s to their location reports. + Note that location history for devices is provided on a best-effort + basis and may not be fully complete or stable. Multiple consecutive calls to this method + may result in different location reports, especially for reports further in the past. + However, each one of these reports is guaranteed to be in line with the data reported by + Apple, and the most recent report will always be included in the results. + + Unless you really need to use this method, and use :meth:`fetch_location` instead. """ raise NotImplementedError @overload @abstractmethod - def fetch_last_reports( + def fetch_location( self, keys: HasHashedPublicKey, - hours: int = 7 * 24, - ) -> MaybeCoro[list[LocationReport]]: ... + ) -> MaybeCoro[LocationReport | None]: ... @overload @abstractmethod - def fetch_last_reports( + def fetch_location( self, keys: RollingKeyPairSource, - hours: int = 7 * 24, - ) -> MaybeCoro[list[LocationReport]]: ... + ) -> MaybeCoro[LocationReport | None]: ... @overload @abstractmethod - def fetch_last_reports( + def fetch_location( self, keys: Sequence[HasHashedPublicKey | RollingKeyPairSource], - hours: int = 7 * 24, - ) -> MaybeCoro[dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]]: ... + ) -> MaybeCoro[ + dict[HasHashedPublicKey | RollingKeyPairSource, LocationReport | None] | None + ]: ... @abstractmethod - def fetch_last_reports( + def fetch_location( self, keys: HasHashedPublicKey - | RollingKeyPairSource - | Sequence[HasHashedPublicKey | RollingKeyPairSource], - hours: int = 7 * 24, + | Sequence[HasHashedPublicKey | RollingKeyPairSource] + | RollingKeyPairSource, ) -> MaybeCoro[ - list[LocationReport] | dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]] + LocationReport + | dict[HasHashedPublicKey | RollingKeyPairSource, LocationReport | None] + | None ]: """ - Fetch location reports for :class:`HasHashedPublicKey`s for the last `hours` hours. + Fetch location for :class:`HasHashedPublicKey`s. - Utility method as an alternative to using :meth:`BaseAppleAccount.fetch_reports` directly. + Returns a dictionary mapping :class:`HasHashedPublicKey`s to their location reports. """ raise NotImplementedError @@ -617,17 +615,19 @@ async def td_2fa_submit(self, code: str) -> LoginState: @require_login_state(LoginState.LOGGED_IN) async def fetch_raw_reports( self, - start: datetime, - end: datetime, - devices: list[list[str]], - ) -> dict[str, Any]: + devices: list[tuple[list[str], list[str]]], + ) -> list[LocationReport]: """Make a request for location reports, returning raw data.""" + logger.debug("Fetching raw reports for %d device(s)", len(devices)) + + now = datetime.now(tz=timezone.utc) + start_ts = int((now - timedelta(days=7)).timestamp()) * 1000 + end_ts = int(now.timestamp()) * 1000 + auth = ( self._login_state_data["dsid"], self._login_state_data["mobileme_data"]["tokens"]["searchPartyToken"], ) - start_ts = int(start.timestamp() * 1000) - end_ts = int(end.timestamp() * 1000) data = { "clientContext": { "clientBundleIdentifier": "com.apple.icloud.searchpartyuseragent", @@ -640,8 +640,8 @@ async def fetch_raw_reports( "startDate": start_ts, "startDateSecondary": start_ts, "endDate": end_ts, - # passing all keys as primary seems to work fine - "primaryIds": device_keys, + "primaryIds": device_keys[0], + "secondaryIds": device_keys[1], } for device_keys in devices ], @@ -679,90 +679,85 @@ async def _do_request() -> HttpResponse: msg = f"Failed to fetch reports: {resp.get('statusCode')}" raise UnhandledProtocolError(msg) - return resp["acsnLocations"] + # parse reports + reports: list[LocationReport] = [] + for key_reports in resp.get("acsnLocations", {}).get("locationPayload", []): + hashed_adv_key_bytes = base64.b64decode(key_reports["id"]) + + for report in key_reports.get("locationInfo", []): + payload = base64.b64decode(report) + loc_report = LocationReport(payload, hashed_adv_key_bytes) + + reports.append(loc_report) + + return reports @overload - async def fetch_reports( + async def fetch_location_history( self, keys: HasHashedPublicKey, - date_from: datetime, - date_to: datetime | None, ) -> list[LocationReport]: ... @overload - async def fetch_reports( + async def fetch_location_history( self, keys: RollingKeyPairSource, - date_from: datetime, - date_to: datetime | None, ) -> list[LocationReport]: ... @overload - async def fetch_reports( + async def fetch_location_history( self, keys: Sequence[HasHashedPublicKey | RollingKeyPairSource], - date_from: datetime, - date_to: datetime | None, ) -> dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]: ... - @require_login_state(LoginState.LOGGED_IN) @override - async def fetch_reports( + async def fetch_location_history( self, keys: HasHashedPublicKey - | RollingKeyPairSource - | Sequence[HasHashedPublicKey | RollingKeyPairSource], - date_from: datetime, - date_to: datetime | None, + | Sequence[HasHashedPublicKey | RollingKeyPairSource] + | RollingKeyPairSource, ) -> ( list[LocationReport] | dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]] ): - """See :meth:`BaseAppleAccount.fetch_reports`.""" - date_to = date_to or datetime.now().astimezone() - - return await self._reports.fetch_reports( - date_from, - date_to, - keys, - ) + """See `BaseAppleAccount.fetch_location_history`.""" + return await self._reports.fetch_location_history(keys) @overload - async def fetch_last_reports( + async def fetch_location( self, keys: HasHashedPublicKey, - hours: int = 7 * 24, - ) -> list[LocationReport]: ... + ) -> LocationReport | None: ... @overload - async def fetch_last_reports( + async def fetch_location( self, keys: RollingKeyPairSource, - hours: int = 7 * 24, - ) -> list[LocationReport]: ... + ) -> LocationReport | None: ... @overload - async def fetch_last_reports( + async def fetch_location( self, keys: Sequence[HasHashedPublicKey | RollingKeyPairSource], - hours: int = 7 * 24, - ) -> dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]: ... + ) -> dict[HasHashedPublicKey | RollingKeyPairSource, LocationReport | None]: ... @require_login_state(LoginState.LOGGED_IN) @override - async def fetch_last_reports( + async def fetch_location( self, keys: HasHashedPublicKey | RollingKeyPairSource | Sequence[HasHashedPublicKey | RollingKeyPairSource], - hours: int = 7 * 24, ) -> ( - list[LocationReport] | dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]] + LocationReport + | dict[HasHashedPublicKey | RollingKeyPairSource, LocationReport | None] + | None ): - """See :meth:`BaseAppleAccount.fetch_last_reports`.""" - end = datetime.now(tz=timezone.utc) - start = end - timedelta(hours=hours) + """See :meth:`BaseAppleAccount.fetch_location`.""" + hist = await self.fetch_location_history(keys) + if isinstance(hist, list): + return sorted(hist)[-1] if hist else None - return await self.fetch_reports(keys, start, end) + return {dev: sorted(reports)[-1] if reports else None for dev, reports in hist.items()} @require_login_state(LoginState.LOGGED_OUT, LoginState.REQUIRE_2FA, LoginState.LOGGED_IN) async def _gsa_authenticate( @@ -1101,78 +1096,71 @@ def td_2fa_submit(self, code: str) -> LoginState: return self._evt_loop.run_until_complete(coro) @overload - def fetch_reports( + def fetch_location_history( self, keys: HasHashedPublicKey, - date_from: datetime, - date_to: datetime | None, ) -> list[LocationReport]: ... @overload - def fetch_reports( + def fetch_location_history( self, keys: RollingKeyPairSource, - date_from: datetime, - date_to: datetime | None, ) -> list[LocationReport]: ... @overload - def fetch_reports( + def fetch_location_history( self, keys: Sequence[HasHashedPublicKey | RollingKeyPairSource], - date_from: datetime, - date_to: datetime | None, ) -> dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]: ... @override - def fetch_reports( + def fetch_location_history( self, keys: HasHashedPublicKey | Sequence[HasHashedPublicKey | RollingKeyPairSource] | RollingKeyPairSource, - date_from: datetime, - date_to: datetime | None, ) -> ( list[LocationReport] | dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]] ): - """See :meth:`AsyncAppleAccount.fetch_reports`.""" - coro = self._asyncacc.fetch_reports(keys, date_from, date_to) + """See `BaseAppleAccount.fetch_location_history`.""" + coro = self._asyncacc.fetch_location_history(keys) return self._evt_loop.run_until_complete(coro) @overload - def fetch_last_reports( + def fetch_location( self, keys: HasHashedPublicKey, - hours: int = 7 * 24, - ) -> list[LocationReport]: ... + ) -> LocationReport | None: ... @overload - def fetch_last_reports( + def fetch_location( self, keys: RollingKeyPairSource, - hours: int = 7 * 24, - ) -> list[LocationReport]: ... + ) -> LocationReport | None: ... @overload - def fetch_last_reports( + def fetch_location( self, keys: Sequence[HasHashedPublicKey | RollingKeyPairSource], - hours: int = 7 * 24, - ) -> dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]: ... + ) -> dict[HasHashedPublicKey | RollingKeyPairSource, LocationReport | None]: ... @override - def fetch_last_reports( + def fetch_location( self, keys: HasHashedPublicKey | RollingKeyPairSource | Sequence[HasHashedPublicKey | RollingKeyPairSource], - hours: int = 7 * 24, ) -> ( - list[LocationReport] | dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]] + LocationReport + | dict[HasHashedPublicKey | RollingKeyPairSource, LocationReport | None] + | None ): - """See :meth:`AsyncAppleAccount.fetch_last_reports`.""" - coro = self._asyncacc.fetch_last_reports(keys, hours) - return self._evt_loop.run_until_complete(coro) + """See :meth:`BaseAppleAccount.fetch_location`.""" + hist = self.fetch_location_history(keys) + if isinstance(hist, list): + return sorted(hist)[-1] if hist else None + + return {dev: sorted(reports)[-1] if reports else None for dev, reports in hist.items()} @override def get_anisette_headers( diff --git a/findmy/reports/reports.py b/findmy/reports/reports.py index a8062b8..3c4e77d 100644 --- a/findmy/reports/reports.py +++ b/findmy/reports/reports.py @@ -3,12 +3,13 @@ from __future__ import annotations import base64 +import bisect import hashlib import logging import struct from collections import defaultdict from datetime import datetime, timedelta, timezone -from typing import TYPE_CHECKING, Literal, TypedDict, Union, cast, overload +from typing import TYPE_CHECKING, Literal, TypedDict, Union, overload from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import ec @@ -16,7 +17,7 @@ from typing_extensions import override from findmy.accessory import RollingKeyPairSource -from findmy.keys import HasHashedPublicKey, KeyPair, KeyPairMapping +from findmy.keys import HasHashedPublicKey, KeyPair, KeyPairMapping, KeyType from findmy.util.abc import Serializable from findmy.util.files import read_data_json, save_and_return_json @@ -337,33 +338,25 @@ def __init__(self, account: AsyncAppleAccount) -> None: self._account: AsyncAppleAccount = account @overload - async def fetch_reports( + async def fetch_location_history( self, - date_from: datetime, - date_to: datetime, device: HasHashedPublicKey, ) -> list[LocationReport]: ... @overload - async def fetch_reports( + async def fetch_location_history( self, - date_from: datetime, - date_to: datetime, device: RollingKeyPairSource, ) -> list[LocationReport]: ... @overload - async def fetch_reports( + async def fetch_location_history( self, - date_from: datetime, - date_to: datetime, device: Sequence[HasHashedPublicKey | RollingKeyPairSource], ) -> dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]: ... - async def fetch_reports( # noqa: C901 + async def fetch_location_history( self, - date_from: datetime, - date_to: datetime, device: HasHashedPublicKey | RollingKeyPairSource | Sequence[HasHashedPublicKey | RollingKeyPairSource], @@ -371,110 +364,152 @@ async def fetch_reports( # noqa: C901 list[LocationReport] | dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]] ): """ - Fetch location reports for a certain device. + Fetch location history for a certain device or multiple devices. When `device` is a single :class:`HasHashedPublicKey`, this method will return a list of location reports corresponding to that key. - When `device` is a :class:`RollingKeyPairSource`, it will return a list of - location reports corresponding to that source. + When `device` is a :class:`RollingKeyPairSource`, it will return a list of location + reports corresponding to that source. When `device` is a sequence of :class:`HasHashedPublicKey`s or RollingKeyPairSource's, - it will return a dictionary with the provided object - as key, and a list of location reports as value. + it will return a dictionary with the provided objects + as keys, and a list of location reports as value. + + Note that the location history of :class:`RollingKeyPairSource` devices is not guaranteed + to be complete, and may be missing certain historical reports. The most recent report is + however guaranteed to be in line with what Apple reports. """ - key_devs: dict[HasHashedPublicKey, HasHashedPublicKey | RollingKeyPairSource] = {} - key_batches: list[list[HasHashedPublicKey]] = [] if isinstance(device, HasHashedPublicKey): # single key - key_devs = {device: device} - key_batches.append([device]) - elif isinstance(device, RollingKeyPairSource): + key_reports = await self._fetch_key_reports([device]) + return key_reports.get(device, []) + + if isinstance(device, RollingKeyPairSource): # key generator - # add 12h margin to the generator - keys = device.keys_between( - date_from - timedelta(hours=12), - date_to + timedelta(hours=12), - ) - key_devs = dict.fromkeys(keys, device) - key_batches.append(list(keys)) - elif isinstance(device, list) and all( + return await self._fetch_accessory_report(device) + + if not isinstance(device, list) or not all( isinstance(x, HasHashedPublicKey | RollingKeyPairSource) for x in device ): - # multiple key generators - # add 12h margin to each generator - device = cast("list[HasHashedPublicKey | RollingKeyPairSource]", device) - for dev in device: - if isinstance(dev, HasHashedPublicKey): - key_devs[dev] = dev - key_batches.append([dev]) - elif isinstance(dev, RollingKeyPairSource): - keys = dev.keys_between( - date_from - timedelta(hours=12), - date_to + timedelta(hours=12), - ) - for key in keys: - key_devs[key] = dev - key_batches.append(list(keys)) - else: - msg = "Unknown device type: %s" - raise ValueError(msg, type(device)) - - # sequence of keys (fetch 256 max at a time) - key_reports: dict[HasHashedPublicKey, list[LocationReport]] = await self._fetch_reports( - date_from, - date_to, - key_batches, - ) + # unsupported type + msg = "Device must be a HasHashedPublicKey, RollingKeyPairSource, or list thereof." + raise ValueError(msg) + + # multiple key generators + # we can batch static keys in a single request, + # but key generators need to be queried separately + static_keys: list[HasHashedPublicKey] = [] + reports: dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]] = { + dev: [] for dev in device + } + for dev in device: + if isinstance(dev, HasHashedPublicKey): + # save for later batch request + static_keys.append(dev) + elif isinstance(dev, RollingKeyPairSource): + # query immediately + reports[dev] = await self._fetch_accessory_report(dev) + + if static_keys: # batch request for static keys + key_reports = await self._fetch_key_reports(static_keys) + reports.update(dict(key_reports.items())) - # combine (key -> list[report]) and (key -> device) into (device -> list[report]) - device_reports = defaultdict(list) - for key, reports in key_reports.items(): - device_reports[key_devs[key]].extend(reports) - for dev in device_reports: - device_reports[dev] = sorted(device_reports[dev]) - - # result - if isinstance(device, (HasHashedPublicKey, RollingKeyPairSource)): - # single key or generator - return device_reports[device] - # multiple static keys or key generators - return device_reports - - async def _fetch_reports( + return reports + + async def _fetch_accessory_report( self, - date_from: datetime, - date_to: datetime, - device_keys: Sequence[Sequence[HasHashedPublicKey]], - ) -> dict[HasHashedPublicKey, list[LocationReport]]: - logger.debug("Fetching reports for %s device(s)", len(device_keys)) + accessory: RollingKeyPairSource, + ) -> list[LocationReport]: + logger.debug("Fetching location report for accessory") - # lock requested time range to the past 7 days, +- 12 hours, then filter the response. - # this is due to an Apple backend bug where the time range is not respected. - # More info: https://github.com/biemster/FindMy/issues/7 now = datetime.now().astimezone() - start_date = now - timedelta(days=7, hours=12) - end_date = now + timedelta(hours=12) - ids = [[key.hashed_adv_key_b64 for key in keys] for keys in device_keys] - data = await self._account.fetch_raw_reports(start_date, end_date, ids) + start_date = now - timedelta(days=7) + end_date = now + + # mappings + key_to_ind: dict[KeyPair, set[int]] = defaultdict(set) + id_to_key: dict[bytes, KeyPair] = {} + + # state variables + cur_keys_primary: set[str] = set() + cur_keys_secondary: set[str] = set() + cur_index = accessory.get_min_index(start_date) + ret: set[LocationReport] = set() + + async def _fetch() -> set[LocationReport]: + """Fetch current keys and add them to final reports.""" + new_reports: list[LocationReport] = await self._account.fetch_raw_reports( + [(list(cur_keys_primary), (list(cur_keys_secondary)))] + ) + logger.info("Fetched %d new reports (index %i)", len(new_reports), cur_index) - id_to_key: dict[bytes, HasHashedPublicKey] = { - key.hashed_adv_key_bytes: key for keys in device_keys for key in keys - } - reports: dict[HasHashedPublicKey, list[LocationReport]] = defaultdict(list) - for key_reports in data.get("locationPayload", []): - hashed_adv_key_bytes = base64.b64decode(key_reports["id"]) - key = id_to_key[hashed_adv_key_bytes] + for report in new_reports: + key = id_to_key[report.hashed_adv_key_bytes] + report.decrypt(key) - for report in key_reports.get("locationInfo", []): - payload = base64.b64decode(report) - loc_report = LocationReport(payload, hashed_adv_key_bytes) + # update alignment data on every report + # if a key maps to multiple indices, only feed it the maximum index, + # since apple only returns the latest reports per request. + # This makes the value more likely to be stable. + accessory.update_alignment(report.timestamp, max(key_to_ind[key])) + + cur_keys_primary.clear() + cur_keys_secondary.clear() + + return set(new_reports) + + while cur_index <= accessory.get_max_index(end_date): + key_batch = accessory.keys_at(cur_index) + + # split into primary and secondary keys + # (UNKNOWN keys are filed as primary) + new_keys_primary: set[str] = { + key.hashed_adv_key_b64 for key in key_batch if key.key_type == KeyType.PRIMARY + } + new_keys_secondary: set[str] = { + key.hashed_adv_key_b64 for key in key_batch if key.key_type != KeyType.PRIMARY + } + + # 290 seems to be the maximum number of keys that Apple accepts in a single request, + # so if adding the new keys would exceed that, fire a request first + if ( + len(cur_keys_primary | new_keys_primary) > 290 + or len(cur_keys_secondary | new_keys_secondary) > 290 + ): + ret |= await _fetch() + + # build mappings before adding to current keys + for key in key_batch: + key_to_ind[key].add(cur_index) + id_to_key[key.hashed_adv_key_bytes] = key + cur_keys_primary |= new_keys_primary + cur_keys_secondary |= new_keys_secondary + + cur_index += 1 + + if cur_keys_primary or cur_keys_secondary: + # fetch remaining keys + ret |= await _fetch() + + return sorted(ret) + + async def _fetch_key_reports( + self, + keys: Sequence[HasHashedPublicKey], + ) -> dict[HasHashedPublicKey, list[LocationReport]]: + logger.debug("Fetching reports for %s key(s)", len(keys)) - if loc_report.timestamp < date_from or loc_report.timestamp > date_to: - continue + # fetch all as primary keys + ids = [([key.hashed_adv_key_b64], []) for key in keys] + encrypted_reports: list[LocationReport] = await self._account.fetch_raw_reports(ids) - # pre-decrypt if possible - if isinstance(key, KeyPair): - loc_report.decrypt(key) + id_to_key: dict[bytes, HasHashedPublicKey] = {key.hashed_adv_key_bytes: key for key in keys} + reports: dict[HasHashedPublicKey, list[LocationReport]] = {key: [] for key in keys} + for report in encrypted_reports: + key = id_to_key[report.hashed_adv_key_bytes] + bisect.insort(reports[key], report) - reports[key].append(loc_report) + # pre-decrypt report if possible + if isinstance(key, KeyPair): + report.decrypt(key) return reports diff --git a/findmy/scanner/scanner.py b/findmy/scanner/scanner.py index ad50589..1f9d7b6 100644 --- a/findmy/scanner/scanner.py +++ b/findmy/scanner/scanner.py @@ -6,7 +6,7 @@ import logging import time from abc import ABC, abstractmethod -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from typing import TYPE_CHECKING, Any from bleak import BleakScanner @@ -152,12 +152,22 @@ def is_from(self, other_device: HasPublicKey | RollingKeyPairSource) -> bool: if isinstance(other_device, HasPublicKey): return other_device.adv_key_bytes.startswith(self._first_adv_key_bytes) if isinstance(other_device, RollingKeyPairSource): - # 1 hour margin around the detected time + # 12 hour margin around the detected time potential_keys = other_device.keys_between( - self.detected_at - timedelta(hours=1), - self.detected_at + timedelta(hours=1), + self.detected_at - timedelta(hours=12), + self.detected_at + timedelta(hours=12), ) - return any(self.is_from(key) for key in potential_keys) + for ind, key in potential_keys: + if not self.is_from(key): + continue + + # update alignment of found key + now = datetime.now(tz=timezone.utc) + other_device.update_alignment(now, ind) + + return True + + return False msg = f"Cannot compare against {type(other_device)}" raise ValueError(msg) @@ -232,14 +242,24 @@ def adv_key_bytes(self) -> bytes: def is_from(self, other_device: HasPublicKey | RollingKeyPairSource) -> bool: """Check whether the OF device's identity originates from a specific key source.""" if isinstance(other_device, HasPublicKey): - return self.adv_key_bytes == other_device.adv_key_bytes + return other_device.adv_key_bytes == self.adv_key_bytes if isinstance(other_device, RollingKeyPairSource): # 12 hour margin around the detected time potential_keys = other_device.keys_between( self.detected_at - timedelta(hours=12), self.detected_at + timedelta(hours=12), ) - return any(self.is_from(key) for key in potential_keys) + for ind, key in potential_keys: + if not self.is_from(key): + continue + + # update alignment of found key + now = datetime.now(tz=timezone.utc) + other_device.update_alignment(now, ind) + + return True + + return False msg = f"Cannot compare against {type(other_device)}" raise ValueError(msg) diff --git a/findmy/util/files.py b/findmy/util/files.py index 1686bbf..a366f5a 100644 --- a/findmy/util/files.py +++ b/findmy/util/files.py @@ -3,6 +3,7 @@ from __future__ import annotations import json +import plistlib from collections.abc import Mapping from pathlib import Path from typing import TypeVar, cast @@ -32,3 +33,30 @@ def read_data_json(val: str | Path | _T) -> _T: val = cast("_T", json.loads(val.read_text())) return val + + +def save_and_return_plist(data: _T, dst: str | Path | None) -> _T: + """Save and return a Plist file.""" + if dst is None: + return data + + if isinstance(dst, str): + dst = Path(dst) + + dst.write_bytes(plistlib.dumps(data)) + + return data + + +def read_data_plist(val: str | Path | _T | bytes) -> _T: + """Read Plist data from a file if a path is passed, or return the argument itself.""" + if isinstance(val, str): + val = Path(val) + + if isinstance(val, Path): + val = val.read_bytes() + + if isinstance(val, bytes): + val = cast("_T", plistlib.loads(val)) + + return val diff --git a/pyproject.toml b/pyproject.toml index 275e7aa..40353c6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -72,6 +72,7 @@ ignore = [ "S101", # use of "assert" "D", # documentation "INP001", # namespacing + "ERA001", # commented out code ] "scripts/*" = [ "T201", # use of "print"