408 lines
19 KiB
Python
408 lines
19 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import hashlib
|
|
import json
|
|
import logging.handlers
|
|
import os
|
|
import tempfile
|
|
import time
|
|
from collections.abc import Iterable
|
|
from datetime import datetime
|
|
from multiprocessing import Pool
|
|
from pathlib import Path
|
|
from time import sleep
|
|
|
|
import praw
|
|
import praw.exceptions
|
|
import praw.models
|
|
import prawcore
|
|
|
|
from bdfr import exceptions as errors
|
|
from bdfr.configuration import Configuration
|
|
from bdfr.connector import RedditConnector
|
|
from bdfr.site_downloaders.download_factory import DownloadFactory
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _calc_hash(existing_file: Path):
|
|
chunk_size = 1024 * 1024
|
|
md5_hash = hashlib.md5()
|
|
with existing_file.open("rb") as file:
|
|
chunk = file.read(chunk_size)
|
|
while chunk:
|
|
md5_hash.update(chunk)
|
|
chunk = file.read(chunk_size)
|
|
file_hash = md5_hash.hexdigest()
|
|
return existing_file, file_hash
|
|
|
|
|
|
class RedditDownloader(RedditConnector):
|
|
def __init__(self, args: Configuration, logging_handlers: Iterable[logging.Handler] = ()):
|
|
super(RedditDownloader, self).__init__(args, logging_handlers)
|
|
self.master_hash_list = {}
|
|
self.url_list = {} # New: Store URL to hash mapping for simple-check
|
|
|
|
# Load existing hashes if no_dupes is enabled or search_existing is requested
|
|
if self.args.no_dupes or self.args.search_existing:
|
|
# First try to load from persistent hash file
|
|
hash_data = self._load_hash_list()
|
|
|
|
# Handle both old and new hash file formats
|
|
if isinstance(hash_data, dict) and 'files' in hash_data:
|
|
# New format with enhanced structure
|
|
self.master_hash_list = {k: v['path'] for k, v in hash_data['files'].items()}
|
|
self.url_list = hash_data.get('urls', {})
|
|
logger.info(f"Loaded {len(self.master_hash_list)} hashes and {len(self.url_list)} URLs from enhanced hash file")
|
|
else:
|
|
# Old format - just hashes
|
|
self.master_hash_list = hash_data
|
|
logger.info(f"Loaded {len(self.master_hash_list)} hashes from legacy hash file")
|
|
|
|
# If search_existing is also enabled, scan for any new files not in hash list
|
|
if self.args.search_existing:
|
|
existing_hashes = set(self.master_hash_list.keys())
|
|
all_files_hashes = self.scan_existing_files(self.download_directory)
|
|
|
|
# Add any new files found by scanning
|
|
for hash_value, file_path in all_files_hashes.items():
|
|
if hash_value not in existing_hashes:
|
|
self.master_hash_list[hash_value] = file_path
|
|
|
|
logger.info(f"Loaded {len(self.master_hash_list)} total hashes "
|
|
f"({len(existing_hashes)} from file, {len(all_files_hashes) - len(existing_hashes)} new)")
|
|
|
|
def download(self):
|
|
for generator in self.reddit_lists:
|
|
last_submission_id = None
|
|
try:
|
|
for submission in generator:
|
|
last_submission_id = submission.id
|
|
try:
|
|
self._download_submission(submission)
|
|
except prawcore.PrawcoreException as e:
|
|
logger.error(f"Submission {submission.id} failed to download due to a PRAW exception: {e}")
|
|
except prawcore.PrawcoreException as e:
|
|
submission_id = last_submission_id or "unknown"
|
|
logger.error(f"The submission after {submission_id} failed to download due to a PRAW exception: {e}")
|
|
logger.debug("Waiting 60 seconds to continue")
|
|
sleep(60)
|
|
|
|
# Save hash list after completion if no_dupes is enabled
|
|
# Always save if no_dupes is enabled, even if hash list is empty
|
|
# This creates the hash file for future runs
|
|
if self.args.no_dupes:
|
|
self._save_hash_list()
|
|
|
|
def _download_submission(self, submission: praw.models.Submission):
|
|
if submission.id in self.excluded_submission_ids:
|
|
logger.debug(f"Object {submission.id} in exclusion list, skipping")
|
|
return
|
|
elif submission.subreddit.display_name.lower() in self.args.skip_subreddit:
|
|
logger.debug(f"Submission {submission.id} in {submission.subreddit.display_name} in skip list")
|
|
return
|
|
elif (submission.author and submission.author.name in self.args.ignore_user) or (
|
|
submission.author is None and "DELETED" in self.args.ignore_user
|
|
):
|
|
logger.debug(
|
|
f"Submission {submission.id} in {submission.subreddit.display_name} skipped"
|
|
f' due to {submission.author.name if submission.author else "DELETED"} being an ignored user'
|
|
)
|
|
return
|
|
elif self.args.min_score and submission.score < self.args.min_score:
|
|
logger.debug(
|
|
f"Submission {submission.id} filtered due to score {submission.score} < [{self.args.min_score}]"
|
|
)
|
|
return
|
|
elif self.args.max_score and self.args.max_score < submission.score:
|
|
logger.debug(
|
|
f"Submission {submission.id} filtered due to score {submission.score} > [{self.args.max_score}]"
|
|
)
|
|
return
|
|
elif (self.args.min_score_ratio and submission.upvote_ratio < self.args.min_score_ratio) or (
|
|
self.args.max_score_ratio and self.args.max_score_ratio < submission.upvote_ratio
|
|
):
|
|
logger.debug(f"Submission {submission.id} filtered due to score ratio ({submission.upvote_ratio})")
|
|
return
|
|
elif not isinstance(submission, praw.models.Submission):
|
|
logger.warning(f"{submission.id} is not a submission")
|
|
return
|
|
elif not self.download_filter.check_url(submission.url):
|
|
logger.debug(f"Submission {submission.id} filtered due to URL {submission.url}")
|
|
return
|
|
|
|
logger.debug(f"Attempting to download submission {submission.id}")
|
|
try:
|
|
downloader_class = DownloadFactory.pull_lever(submission.url)
|
|
downloader = downloader_class(submission)
|
|
logger.debug(f"Using {downloader_class.__name__} with url {submission.url}")
|
|
except errors.NotADownloadableLinkError as e:
|
|
logger.error(f"Could not download submission {submission.id}: {e}")
|
|
return
|
|
if downloader_class.__name__.lower() in self.args.disable_module:
|
|
logger.debug(f"Submission {submission.id} skipped due to disabled module {downloader_class.__name__}")
|
|
return
|
|
try:
|
|
content = downloader.find_resources(self.authenticator)
|
|
except errors.SiteDownloaderError as e:
|
|
logger.error(f"Site {downloader_class.__name__} failed to download submission {submission.id}: {e}")
|
|
return
|
|
files_processed = 0
|
|
for destination, res in self.file_name_formatter.format_resource_paths(content, self.download_directory):
|
|
if destination.exists():
|
|
# Check if we already have this file's hash
|
|
if destination in self.master_hash_list.values():
|
|
logger.debug(f"File {destination} from submission {submission.id} already exists, continuing")
|
|
continue
|
|
else:
|
|
# File exists but not in our hash list - calculate its hash
|
|
try:
|
|
existing_file_hash = _calc_hash(destination)[1]
|
|
self.master_hash_list[existing_file_hash] = destination
|
|
|
|
# Store URL mapping for simple-check functionality if URL is available
|
|
if hasattr(res, 'url') and self.args.simple_check:
|
|
self.url_list[res.url] = existing_file_hash
|
|
|
|
logger.debug(f"Added hash for existing file: {existing_file_hash}")
|
|
files_processed += 1
|
|
if self.args.no_dupes:
|
|
self._save_hash_list()
|
|
except Exception as e:
|
|
logger.warning(f"Failed to calculate hash for existing file {destination}: {e}")
|
|
continue
|
|
elif not self.download_filter.check_resource(res):
|
|
logger.debug(f"Download filter removed {submission.id} file with URL {submission.url}")
|
|
continue
|
|
try:
|
|
res.download({"max_wait_time": self.args.max_wait_time})
|
|
print(f"DEBUG: Successfully downloaded resource {res.url}")
|
|
except errors.BulkDownloaderException as e:
|
|
logger.error(
|
|
f"Failed to download resource {res.url} in submission {submission.id} "
|
|
f"with downloader {downloader_class.__name__}: {e}"
|
|
)
|
|
return
|
|
resource_hash = res.hash.hexdigest()
|
|
destination.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Simple-check: URL-based duplicate detection (fast path)
|
|
if self.args.simple_check and hasattr(res, 'url') and res.url in self.url_list:
|
|
stored_hash = self.url_list[res.url]
|
|
if stored_hash in self.master_hash_list:
|
|
logger.info(f"URL {res.url} from submission {submission.id} already downloaded (simple-check)")
|
|
return
|
|
|
|
# Full hash-based duplicate detection
|
|
if resource_hash in self.master_hash_list:
|
|
if self.args.no_dupes:
|
|
logger.info(f"Resource hash {resource_hash} from submission {submission.id} downloaded elsewhere")
|
|
return
|
|
elif self.args.make_hard_links:
|
|
try:
|
|
destination.hardlink_to(self.master_hash_list[resource_hash])
|
|
except AttributeError:
|
|
self.master_hash_list[resource_hash].link_to(destination)
|
|
logger.info(
|
|
f"Hard link made linking {destination} to {self.master_hash_list[resource_hash]}"
|
|
f" in submission {submission.id}"
|
|
)
|
|
files_processed += 1
|
|
# Save hash list after successful hard link creation if no_dupes is enabled
|
|
if self.args.no_dupes:
|
|
self._save_hash_list()
|
|
return
|
|
try:
|
|
with destination.open("wb") as file:
|
|
file.write(res.content)
|
|
logger.debug(f"Written file to {destination}")
|
|
files_processed += 1
|
|
except OSError as e:
|
|
logger.exception(e)
|
|
logger.error(f"Failed to write file in submission {submission.id} to {destination}: {e}")
|
|
return
|
|
creation_time = time.mktime(datetime.fromtimestamp(submission.created_utc).timetuple())
|
|
os.utime(destination, (creation_time, creation_time))
|
|
self.master_hash_list[resource_hash] = destination
|
|
|
|
# Store URL mapping for simple-check functionality
|
|
if hasattr(res, 'url') and self.args.simple_check:
|
|
self.url_list[res.url] = resource_hash
|
|
|
|
logger.debug(f"Hash added to master list: {resource_hash}")
|
|
logger.debug(f"Master hash list now contains {len(self.master_hash_list)} entries")
|
|
|
|
# Save hash list after successful download if no_dupes is enabled
|
|
if self.args.no_dupes:
|
|
self._save_hash_list()
|
|
|
|
# Only log "Downloaded submission" if files were actually processed
|
|
if files_processed > 0:
|
|
logger.info(f"Downloaded submission {submission.id} from {submission.subreddit.display_name}")
|
|
else:
|
|
logger.info(f"Skipped submission {submission.id} from {submission.subreddit.display_name} (no new files)")
|
|
|
|
@staticmethod
|
|
def scan_existing_files(directory: Path) -> dict[str, Path]:
|
|
files = []
|
|
for (dirpath, _dirnames, filenames) in os.walk(directory):
|
|
files.extend([Path(dirpath, file) for file in filenames])
|
|
logger.info(f"Calculating hashes for {len(files)} files")
|
|
|
|
pool = Pool(15)
|
|
results = pool.map(_calc_hash, files)
|
|
pool.close()
|
|
|
|
hash_list = {res[1]: res[0] for res in results}
|
|
return hash_list
|
|
|
|
def get_master_hash_list(self) -> dict[str, Path]:
|
|
"""Get the current master hash list for testing purposes."""
|
|
return self.master_hash_list
|
|
|
|
def _load_hash_list(self) -> dict[str, Path]:
|
|
"""Load existing hash list from .bdfr_hashes.json in download directory."""
|
|
logger.debug(f"Loading hash list from directory: {self.download_directory}")
|
|
hash_file_path = self.download_directory / '.bdfr_hashes.json'
|
|
|
|
if not hash_file_path.exists():
|
|
logger.debug(f"No existing hash file found at {hash_file_path}")
|
|
return {}
|
|
|
|
try:
|
|
with open(hash_file_path, 'r', encoding='utf-8') as f:
|
|
hash_data = json.load(f)
|
|
|
|
if not isinstance(hash_data, dict):
|
|
logger.warning(f"Hash file {hash_file_path} contains invalid data format")
|
|
return {}
|
|
|
|
# Handle new enhanced format
|
|
if 'files' in hash_data and isinstance(hash_data['files'], dict):
|
|
# New format with enhanced structure
|
|
files_data = hash_data['files']
|
|
loaded_hashes = {}
|
|
urls_data = hash_data.get('urls', {})
|
|
|
|
for hash_value, file_info in files_data.items():
|
|
if isinstance(file_info, dict) and 'path' in file_info:
|
|
# New format: {"hash": {"path": "relative/path", "url": "http://..."}}
|
|
relative_path = file_info['path']
|
|
absolute_path = self.download_directory / relative_path
|
|
if absolute_path.exists():
|
|
loaded_hashes[hash_value] = absolute_path
|
|
else:
|
|
logger.debug(f"File {absolute_path} from hash file no longer exists")
|
|
|
|
# Load URL mapping for simple-check
|
|
if 'url' in file_info and file_info['url']:
|
|
self.url_list[file_info['url']] = hash_value
|
|
elif isinstance(file_info, str):
|
|
# Legacy format within new structure: {"hash": "relative/path"}
|
|
absolute_path = self.download_directory / file_info
|
|
if absolute_path.exists():
|
|
loaded_hashes[hash_value] = absolute_path
|
|
else:
|
|
logger.debug(f"File {absolute_path} from hash file no longer exists")
|
|
|
|
logger.info(f"Loaded {len(loaded_hashes)} hashes and {len(urls_data)} URLs from enhanced hash file")
|
|
return loaded_hashes
|
|
|
|
else:
|
|
# Legacy format: {"hash": "relative/path"}
|
|
loaded_hashes = {}
|
|
for hash_value, relative_path in hash_data.items():
|
|
absolute_path = self.download_directory / relative_path
|
|
if absolute_path.exists():
|
|
loaded_hashes[hash_value] = absolute_path
|
|
else:
|
|
logger.debug(f"File {absolute_path} from hash file no longer exists")
|
|
|
|
logger.info(f"Loaded {len(loaded_hashes)} hashes from legacy hash file")
|
|
return loaded_hashes
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.warning(f"Failed to parse hash file {hash_file_path}: {e}")
|
|
return {}
|
|
except (OSError, IOError) as e:
|
|
logger.warning(f"Failed to read hash file {hash_file_path}: {e}")
|
|
return {}
|
|
|
|
def _save_hash_list(self) -> None:
|
|
"""Save current hash list to .bdfr_hashes.json in download directory using atomic write."""
|
|
hash_file_path = self.download_directory / '.bdfr_hashes.json'
|
|
|
|
# Build enhanced data structure for new format
|
|
if self.args.simple_check:
|
|
# New enhanced format with URLs and metadata
|
|
hash_data = {
|
|
'files': {},
|
|
'urls': self.url_list.copy(),
|
|
'metadata': {
|
|
'version': '2.0',
|
|
'created_with': 'simple_check' if self.args.simple_check else 'standard',
|
|
'url_count': len(self.url_list),
|
|
'hash_count': len(self.master_hash_list)
|
|
}
|
|
}
|
|
|
|
# Convert absolute paths to relative paths for portability
|
|
for hash_value, absolute_path in self.master_hash_list.items():
|
|
try:
|
|
relative_path = absolute_path.relative_to(self.download_directory)
|
|
hash_data['files'][hash_value] = {
|
|
'path': str(relative_path),
|
|
'url': next((url for url, h in self.url_list.items() if h == hash_value), None),
|
|
'check_method': 'hash'
|
|
}
|
|
except ValueError:
|
|
# File is not relative to download directory, skip it
|
|
logger.debug(f"Skipping file {absolute_path} as it's not relative to download directory")
|
|
continue
|
|
else:
|
|
# Legacy format for backward compatibility
|
|
hash_data = {}
|
|
for hash_value, absolute_path in self.master_hash_list.items():
|
|
try:
|
|
relative_path = absolute_path.relative_to(self.download_directory)
|
|
hash_data[hash_value] = str(relative_path)
|
|
except ValueError:
|
|
# File is not relative to download directory, skip it
|
|
logger.debug(f"Skipping file {absolute_path} as it's not relative to download directory")
|
|
continue
|
|
|
|
# Atomic write: write to temporary file first, then rename
|
|
try:
|
|
with tempfile.NamedTemporaryFile(
|
|
mode='w',
|
|
dir=self.download_directory,
|
|
suffix='.tmp',
|
|
delete=False,
|
|
encoding='utf-8'
|
|
) as temp_file:
|
|
json.dump(hash_data, temp_file, indent=2)
|
|
temp_file_path = temp_file.name
|
|
|
|
# Atomic rename
|
|
if os.name == 'nt': # Windows
|
|
# On Windows, we need to remove the target file first if it exists
|
|
if hash_file_path.exists():
|
|
hash_file_path.unlink()
|
|
os.rename(temp_file_path, hash_file_path)
|
|
else: # Unix-like systems
|
|
os.rename(temp_file_path, hash_file_path)
|
|
|
|
logger.debug(f"Saved {len(hash_data)} hashes to {hash_file_path}")
|
|
|
|
except (OSError, IOError) as e:
|
|
logger.error(f"Failed to save hash file {hash_file_path}: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error saving hash file {hash_file_path}: {e}")
|
|
# Clean up temp file if it still exists
|
|
try:
|
|
if 'temp_file_path' in locals():
|
|
os.unlink(temp_file_path)
|
|
except (OSError, IOError):
|
|
pass
|