Coverage for odmpy/processing/audiobook.py: 84.6%

169 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-14 08:51 +0000

1# Copyright (C) 2023 github.com/ping 

2# 

3# This file is part of odmpy. 

4# 

5# odmpy is free software: you can redistribute it and/or modify 

6# it under the terms of the GNU General Public License as published by 

7# the Free Software Foundation, either version 3 of the License, or 

8# (at your option) any later version. 

9# 

10# odmpy is distributed in the hope that it will be useful, 

11# but WITHOUT ANY WARRANTY; without even the implied warranty of 

12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

13# GNU General Public License for more details. 

14# 

15# You should have received a copy of the GNU General Public License 

16# along with odmpy. If not, see <http://www.gnu.org/licenses/>. 

17# 

18 

19import argparse 

20import datetime 

21import json 

22import logging 

23import shutil 

24from typing import Optional, Any, Dict, List 

25from typing import OrderedDict as OrderedDictType 

26 

27import eyed3 # type: ignore[import] 

28import requests 

29from eyed3.id3 import ID3_DEFAULT_VERSION, ID3_V2_3, ID3_V2_4 # type: ignore[import] 

30from requests.exceptions import HTTPError, ConnectionError 

31from termcolor import colored 

32from tqdm import tqdm 

33 

34from .shared import ( 

35 generate_names, 

36 write_tags, 

37 generate_cover, 

38 remux_mp3, 

39 merge_into_mp3, 

40 convert_to_m4b, 

41 create_opf, 

42 get_best_cover_url, 

43 extract_isbn, 

44) 

45from ..errors import OdmpyRuntimeError 

46from ..libby import USER_AGENT, merge_toc, PartMeta, LibbyFormats 

47from ..overdrive import OverDriveClient 

48from ..utils import slugify, plural_or_singular_noun as ps 

49 

50 

51# 

52# Main processing logic for libby direct audiobook loans 

53# 

54 

55 

56def process_audiobook_loan( 

57 loan: Dict, 

58 openbook: Dict, 

59 parsed_toc: OrderedDictType[str, PartMeta], 

60 session: requests.Session, 

61 args: argparse.Namespace, 

62 logger: logging.Logger, 

63) -> None: 

64 """ 

65 Download the audiobook loan directly via Libby without the use of 

66 an odm file 

67 

68 :param loan: 

69 :param openbook: 

70 :param parsed_toc: 

71 :param session: From `LibbyClient.libby_session` because it contains a needed auth cookie 

72 :param args: 

73 :param logger: 

74 :return: 

75 """ 

76 

77 ffmpeg_loglevel = "info" if logger.level == logging.DEBUG else "fatal" 

78 id3v2_version = ID3_DEFAULT_VERSION 

79 if args.id3v2_version == 3: 

80 id3v2_version = ID3_V2_3 

81 if args.id3v2_version == 4: 

82 id3v2_version = ID3_V2_4 

83 

84 title = loan["title"] 

85 overdrive_media_id = loan["id"] 

86 sub_title = loan.get("subtitle", None) 

87 cover_url = get_best_cover_url(loan) 

88 authors = [ 

89 c["name"] for c in openbook.get("creator", []) if c.get("role", "") == "author" 

90 ] 

91 if not authors: 

92 authors = [ 

93 c["name"] 

94 for c in openbook.get("creator", []) 

95 if c.get("role", "") == "editor" 

96 ] 

97 if not authors: 

98 authors = [c["name"] for c in openbook.get("creator", [])] 

99 narrators = [ 

100 c["name"] 

101 for c in openbook.get("creator", []) 

102 if c.get("role", "") == "narrator" 

103 ] 

104 languages: Optional[List[str]] = ( 

105 [str(openbook.get("language"))] if openbook.get("language") else [] 

106 ) 

107 subjects = [subj["name"] for subj in loan.get("subjects", []) if subj.get("name")] 

108 publish_date = loan.get("publishDate", None) 

109 publisher = loan.get("publisherAccount", {}).get("name", "") or "" 

110 series = loan.get("series", "") 

111 series_reading_order = loan.get("detailedSeries", {}).get("readingOrder", "") 

112 description = ( 

113 openbook.get("description", {}).get("full", "") 

114 or openbook.get("description", {}).get("short") 

115 or "" 

116 ) 

117 debug_meta: Dict[str, Any] = { 

118 "meta": { 

119 "title": title, 

120 "coverUrl": cover_url, 

121 "authors": authors, 

122 "publisher": publisher, 

123 "description": description, 

124 } 

125 } 

126 

127 download_parts: List[PartMeta] = list(parsed_toc.values()) # noqa 

128 debug_meta["download_parts"] = [] 

129 for p in download_parts: 

