#!/usr/bin/env python3 """ CHUNKED KINDLE SCANNER - Bulletproof solution for long books Splits scanning into 2-minute chunks to avoid timeouts """ import asyncio import argparse import re from playwright.async_api import async_playwright from pathlib import Path import time import json async def chunked_kindle_scanner(start_page=1, chunk_size=40, total_pages=226): """ Scan a chunk of Kindle pages with bulletproof timeout management """ async with async_playwright() as p: browser = await p.chromium.launch( headless=False, args=[ "--disable-blink-features=AutomationControlled", "--disable-web-security", "--disable-features=VizDisplayCompositor" ] ) context = await browser.new_context( viewport={"width": 1920, "height": 1080}, user_agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ) await context.add_init_script(""" Object.defineProperty(navigator, 'webdriver', { get: () => undefined, }); """) page = await context.new_page() try: print(f"šŸŽÆ CHUNKED SCANNER - Pages {start_page} to {min(start_page + chunk_size - 1, total_pages)}") print("=" * 70) # STEP 1: LOGIN print("šŸ” Step 1: Logging in...") await page.goto("https://read.amazon.com/?asin=B0DJP2C8M6&ref_=kwl_kr_iv_rec_1") await page.wait_for_timeout(5000) if "signin" in page.url: email_field = await page.wait_for_selector("#ap_email", timeout=10000) await email_field.fill("ondrej.glaser@gmail.com") continue_btn = await page.wait_for_selector("#continue", timeout=5000) await continue_btn.click() await page.wait_for_timeout(3000) password_field = await page.wait_for_selector("#ap_password", timeout=10000) await password_field.fill("csjXgew3In") signin_btn = await page.wait_for_selector("#signInSubmit", timeout=5000) await signin_btn.click() await page.wait_for_timeout(5000) print("āœ… Login completed") # STEP 2: WAIT FOR READER TO LOAD print("šŸ“– Step 2: Waiting for reader to load...") await page.wait_for_selector("#reader-header", timeout=30000) await page.wait_for_timeout(3000) # STEP 3: NAVIGATE TO STARTING POSITION print(f"šŸŽÆ Step 3: Navigating to page {start_page}...") if start_page == 1: # For first chunk, use TOC navigation to beginning try: toc_button = await page.wait_for_selector("[aria-label='Table of Contents']", timeout=5000) await toc_button.click() await page.wait_for_timeout(2000) cover_link = await page.wait_for_selector("text=Cover", timeout=5000) await cover_link.click() await page.wait_for_timeout(3000) # Close TOC for i in range(3): await page.keyboard.press("Escape") await page.wait_for_timeout(500) await page.click("body", position={"x": 600, "y": 400}) await page.wait_for_timeout(1000) print(" āœ… Navigated to book beginning") except Exception as e: print(f" āš ļø TOC navigation failed: {e}") else: # For subsequent chunks, navigate to the starting page print(f" šŸ”„ Navigating to page {start_page} (this may take time)...") for _ in range(start_page - 1): await page.keyboard.press("ArrowRight") await page.wait_for_timeout(100) # Fast navigation to start position # STEP 4: SCAN CHUNK output_dir = Path("scanned_pages") output_dir.mkdir(exist_ok=True) end_page = min(start_page + chunk_size - 1, total_pages) pages_to_scan = end_page - start_page + 1 print(f"šŸš€ Step 4: Scanning {pages_to_scan} pages ({start_page} to {end_page})...") consecutive_identical = 0 last_file_size = 0 for page_offset in range(pages_to_scan): current_page_num = start_page + page_offset print(f"šŸ“ø Scanning page {current_page_num}...") # Take screenshot filename = output_dir / f"page_{current_page_num:03d}.png" await page.screenshot(path=str(filename), full_page=False) # Check file size for duplicate detection file_size = filename.stat().st_size if abs(file_size - last_file_size) < 3000: consecutive_identical += 1 print(f" āš ļø Possible duplicate ({consecutive_identical}/5)") else: consecutive_identical = 0 print(f" āœ… New content ({file_size} bytes)") last_file_size = file_size # Stop if too many identical pages (end of book) if consecutive_identical >= 5: print("šŸ“– Detected end of book") break # Navigate to next page (except for last page in chunk) if page_offset < pages_to_scan - 1: await page.keyboard.press("ArrowRight") await page.wait_for_timeout(800) # Reduced timing for efficiency # Save progress progress_file = Path("scan_progress.json") progress_data = { "last_completed_page": end_page, "total_pages": total_pages, "chunk_size": chunk_size, "timestamp": time.time() } with open(progress_file, 'w') as f: json.dump(progress_data, f, indent=2) print(f"\nšŸŽ‰ CHUNK COMPLETED!") print(f"šŸ“Š Pages scanned: {start_page} to {end_page}") print(f"šŸ“ Progress saved to: {progress_file}") if end_page >= total_pages: print("šŸ ENTIRE BOOK COMPLETED!") else: print(f"ā–¶ļø Next chunk: pages {end_page + 1} to {min(end_page + chunk_size, total_pages)}") return end_page except Exception as e: print(f"āŒ Error: {e}") import traceback traceback.print_exc() return start_page - 1 # Return last known good position finally: await browser.close() def get_last_completed_page(): """Get the last completed page from progress file""" progress_file = Path("scan_progress.json") if progress_file.exists(): try: with open(progress_file, 'r') as f: data = json.load(f) return data.get("last_completed_page", 0) except: pass return 0 if __name__ == "__main__": parser = argparse.ArgumentParser(description="Chunked Kindle Scanner") parser.add_argument("--start-page", type=int, help="Starting page (default: auto-resume)") parser.add_argument("--chunk-size", type=int, default=40, help="Pages per chunk (default: 40)") parser.add_argument("--total-pages", type=int, default=226, help="Total pages in book") args = parser.parse_args() # Auto-resume if no start page specified if args.start_page is None: last_page = get_last_completed_page() start_page = last_page + 1 print(f"šŸ“‹ Auto-resuming from page {start_page}") else: start_page = args.start_page if start_page > args.total_pages: print("āœ… All pages have been completed!") else: asyncio.run(chunked_kindle_scanner(start_page, args.chunk_size, args.total_pages))