Coverage for odmpy/libby.py: 94.7%

319 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-14 08:51 +0000

1# Copyright (C) 2023 github.com/ping 

2# 

3# This file is part of odmpy. 

4# 

5# odmpy is free software: you can redistribute it and/or modify 

6# it under the terms of the GNU General Public License as published by 

7# the Free Software Foundation, either version 3 of the License, or 

8# (at your option) any later version. 

9# 

10# odmpy is distributed in the hope that it will be useful, 

11# but WITHOUT ANY WARRANTY; without even the implied warranty of 

12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

13# GNU General Public License for more details. 

14# 

15# You should have received a copy of the GNU General Public License 

16# along with odmpy. If not, see <http://www.gnu.org/licenses/>. 

17# 

18import json 

19import logging 

20import re 

21import sys 

22from collections import OrderedDict 

23from datetime import datetime, timezone 

24from enum import Enum 

25from pathlib import Path 

26from typing import Optional, NamedTuple, Dict, List, Tuple 

27from typing import OrderedDict as OrderedDictType 

28from urllib import request 

29from urllib.parse import urljoin 

30 

31 

32if sys.version_info >= (3, 8): 

33 from typing import TypedDict 

34else: 

35 from typing_extensions import TypedDict 

36import requests 

37from requests.adapters import HTTPAdapter, Retry 

38 

39from .libby_errors import ClientConnectionError, ClientTimeoutError, ErrorHandler 

40 

41# 

42# Client for the Libby web API, and helper functions to make sense 

43# of the stuff returned 

44# 

45 

46 

47class ChapterMarker(NamedTuple): 

48 title: str 

49 part_name: str 

50 start_second: float 

51 end_second: float 

52 

53 

54# TypedDict to hold the metadata about an audiobook part 

55PartMeta = TypedDict( 

56 "PartMeta", 

57 { 

58 "chapters": List[ChapterMarker], 

59 "url": str, 

60 "audio-duration": float, 

61 "file-length": int, 

62 "spine-position": int, 

63 }, 

64) 

65 

66 

67class LibbyFormats(str, Enum): 

68 """ 

69 Format strings 

70 """ 

71 

72 AudioBookMP3 = "audiobook-mp3" 

73 AudioBookOverDrive = "audiobook-overdrive" # not used 

74 EBookEPubAdobe = "ebook-epub-adobe" 

75 EBookEPubOpen = "ebook-epub-open" 

76 EBookPDFAdobe = "ebook-pdf-adobe" 

77 EBookPDFOpen = "ebook-pdf-open" 

78 EBookKobo = "ebook-kobo" # not used 

79 EBookKindle = "ebook-kindle" # not used 

80 EBookOverdrive = "ebook-overdrive" 

81 MagazineOverDrive = "magazine-overdrive" 

82 

83 def __str__(self): 

84 return str(self.value) 

85 

86 

87class LibbyMediaTypes(str, Enum): 

88 """ 

89 Loan type strings 

90 """ 

91 

92 Audiobook = "audiobook" 

93 EBook = "ebook" 

94 Magazine = "magazine" 

95 

96 def __str__(self): 

97 return str(self.value) 

98 

99 

100FILE_PART_RE = re.compile( 

101 r"(?P<part_name>{[A-F0-9\-]{36}}[^#]+)(#(?P<second_stamp>\d+(\.\d+)?))?$" 

102) 

103USER_AGENT = ( 

104 "Mozilla/5.0 (Macintosh; Intel Mac OS X 11_1) AppleWebKit/605.1.15 (KHTML, like Gecko) " 

105 "Version/14.0.2 Safari/605.1.15" 

106) 

107EBOOK_DOWNLOADABLE_FORMATS = ( 

108 LibbyFormats.EBookEPubAdobe, 

109 LibbyFormats.EBookEPubOpen, 

110 LibbyFormats.EBookPDFAdobe, 

111 LibbyFormats.EBookPDFOpen, 

112) 

113DOWNLOADABLE_FORMATS = ( 

114 LibbyFormats.AudioBookMP3, 

115 LibbyFormats.EBookEPubAdobe, 

116 LibbyFormats.EBookEPubOpen, 

117 LibbyFormats.EBookPDFAdobe, 

118 LibbyFormats.EBookPDFOpen, 

119 LibbyFormats.MagazineOverDrive, 

120) 

