Coverage for odmpy/processing/audiobook.py: 84.6%
169 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-14 08:51 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-14 08:51 +0000
1# Copyright (C) 2023 github.com/ping
2#
3# This file is part of odmpy.
4#
5# odmpy is free software: you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation, either version 3 of the License, or
8# (at your option) any later version.
9#
10# odmpy is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with odmpy. If not, see <http://www.gnu.org/licenses/>.
17#
19import argparse
20import datetime
21import json
22import logging
23import shutil
24from typing import Optional, Any, Dict, List
25from typing import OrderedDict as OrderedDictType
27import eyed3 # type: ignore[import]
28import requests
29from eyed3.id3 import ID3_DEFAULT_VERSION, ID3_V2_3, ID3_V2_4 # type: ignore[import]
30from requests.exceptions import HTTPError, ConnectionError
31from termcolor import colored
32from tqdm import tqdm
34from .shared import (
35 generate_names,
36 write_tags,
37 generate_cover,
38 remux_mp3,
39 merge_into_mp3,
40 convert_to_m4b,
41 create_opf,
42 get_best_cover_url,
43 extract_isbn,
44)
45from ..errors import OdmpyRuntimeError
46from ..libby import USER_AGENT, merge_toc, PartMeta, LibbyFormats
47from ..overdrive import OverDriveClient
48from ..utils import slugify, plural_or_singular_noun as ps
51#
52# Main processing logic for libby direct audiobook loans
53#
56def process_audiobook_loan(
57 loan: Dict,
58 openbook: Dict,
59 parsed_toc: OrderedDictType[str, PartMeta],
60 session: requests.Session,
61 args: argparse.Namespace,
62 logger: logging.Logger,
63) -> None:
64 """
65 Download the audiobook loan directly via Libby without the use of
66 an odm file
68 :param loan:
69 :param openbook:
70 :param parsed_toc:
71 :param session: From `LibbyClient.libby_session` because it contains a needed auth cookie
72 :param args:
73 :param logger:
74 :return:
75 """
77 ffmpeg_loglevel = "info" if logger.level == logging.DEBUG else "fatal"
78 id3v2_version = ID3_DEFAULT_VERSION
79 if args.id3v2_version == 3:
80 id3v2_version = ID3_V2_3
81 if args.id3v2_version == 4:
82 id3v2_version = ID3_V2_4
84 title = loan["title"]
85 overdrive_media_id = loan["id"]
86 sub_title = loan.get("subtitle", None)
87 cover_url = get_best_cover_url(loan)
88 authors = [
89 c["name"] for c in openbook.get("creator", []) if c.get("role", "") == "author"
90 ]
91 if not authors:
92 authors = [
93 c["name"]
94 for c in openbook.get("creator", [])
95 if c.get("role", "") == "editor"
96 ]
97 if not authors:
98 authors = [c["name"] for c in openbook.get("creator", [])]
99 narrators = [
100 c["name"]
101 for c in openbook.get("creator", [])
102 if c.get("role", "") == "narrator"
103 ]
104 languages: Optional[List[str]] = (
105 [str(openbook.get("language"))] if openbook.get("language") else []
106 )
107 subjects = [subj["name"] for subj in loan.get("subjects", []) if subj.get("name")]
108 publish_date = loan.get("publishDate", None)
109 publisher = loan.get("publisherAccount", {}).get("name", "") or ""
110 series = loan.get("series", "")
111 series_reading_order = loan.get("detailedSeries", {}).get("readingOrder", "")
112 description = (
113 openbook.get("description", {}).get("full", "")
114 or openbook.get("description", {}).get("short")
115 or ""
116 )
117 debug_meta: Dict[str, Any] = {
118 "meta": {
119 "title": title,
120 "coverUrl": cover_url,
121 "authors": authors,
122 "publisher": publisher,
123 "description": description,
124 }
125 }
127 download_parts: List[PartMeta] = list(parsed_toc.values()) # noqa
128 debug_meta["download_parts"] = []
129 for p in download_parts:
130 chapters = [
131 {"title": m.title, "start": m.start_second, "end": m.end_second}
132 for m in p["chapters"]
133 ]
134 debug_meta["download_parts"].append(
135 {
136 "url": p["url"],
137 "audio-duration": p["audio-duration"],
138 "file-length": p["file-length"],
139 "spine-position": p["spine-position"],
140 "chapters": chapters,
141 }
142 )
144 logger.info(
145 f'Downloading "{colored(title, "blue", attrs=["bold"])}" '
146 f'by "{colored(", ".join(authors), "blue", attrs=["bold"])}" '
147 f'in {len(download_parts)} {ps(len(download_parts), "part")}...'
148 )
150 book_folder, book_filename = generate_names(
151 title=title,
152 series=series,
153 series_reading_order=series_reading_order,
154 authors=authors,
155 edition=loan.get("edition") or "",
156 title_id=loan["id"],
157 args=args,
158 logger=logger,
159 )
160 book_m4b_filename = book_filename.with_suffix(".m4b")
162 # check early if a merged file is already saved
163 if (
164 args.merge_output
165 and (
166 book_filename if args.merge_format == "mp3" else book_m4b_filename
167 ).exists()
168 ):
169 logger.warning(
170 'Already saved "%s"',
171 colored(
172 str(book_filename if args.merge_format == "mp3" else book_m4b_filename),
173 "magenta",
174 ),
175 )
176 return
178 if args.is_debug_mode:
179 with book_folder.joinpath("loan.json").open("w", encoding="utf-8") as f:
180 json.dump(loan, f, indent=2)
182 with book_folder.joinpath("openbook.json").open("w", encoding="utf-8") as f:
183 json.dump(openbook, f, indent=2)
185 cover_filename, cover_bytes = generate_cover(
186 book_folder=book_folder,
187 cover_url=cover_url,
188 session=session,
189 timeout=args.timeout,
190 logger=logger,
191 )
193 keep_cover = args.always_keep_cover
194 file_tracks = []
195 audio_bitrate = 0
196 for p in download_parts:
197 part_number = p["spine-position"] + 1
198 part_filename = book_folder.joinpath(
199 f"{slugify(f'{title} - Part {part_number:02d}', allow_unicode=True)}.mp3"
200 )
201 part_tmp_filename = part_filename.with_suffix(".part")
202 part_file_size = p["file-length"]
203 part_download_url = p["url"]
205 if part_filename.exists():
206 logger.warning("Already saved %s", colored(str(part_filename), "magenta"))
207 else:
208 try:
209 already_downloaded_len = 0
210 if part_tmp_filename.exists():
211 already_downloaded_len = part_tmp_filename.stat().st_size
213 part_download_res = session.get(
214 part_download_url,
215 headers={
216 "User-Agent": USER_AGENT,
217 "Range": f"bytes={already_downloaded_len}-"
218 if already_downloaded_len
219 else None,
220 },
221 timeout=args.timeout,
222 stream=True,
223 )
224 part_download_res.raise_for_status()
226 with tqdm.wrapattr(
227 part_download_res.raw,
228 "read",
229 total=part_file_size,
230 initial=already_downloaded_len,
231 desc=f"Part {part_number:2d}",
232 disable=args.hide_progress,
233 ) as res_raw:
234 with part_tmp_filename.open(
235 "ab" if already_downloaded_len else "wb"
236 ) as outfile:
237 shutil.copyfileobj(res_raw, outfile)
239 # try to remux file to remove mp3 lame tag errors
240 remux_mp3(
241 part_tmp_filename=part_tmp_filename,
242 part_filename=part_filename,
243 ffmpeg_loglevel=ffmpeg_loglevel,
244 logger=logger,
245 )
247 except HTTPError as he:
248 logger.error(f"HTTPError: {str(he)}")
249 logger.debug(he.response.content)
250 raise OdmpyRuntimeError("HTTP Error while downloading part file.")
252 except ConnectionError as ce:
253 logger.error(f"ConnectionError: {str(ce)}")
254 raise OdmpyRuntimeError("Connection Error while downloading part file.")
256 # Save id3 info only on new download, ref #42
257 # This also makes handling of part files consistent with merged files
258 try:
259 # Fill id3 info for mp3 part
260 audiofile = eyed3.load(part_filename)
261 variable_bitrate, audio_bitrate = audiofile.info.bit_rate
262 if variable_bitrate:
263 # don't use vbr
264 audio_bitrate = 0
265 write_tags(
266 audiofile=audiofile,
267 title=title,
268 sub_title=sub_title,
269 authors=authors,
270 narrators=narrators,
271 publisher=publisher,
272 description=description,
273 cover_bytes=cover_bytes,
274 genres=subjects,
275 languages=languages,
276 published_date=publish_date,
277 series=series,
278 part_number=part_number,
279 total_parts=len(download_parts),
280 overdrive_id=overdrive_media_id,
281 isbn=extract_isbn(
282 loan.get("formats", []), [LibbyFormats.AudioBookMP3]
283 ),
284 always_overwrite=args.overwrite_tags,
285 delimiter=args.tag_delimiter,
286 )
287 audiofile.tag.save(version=id3v2_version)
289 if (
290 args.add_chapters
291 and not args.merge_output
292 and (args.overwrite_tags or not audiofile.tag.table_of_contents)
293 ):
294 if args.overwrite_tags and audiofile.tag.table_of_contents:
295 # Clear existing toc to prevent "There may only be one top-level table of contents.
296 # Toc 'b'toc'' is current top-level." error
297 for f in list(audiofile.tag.table_of_contents):
298 audiofile.tag.table_of_contents.remove(f.element_id) # type: ignore[attr-defined]
300 toc = audiofile.tag.table_of_contents.set(
301 "toc".encode("ascii"),
302 toplevel=True,
303 ordered=True,
304 child_ids=[],
305 description="Table of Contents",
306 )
307 chapter_marks = p["chapters"]
308 for i, m in enumerate(chapter_marks):
309 title_frameset = eyed3.id3.frames.FrameSet()
310 title_frameset.setTextFrame(eyed3.id3.frames.TITLE_FID, m.title)
311 chap = audiofile.tag.chapters.set(
312 f"ch{i:02d}".encode("ascii"),
313 times=(
314 round(m.start_second * 1000),
315 round(m.end_second * 1000),
316 ),
317 sub_frames=title_frameset,
318 )
319 toc.child_ids.append(chap.element_id)
320 start_time = datetime.timedelta(seconds=m.start_second)
321 end_time = datetime.timedelta(seconds=m.end_second)
322 logger.debug(
323 'Added chap tag => %s: %s-%s "%s" to "%s"',
324 colored(f"ch{i:02d}", "cyan"),
325 start_time,
326 end_time,
327 colored(m.title, "cyan"),
328 colored(str(part_filename), "blue"),
329 )
330 audiofile.tag.save(version=id3v2_version)
332 except Exception as e: # pylint: disable=broad-except
333 logger.warning(
334 "Error saving ID3: %s", colored(str(e), "red", attrs=["bold"])
335 )
336 keep_cover = True
338 logger.info('Saved "%s"', colored(str(part_filename), "magenta"))
340 file_tracks.append({"file": part_filename})
342 debug_meta["file_tracks"] = [{"file": str(ft["file"])} for ft in file_tracks]
343 if args.merge_output:
344 logger.info(
345 'Generating "%s"...',
346 colored(
347 str(book_filename if args.merge_format == "mp3" else book_m4b_filename),
348 "magenta",
349 ),
350 )
352 merge_into_mp3(
353 book_filename=book_filename,
354 file_tracks=file_tracks,
355 audio_bitrate=audio_bitrate,
356 ffmpeg_loglevel=ffmpeg_loglevel,
357 hide_progress=args.hide_progress,
358 logger=logger,
359 )
361 audiofile = eyed3.load(book_filename)
362 write_tags(
363 audiofile=audiofile,
364 title=title,
365 sub_title=sub_title,
366 authors=authors,
367 narrators=narrators,
368 publisher=publisher,
369 description=description,
370 cover_bytes=cover_bytes,
371 genres=subjects,
372 languages=languages,
373 published_date=publish_date,
374 series=series,
375 part_number=0,
376 total_parts=0,
377 overdrive_id=overdrive_media_id,
378 isbn=extract_isbn(loan.get("formats", []), [LibbyFormats.AudioBookMP3]),
379 always_overwrite=args.overwrite_tags,
380 delimiter=args.tag_delimiter,
381 )
383 if args.add_chapters and (
384 args.overwrite_tags or not audiofile.tag.table_of_contents
385 ):
386 if args.overwrite_tags and audiofile.tag.table_of_contents:
387 # Clear existing toc to prevent "There may only be one top-level table of contents.
388 # Toc 'b'toc'' is current top-level." error
389 for f in list(audiofile.tag.table_of_contents):
390 audiofile.tag.table_of_contents.remove(f.element_id) # type: ignore[attr-defined]
392 toc = audiofile.tag.table_of_contents.set(
393 "toc".encode("ascii"),
394 toplevel=True,
395 ordered=True,
396 child_ids=[],
397 description="Table of Contents",
398 )
399 merged_markers = merge_toc(parsed_toc)
400 debug_meta["merged_markers"] = [
401 {"title": m.title, "start": m.start_second, "end": m.end_second}
402 for m in merged_markers
403 ]
405 for i, m in enumerate(merged_markers):
406 title_frameset = eyed3.id3.frames.FrameSet()
407 title_frameset.setTextFrame(eyed3.id3.frames.TITLE_FID, m.title)
408 chap = audiofile.tag.chapters.set(
409 f"ch{i}".encode("ascii"),
410 times=(round(m.start_second * 1000), round(m.end_second * 1000)),
411 sub_frames=title_frameset,
412 )
413 toc.child_ids.append(chap.element_id)
414 start_time = datetime.timedelta(seconds=m.start_second)
415 end_time = datetime.timedelta(seconds=m.end_second)
416 logger.debug(
417 'Added chap tag => %s: %s-%s "%s" to "%s"',
418 colored(f"ch{i}", "cyan"),
419 start_time,
420 end_time,
421 colored(m.title, "cyan"),
422 colored(str(book_filename), "blue"),
423 )
425 audiofile.tag.save(version=id3v2_version)
427 if args.merge_format == "mp3":
428 logger.info(
429 'Merged files into "%s"',
430 colored(
431 str(
432 book_filename
433 if args.merge_format == "mp3"
434 else book_m4b_filename
435 ),
436 "magenta",
437 ),
438 )
440 if args.merge_format == "m4b":
441 convert_to_m4b(
442 book_filename=book_filename,
443 book_m4b_filename=book_m4b_filename,
444 cover_filename=cover_filename,
445 merge_codec=args.merge_codec,
446 audio_bitrate=audio_bitrate,
447 ffmpeg_loglevel=ffmpeg_loglevel,
448 hide_progress=args.hide_progress,
449 logger=logger,
450 )
452 if not args.keep_mp3:
453 for file_track in file_tracks:
454 try:
455 file_track["file"].unlink()
456 except Exception as e: # pylint: disable=broad-except
457 logger.warning(f'Error deleting "{file_track["file"]}": {str(e)}')
459 if not keep_cover and cover_filename.exists():
460 try:
461 cover_filename.unlink()
462 except Exception as e: # pylint: disable=broad-except
463 logger.warning(f'Error deleting "{cover_filename}": {str(e)}')
465 if args.generate_opf:
466 if args.merge_output:
467 opf_file_path = book_filename.with_suffix(".opf")
468 else:
469 opf_file_path = book_folder.joinpath(
470 f"{slugify(title, allow_unicode=True)}.opf"
471 )
472 if not opf_file_path.exists():
473 od_client = OverDriveClient(
474 user_agent=USER_AGENT, timeout=args.timeout, retry=args.retries
475 )
476 media_info = od_client.media(loan["id"])
477 create_opf(
478 media_info,
479 cover_filename if keep_cover else None,
480 file_tracks
481 if not args.merge_output
482 else [
483 {
484 "file": book_filename
485 if args.merge_format == "mp3"
486 else book_m4b_filename
487 }
488 ],
489 opf_file_path,
490 logger,
491 )
492 else:
493 logger.info("Already saved %s", colored(str(opf_file_path), "magenta"))
495 if args.write_json:
496 with book_folder.joinpath("debug.json").open("w", encoding="utf-8") as outfile:
497 json.dump(debug_meta, outfile, indent=2)
499 if not args.is_debug_mode:
500 # clean up
501 for file_name in (
502 "openbook.json",
503 "loan.json",
504 ):
505 target = book_folder.joinpath(file_name)
506 if target.exists():
507 target.unlink()