Coverage for odmpy/processing/odm.py: 84.3%

337 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-14 08:51 +0000

1# Copyright (C) 2023 github.com/ping 

2# 

3# This file is part of odmpy. 

4# 

5# odmpy is free software: you can redistribute it and/or modify 

6# it under the terms of the GNU General Public License as published by 

7# the Free Software Foundation, either version 3 of the License, or 

8# (at your option) any later version. 

9# 

10# odmpy is distributed in the hope that it will be useful, 

11# but WITHOUT ANY WARRANTY; without even the implied warranty of 

12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

13# GNU General Public License for more details. 

14# 

15# You should have received a copy of the GNU General Public License 

16# along with odmpy. If not, see <http://www.gnu.org/licenses/>. 

17# 

18 

19import argparse 

20import base64 

21import datetime 

22import hashlib 

23import json 

24import logging 

25import math 

26import re 

27import shutil 

28import uuid 

29import xml.etree.ElementTree as ET 

30from collections import OrderedDict 

31from functools import reduce 

32from html import unescape as unescape_html 

33from pathlib import Path 

34from typing import Any, Union, Dict, List, Optional 

35 

36import eyed3 # type: ignore[import] 

37from eyed3.id3 import ID3_DEFAULT_VERSION, ID3_V2_3, ID3_V2_4 # type: ignore[import] 

38from requests.exceptions import HTTPError, ConnectionError 

39from termcolor import colored 

40from tqdm import tqdm 

41 

42from .shared import ( 

43 generate_names, 

44 write_tags, 

45 generate_cover, 

46 remux_mp3, 

47 merge_into_mp3, 

48 convert_to_m4b, 

49 create_opf, 

50 init_session, 

51) 

52from ..cli_utils import OdmpyCommands 

53from ..constants import OMC, OS, UA, UNSUPPORTED_PARSER_ENTITIES, UA_LONG 

54from ..errors import OdmpyRuntimeError 

55from ..libby import USER_AGENT 

56from ..overdrive import OverDriveClient 

57from ..utils import ( 

58 slugify, 

59 mp3_duration_ms, 

60 parse_duration_to_seconds, 

61 parse_duration_to_milliseconds, 

62 get_element_text, 

63 plural_or_singular_noun as ps, 

64) 

65 

66RESERVE_ID_RE = re.compile( 

67 r"(?P<reserve_id>[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})" 

68) 

69 

70# 

71# Main processing logic for odm-based downloads 

72# 

73 

74 

75def _patch_for_parse_error(text: str) -> str: 

76 # [TODO]: Find a more generic solution instead of patching entities, maybe lxml? 

77 # Ref: https://github.com/ping/odmpy/issues/19 

78 return "<!DOCTYPE xml [{patch}]>{text}".format( 

79 patch="".join( 

80 [ 

81 f'<!ENTITY {entity} "{replacement}">' 

82 for entity, replacement in UNSUPPORTED_PARSER_ENTITIES.items() 

83 ] 

84 ), 

85 text=text, 

86 ) 

87 

88 

89def process_odm( 

90 odm_file: Optional[Path], 

91 loan: Dict, 

92 args: argparse.Namespace, 

93 logger: logging.Logger, 

94 cleanup_odm_license: bool = False, 

95) -> None: 

96 """ 

97 Download the audiobook loan using the specified odm file 

98 

99 :param odm_file: 

100 :param loan: 

101 :param args: 

102 :param logger: 

103 :param cleanup_odm_license: 

104 :return: 

105 """ 

106 if not odm_file: 

107 logger.warning("No odm file specified.") 

108 return 

109 

110 ffmpeg_loglevel = "info" if logger.level == logging.DEBUG else "fatal" 

111 

112 id3v2_version = ID3_DEFAULT_VERSION 

113 if hasattr(args, "id3v2_version"): 

114 if args.id3v2_version == 3: 

115 id3v2_version = ID3_V2_3 

116 if args.id3v2_version == 4: 

117 id3v2_version = ID3_V2_4 

