Coverage for odmpy/libby.py: 94.7%
319 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-14 08:51 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-14 08:51 +0000
1# Copyright (C) 2023 github.com/ping
2#
3# This file is part of odmpy.
4#
5# odmpy is free software: you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation, either version 3 of the License, or
8# (at your option) any later version.
9#
10# odmpy is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with odmpy. If not, see <http://www.gnu.org/licenses/>.
17#
18import json
19import logging
20import re
21import sys
22from collections import OrderedDict
23from datetime import datetime, timezone
24from enum import Enum
25from pathlib import Path
26from typing import Optional, NamedTuple, Dict, List, Tuple
27from typing import OrderedDict as OrderedDictType
28from urllib import request
29from urllib.parse import urljoin
32if sys.version_info >= (3, 8):
33 from typing import TypedDict
34else:
35 from typing_extensions import TypedDict
36import requests
37from requests.adapters import HTTPAdapter, Retry
39from .libby_errors import ClientConnectionError, ClientTimeoutError, ErrorHandler
41#
42# Client for the Libby web API, and helper functions to make sense
43# of the stuff returned
44#
47class ChapterMarker(NamedTuple):
48 title: str
49 part_name: str
50 start_second: float
51 end_second: float
54# TypedDict to hold the metadata about an audiobook part
55PartMeta = TypedDict(
56 "PartMeta",
57 {
58 "chapters": List[ChapterMarker],
59 "url": str,
60 "audio-duration": float,
61 "file-length": int,
62 "spine-position": int,
63 },
64)
67class LibbyFormats(str, Enum):
68 """
69 Format strings
70 """
72 AudioBookMP3 = "audiobook-mp3"
73 AudioBookOverDrive = "audiobook-overdrive" # not used
74 EBookEPubAdobe = "ebook-epub-adobe"
75 EBookEPubOpen = "ebook-epub-open"
76 EBookPDFAdobe = "ebook-pdf-adobe"
77 EBookPDFOpen = "ebook-pdf-open"
78 EBookKobo = "ebook-kobo" # not used
79 EBookKindle = "ebook-kindle" # not used
80 EBookOverdrive = "ebook-overdrive"
81 MagazineOverDrive = "magazine-overdrive"
83 def __str__(self):
84 return str(self.value)
87class LibbyMediaTypes(str, Enum):
88 """
89 Loan type strings
90 """
92 Audiobook = "audiobook"
93 EBook = "ebook"
94 Magazine = "magazine"
96 def __str__(self):
97 return str(self.value)
100FILE_PART_RE = re.compile(
101 r"(?P<part_name>{[A-F0-9\-]{36}}[^#]+)(#(?P<second_stamp>\d+(\.\d+)?))?$"
102)
103USER_AGENT = (
104 "Mozilla/5.0 (Macintosh; Intel Mac OS X 11_1) AppleWebKit/605.1.15 (KHTML, like Gecko) "
105 "Version/14.0.2 Safari/605.1.15"
106)
107EBOOK_DOWNLOADABLE_FORMATS = (
108 LibbyFormats.EBookEPubAdobe,
109 LibbyFormats.EBookEPubOpen,
110 LibbyFormats.EBookPDFAdobe,
111 LibbyFormats.EBookPDFOpen,
112)
113DOWNLOADABLE_FORMATS = (
114 LibbyFormats.AudioBookMP3,
115 LibbyFormats.EBookEPubAdobe,
116 LibbyFormats.EBookEPubOpen,
117 LibbyFormats.EBookPDFAdobe,
118 LibbyFormats.EBookPDFOpen,
119 LibbyFormats.MagazineOverDrive,
120)
123def parse_part_path(title: str, part_path: str) -> ChapterMarker:
124 """
125 Extracts chapter marker info from the part path,
126 e.g. {AAAAAAAA-BBBB-CCCC-9999-ABCDEF123456}Fmt425-Part03.mp3#3000
127 yields `ChapterMarker(title, "{AAAAAAAA-BBBB-CCCC-9999-ABCDEF123456}Fmt425-Part03.mp3", 3000)`
129 :param title:
130 :param part_path:
131 :return:
132 """
133 mobj = FILE_PART_RE.match(part_path)
134 if not mobj:
135 raise ValueError(f"Unexpected path format: {part_path}")
136 return ChapterMarker(
137 title=title,
138 part_name=mobj.group("part_name"),
139 start_second=float(mobj.group("second_stamp"))
140 if mobj.group("second_stamp")
141 else 0,
142 end_second=0,
143 )
146def parse_toc(
147 base_url: str, toc: List[Dict], spine: List[Dict]
148) -> OrderedDictType[str, PartMeta]:
149 """
150 Parses `openbook["nav"]["toc"]` and `openbook["spine"]` to a format
151 suitable for processing.
153 :param base_url:
154 :param toc:
155 :param spine:
156 :return:
157 """
158 entries: List[ChapterMarker] = []
159 for item in toc:
160 entries.append(parse_part_path(item["title"], item["path"]))
161 for content in item.get("contents", []):
162 # we use the original `item["title"]` instead of `content["title"]`
163 # so that we can de-dup these entries later
164 entries.append(parse_part_path(item["title"], content["path"]))
166 # use an OrderedDict to ensure that we can consistently test this
167 parsed_toc: OrderedDictType[str, PartMeta] = OrderedDict()
169 for entry in entries:
170 if entry.part_name not in parsed_toc:
171 parsed_toc[entry.part_name] = {
172 "chapters": [],
173 "url": "",
174 "audio-duration": 0,
175 "file-length": 0,
176 "spine-position": 0,
177 }
178 if not parsed_toc[entry.part_name]["chapters"]:
179 # first entry for the part_name
180 parsed_toc[entry.part_name]["chapters"].append(entry)
181 continue
182 # de-dup entries because OD sometimes generates timestamped chapter titles marks
183 # for the same chapter in the same part, e.g. "Chapter 2 (00:00)", "Chapter 2 (12:34)"
184 if entry.title == parsed_toc[entry.part_name]["chapters"][-1].title:
185 continue
186 parsed_toc[entry.part_name]["chapters"].append(entry)
188 for s in spine:
189 parsed_toc[s["-odread-original-path"]].update(
190 {
191 "url": urljoin(base_url, s["path"]),
192 "audio-duration": s["audio-duration"],
193 "file-length": s["-odread-file-bytes"],
194 "spine-position": s["-odread-spine-position"],
195 }
196 )
197 for chapter_mark in parsed_toc.values(): # type: PartMeta
198 chapters = chapter_mark["chapters"]
199 updated_chapters = []
200 for i, chapter in enumerate(chapters):
201 # update end_second mark
202 updated_chapter = ChapterMarker(
203 title=chapter.title,
204 part_name=chapter.part_name,
205 start_second=chapter.start_second,
206 end_second=(
207 chapters[i + 1].start_second
208 if i < (len(chapters) - 1)
209 else chapter_mark["audio-duration"]
210 ),
211 )
212 updated_chapters.append(updated_chapter)
213 chapter_mark["chapters"] = updated_chapters
215 return parsed_toc
218def merge_toc(toc: Dict) -> List[ChapterMarker]:
219 """
220 Generates a list of ChapterMarker for the merged audiobook based on the parsed toc.
222 :param toc: parsed toc
223 :return:
224 """
225 chapters = OrderedDict()
226 parts = list(toc.values())
227 for i, part in enumerate(parts):
228 cumu_part_duration = sum([p["audio-duration"] for p in parts[:i]])
229 for marker in part["chapters"]:
230 if marker.title not in chapters:
231 chapters[marker.title] = {
232 "start": cumu_part_duration + marker.start_second,
233 "end": 0,
234 }
235 chapters[marker.title]["end"] = cumu_part_duration + marker.end_second
237 return [
238 ChapterMarker(
239 title=title,
240 part_name="",
241 start_second=marker["start"],
242 end_second=marker["end"],
243 )
244 for title, marker in list(chapters.items())
245 ]
248class LibbyClient(object):
249 # Original reverse engineering of the libby endpoints is thanks to https://github.com/lullius/pylibby
250 def __init__(
251 self,
252 settings_folder: Optional[str] = None,
253 identity_token: Optional[str] = None,
254 max_retries: int = 0,
255 timeout: int = 10,
256 logger: Optional[logging.Logger] = None,
257 **kwargs,
258 ) -> None:
259 if not logger:
260 logger = logging.getLogger(__name__)
261 self.logger = logger
262 self.settings_folder = Path(settings_folder) if settings_folder else None
263 if self.settings_folder and not self.settings_folder.exists():
264 self.settings_folder.mkdir(parents=True, exist_ok=True)
266 self.timeout = timeout
267 self.identity_token = identity_token
268 self.identity = {}
269 self.identity_settings_file = (
270 self.settings_folder.joinpath("libby.json")
271 if self.settings_folder
272 else None
273 )
274 if self.identity_settings_file and self.identity_settings_file.exists():
275 with self.identity_settings_file.open("r", encoding="utf-8") as f:
276 self.identity = json.load(f)
278 # migrate old sync code storage key
279 if self.identity_settings_file and self.identity.get("__odmpy_sync_code"):
280 if not self.identity.get("__libby_sync_code"):
281 self.identity["__libby_sync_code"] = self.identity["__odmpy_sync_code"]
282 del self.identity["__odmpy_sync_code"]
283 with self.identity_settings_file.open("w", encoding="utf-8") as f:
284 json.dump(self.identity, f)
286 self.max_retries = max_retries
287 libby_session = requests.Session()
288 adapter = HTTPAdapter(max_retries=Retry(total=max_retries, backoff_factor=0.1))
289 for prefix in ("http://", "https://"):
290 libby_session.mount(prefix, adapter)
291 self.libby_session = libby_session
292 self.user_agent = kwargs.pop("user_agent", USER_AGENT)
293 self.api_base = "https://sentry-read.svc.overdrive.com/"
295 @staticmethod
296 def is_valid_sync_code(code: str) -> bool:
297 return code.isdigit() and len(code) == 8
299 def default_headers(self) -> Dict:
300 """
301 Default HTTP headers.
303 :return:
304 """
305 return {
306 "User-Agent": self.user_agent,
307 "Accept": "application/json",
308 "Cache-Control": "no-cache",
309 "Pragma": "no-cache",
310 }
312 def make_request(
313 self,
314 endpoint: str,
315 params: Optional[Dict] = None,
316 data: Optional[Dict] = None,
317 json_data: Optional[Dict] = None,
318 headers: Optional[Dict] = None,
319 method: Optional[str] = None,
320 authenticated: bool = True,
321 session: Optional[requests.sessions.Session] = None,
322 return_res: bool = False,
323 allow_redirects: bool = True,
324 ):
325 endpoint_url = urljoin(self.api_base, endpoint)
326 if not method:
327 # try to set an HTTP method
328 if data is not None:
329 method = "POST"
330 else:
331 method = "GET"
332 if headers is None:
333 headers = self.default_headers()
334 if authenticated and self.get_token():
335 headers["Authorization"] = f"Bearer {self.get_token()}"
337 req = requests.Request(
338 method,
339 endpoint_url,
340 headers=headers,
341 params=params,
342 data=data,
343 json=json_data,
344 )
345 if not session:
346 # default session
347 session = self.libby_session
349 try:
350 res = session.send(
351 session.prepare_request(req),
352 timeout=self.timeout,
353 allow_redirects=allow_redirects,
354 )
355 self.logger.debug("body: %s", res.text)
357 res.raise_for_status()
358 if return_res:
359 return res
360 return res.json()
361 except requests.ConnectionError as conn_err:
362 raise ClientConnectionError(str(conn_err)) from conn_err
363 except requests.Timeout as timeout_err:
364 raise ClientTimeoutError(str(timeout_err)) from timeout_err
365 except requests.HTTPError as http_err:
366 ErrorHandler.process(http_err)
368 def save_settings(self, updates: Dict) -> None:
369 """
370 Persist identity settings.
372 :param updates:
373 :return:
374 """
375 self.identity.update(updates)
376 if not self.identity_settings_file:
377 raise ValueError(
378 "Unable to save settings because settings_folder is not defined"
379 )
380 with open(self.identity_settings_file, "w", encoding="utf-8") as f:
381 json.dump(self.identity, f)
383 def clear_settings(self) -> None:
384 """
385 Wipe previously saved settings.
387 :return:
388 """
389 if self.identity_settings_file and self.identity_settings_file.exists():
390 self.identity_settings_file.unlink()
391 self.identity = {}
393 def has_chip(self) -> bool:
394 """
395 Check if client has identity token chip.
397 :return:
398 """
399 return bool(self.identity.get("identity"))
401 def has_sync_code(self) -> bool:
402 """
403 Check if client has linked account.
405 :return:
406 """
407 return bool(
408 self.identity.get("__libby_sync_code")
409 or self.identity.get("__odmpy_sync_code") # for backwards compat
410 )
412 def get_chip(self, auto_save: bool = True, authenticated: bool = False) -> Dict:
413 """
414 Get an identity chip (contains auth token).
416 :param auto_save:
417 :param authenticated:
418 :return:
419 """
420 res: Dict = self.make_request(
421 "chip",
422 params={"client": "dewey"},
423 method="POST",
424 authenticated=authenticated,
425 )
426 if auto_save:
427 # persist to settings
428 self.save_settings(res)
429 return res
431 def get_token(self) -> Optional[str]:
432 return self.identity_token or self.identity.get("identity")
434 def clone_by_code(self, code: str, auto_save: bool = True) -> Dict:
435 """
436 Link account to identy token retrieved in `get_chip()`.
438 :param code:
439 :param auto_save:
440 :return:
441 """
442 if not self.is_valid_sync_code(code):
443 raise ValueError(f"Invalid code: {code}")
445 res: Dict = self.make_request("chip/clone/code", data={"code": code})
446 if auto_save:
447 # persist to settings
448 self.save_settings({"__libby_sync_code": code})
449 return res
451 def sync(self) -> Dict:
452 """
453 Get the user account state, which includes loans, holds, etc.
455 :return:
456 """
457 res: Dict = self.make_request("chip/sync")
458 return res
460 def auth_form(self, website_id) -> Dict:
461 """
462 Get the parameters required to link a card.
464 :param website_id: Can be gotten from the OverDrive api or "cards" in the sync response
465 :return:
466 """
467 res: Dict = self.make_request(f"auth/forms/{website_id}", authenticated=False)
468 return res
470 def link_card(
471 self, website_id: str, username: str, password: str, ils: str = "default"
472 ) -> Dict:
473 """
474 Used to add or verify an existing card. Not currently used by odmpy, not tested.
475 Returns a similar response as sync but with just the one card linked.
477 :param website_id: Can be gotten from the OverDrive api or "cards" in the sync response
478 :param username:
479 :param password: Or pin.
480 :param ils: Can be gotten from auth_form() or "cards" dict in the sync response
481 :return:
482 """
483 data = {"ils": ils, "username": username, "password": password}
484 res: Dict = self.make_request(
485 f"auth/link/{website_id}", json_data=data, method="POST"
486 )
487 return res
489 def update_card_name(self, card_id: str, card_name: str) -> Dict:
490 """
491 Update a card's name
493 :param card_id:
494 :param card_name:
495 :return:
496 """
497 res: Dict = self.make_request(
498 f"card/{card_id}", params={"card_name": card_name}, method="PUT"
499 )
500 return res
502 def is_logged_in(self) -> bool:
503 """
504 Check if successfully logged in.
506 :return:
507 """
508 synced_state = self.sync()
509 return synced_state.get("result", "") == "synchronized" and bool(
510 synced_state.get("cards")
511 )
513 @staticmethod
514 def is_downloadable_audiobook_loan(book: Dict) -> bool:
515 """
516 Verify if book is a downloadable audiobook.
518 :param book:
519 :return:
520 """
521 return bool(
522 [f for f in book.get("formats", []) if f["id"] == LibbyFormats.AudioBookMP3]
523 )
525 @staticmethod
526 def is_downloadable_ebook_loan(book: Dict) -> bool:
527 """
528 Verify if book is a downloadable ebook.
530 :param book:
531 :return:
532 """
533 return bool(
534 [
535 f
536 for f in book.get("formats", [])
537 if f["id"] in EBOOK_DOWNLOADABLE_FORMATS
538 ]
539 )
541 @staticmethod
542 def is_downloadable_magazine_loan(book: Dict) -> bool:
543 """
544 Verify if loan is a downloadable magazine.
546 :param book:
547 :return:
548 """
549 return bool(
550 [
551 f
552 for f in book.get("formats", [])
553 if f["id"] == LibbyFormats.MagazineOverDrive
554 ]
555 )
557 @staticmethod
558 def has_format(loan: Dict, format_id: str) -> bool:
559 return bool(
560 next(iter([f["id"] for f in loan["formats"] if f["id"] == format_id]), None)
561 )
563 @staticmethod
564 def get_loan_format(loan: Dict, prefer_open_format: bool = True) -> str:
565 locked_in_format = next(
566 iter([f["id"] for f in loan["formats"] if f.get("isLockedIn")]), None
567 )
568 if locked_in_format:
569 if locked_in_format in DOWNLOADABLE_FORMATS:
570 return locked_in_format
571 raise ValueError(
572 f'Loan is locked to a non-downloadable format "{locked_in_format}"'
573 )
575 if not locked_in_format:
576 # the order of these checks will determine the output format
577 # the "open" version of the format (example open epub, open pdf) should
578 # be prioritised
579 if LibbyClient.is_downloadable_audiobook_loan(
580 loan
581 ) and LibbyClient.has_format(loan, LibbyFormats.AudioBookMP3):
582 return LibbyFormats.AudioBookMP3
583 elif (
584 LibbyClient.is_open_ebook_loan(loan)
585 and LibbyClient.has_format(loan, LibbyFormats.EBookEPubOpen)
586 and prefer_open_format
587 ):
588 return LibbyFormats.EBookEPubOpen
589 elif LibbyClient.is_downloadable_magazine_loan(
590 loan
591 ) and LibbyClient.has_format(loan, LibbyFormats.MagazineOverDrive):
592 return LibbyFormats.MagazineOverDrive
593 elif LibbyClient.is_downloadable_ebook_loan(
594 loan
595 ) and LibbyClient.has_format(loan, LibbyFormats.EBookEPubAdobe):
596 return LibbyFormats.EBookEPubAdobe
597 elif (
598 LibbyClient.is_downloadable_ebook_loan(loan)
599 and LibbyClient.has_format(loan, LibbyFormats.EBookPDFOpen)
600 and prefer_open_format
601 ):
602 return LibbyFormats.EBookPDFOpen
603 elif LibbyClient.is_downloadable_ebook_loan(
604 loan
605 ) and LibbyClient.has_format(loan, LibbyFormats.EBookPDFAdobe):
606 return LibbyFormats.EBookPDFAdobe
608 raise ValueError("Unable to find a downloadable format")
610 @staticmethod
611 def is_open_ebook_loan(book: Dict) -> bool:
612 """
613 Verify if book is an open epub.
615 :param book:
616 :return:
617 """
618 return bool(
619 [
620 f
621 for f in book.get("formats", [])
622 if f["id"] == LibbyFormats.EBookEPubOpen
623 ]
624 )
626 @staticmethod
627 def parse_datetime(value: str) -> datetime: # type: ignore[return]
628 """
629 Parses a datetime string from the API into a datetime.
631 :param value:
632 :return:
633 """
634 formats = (
635 "%Y-%m-%dT%H:%M:%SZ",
636 "%Y-%m-%dT%H:%M:%S.%fZ",
637 "%Y-%m-%dT%H:%M:%S%z",
638 "%Y-%m-%dT%H:%M:%S.%f%z",
639 "%m/%d/%Y", # publishDateText
640 )
641 for fmt in formats:
642 try:
643 dt = datetime.strptime(value, fmt)
644 if not dt.tzinfo:
645 dt = dt.replace(tzinfo=timezone.utc)
646 return dt
647 except ValueError:
648 pass
650 raise ValueError(f"time data '{value}' does not match known formats {formats}")
652 @staticmethod
653 def is_renewable(loan: Dict) -> bool:
654 """
655 Check if loan can be renewed.
657 :param loan:
658 :return:
659 """
660 if not loan.get("renewableOn"):
661 raise ValueError("Unable to get renewable date")
662 # Example: 2023-02-23T07:33:55Z
663 return LibbyClient.parse_datetime(loan["renewableOn"]) <= datetime.now(
664 tz=timezone.utc
665 )
667 def get_loans(self) -> List[Dict]:
668 """
669 Get loans
671 :return:
672 """
673 return self.sync().get("loans", [])
675 def get_holds(self) -> List[Dict]:
676 """
677 Get holds
679 :return:
680 """
681 return self.sync().get("holds", [])
683 def get_downloadable_audiobook_loans(self) -> List[Dict]:
684 """
685 Get downloadable audiobook loans.
687 :return:
688 """
689 return [
690 book
691 for book in self.sync().get("loans", [])
692 if self.is_downloadable_audiobook_loan(book)
693 ]
695 def fulfill(self, loan_id: str, card_id: str, format_id: str) -> Dict:
696 """
697 Get the fulfillment details for a loan.
699 :param loan_id:
700 :param card_id:
701 :param format_id:
702 :return:
703 """
704 if format_id not in DOWNLOADABLE_FORMATS:
705 raise ValueError(f"Invalid format_id: {format_id}")
706 res: Dict = self.make_request(
707 f"card/{card_id}/loan/{loan_id}/fulfill/{format_id}",
708 return_res=True,
709 )
710 return res
712 @staticmethod
713 def _urlretrieve(
714 endpoint: str, headers: Optional[Dict] = None, timeout: int = 15
715 ) -> bytes:
716 """
717 Workaround for downloading an open (non-drm) epub or pdf.
719 The fulfillment url 403s when using requests but
720 works in curl, request.urlretrieve, etc.
722 GET API fulfill endpoint -> 302 https://fulfill.contentreserve.com (fulfillment url)
723 GET https://fulfill.contentreserve.com -> 302 https://openepub-gk.cdn.overdrive.com
724 GET https://openepub-gk.cdn.overdrive.com 403
726 Fresh session doesn't work either, headers doesn't seem to
727 matter.
729 .. code-block:: python
730 sess = requests.Session()
731 sess.headers.update({"User-Agent": USER_AGENT})
732 res = sess.get(res_redirect.headers["Location"], timeout=self.timeout)
733 res.raise_for_status()
734 return res.content
736 :param endpoint: fulfillment url
737 :param headers:
738 :param timeout:
739 :return:
740 """
741 if not headers:
742 headers = {}
744 opener = request.build_opener()
745 req = request.Request(endpoint, headers=headers)
746 res = opener.open(req, timeout=timeout)
747 return res.read()
749 def fulfill_loan_file(self, loan_id: str, card_id: str, format_id: str) -> bytes:
750 """
751 Returns the loan file contents directly for MP3 audiobooks (.odm)
752 and DRM epub (.acsm) loans.
753 For open epub/pdf loans, the actual epub/pdf contents are returned.
755 :param loan_id:
756 :param card_id:
757 :param format_id:
758 :return:
759 """
760 if format_id not in DOWNLOADABLE_FORMATS:
761 raise ValueError(f"Unsupported format_id: {format_id}")
763 headers = self.default_headers()
764 headers["Accept"] = "*/*"
766 if format_id in (LibbyFormats.EBookEPubOpen, LibbyFormats.EBookPDFOpen):
767 res_redirect: requests.Response = self.make_request(
768 f"card/{card_id}/loan/{loan_id}/fulfill/{format_id}",
769 headers=headers,
770 return_res=True,
771 allow_redirects=False,
772 )
773 return self._urlretrieve(
774 res_redirect.headers["Location"], headers=headers, timeout=self.timeout
775 )
777 res: requests.Response = self.make_request(
778 f"card/{card_id}/loan/{loan_id}/fulfill/{format_id}",
779 headers=headers,
780 return_res=True,
781 )
782 return res.content
784 def open_loan(self, loan_type: str, card_id: str, title_id: str) -> Dict:
785 """
786 Gets the meta urls needed to fulfill a loan.
788 :param loan_type:
789 :param card_id:
790 :param title_id:
791 :return:
792 """
793 res: Dict = self.make_request(
794 f"open/{loan_type}/card/{card_id}/title/{title_id}"
795 )
796 return res
798 def prepare_loan(self, loan: Dict) -> Tuple[str, Dict]:
799 """
800 Pre-requisite step for processing a loan.
802 :param loan:
803 :return:
804 """
805 loan_type = "book"
806 if loan["type"]["id"] == LibbyMediaTypes.Audiobook:
807 loan_type = "audiobook"
808 if loan["type"]["id"] == LibbyMediaTypes.Magazine:
809 loan_type = "magazine"
810 card_id = loan["cardId"]
811 title_id = loan["id"]
812 meta = self.open_loan(loan_type, card_id, title_id)
813 download_base: str = meta["urls"]["web"]
815 # Sets a needed cookie
816 web_url = download_base + "?" + meta["message"]
817 _ = self.make_request(
818 web_url,
819 headers={"Accept": "*/*"},
820 method="HEAD",
821 authenticated=False,
822 return_res=True,
823 )
824 return download_base, meta
826 def process_audiobook(
827 self, loan: Dict
828 ) -> Tuple[Dict, OrderedDictType[str, PartMeta]]:
829 """
830 Returns the data needed to download an audiobook.
832 :param loan:
833 :return:
834 """
835 download_base, meta = self.prepare_loan(loan)
836 # contains nav/toc and spine
837 openbook = self.make_request(meta["urls"]["openbook"])
838 toc = parse_toc(download_base, openbook["nav"]["toc"], openbook["spine"])
839 return openbook, toc
841 def process_ebook(self, loan: Dict) -> Tuple[str, Dict, List[Dict]]:
842 """
843 Returns the data needed to download an ebook directly.
845 :param loan:
846 :return:
847 """
848 download_base, meta = self.prepare_loan(loan)
849 # contains nav/toc and spine, manifest
850 openbook = self.make_request(meta["urls"]["openbook"])
851 rosters: List[Dict] = self.make_request(meta["urls"]["rosters"])
852 return download_base, openbook, rosters
854 def return_title(self, title_id: str, card_id: str) -> None:
855 """
856 Return a title.
858 :param title_id:
859 :param card_id:
860 :return:
861 """
862 self.make_request(
863 f"card/{card_id}/loan/{title_id}", method="DELETE", return_res=True
864 )
866 def return_loan(self, loan: Dict) -> None:
867 """
868 Return a loan.
870 :param loan:
871 :return:
872 """
873 self.return_title(loan["id"], loan["cardId"])
875 def borrow_title(
876 self, title_id: str, title_format: str, card_id: str, days: int = 21
877 ) -> Dict:
878 """
879 Return a title.
881 :param title_id:
882 :param title_format: Type ID
883 :param card_id:
884 :param days:
885 :return:
886 """
887 data = {
888 "period": days,
889 "units": "days",
890 "lucky_day": None,
891 "title_format": title_format,
892 }
894 res: Dict = self.make_request(
895 f"card/{card_id}/loan/{title_id}", json_data=data, method="POST"
896 )
897 return res
899 def borrow_hold(self, hold: Dict) -> Dict:
900 """
901 Borrow a hold.
903 :param hold:
904 :return:
905 """
906 return self.borrow_title(hold["id"], hold["type"]["id"], hold["cardId"])
908 def renew_title(
909 self, title_id: str, title_format: str, card_id: str, days: int = 21
910 ) -> Dict:
911 """
912 Renew a title.
914 :param title_id:
915 :param title_format: Type ID
916 :param card_id:
917 :param days:
918 :return:
919 """
920 data = {
921 "period": days,
922 "units": "days",
923 "lucky_day": None,
924 "title_format": title_format,
925 }
927 res: Dict = self.make_request(
928 f"card/{card_id}/loan/{title_id}", json_data=data, method="PUT"
929 )
930 return res
932 def renew_loan(self, loan: Dict) -> Dict:
933 """
934 Renew a loan.
936 :param loan:
937 :return:
938 """
939 return self.renew_title(loan["id"], loan["type"]["id"], loan["cardId"])
941 def create_hold(self, title_id: str, card_id: str) -> Dict:
942 """
943 Create a hold on the title.
945 :param title_id:
946 :param card_id:
947 :return:
948 """
949 res: Dict = self.make_request(
950 f"card/{card_id}/hold/{title_id}",
951 json_data={"days_to_suspend": 0, "email_address": ""},
952 method="POST",
953 )
954 return res