130 chapters = [ 

131 {"title": m.title, "start": m.start_second, "end": m.end_second} 

132 for m in p["chapters"] 

133 ] 

134 debug_meta["download_parts"].append( 

135 { 

136 "url": p["url"], 

137 "audio-duration": p["audio-duration"], 

138 "file-length": p["file-length"], 

139 "spine-position": p["spine-position"], 

140 "chapters": chapters, 

141 } 

142 ) 

143 

144 logger.info( 

145 f'Downloading "{colored(title, "blue", attrs=["bold"])}" ' 

146 f'by "{colored(", ".join(authors), "blue", attrs=["bold"])}" ' 

147 f'in {len(download_parts)} {ps(len(download_parts), "part")}...' 

148 ) 

149 

150 book_folder, book_filename = generate_names( 

151 title=title, 

152 series=series, 

153 series_reading_order=series_reading_order, 

154 authors=authors, 

155 edition=loan.get("edition") or "", 

156 title_id=loan["id"], 

157 args=args, 

158 logger=logger, 

159 ) 

160 book_m4b_filename = book_filename.with_suffix(".m4b") 

161 

162 # check early if a merged file is already saved 

163 if ( 

164 args.merge_output 

165 and ( 

166 book_filename if args.merge_format == "mp3" else book_m4b_filename 

167 ).exists() 

168 ): 

169 logger.warning( 

170 'Already saved "%s"', 

171 colored( 

172 str(book_filename if args.merge_format == "mp3" else book_m4b_filename), 

173 "magenta", 

174 ), 

175 ) 

176 return 

177 

178 if args.is_debug_mode: 

179 with book_folder.joinpath("loan.json").open("w", encoding="utf-8") as f: 

180 json.dump(loan, f, indent=2) 

181 

182 with book_folder.joinpath("openbook.json").open("w", encoding="utf-8") as f: 

183 json.dump(openbook, f, indent=2) 

184 

185 cover_filename, cover_bytes = generate_cover( 

186 book_folder=book_folder, 

187 cover_url=cover_url, 

188 session=session, 

189 timeout=args.timeout, 

190 logger=logger, 

191 ) 

192 

193 keep_cover = args.always_keep_cover 

194 file_tracks = [] 

195 audio_bitrate = 0 

196 for p in download_parts: 

197 part_number = p["spine-position"] + 1 

198 part_filename = book_folder.joinpath( 

199 f"{slugify(f'{title} - Part {part_number:02d}', allow_unicode=True)}.mp3" 

200 ) 

201 part_tmp_filename = part_filename.with_suffix(".part") 

202 part_file_size = p["file-length"] 

203 part_download_url = p["url"] 

204 

205 if part_filename.exists(): 

206 logger.warning("Already saved %s", colored(str(part_filename), "magenta")) 

207 else: 

208 try: 

209 already_downloaded_len = 0 

210 if part_tmp_filename.exists(): 

211 already_downloaded_len = part_tmp_filename.stat().st_size 

212 

213 part_download_res = session.get( 

214 part_download_url, 

215 headers={ 

216 "User-Agent": USER_AGENT, 

217 "Range": f"bytes={already_downloaded_len}-" 

218 if already_downloaded_len 

219 else None, 

220 }, 

221 timeout=args.timeout, 

222 stream=True, 

223 ) 

224 part_download_res.raise_for_status() 

225 

226 with tqdm.wrapattr( 

227 part_download_res.raw, 

228 "read", 

229 total=part_file_size, 

230 initial=already_downloaded_len, 

231 desc=f"Part {part_number:2d}", 

232 disable=args.hide_progress, 

233 ) as res_raw: 

234 with part_tmp_filename.open( 

235 "ab" if already_downloaded_len else "wb" 

236 ) as outfile: 

237 shutil.copyfileobj(res_raw, outfile) 

238 

239 # try to remux file to remove mp3 lame tag errors 

240 remux_mp3( 

241 part_tmp_filename=part_tmp_filename, 

242 part_filename=part_filename, 

243 ffmpeg_loglevel=ffmpeg_loglevel, 

244 logger=logger, 

245 ) 

246 

247 except HTTPError as he: 

248 logger.error(f"HTTPError: {str(he)}") 

249 logger.debug(he.response.content) 

250 raise OdmpyRuntimeError("HTTP Error while downloading part file.") 

251 

252 except ConnectionError as ce: 

253 logger.error(f"ConnectionError: {str(ce)}") 

254 raise OdmpyRuntimeError("Connection Error while downloading part file.") 

255 

256 # Save id3 info only on new download, ref #42 

257 # This also makes handling of part files consistent with merged files 

258 try: 

259 # Fill id3 info for mp3 part 

260 audiofile = eyed3.load(part_filename) 