118 

119 xml_doc = ET.parse(odm_file) 

120 root = xml_doc.getroot() 

121 overdrive_media_id = root.attrib.get("id", "") 

122 metadata = None 

123 for t in root.itertext(): 

124 if not t.startswith("<Metadata>"): 

125 continue 

126 # remove invalid '&' char 

127 text = re.sub(r"\s&\s", " &amp; ", t) 

128 try: 

129 metadata = ET.fromstring(text) 

130 except ET.ParseError: 

131 metadata = ET.fromstring(_patch_for_parse_error(text)) 

132 break 

133 

134 if not metadata: 

135 raise ValueError("Unable to find Metadata in ODM") 

136 

137 title = get_element_text(metadata.find("Title")) 

138 sub_title = get_element_text(metadata.find("SubTitle")) 

139 publisher = get_element_text(metadata.find("Publisher")) 

140 description = get_element_text(metadata.find("Description")) 

141 series = get_element_text(metadata.find("Series")) 

142 cover_url = get_element_text(metadata.find("CoverUrl")) 

143 authors = [ 

144 unescape_html(get_element_text(c)) 

145 for c in metadata.find("Creators") or [] 

146 if "Author" in c.attrib.get("role", "") 

147 ] 

148 if not authors: 

149 authors = [ 

150 unescape_html(get_element_text(c)) 

151 for c in metadata.find("Creators") or [] 

152 if "Editor" in c.attrib.get("role", "") 

153 ] 

154 if not authors: 

155 authors = [ 

156 unescape_html(get_element_text(c)) 

157 for c in metadata.find("Creators") or [] 

158 if c.text 

159 ] 

160 narrators = [ 

161 unescape_html(get_element_text(c)) 

162 for c in metadata.find("Creators") or [] 

163 if "Narrator" in c.attrib.get("role", "") 

164 ] 

165 languages = [ 

166 lang.attrib.get("code", "") 

167 for lang in metadata.find("Languages") or [] 

168 if lang.attrib.get("code", "") 

169 ] 

170 subjects = [subj.text for subj in metadata.find("Subjects") or [] if subj.text] 

171 

172 debug_meta: Dict[str, Any] = { 

173 "meta": { 

174 "title": title, 

175 "coverUrl": cover_url, 

176 "authors": authors, 

177 "publisher": publisher, 

178 "description": description, 

179 } 

180 } 

181 

182 # View Book Info 

183 if args.command_name == OdmpyCommands.Information: 

184 if args.format == "text": 

185 logger.info(f'{"Title:":10} {colored(title, "blue")}') 

186 logger.info( 

187 "{:10} {}".format( 

188 "Creators:", 

189 colored( 

190 ", ".join( 

191 [ 

192 f"{c.text} ({c.attrib['role']})" 

193 for c in metadata.find("Creators") or [] 

194 ] 

195 ), 

196 "blue", 

197 ), 

198 ) 

199 ) 

200 logger.info(f"{'Publisher:':10} {publisher}") 

201 logger.info(f"{'Subjects:':10} {', '.join(subjects)}") 

202 logger.info( 

203 f"{'Languages:':10} {', '.join([c.text for c in metadata.find('Languages') or [] if c.text])}" 

204 ) 

205 logger.info(f"{'Description:':10}\n{description}") 

206 for formats in root.findall("Formats"): 

207 for f in formats: 

208 logger.info(f"\n{'Format:':10} {f.attrib['name']}") 

209 parts = f.find("Parts") or [] 

210 for p in parts: 

211 logger.info( 

212 f"* {p.attrib['name']} - {p.attrib['duration']} ({math.ceil(1.0 * int(p.attrib['filesize']) / 1024):,.0f}kB)" 

213 ) 

214 

215 elif args.format == "json": 