121 

122 

123def parse_part_path(title: str, part_path: str) -> ChapterMarker: 

124 """ 

125 Extracts chapter marker info from the part path, 

126 e.g. {AAAAAAAA-BBBB-CCCC-9999-ABCDEF123456}Fmt425-Part03.mp3#3000 

127 yields `ChapterMarker(title, "{AAAAAAAA-BBBB-CCCC-9999-ABCDEF123456}Fmt425-Part03.mp3", 3000)` 

128 

129 :param title: 

130 :param part_path: 

131 :return: 

132 """ 

133 mobj = FILE_PART_RE.match(part_path) 

134 if not mobj: 

135 raise ValueError(f"Unexpected path format: {part_path}") 

136 return ChapterMarker( 

137 title=title, 

138 part_name=mobj.group("part_name"), 

139 start_second=float(mobj.group("second_stamp")) 

140 if mobj.group("second_stamp") 

141 else 0, 

142 end_second=0, 

143 ) 

144 

145 

146def parse_toc( 

147 base_url: str, toc: List[Dict], spine: List[Dict] 

148) -> OrderedDictType[str, PartMeta]: 

149 """ 

150 Parses `openbook["nav"]["toc"]` and `openbook["spine"]` to a format 

151 suitable for processing. 

152 

153 :param base_url: 

154 :param toc: 

155 :param spine: 

156 :return: 

157 """ 

158 entries: List[ChapterMarker] = [] 

159 for item in toc: 

160 entries.append(parse_part_path(item["title"], item["path"])) 

161 for content in item.get("contents", []): 

162 # we use the original `item["title"]` instead of `content["title"]` 

163 # so that we can de-dup these entries later 

164 entries.append(parse_part_path(item["title"], content["path"])) 

165 

166 # use an OrderedDict to ensure that we can consistently test this 

167 parsed_toc: OrderedDictType[str, PartMeta] = OrderedDict() 

168 

169 for entry in entries: 

170 if entry.part_name not in parsed_toc: 

171 parsed_toc[entry.part_name] = { 

172 "chapters": [], 

173 "url": "", 

174 "audio-duration": 0, 

175 "file-length": 0, 

176 "spine-position": 0, 

177 } 

178 if not parsed_toc[entry.part_name]["chapters"]: 

179 # first entry for the part_name 

180 parsed_toc[entry.part_name]["chapters"].append(entry) 

181 continue 

182 # de-dup entries because OD sometimes generates timestamped chapter titles marks 

183 # for the same chapter in the same part, e.g. "Chapter 2 (00:00)", "Chapter 2 (12:34)" 

184 if entry.title == parsed_toc[entry.part_name]["chapters"][-1].title: 

185 continue 

186 parsed_toc[entry.part_name]["chapters"].append(entry) 

187 

188 for s in spine: 

189 parsed_toc[s["-odread-original-path"]].update( 

190 { 

191 "url": urljoin(base_url, s["path"]), 

192 "audio-duration": s["audio-duration"], 

193 "file-length": s["-odread-file-bytes"], 

194 "spine-position": s["-odread-spine-position"], 

195 } 

196 ) 

197 for chapter_mark in parsed_toc.values(): # type: PartMeta 

198 chapters = chapter_mark["chapters"] 

199 updated_chapters = [] 

200 for i, chapter in enumerate(chapters): 

201 # update end_second mark 

202 updated_chapter = ChapterMarker( 

203 title=chapter.title, 

204 part_name=chapter.part_name, 

205 start_second=chapter.start_second, 

206 end_second=( 

207 chapters[i + 1].start_second 

208 if i < (len(chapters) - 1) 

209 else chapter_mark["audio-duration"] 

210 ), 

211 ) 

212 updated_chapters.append(updated_chapter) 

213 chapter_mark["chapters"] = updated_chapters 

214 

215 return parsed_toc 

216 

217 

218def merge_toc(toc: Dict) -> List[ChapterMarker]: 

