Coverage for odmpy/processing/shared.py: 90.7%

332 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-14 08:51 +0000

1# Copyright (C) 2023 github.com/ping 

2# 

3# This file is part of odmpy. 

4# 

5# odmpy is free software: you can redistribute it and/or modify 

6# it under the terms of the GNU General Public License as published by 

7# the Free Software Foundation, either version 3 of the License, or 

8# (at your option) any later version. 

9# 

10# odmpy is distributed in the hope that it will be useful, 

11# but WITHOUT ANY WARRANTY; without even the implied warranty of 

12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

13# GNU General Public License for more details. 

14# 

15# You should have received a copy of the GNU General Public License 

16# along with odmpy. If not, see <http://www.gnu.org/licenses/>. 

17# 

18 

19import argparse 

20import logging 

21import subprocess 

22import xml.etree.ElementTree as ET 

23from pathlib import Path 

24from typing import Optional, Dict, List, Tuple 

25from urllib.parse import urlparse 

26 

27import eyed3 # type: ignore[import] 

28import requests 

29from eyed3.utils import art # type: ignore[import] 

30from iso639 import Lang # type: ignore[import] 

31from requests.adapters import HTTPAdapter, Retry 

32from termcolor import colored 

33 

34from ..constants import PERFORMER_FID, LANGUAGE_FID 

35from ..errors import OdmpyRuntimeError 

36from ..libby import USER_AGENT, LibbyFormats, LibbyClient 

37from ..utils import slugify, sanitize_path, is_windows 

38 

39 

40# 

41# Shared functions across processing for diff loan types 

42# 

43 

44 

45def init_session(max_retries: int = 0) -> requests.Session: 

46 session = requests.Session() 

47 custom_adapter = HTTPAdapter( 

48 max_retries=Retry(total=max_retries, backoff_factor=0.1) 

49 ) 

50 for prefix in ("http://", "https://"): 

51 session.mount(prefix, custom_adapter) 

52 return session 

53 

54 

55def generate_names( 

56 title: str, 

57 series: str, 

58 series_reading_order: str, 

59 authors: List[str], 

60 edition: str, 

61 title_id: str, 

62 args: argparse.Namespace, 

63 logger: logging.Logger, 

64) -> Tuple[Path, Path]: 

65 """ 

66 Creates the download folder if necessary and generates the merged book names 

67 

68 :param title: 

69 :param authors: 

70 :param edition: 

71 :param title_id: 

72 :param series: 

73 :param series_reading_order: 

74 :param args: 

75 :param logger: 

76 :return: 

77 """ 

78 book_folder_name = args.book_folder_format % { 

79 "Title": sanitize_path(title, exclude_chars=args.remove_from_paths), 

80 "Author": sanitize_path( 

81 ", ".join(authors), exclude_chars=args.remove_from_paths 

82 ), 

83 "Series": sanitize_path(series or "", exclude_chars=args.remove_from_paths), 

84 "Edition": sanitize_path(edition, exclude_chars=args.remove_from_paths), 

85 "ID": sanitize_path(title_id, exclude_chars=args.remove_from_paths), 

86 "ReadingOrder": sanitize_path( 

87 series_reading_order, exclude_chars=args.remove_from_paths 

88 ), 

89 } 

90 # unlike book_folder_name, we sanitize the entire book file format 

91 # because it is expected to be a single name and `os.sep` will be 

92 # stripped 

93 book_file_format = sanitize_path( 

94 args.book_file_format 

95 % { 

96 "Title": title, 

97 "Author": ", ".join(authors), 

98 "Series": series or "", 

99 "Edition": edition, 

100 "ID": title_id, 

101 "ReadingOrder": series_reading_order, 

102 }, 

103 exclude_chars=args.remove_from_paths, 

104 ) 

105 # declare book folder/file names here together, so that we can catch problems from too long names 

106 book_folder = Path(args.download_dir, book_folder_name) 

107 if args.no_book_folder: 

108 book_folder = Path(args.download_dir) 

109 

110 # for merged mp3 

111 book_filename = book_folder.joinpath(f"{book_file_format}.mp3") 

112 

113 try: 

114 if not book_folder.exists(): 

115 book_folder.mkdir(parents=True, exist_ok=True) 

116 except OSError as os_err: 

117 # ref http://www.ioplex.com/~miallen/errcmpp.html 

118 # for Windows: OSError: [WinError 123] The filename, directory name, or volume label syntax is incorrect 

119 if (is_windows() and os_err.errno != 22) or ( 

120 os_err.errno not in (36, 63) and not is_windows() 

121 ): 