216 result: Dict[str, Any] = { 

217 "title": title, 

218 "creators": [ 

219 f"{c.text} ({c.attrib['role']})" 

220 for c in metadata.find("Creators") or [] 

221 ], 

222 "publisher": publisher, 

223 "subjects": [c.text for c in metadata.find("Subjects") or [] if c.text], 

224 "languages": [ 

225 c.text for c in metadata.find("Languages") or [] if c.text 

226 ], 

227 "description": description, 

228 "formats": [], 

229 } 

230 

231 for formats in root.findall("Formats"): 

232 for f in formats: 

233 parts = [] 

234 total_secs = 0 

235 for p in f.find("Parts") or []: 

236 part_duration = p.attrib["duration"] 

237 # part duration can look like '%M:%S.%f' or '%H:%M:%S.%f' 

238 total_secs = parse_duration_to_seconds(part_duration) 

239 parts.append( 

240 { 

241 "name": p.attrib["name"], 

242 "duration": part_duration, 

243 "filesize": f"{math.ceil(1.0 * int(p.attrib['filesize']) / 1024):,.0f}kB", 

244 } 

245 ) 

246 result["formats"].append( 

247 {"format": f.attrib["name"], "parts": parts} 

248 ) 

249 # in case there are multiple formats, only need to store it once 

250 if "total_duration" not in result: 

251 result["total_duration"] = { 

252 "total_minutes": round(total_secs / 60), 

253 "total_seconds": round(total_secs), 

254 } 

255 

256 logger.info(json.dumps(result)) 

257 

258 return 

259 

260 session = init_session(max_retries=args.retries) 

261 

262 # Download Book 

263 download_baseurl = "" 

264 download_parts = [] 

265 for formats in root.findall("Formats"): 

266 for f in formats: 

267 protocols = f.find("Protocols") or [] 

268 for p in protocols: 

269 if p.attrib.get("method", "") != "download": 

270 continue 

271 download_baseurl = p.attrib["baseurl"] 

272 break 

273 parts = f.find("Parts") or [] 

274 for p in parts: 

275 download_parts.append(p.attrib) 

276 debug_meta["download_parts"] = download_parts 

277 

278 logger.info( 

279 f'Downloading "{colored(title, "blue", attrs=["bold"])}" ' 

280 f'by "{colored(", ".join(authors), "blue", attrs=["bold"])}" ' 

281 f'in {len(download_parts)} {ps(len(download_parts), "part")}...' 

282 ) 

283 

284 book_folder, book_filename = generate_names( 

285 title=title, 

286 series=series, 

287 series_reading_order=loan.get("detailedSeries", {}).get("readingOrder", ""), 

288 authors=authors, 

289 edition="", 

290 title_id=loan.get("id") or overdrive_media_id, 

291 args=args, 

292 logger=logger, 

293 ) 

294 book_m4b_filename = book_filename.with_suffix(".m4b") 

295 

296 # check early if a merged file is already saved 

297 if ( 

298 args.merge_output 

299 and ( 

300 book_filename if args.merge_format == "mp3" else book_m4b_filename 

301 ).exists() 

302 ): 

303 logger.warning( 

304 'Already saved "%s"', 

305 colored( 

306 str(book_filename if args.merge_format == "mp3" else book_m4b_filename), 

307 "magenta", 

308 ), 

309 ) 

310 if cleanup_odm_license and odm_file.exists(): 

311 try: 

312 odm_file.unlink() 

313 except Exception as e: # pylint: disable=broad-except 

314 logger.warning(f'Error deleting "{odm_file}": {str(e)}') 

315 return 

316 

317 debug_filename = book_folder.joinpath("debug.json") 

318 

319 cover_filename, cover_bytes = generate_cover( 

320 book_folder=book_folder, 

321 cover_url=cover_url, 

322 session=session, 

323 timeout=args.timeout, 

324 logger=logger, 

325 ) 

326 

327 license_ele = root.find("License") 

328 if license_ele is None: 

329 raise ValueError("Unable to find License in ODM") 

330 

331 acquisition_url = get_element_text(license_ele.find("AcquisitionUrl")) 

332 if not acquisition_url: 