219 """ 

220 Generates a list of ChapterMarker for the merged audiobook based on the parsed toc. 

221 

222 :param toc: parsed toc 

223 :return: 

224 """ 

225 chapters = OrderedDict() 

226 parts = list(toc.values()) 

227 for i, part in enumerate(parts): 

228 cumu_part_duration = sum([p["audio-duration"] for p in parts[:i]]) 

229 for marker in part["chapters"]: 

230 if marker.title not in chapters: 

231 chapters[marker.title] = { 

232 "start": cumu_part_duration + marker.start_second, 

233 "end": 0, 

234 } 

235 chapters[marker.title]["end"] = cumu_part_duration + marker.end_second 

236 

237 return [ 

238 ChapterMarker( 

239 title=title, 

240 part_name="", 

241 start_second=marker["start"], 

242 end_second=marker["end"], 

243 ) 

244 for title, marker in list(chapters.items()) 

245 ] 

246 

247 

248class LibbyClient(object): 

249 # Original reverse engineering of the libby endpoints is thanks to https://github.com/lullius/pylibby 

250 def __init__( 

251 self, 

252 settings_folder: Optional[str] = None, 

253 identity_token: Optional[str] = None, 

254 max_retries: int = 0, 

255 timeout: int = 10, 

256 logger: Optional[logging.Logger] = None, 

257 **kwargs, 

258 ) -> None: 

259 if not logger: 

260 logger = logging.getLogger(__name__) 

261 self.logger = logger 

262 self.settings_folder = Path(settings_folder) if settings_folder else None 

263 if self.settings_folder and not self.settings_folder.exists(): 

264 self.settings_folder.mkdir(parents=True, exist_ok=True) 

265 

266 self.timeout = timeout 

267 self.identity_token = identity_token 

268 self.identity = {} 

269 self.identity_settings_file = ( 

270 self.settings_folder.joinpath("libby.json") 

271 if self.settings_folder 

272 else None 

273 ) 

274 if self.identity_settings_file and self.identity_settings_file.exists(): 

275 with self.identity_settings_file.open("r", encoding="utf-8") as f: 

276 self.identity = json.load(f) 

277 

278 # migrate old sync code storage key 

279 if self.identity_settings_file and self.identity.get("__odmpy_sync_code"): 

280 if not self.identity.get("__libby_sync_code"): 

281 self.identity["__libby_sync_code"] = self.identity["__odmpy_sync_code"] 

282 del self.identity["__odmpy_sync_code"] 

283 with self.identity_settings_file.open("w", encoding="utf-8") as f: 

284 json.dump(self.identity, f) 

285 

286 self.max_retries = max_retries 

287 libby_session = requests.Session() 

288 adapter = HTTPAdapter(max_retries=Retry(total=max_retries, backoff_factor=0.1)) 

289 for prefix in ("http://", "https://"): 

290 libby_session.mount(prefix, adapter) 

291 self.libby_session = libby_session 

292 self.user_agent = kwargs.pop("user_agent", USER_AGENT) 

293 self.api_base = "https://sentry-read.svc.overdrive.com/" 

294 

295 @staticmethod 

296 def is_valid_sync_code(code: str) -> bool: 

297 return code.isdigit() and len(code) == 8 

298 

299 def default_headers(self) -> Dict: 

300 """ 

301 Default HTTP headers. 

302 

303 :return: 

304 """ 

305 return { 

306 "User-Agent": self.user_agent, 

307 "Accept": "application/json", 

308 "Cache-Control": "no-cache", 

309 "Pragma": "no-cache", 

310 } 

311 

312 def make_request( 

313 self, 

314 endpoint: str, 

315 params: Optional[Dict] = None, 

316 data: Optional[Dict] = None, 

317 json_data: Optional[Dict] = None, 

318 headers: Optional[Dict] = None, 

319 method: Optional[str] = None, 

320 authenticated: bool = True, 

321 session: Optional[requests.sessions.Session] = None, 

322 return_res: bool = False, 

323 allow_redirects: bool = True, 

324 ): 

325 endpoint_url = urljoin(self.api_base, endpoint) 

326 if not method: 

327 # try to set an HTTP method 