122 raise 

123 

124 # Ref OSError: [Errno 36] File name too long https://github.com/ping/odmpy/issues/5 

125 # create book folder with just the title and first author 

126 book_folder_name = args.book_folder_format % { 

127 "Title": sanitize_path(title, exclude_chars=args.remove_from_paths), 

128 "Author": sanitize_path(authors[0], exclude_chars=args.remove_from_paths) 

129 if authors 

130 else "", 

131 "Series": sanitize_path(series or "", exclude_chars=args.remove_from_paths), 

132 "ID": sanitize_path(title_id, exclude_chars=args.remove_from_paths), 

133 "ReadingOrder": sanitize_path( 

134 series_reading_order, exclude_chars=args.remove_from_paths 

135 ), 

136 } 

137 book_folder = Path(args.download_dir, book_folder_name) 

138 if args.no_book_folder: 

139 book_folder = Path(args.download_dir) 

140 

141 logger.warning( 

142 f'Book folder name is too long. Files will be saved in "{book_folder}" instead.' 

143 ) 

144 if not book_folder.exists(): 

145 book_folder.mkdir(parents=True, exist_ok=True) 

146 

147 # also create book name with just one author 

148 book_file_format = sanitize_path( 

149 args.book_file_format 

150 % { 

151 "Title": title, 

152 "Author": authors[0] if authors else "", 

153 "Series": series or "", 

154 "Edition": edition, 

155 "ID": title_id, 

156 "ReadingOrder": series_reading_order, 

157 }, 

158 exclude_chars=args.remove_from_paths, 

159 ) 

160 book_filename = book_folder.joinpath(f"{book_file_format}.mp3") 

161 return book_folder, book_filename 

162 

163 

164def write_tags( 

165 audiofile: eyed3.core.AudioFile, 

166 title: str, 

167 sub_title: Optional[str], 

168 authors: List[str], 

169 narrators: Optional[List[str]], 

170 publisher: str, 

171 description: str, 

172 cover_bytes: Optional[bytes], 

173 genres: Optional[List[str]], 

174 languages: Optional[List[str]], 

175 published_date: Optional[str], 

176 series: Optional[str], 

177 part_number: int, 

178 total_parts: int, 

179 overdrive_id: str, 

180 isbn: Optional[str] = None, 

181 overwrite_title: bool = False, 

182 always_overwrite: bool = False, 

183 delimiter: str = ";", 

184) -> None: 

185 """ 

186 Write out ID3 tags to the audiofile 

187 

188 :param audiofile: 

189 :param title: 

190 :param sub_title: 

191 :param authors: 

192 :param narrators: 

193 :param publisher: 

194 :param description: 

195 :param cover_bytes: 

196 :param genres: 

197 :param languages: 

198 :param published_date: 

199 :param series: 

200 :param part_number: 

201 :param total_parts: 

202 :param overdrive_id: 

203 :param isbn: 

204 :param overwrite_title: 

205 :param always_overwrite: 

206 :param delimiter: 

207 :return: 

208 """ 

209 if not delimiter: 

210 delimiter = ";" 

211 

212 if not audiofile.tag: 

213 audiofile.initTag() 

214 if always_overwrite or overwrite_title or not audiofile.tag.title: 

215 audiofile.tag.title = str(title) 

216 if sub_title and ( 

217 always_overwrite 

218 or not audiofile.tag.getTextFrame(eyed3.id3.frames.SUBTITLE_FID) 

219 ): 

220 audiofile.tag.setTextFrame(eyed3.id3.frames.SUBTITLE_FID, sub_title) 

221 if always_overwrite or not audiofile.tag.album: 

222 audiofile.tag.album = str(title) 

223 if authors and (always_overwrite or not audiofile.tag.artist): 

224 audiofile.tag.artist = delimiter.join([str(a) for a in authors]) 

225 if authors and (always_overwrite or not audiofile.tag.album_artist): 

226 audiofile.tag.album_artist = delimiter.join([str(a) for a in authors]) 

227 if part_number and (always_overwrite or not audiofile.tag.track_num): 

228 audiofile.tag.track_num = (part_number, total_parts) 

229 if narrators and ( 

230 always_overwrite or not audiofile.tag.getTextFrame(PERFORMER_FID) 

231 ): 

232 audiofile.tag.setTextFrame( 

233 PERFORMER_FID, delimiter.join([str(n) for n in narrators]) 

234 ) 

235 if publisher and (always_overwrite or not audiofile.tag.publisher): 