333 raise ValueError("Unable to extract acquisition_url from ODM") 

334 

335 media_id = root.attrib["id"] 

336 

337 client_id = str(uuid.uuid1()).upper() 

338 raw_hash = f"{client_id}|{OMC}|{OS}|ELOSNOC*AIDEM*EVIRDREVO" 

339 m = hashlib.sha1(raw_hash.encode("utf-16-le")) 

340 license_hash = base64.b64encode(m.digest()) 

341 

342 # Extract license: 

343 # License file is downloadable only once per odm, 

344 # so we keep it in case downloads fail 

345 license_file = Path(args.download_dir, odm_file.with_suffix(".license").name) 

346 if license_file.exists(): 

347 logger.warning(f"Already downloaded license file: {license_file}") 

348 else: 

349 # download license file 

350 params = OrderedDict( 

351 [ 

352 ("MediaID", media_id), 

353 ("ClientID", client_id), 

354 ("OMC", OMC), 

355 ("OS", OS), 

356 ("Hash", license_hash), 

357 ] 

358 ) 

359 

360 license_res = session.get( 

361 acquisition_url, 

362 params=params, 

363 headers={"User-Agent": UA}, 

364 timeout=args.timeout, 

365 stream=True, 

366 ) 

367 try: 

368 license_res.raise_for_status() 

369 with license_file.open("wb") as outfile: 

370 for chunk in license_res.iter_content(1024): 

371 outfile.write(chunk) 

372 logger.debug(f"Saved license file {license_file}") 

373 

374 except HTTPError as he: 

375 if he.response.status_code == 404: 

376 # odm file has expired 

377 logger.error( 

378 f'The loan file "{args.odm_file}" has expired. Please download again.' 

379 ) 

380 else: 

381 logger.error(he.response.content) 

382 raise OdmpyRuntimeError("HTTP Error while downloading license.") 

383 except ConnectionError as ce: 

384 logger.error(f"ConnectionError: {str(ce)}") 

385 raise OdmpyRuntimeError("Connection Error while downloading license.") 

386 

387 license_xml_doc = ET.parse(license_file) 

388 license_root = license_xml_doc.getroot() 

389 

390 ns = "{http://license.overdrive.com/2008/03/License.xsd}" 

391 

392 signed_info_ele = license_root.find(f"{ns}SignedInfo") 

393 if signed_info_ele is None: 

394 raise ValueError("Unable to find SignedInfo in License") 

395 

396 license_client_id = get_element_text(signed_info_ele.find(f"{ns}ClientID")) 

397 if not license_client_id: 

398 raise ValueError("Unable to find ClientID in License.SignedInfo") 

399 

400 with license_file.open("r", encoding="utf-8") as lic_file: 

401 lic_file_contents = lic_file.read() 

402 

403 track_count = 0 

404 file_tracks: List[Dict] = [] 

405 keep_cover = args.always_keep_cover 

406 audio_lengths_ms = [] 

407 audio_bitrate = 0 

408 for p in download_parts: 

409 part_number = int(p["number"]) 

410 part_filename = book_folder.joinpath( 

411 f"{slugify(f'{title} - Part {part_number:02d}', allow_unicode=True)}.mp3" 

412 ) 

413 part_tmp_filename = part_filename.with_suffix(".part") 

414 part_file_size = int(p["filesize"]) 

415 part_url_filename = p["filename"] 

416 part_download_url = f"{download_baseurl}/{part_url_filename}" 

417 part_markers = [] 

418 

419 if part_filename.exists(): 

420 logger.warning("Already saved %s", colored(str(part_filename), "magenta")) 

421 else: 

422 try: 

423 already_downloaded_len = 0 

424 if part_tmp_filename.exists(): 

425 already_downloaded_len = part_tmp_filename.stat().st_size 

426 

427 part_download_res = session.get( 

428 part_download_url, 

429 headers={ 

430 "User-Agent": UA, 

431 "ClientID": license_client_id, 

432 "License": lic_file_contents, 

433 "Range": f"bytes={already_downloaded_len}-" 

434 if already_downloaded_len 

435 else None, 

436 }, 

437 timeout=args.timeout, 

438 stream=True, 

439 ) 