328 if data is not None: 

329 method = "POST" 

330 else: 

331 method = "GET" 

332 if headers is None: 

333 headers = self.default_headers() 

334 if authenticated and self.get_token(): 

335 headers["Authorization"] = f"Bearer {self.get_token()}" 

336 

337 req = requests.Request( 

338 method, 

339 endpoint_url, 

340 headers=headers, 

341 params=params, 

342 data=data, 

343 json=json_data, 

344 ) 

345 if not session: 

346 # default session 

347 session = self.libby_session 

348 

349 try: 

350 res = session.send( 

351 session.prepare_request(req), 

352 timeout=self.timeout, 

353 allow_redirects=allow_redirects, 

354 ) 

355 self.logger.debug("body: %s", res.text) 

356 

357 res.raise_for_status() 

358 if return_res: 

359 return res 

360 return res.json() 

361 except requests.ConnectionError as conn_err: 

362 raise ClientConnectionError(str(conn_err)) from conn_err 

363 except requests.Timeout as timeout_err: 

364 raise ClientTimeoutError(str(timeout_err)) from timeout_err 

365 except requests.HTTPError as http_err: 

366 ErrorHandler.process(http_err) 

367 

368 def save_settings(self, updates: Dict) -> None: 

369 """ 

370 Persist identity settings. 

371 

372 :param updates: 

373 :return: 

374 """ 

375 self.identity.update(updates) 

376 if not self.identity_settings_file: 

377 raise ValueError( 

378 "Unable to save settings because settings_folder is not defined" 

379 ) 

380 with open(self.identity_settings_file, "w", encoding="utf-8") as f: 

381 json.dump(self.identity, f) 

382 

383 def clear_settings(self) -> None: 

384 """ 

385 Wipe previously saved settings. 

386 

387 :return: 

388 """ 

389 if self.identity_settings_file and self.identity_settings_file.exists(): 

390 self.identity_settings_file.unlink() 

391 self.identity = {} 

392 

393 def has_chip(self) -> bool: 

394 """ 

395 Check if client has identity token chip. 

396 

397 :return: 

398 """ 

399 return bool(self.identity.get("identity")) 

400 

401 def has_sync_code(self) -> bool: 

402 """ 

403 Check if client has linked account. 

404 

405 :return: 

406 """ 

407 return bool( 

408 self.identity.get("__libby_sync_code") 

409 or self.identity.get("__odmpy_sync_code") # for backwards compat 

410 ) 

411 

412 def get_chip(self, auto_save: bool = True, authenticated: bool = False) -> Dict: 

413 """ 

414 Get an identity chip (contains auth token). 

415 

416 :param auto_save: 

417 :param authenticated: 

418 :return: 

419 """ 

420 res: Dict = self.make_request( 

421 "chip", 

422 params={"client": "dewey"}, 

423 method="POST", 

424 authenticated=authenticated, 

425 ) 

426 if auto_save: 

427 # persist to settings 

428 self.save_settings(res) 

429 return res 

430 

431 def get_token(self) -> Optional[str]: 

432 return self.identity_token or self.identity.get("identity") 

433 

434 def clone_by_code(self, code: str, auto_save: bool = True) -> Dict: 

435 """ 

436 Link account to identy token retrieved in `get_chip()`. 

437 

438 :param code: 

439 :param auto_save: 

440 :return: 

441 """ 

442 if not self.is_valid_sync_code(code): 

443 raise ValueError(f"Invalid code: {code}") 

444 

445 res: Dict = self.make_request("chip/clone/code", data={"code": code}) 

446 if auto_save: 

447 # persist to settings 

448 self.save_settings({"__libby_sync_code": code}) 

449 return res 

450 

451 def sync(self) -> Dict: 

452 """ 

453 Get the user account state, which includes loans, holds, etc. 

454 

455 :return: 

456 """ 

457 res: Dict = self.make_request("chip/sync") 

458 return res 

459 

460 def auth_form(self, website_id) -> Dict: 

461 """ 

462 Get the parameters required to link a card. 

463 

464 :param website_id: Can be gotten from the OverDrive api or "cards" in the sync response 

465 :return: 

466 """ 

