#!/usr/bin/env python3 """ PERSISTENT SESSION SCANNER - Uses storageState to maintain session across chunks Based on expert recommendation for bulletproof chunking """ import asyncio import argparse from playwright.async_api import async_playwright from pathlib import Path import time import json async def initialize_session(): """ Initialize the browser session, handle auth, and save storageState """ async with async_playwright() as p: browser = await p.chromium.launch( headless=False, args=[ "--disable-blink-features=AutomationControlled", "--disable-web-security", "--disable-features=VizDisplayCompositor" ] ) context = await browser.new_context( viewport={"width": 1920, "height": 1080}, user_agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ) await context.add_init_script(""" Object.defineProperty(navigator, 'webdriver', { get: () => undefined, }); """) page = await context.new_page() try: print("šŸš€ INITIALIZING PERSISTENT SESSION") print("=" * 50) # LOGIN AND NAVIGATE TO BEGINNING print("šŸ” Step 1: Logging in...") await page.goto("https://read.amazon.com/?asin=B0DJP2C8M6&ref_=kwl_kr_iv_rec_1") await page.wait_for_timeout(5000) if "signin" in page.url: email_field = await page.wait_for_selector("#ap_email", timeout=10000) await email_field.fill("ondrej.glaser@gmail.com") continue_btn = await page.wait_for_selector("#continue", timeout=5000) await continue_btn.click() await page.wait_for_timeout(3000) password_field = await page.wait_for_selector("#ap_password", timeout=10000) await password_field.fill("csjXgew3In") signin_btn = await page.wait_for_selector("#signInSubmit", timeout=5000) await signin_btn.click() await page.wait_for_timeout(5000) print("āœ… Login completed") # WAIT FOR READER AND NAVIGATE TO BEGINNING await page.wait_for_timeout(8000) print("šŸ“– Step 2: Navigating to book beginning...") try: toc_button = await page.wait_for_selector("[aria-label='Table of Contents']", timeout=5000) await toc_button.click() await page.wait_for_timeout(2000) cover_link = await page.wait_for_selector("text=Cover", timeout=5000) await cover_link.click() await page.wait_for_timeout(3000) # Close TOC for i in range(5): await page.keyboard.press("Escape") await page.wait_for_timeout(500) await page.click("body", position={"x": 600, "y": 400}) await page.wait_for_timeout(2000) print(" āœ… Navigated to beginning") except Exception as e: print(f" āš ļø TOC navigation failed: {e}") # SAVE SESSION STATE print("šŸ’¾ Step 3: Saving session state...") storage_state_path = "kindle_session_state.json" await context.storage_state(path=storage_state_path) print(f" āœ… Session saved to: {storage_state_path}") # TAKE INITIAL SCREENSHOT TO VERIFY POSITION await page.screenshot(path="session_init_position.png") print(" šŸ“ø Initial position screenshot saved") print("\nāœ… SESSION INITIALIZATION COMPLETE") print("Ready for chunked scanning with persistent state!") return True except Exception as e: print(f"āŒ Initialization error: {e}") return False finally: await browser.close() async def scan_chunk_with_persistence(start_page, chunk_size, total_pages=226): """ Scan a chunk using persistent session state """ storage_state_path = "kindle_session_state.json" if not Path(storage_state_path).exists(): print("āŒ No session state found. Run initialize_session first.") return False async with async_playwright() as p: browser = await p.chromium.launch( headless=False, args=[ "--disable-blink-features=AutomationControlled", "--disable-web-security", "--disable-features=VizDisplayCompositor" ] ) # LOAD PERSISTENT SESSION STATE context = await browser.new_context( storage_state=storage_state_path, viewport={"width": 1920, "height": 1080}, user_agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ) page = await context.new_page() try: end_page = min(start_page + chunk_size - 1, total_pages) print(f"šŸŽÆ SCANNING CHUNK: Pages {start_page} to {end_page}") print("=" * 50) # NAVIGATE TO BOOK (should maintain position due to session state) print("šŸ“– Loading book with persistent session...") await page.goto("https://read.amazon.com/?asin=B0DJP2C8M6&ref_=kwl_kr_iv_rec_1") await page.wait_for_timeout(5000) # NAVIGATE TO TARGET START PAGE if start_page > 1: print(f"šŸŽÆ Navigating to page {start_page}...") # Use fast navigation to reach target page for i in range(start_page - 1): await page.keyboard.press("ArrowRight") if i % 10 == 9: # Progress indicator every 10 pages print(f" šŸ“ Navigated {i + 1} pages...") await page.wait_for_timeout(200) # Fast navigation print(f" āœ… Reached target page {start_page}") # SCAN THE CHUNK output_dir = Path("scanned_pages") output_dir.mkdir(exist_ok=True) print(f"šŸš€ Scanning pages {start_page} to {end_page}...") consecutive_identical = 0 last_file_size = 0 for page_num in range(start_page, end_page + 1): print(f"šŸ“ø Scanning page {page_num}...") # Take screenshot filename = output_dir / f"page_{page_num:03d}.png" await page.screenshot(path=str(filename)) # Check file size file_size = filename.stat().st_size if abs(file_size - last_file_size) < 5000: consecutive_identical += 1 print(f" āš ļø Possible duplicate ({consecutive_identical}/7)") else: consecutive_identical = 0 print(f" āœ… New content ({file_size} bytes)") last_file_size = file_size # Stop if too many duplicates if consecutive_identical >= 7: print("šŸ“– Detected end of book") actual_end = page_num - consecutive_identical break # Navigate to next page (except last) if page_num < end_page: await page.keyboard.press("ArrowRight") await page.wait_for_timeout(1000) else: actual_end = end_page # SAVE PROGRESS progress_file = Path("scan_progress.json") progress_data = { "last_completed_page": actual_end, "total_pages": total_pages, "chunk_size": chunk_size, "timestamp": time.time(), "session_state_file": storage_state_path } with open(progress_file, 'w') as f: json.dump(progress_data, f, indent=2) print(f"\nšŸŽ‰ CHUNK COMPLETED!") print(f"šŸ“Š Scanned: {start_page} to {actual_end}") print(f"šŸ“ Progress saved to: {progress_file}") return actual_end except Exception as e: print(f"āŒ Scanning error: {e}") import traceback traceback.print_exc() return start_page - 1 finally: await browser.close() if __name__ == "__main__": parser = argparse.ArgumentParser(description="Persistent Session Kindle Scanner") parser.add_argument("--init", action="store_true", help="Initialize session") parser.add_argument("--start-page", type=int, default=1, help="Starting page") parser.add_argument("--chunk-size", type=int, default=40, help="Pages per chunk") parser.add_argument("--total-pages", type=int, default=226, help="Total pages") args = parser.parse_args() if args.init: print("Initializing session...") success = asyncio.run(initialize_session()) if success: print("āœ… Ready to start chunked scanning!") else: print("āŒ Initialization failed") else: result = asyncio.run(scan_chunk_with_persistence(args.start_page, args.chunk_size, args.total_pages)) if result: print(f"āœ… Chunk completed up to page {result}") else: print("āŒ Chunk failed")