440 part_download_res.raise_for_status() 

441 

442 with tqdm.wrapattr( 

443 part_download_res.raw, 

444 "read", 

445 total=part_file_size, 

446 initial=already_downloaded_len, 

447 desc=f"Part {part_number:2d}", 

448 disable=args.hide_progress, 

449 ) as res_raw: 

450 with part_tmp_filename.open( 

451 "ab" if already_downloaded_len else "wb" 

452 ) as outfile: 

453 shutil.copyfileobj(res_raw, outfile) 

454 

455 # try to remux file to remove mp3 lame tag errors 

456 remux_mp3( 

457 part_tmp_filename=part_tmp_filename, 

458 part_filename=part_filename, 

459 ffmpeg_loglevel=ffmpeg_loglevel, 

460 logger=logger, 

461 ) 

462 

463 except HTTPError as he: 

464 logger.error(f"HTTPError: {str(he)}") 

465 logger.debug(he.response.content) 

466 raise OdmpyRuntimeError("HTTP Error while downloading part file.") 

467 

468 except ConnectionError as ce: 

469 logger.error(f"ConnectionError: {str(ce)}") 

470 raise OdmpyRuntimeError("Connection Error while downloading part file.") 

471 

472 # Save id3 info only on new download, ref #42 

473 # This also makes handling of part files consistent with merged files 

474 try: 

475 # Fill id3 info for mp3 part 

476 audiofile: eyed3.core.AudioFile = eyed3.load(part_filename) 

477 _, audio_bitrate = audiofile.info.bit_rate 

478 

479 write_tags( 

480 audiofile=audiofile, 

481 title=title, 

482 sub_title=sub_title, 

483 authors=authors, 

484 narrators=narrators, 

485 publisher=publisher, 

486 description=description, 

487 cover_bytes=cover_bytes, 

488 genres=subjects, 

489 languages=languages, 

490 published_date=None, # odm does not contain date info 

491 series=series, 

492 part_number=part_number, 

493 total_parts=len(download_parts), 

494 overdrive_id=overdrive_media_id, 

495 always_overwrite=args.overwrite_tags, 

496 delimiter=args.tag_delimiter, 

497 ) 

498 audiofile.tag.save(version=id3v2_version) 

499 

500 # Notes: Can't switch over to using eyed3 (audiofile.info.time_secs) 

501 # because it is completely off by about 10-20 seconds. 

502 # Also, can't rely on `p["duration"]` because it is also often off 

503 # by about 1 second. 

504 audio_lengths_ms.append(mp3_duration_ms(part_filename)) 

505 

506 # Extract OD chapter info from mp3s for use in merged file 

507 for frame in audiofile.tag.frame_set.get( 

508 eyed3.id3.frames.USERTEXT_FID, [] 

509 ): 

510 if frame.description != "OverDrive MediaMarkers": 

511 continue 

512 if frame.text: 

513 frame_text = re.sub(r"\s&\s", " &amp; ", frame.text) 

514 try: 

515 tree = ET.fromstring(frame_text) 

516 except UnicodeEncodeError: 

517 tree = ET.fromstring( 

518 frame_text.encode("ascii", "ignore").decode("ascii") 

519 ) 

520 except ET.ParseError: 

521 tree = ET.fromstring(_patch_for_parse_error(frame_text)) 

522 

523 for marker in tree.iter("Marker"): # type: ET.Element 

524 marker_name = get_element_text(marker.find("Name")).strip() 

525 marker_timestamp = get_element_text(marker.find("Time")) 

526 

527 # 2 timestamp formats found ("%M:%S.%f", "%H:%M:%S.%f") 

528 ts_mark = parse_duration_to_milliseconds(marker_timestamp) 

529 track_count += 1 

530 part_markers.append( 

531 (f"ch{track_count:02d}", marker_name, ts_mark) 

532 ) 