467 res: Dict = self.make_request(f"auth/forms/{website_id}", authenticated=False) 

468 return res 

469 

470 def link_card( 

471 self, website_id: str, username: str, password: str, ils: str = "default" 

472 ) -> Dict: 

473 """ 

474 Used to add or verify an existing card. Not currently used by odmpy, not tested. 

475 Returns a similar response as sync but with just the one card linked. 

476 

477 :param website_id: Can be gotten from the OverDrive api or "cards" in the sync response 

478 :param username: 

479 :param password: Or pin. 

480 :param ils: Can be gotten from auth_form() or "cards" dict in the sync response 

481 :return: 

482 """ 

483 data = {"ils": ils, "username": username, "password": password} 

484 res: Dict = self.make_request( 

485 f"auth/link/{website_id}", json_data=data, method="POST" 

486 ) 

487 return res 

488 

489 def update_card_name(self, card_id: str, card_name: str) -> Dict: 

490 """ 

491 Update a card's name 

492 

493 :param card_id: 

494 :param card_name: 

495 :return: 

496 """ 

497 res: Dict = self.make_request( 

498 f"card/{card_id}", params={"card_name": card_name}, method="PUT" 

499 ) 

500 return res 

501 

502 def is_logged_in(self) -> bool: 

503 """ 

504 Check if successfully logged in. 

505 

506 :return: 

507 """ 

508 synced_state = self.sync() 

509 return synced_state.get("result", "") == "synchronized" and bool( 

510 synced_state.get("cards") 

511 ) 

512 

513 @staticmethod 

514 def is_downloadable_audiobook_loan(book: Dict) -> bool: 

515 """ 

516 Verify if book is a downloadable audiobook. 

517 

518 :param book: 

519 :return: 

520 """ 

521 return bool( 

522 [f for f in book.get("formats", []) if f["id"] == LibbyFormats.AudioBookMP3] 

523 ) 

524 

525 @staticmethod 

526 def is_downloadable_ebook_loan(book: Dict) -> bool: 

527 """ 

528 Verify if book is a downloadable ebook. 

529 

530 :param book: 

531 :return: 

532 """ 

533 return bool( 

534 [ 

535 f 

536 for f in book.get("formats", []) 

537 if f["id"] in EBOOK_DOWNLOADABLE_FORMATS 

538 ] 

539 ) 

540 

541 @staticmethod 

542 def is_downloadable_magazine_loan(book: Dict) -> bool: 

543 """ 

544 Verify if loan is a downloadable magazine. 

545 

546 :param book: 

547 :return: 

548 """ 

549 return bool( 

550 [ 

551 f 

552 for f in book.get("formats", []) 

553 if f["id"] == LibbyFormats.MagazineOverDrive 

554 ] 

555 ) 

556 

557 @staticmethod 

558 def has_format(loan: Dict, format_id: str) -> bool: 

559 return bool( 

560 next(iter([f["id"] for f in loan["formats"] if f["id"] == format_id]), None) 

561 ) 

562 

563 @staticmethod 

564 def get_loan_format(loan: Dict, prefer_open_format: bool = True) -> str: 

565 locked_in_format = next( 

566 iter([f["id"] for f in loan["formats"] if f.get("isLockedIn")]), None 

567 ) 

568 if locked_in_format: 

569 if locked_in_format in DOWNLOADABLE_FORMATS: 

570 return locked_in_format 

571 raise ValueError( 

572 f'Loan is locked to a non-downloadable format "{locked_in_format}"' 

573 ) 

574 

575 if not locked_in_format: 

576 # the order of these checks will determine the output format 

577 # the "open" version of the format (example open epub, open pdf) should 

578 # be prioritised 

579 if LibbyClient.is_downloadable_audiobook_loan( 

580 loan 

581 ) and LibbyClient.has_format(loan, LibbyFormats.AudioBookMP3): 

582 return LibbyFormats.AudioBookMP3 

583 elif ( 

584 LibbyClient.is_open_ebook_loan(loan) 

585 and LibbyClient.has_format(loan, LibbyFormats.EBookEPubOpen) 

586 and prefer_open_format 

587 ): 