236 audiofile.tag.publisher = str(publisher) 

237 if description and ( 

238 always_overwrite or eyed3.id3.frames.COMMENT_FID not in audiofile.tag.frame_set 

239 ): 

240 audiofile.tag.comments.set(str(description), description="Description") 

241 if genres and (always_overwrite or not audiofile.tag.genre): 

242 audiofile.tag.genre = delimiter.join(genres) 

243 if languages and (always_overwrite or not audiofile.tag.getTextFrame(LANGUAGE_FID)): 

244 try: 

245 tag_langs = [Lang(lang).pt2b for lang in languages] 

246 except: # noqa: E722, pylint: disable=bare-except 

247 tag_langs = languages 

248 audiofile.tag.setTextFrame(LANGUAGE_FID, delimiter.join(tag_langs)) 

249 if published_date and (always_overwrite or not audiofile.tag.release_date): 

250 audiofile.tag.release_date = published_date 

251 if cover_bytes: 

252 audiofile.tag.images.set( 

253 art.TO_ID3_ART_TYPES[art.FRONT_COVER][0], 

254 cover_bytes, 

255 "image/jpeg", 

256 description="Cover", 

257 ) 

258 if series: 

259 audiofile.tag.user_text_frames.set(series, "Series") 

260 # Output some OD identifiers in the mp3 

261 if overdrive_id: 

262 audiofile.tag.user_text_frames.set( 

263 overdrive_id, 

264 "OverDrive Media ID" if overdrive_id.isdigit() else "OverDrive Reserve ID", 

265 ) 

266 if isbn: 

267 audiofile.tag.user_text_frames.set(isbn, "ISBN") 

268 

269 

270def get_best_cover_url(loan: Dict) -> Optional[str]: 

271 """ 

272 Extracts the highest resolution cover image for the loan 

273 

274 :param loan: 

275 :return: 

276 """ 

277 covers: List[Dict] = sorted( 

278 list(loan.get("covers", []).values()), 

279 key=lambda c: c.get("width", 0), 

280 reverse=True, 

281 ) 

282 cover_highest_res: Optional[Dict] = next(iter(covers), None) 

283 return cover_highest_res["href"] if cover_highest_res else None 

284 

285 

286def generate_cover( 

287 book_folder: Path, 

288 cover_url: Optional[str], 

289 session: requests.Session, 

290 timeout: int, 

291 logger: logging.Logger, 

292 force_square: bool = True, 

293) -> Tuple[Path, Optional[bytes]]: 

294 """ 

295 Get the book cover 

296 

297 :param book_folder: 

298 :param cover_url: 

299 :param session: 

300 :param timeout: 

301 :param logger: 

302 :param force_square: 

303 :return: 

304 """ 

305 cover_filename = book_folder.joinpath("cover.jpg") 

306 if not cover_filename.exists() and cover_url: 

307 try: 

308 if force_square: 

309 square_cover_url_params = { 

310 "type": "auto", 

311 "width": str(510), 

312 "height": str(510), 

313 "force": "true", 

314 "quality": str(80), 

315 "url": urlparse(cover_url).path, 

316 } 

317 # credit: https://github.com/lullius/pylibby/pull/18 

318 # this endpoint produces a resized version of the cover 

319 cover_res = session.get( 

320 "https://ic.od-cdn.com/resize", 

321 params=square_cover_url_params, 

322 headers={"User-Agent": USER_AGENT}, 

323 timeout=timeout, 

324 ) 

325 else: 

326 cover_res = session.get( 

327 cover_url, headers={"User-Agent": USER_AGENT}, timeout=timeout 

328 ) 

329 cover_res.raise_for_status() 

330 with cover_filename.open("wb") as outfile: 

331 outfile.write(cover_res.content) 

332 except requests.exceptions.HTTPError as he: 

333 if not force_square: 

334 logger.warning( 

335 "Error downloading cover: %s", 

336 colored(str(he), "red", attrs=["bold"]), 

337 ) 

338 else: 

339 logger.warning( 

340 "Error downloading square cover: %s", 

341 colored(str(he), "red", attrs=["bold"]), 

342 ) 

343 # fallback to original cover url 

344 try: 

345 cover_res = session.get( 

346 cover_url, 

347 headers={"User-Agent": USER_AGENT}, 

348 timeout=timeout, 

349 ) 

350 cover_res.raise_for_status() 

351 with cover_filename.open("wb") as outfile: 

352 outfile.write(cover_res.content) 

353 except requests.exceptions.HTTPError as he2: 