533 break 

534 

535 if ( 

536 args.add_chapters 

537 and not args.merge_output 

538 and (args.overwrite_tags or not audiofile.tag.table_of_contents) 

539 ): 

540 # set the chapter marks 

541 generated_markers: List[Dict[str, Union[str, int]]] = [] 

542 for j, file_marker in enumerate(part_markers): 

543 generated_markers.append( 

544 { 

545 "id": file_marker[0], 

546 "text": file_marker[1], 

547 "start_time": int(file_marker[2]), 

548 "end_time": int( 

549 round(audiofile.info.time_secs * 1000) 

550 if j == (len(part_markers) - 1) 

551 else part_markers[j + 1][2] 

552 ), 

553 } 

554 ) 

555 

556 if args.overwrite_tags and audiofile.tag.table_of_contents: 

557 # Clear existing toc to prevent "There may only be one top-level table of contents. 

558 # Toc 'b'toc'' is current top-level." error 

559 for f in list(audiofile.tag.table_of_contents): 

560 audiofile.tag.table_of_contents.remove(f.element_id) # type: ignore[attr-defined] 

561 

562 toc = audiofile.tag.table_of_contents.set( 

563 "toc".encode("ascii"), 

564 toplevel=True, 

565 ordered=True, 

566 child_ids=[], 

567 description="Table of Contents", 

568 ) 

569 

570 for gm in generated_markers: 

571 title_frameset = eyed3.id3.frames.FrameSet() 

572 title_frameset.setTextFrame( 

573 eyed3.id3.frames.TITLE_FID, str(gm["text"]) 

574 ) 

575 

576 chap = audiofile.tag.chapters.set( 

577 str(gm["id"]).encode("ascii"), 

578 times=(gm["start_time"], gm["end_time"]), 

579 sub_frames=title_frameset, 

580 ) 

581 toc.child_ids.append(chap.element_id) 

582 start_time = datetime.timedelta( 

583 milliseconds=float(gm["start_time"]) 

584 ) 

585 end_time = datetime.timedelta( 

586 milliseconds=float(gm["end_time"]) 

587 ) 

588 logger.debug( 

589 'Added chap tag => %s: %s-%s "%s" to "%s"', 

590 colored(str(gm["id"]), "cyan"), 

591 start_time, 

592 end_time, 

593 colored(str(gm["text"]), "cyan"), 

594 colored(str(part_filename), "blue"), 

595 ) 

596 

597 audiofile.tag.save(version=id3v2_version) 

598 

599 except Exception as e: # pylint: disable=broad-except 

600 logger.warning( 

601 "Error saving ID3: %s", colored(str(e), "red", attrs=["bold"]) 

602 ) 

603 keep_cover = True 

604 

605 logger.info('Saved "%s"', colored(str(part_filename), "magenta")) 

606 

607 file_tracks.append( 

608 { 

609 "file": part_filename, 

610 "markers": part_markers, 

611 } 

612 ) 

613 # end loop: for p in download_parts: 

614 

615 debug_meta["audio_lengths_ms"] = audio_lengths_ms 

616 debug_meta["file_tracks"] = [ 

617 {"file": str(f["file"]), "markers": f["markers"]} for f in file_tracks 

618 ] 

619 

620 if args.merge_output: 

621 logger.info( 

622 'Generating "%s"...', 

623 colored( 

624 str(book_filename if args.merge_format == "mp3" else book_m4b_filename), 

625 "magenta", 

626 ), 

627 ) 

628 

629 merge_into_mp3( 

630 book_filename=book_filename, 

631 file_tracks=file_tracks, 

632 audio_bitrate=audio_bitrate, 

633 ffmpeg_loglevel=ffmpeg_loglevel, 

634 hide_progress=args.hide_progress, 

635 logger=logger, 

636 ) 

637 

638 audiofile = eyed3.load(book_filename) 