588 return LibbyFormats.EBookEPubOpen 

589 elif LibbyClient.is_downloadable_magazine_loan( 

590 loan 

591 ) and LibbyClient.has_format(loan, LibbyFormats.MagazineOverDrive): 

592 return LibbyFormats.MagazineOverDrive 

593 elif LibbyClient.is_downloadable_ebook_loan( 

594 loan 

595 ) and LibbyClient.has_format(loan, LibbyFormats.EBookEPubAdobe): 

596 return LibbyFormats.EBookEPubAdobe 

597 elif ( 

598 LibbyClient.is_downloadable_ebook_loan(loan) 

599 and LibbyClient.has_format(loan, LibbyFormats.EBookPDFOpen) 

600 and prefer_open_format 

601 ): 

602 return LibbyFormats.EBookPDFOpen 

603 elif LibbyClient.is_downloadable_ebook_loan( 

604 loan 

605 ) and LibbyClient.has_format(loan, LibbyFormats.EBookPDFAdobe): 

606 return LibbyFormats.EBookPDFAdobe 

607 

608 raise ValueError("Unable to find a downloadable format") 

609 

610 @staticmethod 

611 def is_open_ebook_loan(book: Dict) -> bool: 

612 """ 

613 Verify if book is an open epub. 

614 

615 :param book: 

616 :return: 

617 """ 

618 return bool( 

619 [ 

620 f 

621 for f in book.get("formats", []) 

622 if f["id"] == LibbyFormats.EBookEPubOpen 

623 ] 

624 ) 

625 

626 @staticmethod 

627 def parse_datetime(value: str) -> datetime: # type: ignore[return] 

628 """ 

629 Parses a datetime string from the API into a datetime. 

630 

631 :param value: 

632 :return: 

633 """ 

634 formats = ( 

635 "%Y-%m-%dT%H:%M:%SZ", 

636 "%Y-%m-%dT%H:%M:%S.%fZ", 

637 "%Y-%m-%dT%H:%M:%S%z", 

638 "%Y-%m-%dT%H:%M:%S.%f%z", 

639 "%m/%d/%Y", # publishDateText 

640 ) 

641 for fmt in formats: 

642 try: 

643 dt = datetime.strptime(value, fmt) 

644 if not dt.tzinfo: 

645 dt = dt.replace(tzinfo=timezone.utc) 

646 return dt 

647 except ValueError: 

648 pass 

649 

650 raise ValueError(f"time data '{value}' does not match known formats {formats}") 

651 

652 @staticmethod 

653 def is_renewable(loan: Dict) -> bool: 

654 """ 

655 Check if loan can be renewed. 

656 

657 :param loan: 

658 :return: 

659 """ 

660 if not loan.get("renewableOn"): 

661 raise ValueError("Unable to get renewable date") 

662 # Example: 2023-02-23T07:33:55Z 

663 return LibbyClient.parse_datetime(loan["renewableOn"]) <= datetime.now( 

664 tz=timezone.utc 

665 ) 

666 

667 def get_loans(self) -> List[Dict]: 

668 """ 

669 Get loans 

670 

671 :return: 

672 """ 

673 return self.sync().get("loans", []) 

674 

675 def get_holds(self) -> List[Dict]: 

676 """ 

677 Get holds 

678 

679 :return: 

680 """ 

681 return self.sync().get("holds", []) 

682 

683 def get_downloadable_audiobook_loans(self) -> List[Dict]: 

684 """ 

685 Get downloadable audiobook loans. 

686 

687 :return: 

688 """ 

689 return [ 

690 book 

691 for book in self.sync().get("loans", []) 

692 if self.is_downloadable_audiobook_loan(book) 

693 ] 

694 

695 def fulfill(self, loan_id: str, card_id: str, format_id: str) -> Dict: 

696 """ 

697 Get the fulfillment details for a loan. 

698 

699 :param loan_id: 

700 :param card_id: 

701 :param format_id: 

702 :return: 

703 """ 

704 if format_id not in DOWNLOADABLE_FORMATS: 

705 raise ValueError(f"Invalid format_id: {format_id}") 