354 logger.warning( 

355 "Error downloading cover: %s", 

356 colored(str(he2), "red", attrs=["bold"]), 

357 ) 

358 

359 cover_bytes: Optional[bytes] = None 

360 if cover_filename.exists(): 

361 with cover_filename.open("rb") as f: 

362 cover_bytes = f.read() 

363 

364 return cover_filename, cover_bytes 

365 

366 

367def merge_into_mp3( 

368 book_filename: Path, 

369 file_tracks: List[Dict], 

370 audio_bitrate: int, 

371 ffmpeg_loglevel: str, 

372 hide_progress: bool, 

373 logger: logging.Logger, 

374) -> None: 

375 """ 

376 Merge the files into a single mp3 

377 

378 :param book_filename: mp3 file name 

379 :param file_tracks: 

380 :param audio_bitrate: 

381 :param ffmpeg_loglevel: 

382 :param hide_progress: 

383 :param logger: 

384 :return: 

385 """ 

386 

387 # We can't directly generate a m4b here even if specified because eyed3 doesn't support m4b/mp4 

388 temp_book_filename = book_filename.with_suffix(".part") 

389 cmd = [ 

390 "ffmpeg", 

391 "-y", 

392 "-nostdin", 

393 "-hide_banner", 

394 "-loglevel", 

395 ffmpeg_loglevel, 

396 ] 

397 if not hide_progress: 

398 cmd.append("-stats") 

399 cmd.extend( 

400 [ 

401 "-i", 

402 f"concat:{'|'.join([str(ft['file']) for ft in file_tracks])}", 

403 "-acodec", 

404 "copy", 

405 "-vcodec", 

406 "copy", 

407 "-b:a", 

408 f"{audio_bitrate}k" 

409 if audio_bitrate 

410 else "64k", # explicitly set audio bitrate 

411 "-f", 

412 "mp3", 

413 str(temp_book_filename), 

414 ] 

415 ) 

416 exit_code = subprocess.call(cmd) 

417 if exit_code: 

418 logger.error(f"ffmpeg exited with the code: {exit_code!s}") 

419 logger.error(f"Command: {' '.join(cmd)!s}") 

420 raise OdmpyRuntimeError("ffmpeg exited with a non-zero code") 

421 

422 temp_book_filename.replace(book_filename) 

423 

424 

425def convert_to_m4b( 

426 book_filename: Path, 

427 book_m4b_filename: Path, 

428 cover_filename: Path, 

429 merge_codec: str, 

430 audio_bitrate: int, 

431 ffmpeg_loglevel: str, 

432 hide_progress: str, 

433 logger: logging.Logger, 

434) -> None: 

435 """ 

436 Converts the merged mp3 into a m4b 

437 

438 :param book_filename: mp3 file name 

439 :param book_m4b_filename: 

440 :param cover_filename: 

441 :param merge_codec: 

442 :param audio_bitrate: 

443 :param ffmpeg_loglevel: 

444 :param hide_progress: 

445 :param logger: 

446 :return: 

447 """ 

448 temp_book_m4b_filename = book_m4b_filename.with_suffix(".part") 

449 cmd = [ 

450 "ffmpeg", 

451 "-y", 

452 "-nostdin", 

453 "-hide_banner", 

454 "-loglevel", 

455 ffmpeg_loglevel, 

456 ] 

457 if not hide_progress: 

458 cmd.append("-stats") 

459 cmd.extend( 

460 [ 

461 "-i", 

462 str(book_filename), 

463 ] 

464 ) 

465 if cover_filename.exists(): 

466 cmd.extend(["-i", str(cover_filename)]) 

467 

468 cmd.extend( 

469 [ 

470 "-map", 

471 "0:a", 

472 "-c:a", 

473 merge_codec, 

474 "-b:a", 

475 f"{audio_bitrate}k" 

476 if audio_bitrate 

477 else "64k", # explicitly set audio bitrate 

478 ] 

479 ) 

480 if cover_filename.exists(): 

481 cmd.extend( 

482 [ 

483 "-map", 

484 "1:v", 

485 "-c:v", 

486 "copy", 

487 "-disposition:v:0", 

488 "attached_pic", 

489 ] 

490 ) 

491 

492 cmd.extend(["-f", "mp4", str(temp_book_m4b_filename)]) 

493 exit_code = subprocess.call(cmd) 

494 if exit_code: 

495 logger.error(f"ffmpeg exited with the code: {exit_code!s}") 

496 logger.error(f"Command: {' '.join(cmd)!s}") 