261 variable_bitrate, audio_bitrate = audiofile.info.bit_rate 

262 if variable_bitrate: 

263 # don't use vbr 

264 audio_bitrate = 0 

265 write_tags( 

266 audiofile=audiofile, 

267 title=title, 

268 sub_title=sub_title, 

269 authors=authors, 

270 narrators=narrators, 

271 publisher=publisher, 

272 description=description, 

273 cover_bytes=cover_bytes, 

274 genres=subjects, 

275 languages=languages, 

276 published_date=publish_date, 

277 series=series, 

278 part_number=part_number, 

279 total_parts=len(download_parts), 

280 overdrive_id=overdrive_media_id, 

281 isbn=extract_isbn( 

282 loan.get("formats", []), [LibbyFormats.AudioBookMP3] 

283 ), 

284 always_overwrite=args.overwrite_tags, 

285 delimiter=args.tag_delimiter, 

286 ) 

287 audiofile.tag.save(version=id3v2_version) 

288 

289 if ( 

290 args.add_chapters 

291 and not args.merge_output 

292 and (args.overwrite_tags or not audiofile.tag.table_of_contents) 

293 ): 

294 if args.overwrite_tags and audiofile.tag.table_of_contents: 

295 # Clear existing toc to prevent "There may only be one top-level table of contents. 

296 # Toc 'b'toc'' is current top-level." error 

297 for f in list(audiofile.tag.table_of_contents): 

298 audiofile.tag.table_of_contents.remove(f.element_id) # type: ignore[attr-defined] 

299 

300 toc = audiofile.tag.table_of_contents.set( 

301 "toc".encode("ascii"), 

302 toplevel=True, 

303 ordered=True, 

304 child_ids=[], 

305 description="Table of Contents", 

306 ) 

307 chapter_marks = p["chapters"] 

308 for i, m in enumerate(chapter_marks): 

309 title_frameset = eyed3.id3.frames.FrameSet() 

310 title_frameset.setTextFrame(eyed3.id3.frames.TITLE_FID, m.title) 

311 chap = audiofile.tag.chapters.set( 

312 f"ch{i:02d}".encode("ascii"), 

313 times=( 

314 round(m.start_second * 1000), 

315 round(m.end_second * 1000), 

316 ), 

317 sub_frames=title_frameset, 

318 ) 

319 toc.child_ids.append(chap.element_id) 

320 start_time = datetime.timedelta(seconds=m.start_second) 

321 end_time = datetime.timedelta(seconds=m.end_second) 

322 logger.debug( 

323 'Added chap tag => %s: %s-%s "%s" to "%s"', 

324 colored(f"ch{i:02d}", "cyan"), 

325 start_time, 

326 end_time, 

327 colored(m.title, "cyan"), 

328 colored(str(part_filename), "blue"), 

329 ) 

330 audiofile.tag.save(version=id3v2_version) 

331 

332 except Exception as e: # pylint: disable=broad-except 

333 logger.warning( 

334 "Error saving ID3: %s", colored(str(e), "red", attrs=["bold"]) 

335 ) 

336 keep_cover = True 

337 

338 logger.info('Saved "%s"', colored(str(part_filename), "magenta")) 

339 

340 file_tracks.append({"file": part_filename}) 

341 

342 debug_meta["file_tracks"] = [{"file": str(ft["file"])} for ft in file_tracks] 

343 if args.merge_output: 

344 logger.info( 

345 'Generating "%s"...', 

346 colored( 

347 str(book_filename if args.merge_format == "mp3" else book_m4b_filename), 

348 "magenta", 

349 ), 

350 ) 

351 

352 merge_into_mp3( 

353 book_filename=book_filename, 

354 file_tracks=file_tracks, 

355 audio_bitrate=audio_bitrate, 

356 ffmpeg_loglevel=ffmpeg_loglevel, 

357 hide_progress=args.hide_progress, 

358 logger=logger, 

359 ) 

360 

361 audiofile = eyed3.load(book_filename) 

362 write_tags( 

363 audiofile=audiofile, 

364 title=title, 

365 sub_title=sub_title, 

366 authors=authors, 

367 narrators=narrators, 

368 publisher=publisher, 

369 description=description, 

370 cover_bytes=cover_bytes, 

371 genres=subjects, 

372 languages=languages, 

373 published_date=publish_date, 

374 series=series, 

375 part_number=0, 

376 total_parts=0, 

377 overdrive_id=overdrive_media_id, 

378 isbn=extract_isbn(loan.get("formats", []), [LibbyFormats.AudioBookMP3]), 

379 always_overwrite=args.overwrite_tags, 

380 delimiter=args.tag_delimiter, 

381 ) 

382 

