Refactor Imgur class to be hardier

This commit is contained in:
Serene-Arc
2021-03-21 11:10:06 +10:00
committed by Ali Parlakci
parent ba2ab25c2c
commit 1215bc69de

View File

@@ -1,99 +1,73 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import json import json
import logging import re
from typing import Optional from typing import Optional
import bs4
import requests import requests
from praw.models import Submission from praw.models import Submission
from bulkredditdownloader.site_authenticator import SiteAuthenticator
from bulkredditdownloader.exceptions import NotADownloadableLinkError, ResourceNotFound, SiteDownloaderError from bulkredditdownloader.exceptions import NotADownloadableLinkError, ResourceNotFound, SiteDownloaderError
from bulkredditdownloader.resource import Resource from bulkredditdownloader.resource import Resource
from bulkredditdownloader.site_authenticator import SiteAuthenticator
from bulkredditdownloader.site_downloaders.base_downloader import BaseDownloader from bulkredditdownloader.site_downloaders.base_downloader import BaseDownloader
from bulkredditdownloader.site_downloaders.direct import Direct
logger = logging.getLogger(__name__)
class Imgur(BaseDownloader): class Imgur(BaseDownloader):
imgur_image_domain = "https://i.imgur.com/"
def __init__(self, post: Submission): def __init__(self, post: Submission):
super().__init__(post) super().__init__(post)
self.raw_data = {} self.raw_data = {}
def find_resources(self, authenticator: Optional[SiteAuthenticator] = None) -> list[Resource]: def find_resources(self, authenticator: Optional[SiteAuthenticator] = None) -> list[Resource]:
link = self.post.url self.raw_data = self._get_data(self.post.url)
if link.endswith(".gifv"):
direct_thing = Direct(self.post)
return direct_thing.find_resources(authenticator)
self.raw_data = self._get_data(link)
if self._is_album():
if self.raw_data["album_images"]["count"] != 1:
out = self._download_album(self.raw_data["album_images"])
else:
out = self._download_image(self.raw_data["album_images"]["images"][0])
else:
out = self._download_image(self.raw_data)
return out
def _download_album(self, images: dict):
images_length = images["count"]
out = [] out = []
if 'album_images' in self.raw_data:
for i in range(images_length): images = self.raw_data['album_images']
extension = self._validate_extension(images["images"][i]["ext"]) for image in images['images']:
image_url = self.imgur_image_domain + images["images"][i]["hash"] + extension out.append(self._download_image(image))
out.append(Resource(self.post, image_url)) else:
out.append(self._download_image(self.raw_data))
return out return out
def _download_image(self, image: dict): def _download_image(self, image: dict) -> Resource:
extension = self._validate_extension(image["ext"]) image_url = 'https://i.imgur.com/' + image['hash'] + self._validate_extension(image['ext'])
image_url = self.imgur_image_domain + image["hash"] + extension return Resource(self.post, image_url)
return [Resource(self.post, image_url)]
def _is_album(self) -> bool:
return "album_images" in self.raw_data
@staticmethod @staticmethod
def _get_data(link: str) -> dict: def _get_data(link: str) -> dict:
cookies = {"over18": "1", "postpagebeta": "0"} res = requests.get(link, cookies={'over18': '1', 'postpagebeta': '0'})
res = requests.get(link, cookies=cookies)
if res.status_code != 200: if res.status_code != 200:
raise ResourceNotFound(f"Server responded with {res.status_code} to {link}") raise ResourceNotFound(f'Server responded with {res.status_code} to {link}')
page_source = requests.get(link, cookies=cookies).text
starting_string = "image : " soup = bs4.BeautifulSoup(res.text)
ending_string = "group :" scripts = soup.find_all('script', attrs={'type': 'text/javascript'})
scripts = [script.string.replace('\n', '') for script in scripts if script.string]
starting_string_lenght = len(starting_string) script_regex = re.compile(r'\s*\(function\(widgetFactory\)\s*{\s*widgetFactory\.mergeConfig\(\'gallery\'')
try: chosen_script = list(filter(lambda s: re.search(script_regex, s), scripts))
start_index = page_source.index(starting_string) + starting_string_lenght if len(chosen_script) != 1:
end_index = page_source.index(ending_string, start_index) raise NotADownloadableLinkError(f'Could not read page source from {link}')
except ValueError: else:
raise NotADownloadableLinkError( chosen_script = chosen_script[0]
f"Could not read the page source on {link}")
while page_source[end_index] != "}": outer_regex = re.compile(r'widgetFactory\.mergeConfig\(\'gallery\', ({.*})\);')
end_index -= 1 image_dict = re.search(outer_regex, chosen_script).group(1)
try:
data = page_source[start_index:end_index + 2].strip()[:-1]
except IndexError:
page_source[end_index + 1] = '}'
data = page_source[start_index:end_index + 3].strip()[:-1]
return json.loads(data) inner_regex = re.compile(r'image\s*:(.*),\s*group')
image_dict = re.search(inner_regex, image_dict).group(1)
image_dict = json.loads(image_dict)
return image_dict
@staticmethod @staticmethod
def _validate_extension(extension_suffix: str) -> str: def _validate_extension(extension_suffix: str) -> str:
possible_extensions = [".jpg", ".png", ".mp4", ".gif"] possible_extensions = ('.jpg', '.png', '.mp4', '.gif')
for extension in possible_extensions: selection = [ext for ext in possible_extensions if ext == extension_suffix]
if extension in extension_suffix: if len(selection) == 1:
return extension return selection[0]
else: else:
raise SiteDownloaderError(f'"{extension_suffix}" is not recognized as a valid extension for Imgur') raise SiteDownloaderError(f'"{extension_suffix}" is not recognized as a valid extension for Imgur')