497 raise OdmpyRuntimeError("ffmpeg exited with a non-zero code") 

498 

499 temp_book_m4b_filename.rename(book_m4b_filename) 

500 logger.info('Merged files into "%s"', colored(str(book_m4b_filename), "magenta")) 

501 try: 

502 book_filename.unlink() 

503 except Exception as e: # pylint: disable=broad-except 

504 logger.warning(f'Error deleting "{book_filename}": {str(e)}') 

505 

506 

507def remux_mp3( 

508 part_tmp_filename: Path, 

509 part_filename: Path, 

510 ffmpeg_loglevel: str, 

511 logger: logging.Logger, 

512) -> None: 

513 """ 

514 Try to remux file to remove mp3 lame tag errors 

515 

516 :param part_tmp_filename: 

517 :param part_filename: 

518 :param ffmpeg_loglevel: 

519 :param logger: 

520 :return: 

521 """ 

522 cmd = [ 

523 "ffmpeg", 

524 "-y", 

525 "-nostdin", 

526 "-hide_banner", 

527 "-loglevel", 

528 ffmpeg_loglevel, 

529 "-i", 

530 str(part_tmp_filename), 

531 "-c:a", 

532 "copy", 

533 "-c:v", 

534 "copy", 

535 str(part_filename), 

536 ] 

537 try: 

538 exit_code = subprocess.call(cmd) 

539 if exit_code: 

540 logger.warning(f"ffmpeg exited with the code: {exit_code!s}") 

541 logger.warning(f"Command: {' '.join(cmd)!s}") 

542 part_tmp_filename.rename(part_filename) 

543 else: 

544 part_tmp_filename.unlink() 

545 except Exception as ffmpeg_ex: # pylint: disable=broad-except 

546 logger.warning(f"Error executing ffmpeg: {str(ffmpeg_ex)}") 

547 part_tmp_filename.rename(part_filename) 

548 

549 

550def extract_authors_from_openbook(openbook: Dict) -> List[str]: 

551 """ 

552 Extract list of author names from openbook 

553 

554 :param openbook: 

555 :return: 

556 """ 

557 creators = openbook.get("creator", []) 

558 return ( 

559 [c["name"] for c in creators if c.get("role", "") == "author"] 

560 or [c["name"] for c in creators if c.get("role", "") == "editor"] 

561 or [c["name"] for c in creators] 

562 ) 

563 

564 

565def extract_asin(formats: List[Dict]) -> str: 

566 """ 

567 Extract Amazon's ASIN from media_info["formats"] 

568 

569 :param formats: 

570 :return: 

571 """ 

572 for media_format in [ 

573 f 

574 for f in formats 

575 if [i for i in f.get("identifiers", []) if i["type"] == "ASIN"] 

576 ]: 

577 asin = next( 

578 iter( 

579 [ 

580 identifier["value"] 

581 for identifier in media_format.get("identifiers", []) 

582 if identifier["type"] == "ASIN" 

583 ] 

584 ), 

585 "", 

586 ) 

587 if asin: 

588 return asin 

589 return "" 

590 

591 

592def extract_isbn(formats: List[Dict], format_types: List[str]) -> str: 

593 """ 

594 Extract ISBN from media_info["formats"] 

595 

596 :param formats: 

597 :param format_types: 

598 :return: 

599 """ 

600 # a format can contain 2 different "ISBN"s.. one type "ISBN", and another "LibraryISBN" 

601 # in format["identifiers"] 

602 # format["isbn"] reflects the "LibraryISBN" value 

603 

604 isbn = next( 

605 iter([f["isbn"] for f in formats if f["id"] in format_types and f.get("isbn")]), 

606 "", 

607 ) 

608 if isbn: 

609 return isbn 

610 

611 for isbn_type in ("LibraryISBN", "ISBN"): 

612 for media_format in [ 

613 f 

614 for f in formats 

615 if f["id"] in format_types 

616 and [i for i in f.get("identifiers", []) if i["type"] == isbn_type] 

617 ]: 

618 isbn = next( 

619 iter( 

620 [ 

621 identifier["value"] 

622 for identifier in media_format.get("identifiers", []) 

623 if identifier["type"] == isbn_type 

624 ] 

625 ), 

626 "", 

627 ) 

628 if isbn: 

629 return isbn 

630 

631 return "" 

632 

633 

634def build_opf_package( 

635 media_info: Dict, version: str = "2.0", loan_format: str = LibbyFormats.AudioBookMP3 

636) -> ET.Element: 