639 write_tags( 

640 audiofile=audiofile, 

641 title=title, 

642 sub_title=sub_title, 

643 authors=authors, 

644 narrators=narrators, 

645 publisher=publisher, 

646 description=description, 

647 cover_bytes=cover_bytes, 

648 genres=subjects, 

649 languages=languages, 

650 published_date=None, # odm does not contain date info 

651 series=series, 

652 part_number=0, 

653 total_parts=0, 

654 overdrive_id=overdrive_media_id, 

655 overwrite_title=True, 

656 always_overwrite=args.overwrite_tags, 

657 delimiter=args.tag_delimiter, 

658 ) 

659 

660 if args.add_chapters and ( 

661 args.overwrite_tags or not audiofile.tag.table_of_contents 

662 ): 

663 merged_markers: List[Dict[str, Union[str, int]]] = [] 

664 for i, f in enumerate(file_tracks): 

665 prev_tracks_len_ms = ( 

666 0 if i == 0 else reduce(lambda x, y: x + y, audio_lengths_ms[0:i]) 

667 ) 

668 this_track_endtime_ms = int( 

669 reduce(lambda x, y: x + y, audio_lengths_ms[0 : i + 1]) 

670 ) 

671 file_markers = f["markers"] 

672 for j, file_marker in enumerate(file_markers): 

673 merged_markers.append( 

674 { 

675 "id": file_marker[0], 

676 "text": str(file_marker[1]), 

677 "start_time": int(file_marker[2]) + prev_tracks_len_ms, 

678 "end_time": int( 

679 this_track_endtime_ms 

680 if j == (len(file_markers) - 1) 

681 else file_markers[j + 1][2] + prev_tracks_len_ms 

682 ), 

683 } 

684 ) 

685 debug_meta["merged_markers"] = merged_markers 

686 

687 if args.overwrite_tags and audiofile.tag.table_of_contents: 

688 # Clear existing toc to prevent "There may only be one top-level table of contents. 

689 # Toc 'b'toc'' is current top-level." error 

690 for f in list(audiofile.tag.table_of_contents): 

691 audiofile.tag.table_of_contents.remove(f.element_id) # type: ignore[attr-defined] 

692 

693 toc = audiofile.tag.table_of_contents.set( 

694 "toc".encode("ascii"), 

695 toplevel=True, 

696 ordered=True, 

697 child_ids=[], 

698 description="Table of Contents", 

699 ) 

700 

701 for mm in merged_markers: # type: Dict[str, Union[str, int]] 

702 title_frameset = eyed3.id3.frames.FrameSet() 

703 title_frameset.setTextFrame(eyed3.id3.frames.TITLE_FID, mm["text"]) 

704 chap = audiofile.tag.chapters.set( 

705 str(mm["id"]).encode("ascii"), 

706 times=(mm["start_time"], mm["end_time"]), 

707 sub_frames=title_frameset, 

708 ) 

709 toc.child_ids.append(chap.element_id) 

710 start_time = datetime.timedelta(milliseconds=float(mm["start_time"])) 

711 end_time = datetime.timedelta(milliseconds=float(mm["end_time"])) 

712 logger.debug( 

713 'Added chap tag => %s: %s-%s "%s" to "%s"', 

714 colored(str(mm["id"]), "cyan"), 

715 start_time, 

716 end_time, 

717 colored(str(mm["text"]), "cyan"), 

718 colored(str(book_filename), "blue"), 

719 ) 

720 

721 audiofile.tag.save(version=id3v2_version) 

722 

723 if args.merge_format == "mp3": 

724 logger.info( 

725 'Merged files into "%s"', 

726 colored( 

727 str( 

728 book_filename 

729 if args.merge_format == "mp3" 

730 else book_m4b_filename 

731 ), 

732 "magenta", 

733 ), 

734 ) 

735 

736 if args.merge_format == "m4b": 

