Coverage for odmpy/processing/shared.py: 90.7%
332 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-14 08:51 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-14 08:51 +0000
1# Copyright (C) 2023 github.com/ping
2#
3# This file is part of odmpy.
4#
5# odmpy is free software: you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation, either version 3 of the License, or
8# (at your option) any later version.
9#
10# odmpy is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with odmpy. If not, see <http://www.gnu.org/licenses/>.
17#
19import argparse
20import logging
21import subprocess
22import xml.etree.ElementTree as ET
23from pathlib import Path
24from typing import Optional, Dict, List, Tuple
25from urllib.parse import urlparse
27import eyed3 # type: ignore[import]
28import requests
29from eyed3.utils import art # type: ignore[import]
30from iso639 import Lang # type: ignore[import]
31from requests.adapters import HTTPAdapter, Retry
32from termcolor import colored
34from ..constants import PERFORMER_FID, LANGUAGE_FID
35from ..errors import OdmpyRuntimeError
36from ..libby import USER_AGENT, LibbyFormats, LibbyClient
37from ..utils import slugify, sanitize_path, is_windows
40#
41# Shared functions across processing for diff loan types
42#
45def init_session(max_retries: int = 0) -> requests.Session:
46 session = requests.Session()
47 custom_adapter = HTTPAdapter(
48 max_retries=Retry(total=max_retries, backoff_factor=0.1)
49 )
50 for prefix in ("http://", "https://"):
51 session.mount(prefix, custom_adapter)
52 return session
55def generate_names(
56 title: str,
57 series: str,
58 series_reading_order: str,
59 authors: List[str],
60 edition: str,
61 title_id: str,
62 args: argparse.Namespace,
63 logger: logging.Logger,
64) -> Tuple[Path, Path]:
65 """
66 Creates the download folder if necessary and generates the merged book names
68 :param title:
69 :param authors:
70 :param edition:
71 :param title_id:
72 :param series:
73 :param series_reading_order:
74 :param args:
75 :param logger:
76 :return:
77 """
78 book_folder_name = args.book_folder_format % {
79 "Title": sanitize_path(title, exclude_chars=args.remove_from_paths),
80 "Author": sanitize_path(
81 ", ".join(authors), exclude_chars=args.remove_from_paths
82 ),
83 "Series": sanitize_path(series or "", exclude_chars=args.remove_from_paths),
84 "Edition": sanitize_path(edition, exclude_chars=args.remove_from_paths),
85 "ID": sanitize_path(title_id, exclude_chars=args.remove_from_paths),
86 "ReadingOrder": sanitize_path(
87 series_reading_order, exclude_chars=args.remove_from_paths
88 ),
89 }
90 # unlike book_folder_name, we sanitize the entire book file format
91 # because it is expected to be a single name and `os.sep` will be
92 # stripped
93 book_file_format = sanitize_path(
94 args.book_file_format
95 % {
96 "Title": title,
97 "Author": ", ".join(authors),
98 "Series": series or "",
99 "Edition": edition,
100 "ID": title_id,
101 "ReadingOrder": series_reading_order,
102 },
103 exclude_chars=args.remove_from_paths,
104 )
105 # declare book folder/file names here together, so that we can catch problems from too long names
106 book_folder = Path(args.download_dir, book_folder_name)
107 if args.no_book_folder:
108 book_folder = Path(args.download_dir)
110 # for merged mp3
111 book_filename = book_folder.joinpath(f"{book_file_format}.mp3")
113 try:
114 if not book_folder.exists():
115 book_folder.mkdir(parents=True, exist_ok=True)
116 except OSError as os_err:
117 # ref http://www.ioplex.com/~miallen/errcmpp.html
118 # for Windows: OSError: [WinError 123] The filename, directory name, or volume label syntax is incorrect
119 if (is_windows() and os_err.errno != 22) or (
120 os_err.errno not in (36, 63) and not is_windows()
121 ):
122 raise
124 # Ref OSError: [Errno 36] File name too long https://github.com/ping/odmpy/issues/5
125 # create book folder with just the title and first author
126 book_folder_name = args.book_folder_format % {
127 "Title": sanitize_path(title, exclude_chars=args.remove_from_paths),
128 "Author": sanitize_path(authors[0], exclude_chars=args.remove_from_paths)
129 if authors
130 else "",
131 "Series": sanitize_path(series or "", exclude_chars=args.remove_from_paths),
132 "ID": sanitize_path(title_id, exclude_chars=args.remove_from_paths),
133 "ReadingOrder": sanitize_path(
134 series_reading_order, exclude_chars=args.remove_from_paths
135 ),
136 }
137 book_folder = Path(args.download_dir, book_folder_name)
138 if args.no_book_folder:
139 book_folder = Path(args.download_dir)
141 logger.warning(
142 f'Book folder name is too long. Files will be saved in "{book_folder}" instead.'
143 )
144 if not book_folder.exists():
145 book_folder.mkdir(parents=True, exist_ok=True)
147 # also create book name with just one author
148 book_file_format = sanitize_path(
149 args.book_file_format
150 % {
151 "Title": title,
152 "Author": authors[0] if authors else "",
153 "Series": series or "",
154 "Edition": edition,
155 "ID": title_id,
156 "ReadingOrder": series_reading_order,
157 },
158 exclude_chars=args.remove_from_paths,
159 )
160 book_filename = book_folder.joinpath(f"{book_file_format}.mp3")
161 return book_folder, book_filename
164def write_tags(
165 audiofile: eyed3.core.AudioFile,
166 title: str,
167 sub_title: Optional[str],
168 authors: List[str],
169 narrators: Optional[List[str]],
170 publisher: str,
171 description: str,
172 cover_bytes: Optional[bytes],
173 genres: Optional[List[str]],
174 languages: Optional[List[str]],
175 published_date: Optional[str],
176 series: Optional[str],
177 part_number: int,
178 total_parts: int,
179 overdrive_id: str,
180 isbn: Optional[str] = None,
181 overwrite_title: bool = False,
182 always_overwrite: bool = False,
183 delimiter: str = ";",
184) -> None:
185 """
186 Write out ID3 tags to the audiofile
188 :param audiofile:
189 :param title:
190 :param sub_title:
191 :param authors:
192 :param narrators:
193 :param publisher:
194 :param description:
195 :param cover_bytes:
196 :param genres:
197 :param languages:
198 :param published_date:
199 :param series:
200 :param part_number:
201 :param total_parts:
202 :param overdrive_id:
203 :param isbn:
204 :param overwrite_title:
205 :param always_overwrite:
206 :param delimiter:
207 :return:
208 """
209 if not delimiter:
210 delimiter = ";"
212 if not audiofile.tag:
213 audiofile.initTag()
214 if always_overwrite or overwrite_title or not audiofile.tag.title:
215 audiofile.tag.title = str(title)
216 if sub_title and (
217 always_overwrite
218 or not audiofile.tag.getTextFrame(eyed3.id3.frames.SUBTITLE_FID)
219 ):
220 audiofile.tag.setTextFrame(eyed3.id3.frames.SUBTITLE_FID, sub_title)
221 if always_overwrite or not audiofile.tag.album:
222 audiofile.tag.album = str(title)
223 if authors and (always_overwrite or not audiofile.tag.artist):
224 audiofile.tag.artist = delimiter.join([str(a) for a in authors])
225 if authors and (always_overwrite or not audiofile.tag.album_artist):
226 audiofile.tag.album_artist = delimiter.join([str(a) for a in authors])
227 if part_number and (always_overwrite or not audiofile.tag.track_num):
228 audiofile.tag.track_num = (part_number, total_parts)
229 if narrators and (
230 always_overwrite or not audiofile.tag.getTextFrame(PERFORMER_FID)
231 ):
232 audiofile.tag.setTextFrame(
233 PERFORMER_FID, delimiter.join([str(n) for n in narrators])
234 )
235 if publisher and (always_overwrite or not audiofile.tag.publisher):
236 audiofile.tag.publisher = str(publisher)
237 if description and (
238 always_overwrite or eyed3.id3.frames.COMMENT_FID not in audiofile.tag.frame_set
239 ):
240 audiofile.tag.comments.set(str(description), description="Description")
241 if genres and (always_overwrite or not audiofile.tag.genre):
242 audiofile.tag.genre = delimiter.join(genres)
243 if languages and (always_overwrite or not audiofile.tag.getTextFrame(LANGUAGE_FID)):
244 try:
245 tag_langs = [Lang(lang).pt2b for lang in languages]
246 except: # noqa: E722, pylint: disable=bare-except
247 tag_langs = languages
248 audiofile.tag.setTextFrame(LANGUAGE_FID, delimiter.join(tag_langs))
249 if published_date and (always_overwrite or not audiofile.tag.release_date):
250 audiofile.tag.release_date = published_date
251 if cover_bytes:
252 audiofile.tag.images.set(
253 art.TO_ID3_ART_TYPES[art.FRONT_COVER][0],
254 cover_bytes,
255 "image/jpeg",
256 description="Cover",
257 )
258 if series:
259 audiofile.tag.user_text_frames.set(series, "Series")
260 # Output some OD identifiers in the mp3
261 if overdrive_id:
262 audiofile.tag.user_text_frames.set(
263 overdrive_id,
264 "OverDrive Media ID" if overdrive_id.isdigit() else "OverDrive Reserve ID",
265 )
266 if isbn:
267 audiofile.tag.user_text_frames.set(isbn, "ISBN")
270def get_best_cover_url(loan: Dict) -> Optional[str]:
271 """
272 Extracts the highest resolution cover image for the loan
274 :param loan:
275 :return:
276 """
277 covers: List[Dict] = sorted(
278 list(loan.get("covers", []).values()),
279 key=lambda c: c.get("width", 0),
280 reverse=True,
281 )
282 cover_highest_res: Optional[Dict] = next(iter(covers), None)
283 return cover_highest_res["href"] if cover_highest_res else None
286def generate_cover(
287 book_folder: Path,
288 cover_url: Optional[str],
289 session: requests.Session,
290 timeout: int,
291 logger: logging.Logger,
292 force_square: bool = True,
293) -> Tuple[Path, Optional[bytes]]:
294 """
295 Get the book cover
297 :param book_folder:
298 :param cover_url:
299 :param session:
300 :param timeout:
301 :param logger:
302 :param force_square:
303 :return:
304 """
305 cover_filename = book_folder.joinpath("cover.jpg")
306 if not cover_filename.exists() and cover_url:
307 try:
308 if force_square:
309 square_cover_url_params = {
310 "type": "auto",
311 "width": str(510),
312 "height": str(510),
313 "force": "true",
314 "quality": str(80),
315 "url": urlparse(cover_url).path,
316 }
317 # credit: https://github.com/lullius/pylibby/pull/18
318 # this endpoint produces a resized version of the cover
319 cover_res = session.get(
320 "https://ic.od-cdn.com/resize",
321 params=square_cover_url_params,
322 headers={"User-Agent": USER_AGENT},
323 timeout=timeout,
324 )
325 else:
326 cover_res = session.get(
327 cover_url, headers={"User-Agent": USER_AGENT}, timeout=timeout
328 )
329 cover_res.raise_for_status()
330 with cover_filename.open("wb") as outfile:
331 outfile.write(cover_res.content)
332 except requests.exceptions.HTTPError as he:
333 if not force_square:
334 logger.warning(
335 "Error downloading cover: %s",
336 colored(str(he), "red", attrs=["bold"]),
337 )
338 else:
339 logger.warning(
340 "Error downloading square cover: %s",
341 colored(str(he), "red", attrs=["bold"]),
342 )
343 # fallback to original cover url
344 try:
345 cover_res = session.get(
346 cover_url,
347 headers={"User-Agent": USER_AGENT},
348 timeout=timeout,
349 )
350 cover_res.raise_for_status()
351 with cover_filename.open("wb") as outfile:
352 outfile.write(cover_res.content)
353 except requests.exceptions.HTTPError as he2:
354 logger.warning(
355 "Error downloading cover: %s",
356 colored(str(he2), "red", attrs=["bold"]),
357 )
359 cover_bytes: Optional[bytes] = None
360 if cover_filename.exists():
361 with cover_filename.open("rb") as f:
362 cover_bytes = f.read()
364 return cover_filename, cover_bytes
367def merge_into_mp3(
368 book_filename: Path,
369 file_tracks: List[Dict],
370 audio_bitrate: int,
371 ffmpeg_loglevel: str,
372 hide_progress: bool,
373 logger: logging.Logger,
374) -> None:
375 """
376 Merge the files into a single mp3
378 :param book_filename: mp3 file name
379 :param file_tracks:
380 :param audio_bitrate:
381 :param ffmpeg_loglevel:
382 :param hide_progress:
383 :param logger:
384 :return:
385 """
387 # We can't directly generate a m4b here even if specified because eyed3 doesn't support m4b/mp4
388 temp_book_filename = book_filename.with_suffix(".part")
389 cmd = [
390 "ffmpeg",
391 "-y",
392 "-nostdin",
393 "-hide_banner",
394 "-loglevel",
395 ffmpeg_loglevel,
396 ]
397 if not hide_progress:
398 cmd.append("-stats")
399 cmd.extend(
400 [
401 "-i",
402 f"concat:{'|'.join([str(ft['file']) for ft in file_tracks])}",
403 "-acodec",
404 "copy",
405 "-vcodec",
406 "copy",
407 "-b:a",
408 f"{audio_bitrate}k"
409 if audio_bitrate
410 else "64k", # explicitly set audio bitrate
411 "-f",
412 "mp3",
413 str(temp_book_filename),
414 ]
415 )
416 exit_code = subprocess.call(cmd)
417 if exit_code:
418 logger.error(f"ffmpeg exited with the code: {exit_code!s}")
419 logger.error(f"Command: {' '.join(cmd)!s}")
420 raise OdmpyRuntimeError("ffmpeg exited with a non-zero code")
422 temp_book_filename.replace(book_filename)
425def convert_to_m4b(
426 book_filename: Path,
427 book_m4b_filename: Path,
428 cover_filename: Path,
429 merge_codec: str,
430 audio_bitrate: int,
431 ffmpeg_loglevel: str,
432 hide_progress: str,
433 logger: logging.Logger,
434) -> None:
435 """
436 Converts the merged mp3 into a m4b
438 :param book_filename: mp3 file name
439 :param book_m4b_filename:
440 :param cover_filename:
441 :param merge_codec:
442 :param audio_bitrate:
443 :param ffmpeg_loglevel:
444 :param hide_progress:
445 :param logger:
446 :return:
447 """
448 temp_book_m4b_filename = book_m4b_filename.with_suffix(".part")
449 cmd = [
450 "ffmpeg",
451 "-y",
452 "-nostdin",
453 "-hide_banner",
454 "-loglevel",
455 ffmpeg_loglevel,
456 ]
457 if not hide_progress:
458 cmd.append("-stats")
459 cmd.extend(
460 [
461 "-i",
462 str(book_filename),
463 ]
464 )
465 if cover_filename.exists():
466 cmd.extend(["-i", str(cover_filename)])
468 cmd.extend(
469 [
470 "-map",
471 "0:a",
472 "-c:a",
473 merge_codec,
474 "-b:a",
475 f"{audio_bitrate}k"
476 if audio_bitrate
477 else "64k", # explicitly set audio bitrate
478 ]
479 )
480 if cover_filename.exists():
481 cmd.extend(
482 [
483 "-map",
484 "1:v",
485 "-c:v",
486 "copy",
487 "-disposition:v:0",
488 "attached_pic",
489 ]
490 )
492 cmd.extend(["-f", "mp4", str(temp_book_m4b_filename)])
493 exit_code = subprocess.call(cmd)
494 if exit_code:
495 logger.error(f"ffmpeg exited with the code: {exit_code!s}")
496 logger.error(f"Command: {' '.join(cmd)!s}")
497 raise OdmpyRuntimeError("ffmpeg exited with a non-zero code")
499 temp_book_m4b_filename.rename(book_m4b_filename)
500 logger.info('Merged files into "%s"', colored(str(book_m4b_filename), "magenta"))
501 try:
502 book_filename.unlink()
503 except Exception as e: # pylint: disable=broad-except
504 logger.warning(f'Error deleting "{book_filename}": {str(e)}')
507def remux_mp3(
508 part_tmp_filename: Path,
509 part_filename: Path,
510 ffmpeg_loglevel: str,
511 logger: logging.Logger,
512) -> None:
513 """
514 Try to remux file to remove mp3 lame tag errors
516 :param part_tmp_filename:
517 :param part_filename:
518 :param ffmpeg_loglevel:
519 :param logger:
520 :return:
521 """
522 cmd = [
523 "ffmpeg",
524 "-y",
525 "-nostdin",
526 "-hide_banner",
527 "-loglevel",
528 ffmpeg_loglevel,
529 "-i",
530 str(part_tmp_filename),
531 "-c:a",
532 "copy",
533 "-c:v",
534 "copy",
535 str(part_filename),
536 ]
537 try:
538 exit_code = subprocess.call(cmd)
539 if exit_code:
540 logger.warning(f"ffmpeg exited with the code: {exit_code!s}")
541 logger.warning(f"Command: {' '.join(cmd)!s}")
542 part_tmp_filename.rename(part_filename)
543 else:
544 part_tmp_filename.unlink()
545 except Exception as ffmpeg_ex: # pylint: disable=broad-except
546 logger.warning(f"Error executing ffmpeg: {str(ffmpeg_ex)}")
547 part_tmp_filename.rename(part_filename)
550def extract_authors_from_openbook(openbook: Dict) -> List[str]:
551 """
552 Extract list of author names from openbook
554 :param openbook:
555 :return:
556 """
557 creators = openbook.get("creator", [])
558 return (
559 [c["name"] for c in creators if c.get("role", "") == "author"]
560 or [c["name"] for c in creators if c.get("role", "") == "editor"]
561 or [c["name"] for c in creators]
562 )
565def extract_asin(formats: List[Dict]) -> str:
566 """
567 Extract Amazon's ASIN from media_info["formats"]
569 :param formats:
570 :return:
571 """
572 for media_format in [
573 f
574 for f in formats
575 if [i for i in f.get("identifiers", []) if i["type"] == "ASIN"]
576 ]:
577 asin = next(
578 iter(
579 [
580 identifier["value"]
581 for identifier in media_format.get("identifiers", [])
582 if identifier["type"] == "ASIN"
583 ]
584 ),
585 "",
586 )
587 if asin:
588 return asin
589 return ""
592def extract_isbn(formats: List[Dict], format_types: List[str]) -> str:
593 """
594 Extract ISBN from media_info["formats"]
596 :param formats:
597 :param format_types:
598 :return:
599 """
600 # a format can contain 2 different "ISBN"s.. one type "ISBN", and another "LibraryISBN"
601 # in format["identifiers"]
602 # format["isbn"] reflects the "LibraryISBN" value
604 isbn = next(
605 iter([f["isbn"] for f in formats if f["id"] in format_types and f.get("isbn")]),
606 "",
607 )
608 if isbn:
609 return isbn
611 for isbn_type in ("LibraryISBN", "ISBN"):
612 for media_format in [
613 f
614 for f in formats
615 if f["id"] in format_types
616 and [i for i in f.get("identifiers", []) if i["type"] == isbn_type]
617 ]:
618 isbn = next(
619 iter(
620 [
621 identifier["value"]
622 for identifier in media_format.get("identifiers", [])
623 if identifier["type"] == isbn_type
624 ]
625 ),
626 "",
627 )
628 if isbn:
629 return isbn
631 return ""
634def build_opf_package(
635 media_info: Dict, version: str = "2.0", loan_format: str = LibbyFormats.AudioBookMP3
636) -> ET.Element:
637 """
638 Build the package element from media_info.
640 :param media_info:
641 :param version:
642 :param loan_format:
643 :return:
644 """
646 # References:
647 # Version 2: https://idpf.org/epub/20/spec/OPF_2.0_final_spec.html#Section2.0
648 # Version 3: https://www.w3.org/TR/epub-33/#sec-package-doc
649 direct_epub_formats = [LibbyFormats.EBookOverdrive, LibbyFormats.MagazineOverDrive]
650 ET.register_namespace("opf", "http://www.idpf.org/2007/opf")
651 ET.register_namespace("dc", "http://purl.org/dc/elements/1.1/")
652 package = ET.Element(
653 "package",
654 attrib={
655 "version": version,
656 "xmlns": "http://www.idpf.org/2007/opf",
657 "unique-identifier": "publication-id",
658 },
659 )
660 metadata = ET.SubElement(
661 package,
662 "metadata",
663 attrib={
664 "xmlns:dc": "http://purl.org/dc/elements/1.1/",
665 "xmlns:opf": "http://www.idpf.org/2007/opf",
666 },
667 )
668 title = ET.SubElement(metadata, "dc:title")
669 title.text = media_info["title"]
670 if loan_format == LibbyFormats.MagazineOverDrive and media_info.get("edition"):
671 # for magazines, put the edition into the title to ensure some uniqueness
672 title.text = f'{media_info["title"]} - {media_info["edition"]}'
674 if version == "3.0":
675 title.set("id", "main-title")
676 meta_main_title = ET.SubElement(
677 metadata,
678 "meta",
679 attrib={"refines": "#main-title", "property": "title-type"},
680 )
681 meta_main_title.text = "main"
683 if (
684 version == "2.0"
685 and loan_format not in direct_epub_formats
686 and media_info.get("subtitle")
687 ):
688 ET.SubElement(metadata, "dc:subtitle").text = media_info["subtitle"]
689 if version == "3.0" and media_info.get("subtitle"):
690 sub_title = ET.SubElement(metadata, "dc:title")
691 sub_title.text = media_info["subtitle"]
692 sub_title.set("id", "sub-title")
693 meta_sub_title = ET.SubElement(
694 metadata, "meta", attrib={"refines": "#sub-title", "property": "title-type"}
695 )
696 meta_sub_title.text = "subtitle"
698 if version == "3.0" and media_info.get("edition"):
699 sub_title = ET.SubElement(metadata, "dc:title")
700 sub_title.text = media_info["edition"]
701 sub_title.set("id", "edition")
702 media_edition = ET.SubElement(
703 metadata, "meta", attrib={"refines": "#edition", "property": "title-type"}
704 )
705 media_edition.text = "edition"
707 ET.SubElement(metadata, "dc:language").text = media_info["languages"][0]["id"]
708 identifier = ET.SubElement(metadata, "dc:identifier")
709 identifier.set("id", "publication-id")
711 isbn = extract_isbn(media_info["formats"], format_types=[loan_format])
712 if isbn:
713 identifier.text = isbn
714 if version == "2.0":
715 identifier.set("opf:scheme", "ISBN")
716 if version == "3.0":
717 if len(isbn) in (10, 13):
718 meta_isbn = ET.SubElement(
719 metadata,
720 "meta",
721 attrib={
722 "refines": "#publication-id",
723 "property": "identifier-type",
724 "scheme": "onix:codelist5",
725 },
726 )
727 # https://ns.editeur.org/onix/en/5
728 meta_isbn.text = "15" if len(isbn) == 13 else "02"
729 else:
730 identifier.text = media_info["id"]
731 if version == "2.0":
732 identifier.set("opf:scheme", "overdrive")
733 if version == "3.0":
734 identifier.text = media_info["id"]
736 asin = extract_asin(media_info["formats"])
737 if asin:
738 asin_tag = ET.SubElement(metadata, "dc:identifier")
739 asin_tag.text = asin
740 asin_tag.set("id", "asin")
741 if version == "2.0":
742 asin_tag.set("opf:scheme", "ASIN")
743 if version == "3.0":
744 asin_tag_meta = ET.SubElement(
745 metadata,
746 "meta",
747 attrib={
748 "refines": "#asin",
749 "property": "identifier-type",
750 },
751 )
752 asin_tag_meta.text = "ASIN"
754 # add overdrive id and reserveId
755 overdrive_id = ET.SubElement(metadata, "dc:identifier")
756 overdrive_id.text = media_info["id"]
757 overdrive_id.set("id", "overdrive-id")
758 overdrive_reserve_id = ET.SubElement(metadata, "dc:identifier")
759 overdrive_reserve_id.text = media_info["reserveId"]
760 overdrive_reserve_id.set("id", "overdrive-reserve-id")
761 if version == "2.0":
762 overdrive_id.set("opf:scheme", "OverDriveId")
763 overdrive_reserve_id.set("opf:scheme", "OverDriveReserveId")
764 if version == "3.0":
765 overdrive_id_meta = ET.SubElement(
766 metadata,
767 "meta",
768 attrib={
769 "refines": "#overdrive-id",
770 "property": "identifier-type",
771 },
772 )
773 overdrive_id_meta.text = "overdrive-id"
775 overdrive_reserve_id_meta = ET.SubElement(
776 metadata,
777 "meta",
778 attrib={
779 "refines": "#overdrive-reserve-id",
780 "property": "identifier-type",
781 },
782 )
783 overdrive_reserve_id_meta.text = "overdrive-reserve-id"
785 # for magazines, no creators are provided, so we'll patch in the publisher
786 if media_info.get("publisher", {}).get("name") and not media_info["creators"]:
787 media_info["creators"] = [
788 {
789 "name": media_info["publisher"]["name"],
790 "id": media_info["publisher"]["id"],
791 "role": "Publisher",
792 }
793 ]
795 # Roles https://idpf.org/epub/20/spec/OPF_2.0_final_spec.html#Section2.2.6
796 for media_role, opf_role in (
797 ("Author", "aut"),
798 ("Narrator", "nrt"),
799 ("Editor", "edt"),
800 ("Translator", "trl"),
801 ("Illustrator", "ill"),
802 ("Photographer", "pht"),
803 ("Artist", "art"),
804 ("Collaborator", "clb"),
805 ("Other", "oth"),
806 ("Publisher", "pbl"),
807 ):
808 creators = [
809 c for c in media_info["creators"] if c.get("role", "") == media_role
810 ]
811 for c in creators:
812 creator = ET.SubElement(metadata, "dc:creator")
813 creator.text = c["name"]
814 if version == "2.0":
815 creator.set("opf:role", opf_role)
816 if c.get("sortName"):
817 creator.set("opf:file-as", c["sortName"])
818 if version == "3.0":
819 creator.set("id", f'creator_{c["id"]}')
820 if c.get("sortName"):
821 meta_file_as = ET.SubElement(
822 metadata,
823 "meta",
824 attrib={
825 "refines": f'#creator_{c["id"]}',
826 "property": "file-as",
827 },
828 )
829 meta_file_as.text = c["sortName"]
830 meta_role = ET.SubElement(
831 metadata,
832 "meta",
833 attrib={
834 "refines": f'#creator_{c["id"]}',
835 "property": "role",
836 "scheme": "marc:relators",
837 },
838 )
839 meta_role.text = opf_role
841 if media_info.get("publisher", {}).get("name"):
842 ET.SubElement(metadata, "dc:publisher").text = media_info["publisher"]["name"]
843 if media_info.get("description"):
844 ET.SubElement(metadata, "dc:description").text = media_info["description"]
845 for s in media_info.get("subject", []):
846 ET.SubElement(metadata, "dc:subject").text = s["name"]
848 if version == "2.0" and loan_format not in direct_epub_formats:
849 for k in media_info.get("keywords", []):
850 ET.SubElement(metadata, "dc:tag").text = k
851 if version == "3.0" and media_info.get("bisac"):
852 for i, bisac in enumerate(media_info["bisac"], start=1):
853 subject = ET.SubElement(metadata, "dc:subject")
854 subject.text = bisac["description"]
855 subject.set("id", f"subject_{i}")
856 meta_subject_authority = ET.SubElement(
857 metadata,
858 "meta",
859 attrib={"refines": f"#subject_{i}", "property": "authority"},
860 )
861 meta_subject_authority.text = "BISAC"
862 meta_subject_term = ET.SubElement(
863 metadata,
864 "meta",
865 attrib={"refines": f"#subject_{i}", "property": "term"},
866 )
867 meta_subject_term.text = bisac["code"]
869 publish_date = media_info.get("publishDate") or media_info.get(
870 "estimatedReleaseDate"
871 )
872 if publish_date:
873 pub_date = ET.SubElement(metadata, "dc:date")
874 if version == "2.0":
875 pub_date.set("opf:event", "publication")
876 pub_date.text = publish_date
877 if version == "3.0":
878 meta_pubdate = ET.SubElement(metadata, "meta")
879 meta_pubdate.set("property", "dcterms:modified")
880 meta_pubdate.text = publish_date
882 if (
883 media_info.get("detailedSeries")
884 or media_info.get("series")
885 or loan_format == LibbyFormats.MagazineOverDrive
886 ):
887 series_info = media_info.get("detailedSeries", {})
888 series_name = (
889 series_info.get("seriesName")
890 or media_info.get("series")
891 or (
892 media_info["title"]
893 if loan_format == LibbyFormats.MagazineOverDrive
894 else None
895 )
896 )
897 if series_name:
898 ET.SubElement(
899 metadata,
900 "meta",
901 attrib={"name": "calibre:series", "content": series_name},
902 )
903 if version == "3.0":
904 meta_series = ET.SubElement(
905 metadata,
906 "meta",
907 attrib={"id": "series-name", "property": "belongs-to-collection"},
908 )
909 meta_series.text = series_name
910 meta_series_type = ET.SubElement(
911 metadata,
912 "meta",
913 attrib={"refines": "#series-name", "property": "collection-type"},
914 )
915 meta_series_type.text = "series"
917 reading_order = series_info.get("readingOrder", "")
918 if (
919 (not reading_order)
920 and loan_format == LibbyFormats.MagazineOverDrive
921 and media_info.get("estimatedReleaseDate")
922 ):
923 est_release_date = LibbyClient.parse_datetime(
924 media_info["estimatedReleaseDate"]
925 )
926 reading_order = f"{est_release_date:%y%j}" # use release date to construct a pseudo reading order
928 if reading_order:
929 ET.SubElement(
930 metadata,
931 "meta",
932 attrib={
933 "name": "calibre:series_index",
934 "content": reading_order,
935 },
936 )
937 if version == "3.0":
938 meta_series_pos = ET.SubElement(
939 metadata,
940 "meta",
941 attrib={"refines": "#series-name", "property": "group-position"},
942 )
943 meta_series_pos.text = reading_order
945 return package
948def create_opf(
949 media_info: Dict,
950 cover_filename: Optional[Path],
951 file_tracks: List[Dict],
952 opf_file_path: Path,
953 logger: logging.Logger,
954) -> None:
955 """
957 :param media_info:
958 :param cover_filename:
959 :param file_tracks:
960 :param opf_file_path:
961 :param logger:
962 :return:
963 """
964 package = build_opf_package(media_info, version="2.0", loan_format="audiobook-mp3")
965 manifest = ET.SubElement(package, "manifest")
966 if cover_filename:
967 ET.SubElement(
968 manifest,
969 "item",
970 attrib={
971 "id": "cover",
972 "href": cover_filename.name,
973 "media-type": "image/jpeg",
974 },
975 )
976 spine = ET.SubElement(package, "spine")
977 for f in file_tracks:
978 file_id = slugify(f["file"].stem)
979 ET.SubElement(
980 manifest,
981 "item",
982 attrib={
983 "id": file_id,
984 "href": f["file"].name,
985 "media-type": "audio/mpeg",
986 },
987 )
988 ET.SubElement(spine, "itemref", attrib={"idref": file_id})
990 tree = ET.ElementTree(package)
991 tree.write(opf_file_path, xml_declaration=True, encoding="utf-8")
992 logger.info('Saved "%s"', colored(str(opf_file_path), "magenta"))