637 """ 

638 Build the package element from media_info. 

639 

640 :param media_info: 

641 :param version: 

642 :param loan_format: 

643 :return: 

644 """ 

645 

646 # References: 

647 # Version 2: https://idpf.org/epub/20/spec/OPF_2.0_final_spec.html#Section2.0 

648 # Version 3: https://www.w3.org/TR/epub-33/#sec-package-doc 

649 direct_epub_formats = [LibbyFormats.EBookOverdrive, LibbyFormats.MagazineOverDrive] 

650 ET.register_namespace("opf", "http://www.idpf.org/2007/opf") 

651 ET.register_namespace("dc", "http://purl.org/dc/elements/1.1/") 

652 package = ET.Element( 

653 "package", 

654 attrib={ 

655 "version": version, 

656 "xmlns": "http://www.idpf.org/2007/opf", 

657 "unique-identifier": "publication-id", 

658 }, 

659 ) 

660 metadata = ET.SubElement( 

661 package, 

662 "metadata", 

663 attrib={ 

664 "xmlns:dc": "http://purl.org/dc/elements/1.1/", 

665 "xmlns:opf": "http://www.idpf.org/2007/opf", 

666 }, 

667 ) 

668 title = ET.SubElement(metadata, "dc:title") 

669 title.text = media_info["title"] 

670 if loan_format == LibbyFormats.MagazineOverDrive and media_info.get("edition"): 

671 # for magazines, put the edition into the title to ensure some uniqueness 

672 title.text = f'{media_info["title"]} - {media_info["edition"]}' 

673 

674 if version == "3.0": 

675 title.set("id", "main-title") 

676 meta_main_title = ET.SubElement( 

677 metadata, 

678 "meta", 

679 attrib={"refines": "#main-title", "property": "title-type"}, 

680 ) 

681 meta_main_title.text = "main" 

682 

683 if ( 

684 version == "2.0" 

685 and loan_format not in direct_epub_formats 

686 and media_info.get("subtitle") 

687 ): 

688 ET.SubElement(metadata, "dc:subtitle").text = media_info["subtitle"] 

689 if version == "3.0" and media_info.get("subtitle"): 

690 sub_title = ET.SubElement(metadata, "dc:title") 

691 sub_title.text = media_info["subtitle"] 

692 sub_title.set("id", "sub-title") 

693 meta_sub_title = ET.SubElement( 

694 metadata, "meta", attrib={"refines": "#sub-title", "property": "title-type"} 

695 ) 

696 meta_sub_title.text = "subtitle" 

697 

698 if version == "3.0" and media_info.get("edition"): 

699 sub_title = ET.SubElement(metadata, "dc:title") 

700 sub_title.text = media_info["edition"] 

701 sub_title.set("id", "edition") 

702 media_edition = ET.SubElement( 

703 metadata, "meta", attrib={"refines": "#edition", "property": "title-type"} 

704 ) 

705 media_edition.text = "edition" 

706 

707 ET.SubElement(metadata, "dc:language").text = media_info["languages"][0]["id"] 

708 identifier = ET.SubElement(metadata, "dc:identifier") 

709 identifier.set("id", "publication-id") 

710 

711 isbn = extract_isbn(media_info["formats"], format_types=[loan_format]) 

712 if isbn: 

713 identifier.text = isbn 

714 if version == "2.0": 

715 identifier.set("opf:scheme", "ISBN") 

716 if version == "3.0": 

717 if len(isbn) in (10, 13): 

718 meta_isbn = ET.SubElement( 

719 metadata, 

720 "meta", 

721 attrib={ 

722 "refines": "#publication-id", 

723 "property": "identifier-type", 

724 "scheme": "onix:codelist5", 

725 }, 

726 ) 

727 # https://ns.editeur.org/onix/en/5 

728 meta_isbn.text = "15" if len(isbn) == 13 else "02" 

729 else: 

730 identifier.text = media_info["id"] 

731 if version == "2.0": 

732 identifier.set("opf:scheme", "overdrive") 

733 if version == "3.0": 

734 identifier.text = media_info["id"] 

735 

736 asin = extract_asin(media_info["formats"]) 

737 if asin: 

738 asin_tag = ET.SubElement(metadata, "dc:identifier") 

739 asin_tag.text = asin 

740 asin_tag.set("id", "asin") 

741 if version == "2.0": 

742 asin_tag.set("opf:scheme", "ASIN") 

743 if version == "3.0": 