706 res: Dict = self.make_request( 

707 f"card/{card_id}/loan/{loan_id}/fulfill/{format_id}", 

708 return_res=True, 

709 ) 

710 return res 

711 

712 @staticmethod 

713 def _urlretrieve( 

714 endpoint: str, headers: Optional[Dict] = None, timeout: int = 15 

715 ) -> bytes: 

716 """ 

717 Workaround for downloading an open (non-drm) epub or pdf. 

718 

719 The fulfillment url 403s when using requests but 

720 works in curl, request.urlretrieve, etc. 

721 

722 GET API fulfill endpoint -> 302 https://fulfill.contentreserve.com (fulfillment url) 

723 GET https://fulfill.contentreserve.com -> 302 https://openepub-gk.cdn.overdrive.com 

724 GET https://openepub-gk.cdn.overdrive.com 403 

725 

726 Fresh session doesn't work either, headers doesn't seem to 

727 matter. 

728 

729 .. code-block:: python 

730 sess = requests.Session() 

731 sess.headers.update({"User-Agent": USER_AGENT}) 

732 res = sess.get(res_redirect.headers["Location"], timeout=self.timeout) 

733 res.raise_for_status() 

734 return res.content 

735 

736 :param endpoint: fulfillment url 

737 :param headers: 

738 :param timeout: 

739 :return: 

740 """ 

741 if not headers: 

742 headers = {} 

743 

744 opener = request.build_opener() 

745 req = request.Request(endpoint, headers=headers) 

746 res = opener.open(req, timeout=timeout) 

747 return res.read() 

748 

749 def fulfill_loan_file(self, loan_id: str, card_id: str, format_id: str) -> bytes: 

750 """ 

751 Returns the loan file contents directly for MP3 audiobooks (.odm) 

752 and DRM epub (.acsm) loans. 

753 For open epub/pdf loans, the actual epub/pdf contents are returned. 

754 

755 :param loan_id: 

756 :param card_id: 

757 :param format_id: 

758 :return: 

759 """ 

760 if format_id not in DOWNLOADABLE_FORMATS: 

761 raise ValueError(f"Unsupported format_id: {format_id}") 

762 

763 headers = self.default_headers() 

764 headers["Accept"] = "*/*" 

765 

766 if format_id in (LibbyFormats.EBookEPubOpen, LibbyFormats.EBookPDFOpen): 

767 res_redirect: requests.Response = self.make_request( 

768 f"card/{card_id}/loan/{loan_id}/fulfill/{format_id}", 

769 headers=headers, 

770 return_res=True, 

771 allow_redirects=False, 

772 ) 

773 return self._urlretrieve( 

774 res_redirect.headers["Location"], headers=headers, timeout=self.timeout 

775 ) 

776 

777 res: requests.Response = self.make_request( 

778 f"card/{card_id}/loan/{loan_id}/fulfill/{format_id}", 

779 headers=headers, 

780 return_res=True, 

781 ) 

782 return res.content 

783 

784 def open_loan(self, loan_type: str, card_id: str, title_id: str) -> Dict: 

785 """ 

786 Gets the meta urls needed to fulfill a loan. 

787 

788 :param loan_type: 

789 :param card_id: 

790 :param title_id: 

791 :return: 

792 """ 

793 res: Dict = self.make_request( 

794 f"open/{loan_type}/card/{card_id}/title/{title_id}" 

795 ) 

796 return res 

797 

798 def prepare_loan(self, loan: Dict) -> Tuple[str, Dict]: 

799 """ 

800 Pre-requisite step for processing a loan. 

801 

802 :param loan: 

803 :return: 

804 """ 

805 loan_type = "book" 

806 if loan["type"]["id"] == LibbyMediaTypes.Audiobook: 

807 loan_type = "audiobook" 

808 if loan["type"]["id"] == LibbyMediaTypes.Magazine: 

809 loan_type = "magazine" 

810 card_id = loan["cardId"] 

811 title_id = loan["id"] 

812 meta = self.open_loan(loan_type, card_id, title_id) 

813 download_base: str = meta["urls"]["web"] 

814 

815 # Sets a needed cookie 

816 web_url = download_base + "?" + meta["message"] 