737 convert_to_m4b( 

738 book_filename=book_filename, 

739 book_m4b_filename=book_m4b_filename, 

740 cover_filename=cover_filename, 

741 merge_codec=args.merge_codec, 

742 audio_bitrate=audio_bitrate, 

743 ffmpeg_loglevel=ffmpeg_loglevel, 

744 hide_progress=args.hide_progress, 

745 logger=logger, 

746 ) 

747 

748 if not args.keep_mp3: 

749 for f in file_tracks: 

750 try: 

751 f["file"].unlink() 

752 except Exception as e: # pylint: disable=broad-except 

753 logger.warning(f'Error deleting "{f["file"]}": {str(e)}') 

754 

755 if cleanup_odm_license: 

756 for target_file in [odm_file, license_file]: 

757 if target_file and target_file.exists(): 

758 try: 

759 target_file.unlink() 

760 except Exception as e: # pylint: disable=broad-except 

761 logger.warning(f'Error deleting "{target_file}": {str(e)}') 

762 

763 if not keep_cover and cover_filename.exists(): 

764 try: 

765 cover_filename.unlink() 

766 except Exception as e: # pylint: disable=broad-except 

767 logger.warning(f'Error deleting "{cover_filename}": {str(e)}') 

768 

769 if args.generate_opf: 

770 if args.merge_output: 

771 opf_file_path = book_filename.with_suffix(".opf") 

772 else: 

773 opf_file_path = book_folder.joinpath( 

774 f"{slugify(title, allow_unicode=True)}.opf" 

775 ) 

776 

777 if not opf_file_path.exists(): 

778 mobj = RESERVE_ID_RE.match(overdrive_media_id) 

779 if not mobj: 

780 logger.warning( 

781 f"Could not get a valid reserve ID: {overdrive_media_id}" 

782 ) 

783 else: 

784 reserve_id = mobj.group("reserve_id") 

785 od_client = OverDriveClient( 

786 user_agent=USER_AGENT, timeout=args.timeout, retry=args.retries 

787 ) 

788 media_info = od_client.media(reserve_id) 

789 create_opf( 

790 media_info, 

791 cover_filename if keep_cover else None, 

792 file_tracks 

793 if not args.merge_output 

794 else [ 

795 { 

796 "file": book_filename 

797 if args.merge_format == "mp3" 

798 else book_m4b_filename 

799 } 

800 ], 

801 opf_file_path, 

802 logger, 

803 ) 

804 else: 

805 logger.info("Already saved %s", colored(str(opf_file_path), "magenta")) 

806 

807 if args.write_json: 

808 with debug_filename.open("w", encoding="utf-8") as outfile: 

809 json.dump(debug_meta, outfile, indent=2) 

810 

811 

812def process_odm_return(args: argparse.Namespace, logger: logging.Logger) -> None: 

813 """ 

814 Return the audiobook loan using the specified odm file 

815 

816 :param logger: 

817 :param args: 

818 :return: 

819 """ 

820 xml_doc = ET.parse(args.odm_file) 

821 root = xml_doc.getroot() 

822 

823 logger.info(f"Returning {args.odm_file} ...") 

824 early_return_url = get_element_text(root.find("EarlyReturnURL")) 

825 if not early_return_url: 

826 raise OdmpyRuntimeError("Unable to get EarlyReturnURL") 

827 sess = init_session(args.retries) 

828 try: 

829 early_return_res = sess.get( 

830 early_return_url, headers={"User-Agent": UA_LONG}, timeout=args.timeout 

831 ) 

832 early_return_res.raise_for_status() 

833 logger.info(f"Loan returned successfully: {args.odm_file}") 

834 except HTTPError as he: 

835 if he.response.status_code == 403: 

836 logger.warning("Loan is probably already returned.") 

837 return 

838 logger.error(f"HTTPError: {str(he)}") 

839 logger.debug(he.response.content) 

840 raise OdmpyRuntimeError(f"HTTP error returning odm {args.odm_file, }") 

841 except ConnectionError as ce: 

842 logger.error(f"ConnectionError: {str(ce)}") 

843 raise OdmpyRuntimeError(f"Connection error returning odm {args.odm_file, }")