744 asin_tag_meta = ET.SubElement( 

745 metadata, 

746 "meta", 

747 attrib={ 

748 "refines": "#asin", 

749 "property": "identifier-type", 

750 }, 

751 ) 

752 asin_tag_meta.text = "ASIN" 

753 

754 # add overdrive id and reserveId 

755 overdrive_id = ET.SubElement(metadata, "dc:identifier") 

756 overdrive_id.text = media_info["id"] 

757 overdrive_id.set("id", "overdrive-id") 

758 overdrive_reserve_id = ET.SubElement(metadata, "dc:identifier") 

759 overdrive_reserve_id.text = media_info["reserveId"] 

760 overdrive_reserve_id.set("id", "overdrive-reserve-id") 

761 if version == "2.0": 

762 overdrive_id.set("opf:scheme", "OverDriveId") 

763 overdrive_reserve_id.set("opf:scheme", "OverDriveReserveId") 

764 if version == "3.0": 

765 overdrive_id_meta = ET.SubElement( 

766 metadata, 

767 "meta", 

768 attrib={ 

769 "refines": "#overdrive-id", 

770 "property": "identifier-type", 

771 }, 

772 ) 

773 overdrive_id_meta.text = "overdrive-id" 

774 

775 overdrive_reserve_id_meta = ET.SubElement( 

776 metadata, 

777 "meta", 

778 attrib={ 

779 "refines": "#overdrive-reserve-id", 

780 "property": "identifier-type", 

781 }, 

782 ) 

783 overdrive_reserve_id_meta.text = "overdrive-reserve-id" 

784 

785 # for magazines, no creators are provided, so we'll patch in the publisher 

786 if media_info.get("publisher", {}).get("name") and not media_info["creators"]: 

787 media_info["creators"] = [ 

788 { 

789 "name": media_info["publisher"]["name"], 

790 "id": media_info["publisher"]["id"], 

791 "role": "Publisher", 

792 } 

793 ] 

794 

795 # Roles https://idpf.org/epub/20/spec/OPF_2.0_final_spec.html#Section2.2.6 

796 for media_role, opf_role in ( 

797 ("Author", "aut"), 

798 ("Narrator", "nrt"), 

799 ("Editor", "edt"), 

800 ("Translator", "trl"), 

801 ("Illustrator", "ill"), 

802 ("Photographer", "pht"), 

803 ("Artist", "art"), 

804 ("Collaborator", "clb"), 

805 ("Other", "oth"), 

806 ("Publisher", "pbl"), 

807 ): 

808 creators = [ 

809 c for c in media_info["creators"] if c.get("role", "") == media_role 

810 ] 

811 for c in creators: 

812 creator = ET.SubElement(metadata, "dc:creator") 

813 creator.text = c["name"] 

814 if version == "2.0": 

815 creator.set("opf:role", opf_role) 

816 if c.get("sortName"): 

817 creator.set("opf:file-as", c["sortName"]) 

818 if version == "3.0": 

819 creator.set("id", f'creator_{c["id"]}') 

820 if c.get("sortName"): 

821 meta_file_as = ET.SubElement( 

822 metadata, 

823 "meta", 

824 attrib={ 

825 "refines": f'#creator_{c["id"]}', 

826 "property": "file-as", 

827 }, 

828 ) 

829 meta_file_as.text = c["sortName"] 

830 meta_role = ET.SubElement( 

831 metadata, 

832 "meta", 

833 attrib={ 

834 "refines": f'#creator_{c["id"]}', 

835 "property": "role", 

836 "scheme": "marc:relators", 

837 }, 

838 ) 

839 meta_role.text = opf_role 

840 

841 if media_info.get("publisher", {}).get("name"): 

842 ET.SubElement(metadata, "dc:publisher").text = media_info["publisher"]["name"] 

843 if media_info.get("description"): 

844 ET.SubElement(metadata, "dc:description").text = media_info["description"] 

845 for s in media_info.get("subject", []): 

846 ET.SubElement(metadata, "dc:subject").text = s["name"] 

847 

848 if version == "2.0" and loan_format not in direct_epub_formats: 

849 for k in media_info.get("keywords", []): 

850 ET.SubElement(metadata, "dc:tag").text = k 

851 if version == "3.0" and media_info.get("bisac"): 

852 for i, bisac in enumerate(media_info["bisac"], start=1): 

853 subject = ET.SubElement(metadata, "dc:subject") 

854 subject.text = bisac["description"] 

855 subject.set("id", f"subject_{i}") 

856 meta_subject_authority = ET.SubElement( 

857 metadata, 

858 "meta", 

859 attrib={"refines": f"#subject_{i}", "property": "authority"}, 

860 ) 