817 _ = self.make_request( 

818 web_url, 

819 headers={"Accept": "*/*"}, 

820 method="HEAD", 

821 authenticated=False, 

822 return_res=True, 

823 ) 

824 return download_base, meta 

825 

826 def process_audiobook( 

827 self, loan: Dict 

828 ) -> Tuple[Dict, OrderedDictType[str, PartMeta]]: 

829 """ 

830 Returns the data needed to download an audiobook. 

831 

832 :param loan: 

833 :return: 

834 """ 

835 download_base, meta = self.prepare_loan(loan) 

836 # contains nav/toc and spine 

837 openbook = self.make_request(meta["urls"]["openbook"]) 

838 toc = parse_toc(download_base, openbook["nav"]["toc"], openbook["spine"]) 

839 return openbook, toc 

840 

841 def process_ebook(self, loan: Dict) -> Tuple[str, Dict, List[Dict]]: 

842 """ 

843 Returns the data needed to download an ebook directly. 

844 

845 :param loan: 

846 :return: 

847 """ 

848 download_base, meta = self.prepare_loan(loan) 

849 # contains nav/toc and spine, manifest 

850 openbook = self.make_request(meta["urls"]["openbook"]) 

851 rosters: List[Dict] = self.make_request(meta["urls"]["rosters"]) 

852 return download_base, openbook, rosters 

853 

854 def return_title(self, title_id: str, card_id: str) -> None: 

855 """ 

856 Return a title. 

857 

858 :param title_id: 

859 :param card_id: 

860 :return: 

861 """ 

862 self.make_request( 

863 f"card/{card_id}/loan/{title_id}", method="DELETE", return_res=True 

864 ) 

865 

866 def return_loan(self, loan: Dict) -> None: 

867 """ 

868 Return a loan. 

869 

870 :param loan: 

871 :return: 

872 """ 

873 self.return_title(loan["id"], loan["cardId"]) 

874 

875 def borrow_title( 

876 self, title_id: str, title_format: str, card_id: str, days: int = 21 

877 ) -> Dict: 

878 """ 

879 Return a title. 

880 

881 :param title_id: 

882 :param title_format: Type ID 

883 :param card_id: 

884 :param days: 

885 :return: 

886 """ 

887 data = { 

888 "period": days, 

889 "units": "days", 

890 "lucky_day": None, 

891 "title_format": title_format, 

892 } 

893 

894 res: Dict = self.make_request( 

895 f"card/{card_id}/loan/{title_id}", json_data=data, method="POST" 

896 ) 

897 return res 

898 

899 def borrow_hold(self, hold: Dict) -> Dict: 

900 """ 

901 Borrow a hold. 

902 

903 :param hold: 

904 :return: 

905 """ 

906 return self.borrow_title(hold["id"], hold["type"]["id"], hold["cardId"]) 

907 

908 def renew_title( 

909 self, title_id: str, title_format: str, card_id: str, days: int = 21 

910 ) -> Dict: 

911 """ 

912 Renew a title. 

913 

914 :param title_id: 

915 :param title_format: Type ID 

916 :param card_id: 

917 :param days: 

918 :return: 

919 """ 

920 data = { 

921 "period": days, 

922 "units": "days", 

923 "lucky_day": None, 

924 "title_format": title_format, 

925 } 

926 

927 res: Dict = self.make_request( 

928 f"card/{card_id}/loan/{title_id}", json_data=data, method="PUT" 

929 ) 

930 return res 

931 

932 def renew_loan(self, loan: Dict) -> Dict: 

933 """ 

934 Renew a loan. 

935 

936 :param loan: 

937 :return: 

938 """ 

939 return self.renew_title(loan["id"], loan["type"]["id"], loan["cardId"]) 

940 

941 def create_hold(self, title_id: str, card_id: str) -> Dict: 

942 """ 

943 Create a hold on the title. 

944 

945 :param title_id: 

946 :param card_id: 

947 :return: 

948 """ 

949 res: Dict = self.make_request( 

950 f"card/{card_id}/hold/{title_id}", 

951 json_data={"days_to_suspend": 0, "email_address": ""}, 

952 method="POST", 

953 ) 

954 return res