383 if args.add_chapters and ( 

384 args.overwrite_tags or not audiofile.tag.table_of_contents 

385 ): 

386 if args.overwrite_tags and audiofile.tag.table_of_contents: 

387 # Clear existing toc to prevent "There may only be one top-level table of contents. 

388 # Toc 'b'toc'' is current top-level." error 

389 for f in list(audiofile.tag.table_of_contents): 

390 audiofile.tag.table_of_contents.remove(f.element_id) # type: ignore[attr-defined] 

391 

392 toc = audiofile.tag.table_of_contents.set( 

393 "toc".encode("ascii"), 

394 toplevel=True, 

395 ordered=True, 

396 child_ids=[], 

397 description="Table of Contents", 

398 ) 

399 merged_markers = merge_toc(parsed_toc) 

400 debug_meta["merged_markers"] = [ 

401 {"title": m.title, "start": m.start_second, "end": m.end_second} 

402 for m in merged_markers 

403 ] 

404 

405 for i, m in enumerate(merged_markers): 

406 title_frameset = eyed3.id3.frames.FrameSet() 

407 title_frameset.setTextFrame(eyed3.id3.frames.TITLE_FID, m.title) 

408 chap = audiofile.tag.chapters.set( 

409 f"ch{i}".encode("ascii"), 

410 times=(round(m.start_second * 1000), round(m.end_second * 1000)), 

411 sub_frames=title_frameset, 

412 ) 

413 toc.child_ids.append(chap.element_id) 

414 start_time = datetime.timedelta(seconds=m.start_second) 

415 end_time = datetime.timedelta(seconds=m.end_second) 

416 logger.debug( 

417 'Added chap tag => %s: %s-%s "%s" to "%s"', 

418 colored(f"ch{i}", "cyan"), 

419 start_time, 

420 end_time, 

421 colored(m.title, "cyan"), 

422 colored(str(book_filename), "blue"), 

423 ) 

424 

425 audiofile.tag.save(version=id3v2_version) 

426 

427 if args.merge_format == "mp3": 

428 logger.info( 

429 'Merged files into "%s"', 

430 colored( 

431 str( 

432 book_filename 

433 if args.merge_format == "mp3" 

434 else book_m4b_filename 

435 ), 

436 "magenta", 

437 ), 

438 ) 

439 

440 if args.merge_format == "m4b": 

441 convert_to_m4b( 

442 book_filename=book_filename, 

443 book_m4b_filename=book_m4b_filename, 

444 cover_filename=cover_filename, 

445 merge_codec=args.merge_codec, 

446 audio_bitrate=audio_bitrate, 

447 ffmpeg_loglevel=ffmpeg_loglevel, 

448 hide_progress=args.hide_progress, 

449 logger=logger, 

450 ) 

451 

452 if not args.keep_mp3: 

453 for file_track in file_tracks: 

454 try: 

455 file_track["file"].unlink() 

456 except Exception as e: # pylint: disable=broad-except 

457 logger.warning(f'Error deleting "{file_track["file"]}": {str(e)}') 

458 

459 if not keep_cover and cover_filename.exists(): 

460 try: 

461 cover_filename.unlink() 

462 except Exception as e: # pylint: disable=broad-except 

463 logger.warning(f'Error deleting "{cover_filename}": {str(e)}') 

464 

465 if args.generate_opf: 

466 if args.merge_output: 

467 opf_file_path = book_filename.with_suffix(".opf") 

468 else: 

469 opf_file_path = book_folder.joinpath( 

470 f"{slugify(title, allow_unicode=True)}.opf" 

471 ) 

472 if not opf_file_path.exists(): 

473 od_client = OverDriveClient( 

474 user_agent=USER_AGENT, timeout=args.timeout, retry=args.retries 

475 ) 

476 media_info = od_client.media(loan["id"]) 

477 create_opf( 

478 media_info, 

479 cover_filename if keep_cover else None, 

480 file_tracks 

481 if not args.merge_output 

482 else [ 

483 { 

484 "file": book_filename 

485 if args.merge_format == "mp3" 

486 else book_m4b_filename 

487 } 

488 ], 

489 opf_file_path, 

490 logger, 

491 ) 

492 else: 

493 logger.info("Already saved %s", colored(str(opf_file_path), "magenta")) 

494 

495 if args.write_json: 

496 with book_folder.joinpath("debug.json").open("w", encoding="utf-8") as outfile: 

497 json.dump(debug_meta, outfile, indent=2) 

498 

499 if not args.is_debug_mode: 

500 # clean up 

501 for file_name in ( 

502 "openbook.json", 

503 "loan.json", 

504 ): 

505 target = book_folder.joinpath(file_name) 

506 if target.exists(): 

507 target.unlink()