861 meta_subject_authority.text = "BISAC" 

862 meta_subject_term = ET.SubElement( 

863 metadata, 

864 "meta", 

865 attrib={"refines": f"#subject_{i}", "property": "term"}, 

866 ) 

867 meta_subject_term.text = bisac["code"] 

868 

869 publish_date = media_info.get("publishDate") or media_info.get( 

870 "estimatedReleaseDate" 

871 ) 

872 if publish_date: 

873 pub_date = ET.SubElement(metadata, "dc:date") 

874 if version == "2.0": 

875 pub_date.set("opf:event", "publication") 

876 pub_date.text = publish_date 

877 if version == "3.0": 

878 meta_pubdate = ET.SubElement(metadata, "meta") 

879 meta_pubdate.set("property", "dcterms:modified") 

880 meta_pubdate.text = publish_date 

881 

882 if ( 

883 media_info.get("detailedSeries") 

884 or media_info.get("series") 

885 or loan_format == LibbyFormats.MagazineOverDrive 

886 ): 

887 series_info = media_info.get("detailedSeries", {}) 

888 series_name = ( 

889 series_info.get("seriesName") 

890 or media_info.get("series") 

891 or ( 

892 media_info["title"] 

893 if loan_format == LibbyFormats.MagazineOverDrive 

894 else None 

895 ) 

896 ) 

897 if series_name: 

898 ET.SubElement( 

899 metadata, 

900 "meta", 

901 attrib={"name": "calibre:series", "content": series_name}, 

902 ) 

903 if version == "3.0": 

904 meta_series = ET.SubElement( 

905 metadata, 

906 "meta", 

907 attrib={"id": "series-name", "property": "belongs-to-collection"}, 

908 ) 

909 meta_series.text = series_name 

910 meta_series_type = ET.SubElement( 

911 metadata, 

912 "meta", 

913 attrib={"refines": "#series-name", "property": "collection-type"}, 

914 ) 

915 meta_series_type.text = "series" 

916 

917 reading_order = series_info.get("readingOrder", "") 

918 if ( 

919 (not reading_order) 

920 and loan_format == LibbyFormats.MagazineOverDrive 

921 and media_info.get("estimatedReleaseDate") 

922 ): 

923 est_release_date = LibbyClient.parse_datetime( 

924 media_info["estimatedReleaseDate"] 

925 ) 

926 reading_order = f"{est_release_date:%y%j}" # use release date to construct a pseudo reading order 

927 

928 if reading_order: 

929 ET.SubElement( 

930 metadata, 

931 "meta", 

932 attrib={ 

933 "name": "calibre:series_index", 

934 "content": reading_order, 

935 }, 

936 ) 

937 if version == "3.0": 

938 meta_series_pos = ET.SubElement( 

939 metadata, 

940 "meta", 

941 attrib={"refines": "#series-name", "property": "group-position"}, 

942 ) 

943 meta_series_pos.text = reading_order 

944 

945 return package 

946 

947 

948def create_opf( 

949 media_info: Dict, 

950 cover_filename: Optional[Path], 

951 file_tracks: List[Dict], 

952 opf_file_path: Path, 

953 logger: logging.Logger, 

954) -> None: 

955 """ 

956 

957 :param media_info: 

958 :param cover_filename: 

959 :param file_tracks: 

960 :param opf_file_path: 

961 :param logger: 

962 :return: 

963 """ 

964 package = build_opf_package(media_info, version="2.0", loan_format="audiobook-mp3") 

965 manifest = ET.SubElement(package, "manifest") 

966 if cover_filename: 

967 ET.SubElement( 

968 manifest, 

969 "item", 

970 attrib={ 

971 "id": "cover", 

972 "href": cover_filename.name, 

973 "media-type": "image/jpeg", 

974 }, 

975 ) 

976 spine = ET.SubElement(package, "spine") 

977 for f in file_tracks: 

978 file_id = slugify(f["file"].stem) 

979 ET.SubElement( 

980 manifest, 

981 "item", 

982 attrib={ 

983 "id": file_id, 

984 "href": f["file"].name, 

985 "media-type": "audio/mpeg", 

986 }, 

987 ) 

988 ET.SubElement(spine, "itemref", attrib={"idref": file_id}) 

989 

990 tree = ET.ElementTree(package) 

991 tree.write(opf_file_path, xml_declaration=True, encoding="utf-8") 

992 logger.info('Saved "%s"', colored(str(opf_file_path), "magenta"))