#!/usr/bin/env python3 # -*- coding: utf-8 -*- import hashlib import json import logging.handlers import os import tempfile import time from collections.abc import Iterable from datetime import datetime from multiprocessing import Pool from pathlib import Path from time import sleep import praw import praw.exceptions import praw.models import prawcore from bdfr import exceptions as errors from bdfr.configuration import Configuration from bdfr.connector import RedditConnector from bdfr.site_downloaders.download_factory import DownloadFactory logger = logging.getLogger(__name__) def _calc_hash(existing_file: Path): chunk_size = 1024 * 1024 md5_hash = hashlib.md5() with existing_file.open("rb") as file: chunk = file.read(chunk_size) while chunk: md5_hash.update(chunk) chunk = file.read(chunk_size) file_hash = md5_hash.hexdigest() return existing_file, file_hash class RedditDownloader(RedditConnector): def __init__(self, args: Configuration, logging_handlers: Iterable[logging.Handler] = ()): super(RedditDownloader, self).__init__(args, logging_handlers) self.master_hash_list = {} self.url_list = {} # New: Store URL to hash mapping for simple-check # Load existing hashes if no_dupes is enabled or search_existing is requested if self.args.no_dupes or self.args.search_existing: # First try to load from persistent hash file hash_data = self._load_hash_list() # Handle both old and new hash file formats if isinstance(hash_data, dict) and 'files' in hash_data: # New format with enhanced structure self.master_hash_list = {k: v['path'] for k, v in hash_data['files'].items()} self.url_list = hash_data.get('urls', {}) logger.info(f"Loaded {len(self.master_hash_list)} hashes and {len(self.url_list)} URLs from enhanced hash file") else: # Old format - just hashes self.master_hash_list = hash_data logger.info(f"Loaded {len(self.master_hash_list)} hashes from legacy hash file") # If search_existing is also enabled, scan for any new files not in hash list if self.args.search_existing: existing_hashes = set(self.master_hash_list.keys()) all_files_hashes = self.scan_existing_files(self.download_directory) # Add any new files found by scanning for hash_value, file_path in all_files_hashes.items(): if hash_value not in existing_hashes: self.master_hash_list[hash_value] = file_path logger.info(f"Loaded {len(self.master_hash_list)} total hashes " f"({len(existing_hashes)} from file, {len(all_files_hashes) - len(existing_hashes)} new)") def download(self): for generator in self.reddit_lists: last_submission_id = None try: for submission in generator: last_submission_id = submission.id try: self._download_submission(submission) except prawcore.PrawcoreException as e: logger.error(f"Submission {submission.id} failed to download due to a PRAW exception: {e}") except prawcore.PrawcoreException as e: submission_id = last_submission_id or "unknown" logger.error(f"The submission after {submission_id} failed to download due to a PRAW exception: {e}") logger.debug("Waiting 60 seconds to continue") sleep(60) # Save hash list after completion if no_dupes is enabled # Always save if no_dupes is enabled, even if hash list is empty # This creates the hash file for future runs if self.args.no_dupes: self._save_hash_list() def _download_submission(self, submission: praw.models.Submission): if submission.id in self.excluded_submission_ids: logger.debug(f"Object {submission.id} in exclusion list, skipping") return elif submission.subreddit.display_name.lower() in self.args.skip_subreddit: logger.debug(f"Submission {submission.id} in {submission.subreddit.display_name} in skip list") return elif (submission.author and submission.author.name in self.args.ignore_user) or ( submission.author is None and "DELETED" in self.args.ignore_user ): logger.debug( f"Submission {submission.id} in {submission.subreddit.display_name} skipped" f' due to {submission.author.name if submission.author else "DELETED"} being an ignored user' ) return elif self.args.min_score and submission.score < self.args.min_score: logger.debug( f"Submission {submission.id} filtered due to score {submission.score} < [{self.args.min_score}]" ) return elif self.args.max_score and self.args.max_score < submission.score: logger.debug( f"Submission {submission.id} filtered due to score {submission.score} > [{self.args.max_score}]" ) return elif (self.args.min_score_ratio and submission.upvote_ratio < self.args.min_score_ratio) or ( self.args.max_score_ratio and self.args.max_score_ratio < submission.upvote_ratio ): logger.debug(f"Submission {submission.id} filtered due to score ratio ({submission.upvote_ratio})") return elif not isinstance(submission, praw.models.Submission): logger.warning(f"{submission.id} is not a submission") return elif not self.download_filter.check_url(submission.url): logger.debug(f"Submission {submission.id} filtered due to URL {submission.url}") return logger.debug(f"Attempting to download submission {submission.id}") try: downloader_class = DownloadFactory.pull_lever(submission.url) downloader = downloader_class(submission) logger.debug(f"Using {downloader_class.__name__} with url {submission.url}") except errors.NotADownloadableLinkError as e: logger.error(f"Could not download submission {submission.id}: {e}") return if downloader_class.__name__.lower() in self.args.disable_module: logger.debug(f"Submission {submission.id} skipped due to disabled module {downloader_class.__name__}") return try: content = downloader.find_resources(self.authenticator) except errors.SiteDownloaderError as e: logger.error(f"Site {downloader_class.__name__} failed to download submission {submission.id}: {e}") return files_processed = 0 for destination, res in self.file_name_formatter.format_resource_paths(content, self.download_directory): if destination.exists(): # Check if we already have this file's hash if destination in self.master_hash_list.values(): logger.debug(f"File {destination} from submission {submission.id} already exists, continuing") continue else: # File exists but not in our hash list - calculate its hash try: existing_file_hash = _calc_hash(destination)[1] self.master_hash_list[existing_file_hash] = destination # Store URL mapping for simple-check functionality if URL is available if hasattr(res, 'url') and self.args.simple_check: self.url_list[res.url] = existing_file_hash logger.debug(f"Added hash for existing file: {existing_file_hash}") files_processed += 1 if self.args.no_dupes: self._save_hash_list() except Exception as e: logger.warning(f"Failed to calculate hash for existing file {destination}: {e}") continue elif not self.download_filter.check_resource(res): logger.debug(f"Download filter removed {submission.id} file with URL {submission.url}") continue try: res.download({"max_wait_time": self.args.max_wait_time}) print(f"DEBUG: Successfully downloaded resource {res.url}") except errors.BulkDownloaderException as e: logger.error( f"Failed to download resource {res.url} in submission {submission.id} " f"with downloader {downloader_class.__name__}: {e}" ) return resource_hash = res.hash.hexdigest() destination.parent.mkdir(parents=True, exist_ok=True) # Simple-check: URL-based duplicate detection (fast path) if self.args.simple_check and hasattr(res, 'url') and res.url in self.url_list: stored_hash = self.url_list[res.url] if stored_hash in self.master_hash_list: logger.info(f"URL {res.url} from submission {submission.id} already downloaded (simple-check)") return # Full hash-based duplicate detection if resource_hash in self.master_hash_list: if self.args.no_dupes: logger.info(f"Resource hash {resource_hash} from submission {submission.id} downloaded elsewhere") return elif self.args.make_hard_links: try: destination.hardlink_to(self.master_hash_list[resource_hash]) except AttributeError: self.master_hash_list[resource_hash].link_to(destination) logger.info( f"Hard link made linking {destination} to {self.master_hash_list[resource_hash]}" f" in submission {submission.id}" ) files_processed += 1 # Save hash list after successful hard link creation if no_dupes is enabled if self.args.no_dupes: self._save_hash_list() return try: with destination.open("wb") as file: file.write(res.content) logger.debug(f"Written file to {destination}") files_processed += 1 except OSError as e: logger.exception(e) logger.error(f"Failed to write file in submission {submission.id} to {destination}: {e}") return creation_time = time.mktime(datetime.fromtimestamp(submission.created_utc).timetuple()) os.utime(destination, (creation_time, creation_time)) self.master_hash_list[resource_hash] = destination # Store URL mapping for simple-check functionality if hasattr(res, 'url') and self.args.simple_check: self.url_list[res.url] = resource_hash logger.debug(f"Hash added to master list: {resource_hash}") logger.debug(f"Master hash list now contains {len(self.master_hash_list)} entries") # Save hash list after successful download if no_dupes is enabled if self.args.no_dupes: self._save_hash_list() # Only log "Downloaded submission" if files were actually processed if files_processed > 0: logger.info(f"Downloaded submission {submission.id} from {submission.subreddit.display_name}") else: logger.info(f"Skipped submission {submission.id} from {submission.subreddit.display_name} (no new files)") @staticmethod def scan_existing_files(directory: Path) -> dict[str, Path]: files = [] for (dirpath, _dirnames, filenames) in os.walk(directory): files.extend([Path(dirpath, file) for file in filenames]) logger.info(f"Calculating hashes for {len(files)} files") pool = Pool(15) results = pool.map(_calc_hash, files) pool.close() hash_list = {res[1]: res[0] for res in results} return hash_list def get_master_hash_list(self) -> dict[str, Path]: """Get the current master hash list for testing purposes.""" return self.master_hash_list def _load_hash_list(self) -> dict[str, Path]: """Load existing hash list from .bdfr_hashes.json in download directory.""" logger.debug(f"Loading hash list from directory: {self.download_directory}") hash_file_path = self.download_directory / '.bdfr_hashes.json' if not hash_file_path.exists(): logger.debug(f"No existing hash file found at {hash_file_path}") return {} try: with open(hash_file_path, 'r', encoding='utf-8') as f: hash_data = json.load(f) if not isinstance(hash_data, dict): logger.warning(f"Hash file {hash_file_path} contains invalid data format") return {} # Handle new enhanced format if 'files' in hash_data and isinstance(hash_data['files'], dict): # New format with enhanced structure files_data = hash_data['files'] loaded_hashes = {} urls_data = hash_data.get('urls', {}) for hash_value, file_info in files_data.items(): if isinstance(file_info, dict) and 'path' in file_info: # New format: {"hash": {"path": "relative/path", "url": "http://..."}} relative_path = file_info['path'] absolute_path = self.download_directory / relative_path if absolute_path.exists(): loaded_hashes[hash_value] = absolute_path else: logger.debug(f"File {absolute_path} from hash file no longer exists") # Load URL mapping for simple-check if 'url' in file_info and file_info['url']: self.url_list[file_info['url']] = hash_value elif isinstance(file_info, str): # Legacy format within new structure: {"hash": "relative/path"} absolute_path = self.download_directory / file_info if absolute_path.exists(): loaded_hashes[hash_value] = absolute_path else: logger.debug(f"File {absolute_path} from hash file no longer exists") logger.info(f"Loaded {len(loaded_hashes)} hashes and {len(urls_data)} URLs from enhanced hash file") return loaded_hashes else: # Legacy format: {"hash": "relative/path"} loaded_hashes = {} for hash_value, relative_path in hash_data.items(): absolute_path = self.download_directory / relative_path if absolute_path.exists(): loaded_hashes[hash_value] = absolute_path else: logger.debug(f"File {absolute_path} from hash file no longer exists") logger.info(f"Loaded {len(loaded_hashes)} hashes from legacy hash file") return loaded_hashes except json.JSONDecodeError as e: logger.warning(f"Failed to parse hash file {hash_file_path}: {e}") return {} except (OSError, IOError) as e: logger.warning(f"Failed to read hash file {hash_file_path}: {e}") return {} def _save_hash_list(self) -> None: """Save current hash list to .bdfr_hashes.json in download directory using atomic write.""" hash_file_path = self.download_directory / '.bdfr_hashes.json' # Build enhanced data structure for new format if self.args.simple_check: # New enhanced format with URLs and metadata hash_data = { 'files': {}, 'urls': self.url_list.copy(), 'metadata': { 'version': '2.0', 'created_with': 'simple_check' if self.args.simple_check else 'standard', 'url_count': len(self.url_list), 'hash_count': len(self.master_hash_list) } } # Convert absolute paths to relative paths for portability for hash_value, absolute_path in self.master_hash_list.items(): try: relative_path = absolute_path.relative_to(self.download_directory) hash_data['files'][hash_value] = { 'path': str(relative_path), 'url': next((url for url, h in self.url_list.items() if h == hash_value), None), 'check_method': 'hash' } except ValueError: # File is not relative to download directory, skip it logger.debug(f"Skipping file {absolute_path} as it's not relative to download directory") continue else: # Legacy format for backward compatibility hash_data = {} for hash_value, absolute_path in self.master_hash_list.items(): try: relative_path = absolute_path.relative_to(self.download_directory) hash_data[hash_value] = str(relative_path) except ValueError: # File is not relative to download directory, skip it logger.debug(f"Skipping file {absolute_path} as it's not relative to download directory") continue # Atomic write: write to temporary file first, then rename try: with tempfile.NamedTemporaryFile( mode='w', dir=self.download_directory, suffix='.tmp', delete=False, encoding='utf-8' ) as temp_file: json.dump(hash_data, temp_file, indent=2) temp_file_path = temp_file.name # Atomic rename if os.name == 'nt': # Windows # On Windows, we need to remove the target file first if it exists if hash_file_path.exists(): hash_file_path.unlink() os.rename(temp_file_path, hash_file_path) else: # Unix-like systems os.rename(temp_file_path, hash_file_path) logger.debug(f"Saved {len(hash_data)} hashes to {hash_file_path}") except (OSError, IOError) as e: logger.error(f"Failed to save hash file {hash_file_path}: {e}") except Exception as e: logger.error(f"Unexpected error saving hash file {hash_file_path}: {e}") # Clean up temp file if it still exists try: if 'temp_file_path' in locals(): os.unlink(temp_file_path) except (OSError, IOError): pass