Format according to the black standard

This commit is contained in:
Serene-Arc
2022-12-03 15:11:17 +10:00
parent 96cd7d7147
commit 0873a4a2b2
60 changed files with 2160 additions and 1790 deletions

View File

@@ -31,7 +31,7 @@ class BaseDownloader(ABC):
res = requests.get(url, cookies=cookies, headers=headers)
except requests.exceptions.RequestException as e:
logger.exception(e)
raise SiteDownloaderError(f'Failed to get page {url}')
raise SiteDownloaderError(f"Failed to get page {url}")
if res.status_code != 200:
raise ResourceNotFound(f'Server responded with {res.status_code} to {url}')
raise ResourceNotFound(f"Server responded with {res.status_code} to {url}")
return res

View File

@@ -5,8 +5,8 @@ from typing import Optional
from praw.models import Submission
from bdfr.site_authenticator import SiteAuthenticator
from bdfr.resource import Resource
from bdfr.site_authenticator import SiteAuthenticator
from bdfr.site_downloaders.base_downloader import BaseDownloader
logger = logging.getLogger(__name__)

View File

@@ -4,8 +4,8 @@ from typing import Optional
from praw.models import Submission
from bdfr.site_authenticator import SiteAuthenticator
from bdfr.resource import Resource
from bdfr.site_authenticator import SiteAuthenticator
from bdfr.site_downloaders.base_downloader import BaseDownloader

View File

@@ -26,62 +26,63 @@ class DownloadFactory:
@staticmethod
def pull_lever(url: str) -> Type[BaseDownloader]:
sanitised_url = DownloadFactory.sanitise_url(url)
if re.match(r'(i\.|m\.)?imgur', sanitised_url):
if re.match(r"(i\.|m\.)?imgur", sanitised_url):
return Imgur
elif re.match(r'(i\.)?(redgifs|gifdeliverynetwork)', sanitised_url):
elif re.match(r"(i\.)?(redgifs|gifdeliverynetwork)", sanitised_url):
return Redgifs
elif re.match(r'.*/.*\.\w{3,4}(\?[\w;&=]*)?$', sanitised_url) and \
not DownloadFactory.is_web_resource(sanitised_url):
elif re.match(r".*/.*\.\w{3,4}(\?[\w;&=]*)?$", sanitised_url) and not DownloadFactory.is_web_resource(
sanitised_url
):
return Direct
elif re.match(r'erome\.com.*', sanitised_url):
elif re.match(r"erome\.com.*", sanitised_url):
return Erome
elif re.match(r'delayforreddit\.com', sanitised_url):
elif re.match(r"delayforreddit\.com", sanitised_url):
return DelayForReddit
elif re.match(r'reddit\.com/gallery/.*', sanitised_url):
elif re.match(r"reddit\.com/gallery/.*", sanitised_url):
return Gallery
elif re.match(r'patreon\.com.*', sanitised_url):
elif re.match(r"patreon\.com.*", sanitised_url):
return Gallery
elif re.match(r'gfycat\.', sanitised_url):
elif re.match(r"gfycat\.", sanitised_url):
return Gfycat
elif re.match(r'reddit\.com/r/', sanitised_url):
elif re.match(r"reddit\.com/r/", sanitised_url):
return SelfPost
elif re.match(r'(m\.)?youtu\.?be', sanitised_url):
elif re.match(r"(m\.)?youtu\.?be", sanitised_url):
return Youtube
elif re.match(r'i\.redd\.it.*', sanitised_url):
elif re.match(r"i\.redd\.it.*", sanitised_url):
return Direct
elif re.match(r'v\.redd\.it.*', sanitised_url):
elif re.match(r"v\.redd\.it.*", sanitised_url):
return VReddit
elif re.match(r'pornhub\.com.*', sanitised_url):
elif re.match(r"pornhub\.com.*", sanitised_url):
return PornHub
elif re.match(r'vidble\.com', sanitised_url):
elif re.match(r"vidble\.com", sanitised_url):
return Vidble
elif YtdlpFallback.can_handle_link(sanitised_url):
return YtdlpFallback
else:
raise NotADownloadableLinkError(f'No downloader module exists for url {url}')
raise NotADownloadableLinkError(f"No downloader module exists for url {url}")
@staticmethod
def sanitise_url(url: str) -> str:
beginning_regex = re.compile(r'\s*(www\.?)?')
beginning_regex = re.compile(r"\s*(www\.?)?")
split_url = urllib.parse.urlsplit(url)
split_url = split_url.netloc + split_url.path
split_url = re.sub(beginning_regex, '', split_url)
split_url = re.sub(beginning_regex, "", split_url)
return split_url
@staticmethod
def is_web_resource(url: str) -> bool:
web_extensions = (
'asp',
'aspx',
'cfm',
'cfml',
'css',
'htm',
'html',
'js',
'php',
'php3',
'xhtml',
"asp",
"aspx",
"cfm",
"cfml",
"css",
"htm",
"html",
"js",
"php",
"php3",
"xhtml",
)
if re.match(rf'(?i).*/.*\.({"|".join(web_extensions)})$', url):
return True

View File

@@ -23,34 +23,34 @@ class Erome(BaseDownloader):
links = self._get_links(self.post.url)
if not links:
raise SiteDownloaderError('Erome parser could not find any links')
raise SiteDownloaderError("Erome parser could not find any links")
out = []
for link in links:
if not re.match(r'https?://.*', link):
link = 'https://' + link
if not re.match(r"https?://.*", link):
link = "https://" + link
out.append(Resource(self.post, link, self.erome_download(link)))
return out
@staticmethod
def _get_links(url: str) -> set[str]:
page = Erome.retrieve_url(url)
soup = bs4.BeautifulSoup(page.text, 'html.parser')
front_images = soup.find_all('img', attrs={'class': 'lasyload'})
out = [im.get('data-src') for im in front_images]
soup = bs4.BeautifulSoup(page.text, "html.parser")
front_images = soup.find_all("img", attrs={"class": "lasyload"})
out = [im.get("data-src") for im in front_images]
videos = soup.find_all('source')
out.extend([vid.get('src') for vid in videos])
videos = soup.find_all("source")
out.extend([vid.get("src") for vid in videos])
return set(out)
@staticmethod
def erome_download(url: str) -> Callable:
download_parameters = {
'headers': {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)'
' Chrome/88.0.4324.104 Safari/537.36',
'Referer': 'https://www.erome.com/',
"headers": {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)"
" Chrome/88.0.4324.104 Safari/537.36",
"Referer": "https://www.erome.com/",
},
}
return lambda global_params: Resource.http_download(url, global_params | download_parameters)

View File

@@ -7,7 +7,6 @@ from bdfr.site_downloaders.base_downloader import BaseDownloader
class BaseFallbackDownloader(BaseDownloader, ABC):
@staticmethod
@abstractmethod
def can_handle_link(url: str) -> bool:

View File

@@ -9,7 +9,9 @@ from praw.models import Submission
from bdfr.exceptions import NotADownloadableLinkError
from bdfr.resource import Resource
from bdfr.site_authenticator import SiteAuthenticator
from bdfr.site_downloaders.fallback_downloaders.fallback_downloader import BaseFallbackDownloader
from bdfr.site_downloaders.fallback_downloaders.fallback_downloader import (
BaseFallbackDownloader,
)
from bdfr.site_downloaders.youtube import Youtube
logger = logging.getLogger(__name__)
@@ -24,7 +26,7 @@ class YtdlpFallback(BaseFallbackDownloader, Youtube):
self.post,
self.post.url,
super()._download_video({}),
super().get_video_attributes(self.post.url)['ext'],
super().get_video_attributes(self.post.url)["ext"],
)
return [out]

View File

@@ -20,27 +20,27 @@ class Gallery(BaseDownloader):
def find_resources(self, authenticator: Optional[SiteAuthenticator] = None) -> list[Resource]:
try:
image_urls = self._get_links(self.post.gallery_data['items'])
image_urls = self._get_links(self.post.gallery_data["items"])
except (AttributeError, TypeError):
try:
image_urls = self._get_links(self.post.crosspost_parent_list[0]['gallery_data']['items'])
image_urls = self._get_links(self.post.crosspost_parent_list[0]["gallery_data"]["items"])
except (AttributeError, IndexError, TypeError, KeyError):
logger.error(f'Could not find gallery data in submission {self.post.id}')
logger.exception('Gallery image find failure')
raise SiteDownloaderError('No images found in Reddit gallery')
logger.error(f"Could not find gallery data in submission {self.post.id}")
logger.exception("Gallery image find failure")
raise SiteDownloaderError("No images found in Reddit gallery")
if not image_urls:
raise SiteDownloaderError('No images found in Reddit gallery')
raise SiteDownloaderError("No images found in Reddit gallery")
return [Resource(self.post, url, Resource.retry_download(url)) for url in image_urls]
@ staticmethod
@staticmethod
def _get_links(id_dict: list[dict]) -> list[str]:
out = []
for item in id_dict:
image_id = item['media_id']
possible_extensions = ('.jpg', '.png', '.gif', '.gifv', '.jpeg')
image_id = item["media_id"]
possible_extensions = (".jpg", ".png", ".gif", ".gifv", ".jpeg")
for extension in possible_extensions:
test_url = f'https://i.redd.it/{image_id}{extension}'
test_url = f"https://i.redd.it/{image_id}{extension}"
response = requests.head(test_url)
if response.status_code == 200:
out.append(test_url)

View File

@@ -22,21 +22,23 @@ class Gfycat(Redgifs):
@staticmethod
def _get_link(url: str) -> set[str]:
gfycat_id = re.match(r'.*/(.*?)/?$', url).group(1)
url = 'https://gfycat.com/' + gfycat_id
gfycat_id = re.match(r".*/(.*?)/?$", url).group(1)
url = "https://gfycat.com/" + gfycat_id
response = Gfycat.retrieve_url(url)
if re.search(r'(redgifs|gifdeliverynetwork)', response.url):
if re.search(r"(redgifs|gifdeliverynetwork)", response.url):
url = url.lower() # Fixes error with old gfycat/redgifs links
return Redgifs._get_link(url)
soup = BeautifulSoup(response.text, 'html.parser')
content = soup.find('script', attrs={'data-react-helmet': 'true', 'type': 'application/ld+json'})
soup = BeautifulSoup(response.text, "html.parser")
content = soup.find("script", attrs={"data-react-helmet": "true", "type": "application/ld+json"})
try:
out = json.loads(content.contents[0])['video']['contentUrl']
out = json.loads(content.contents[0])["video"]["contentUrl"]
except (IndexError, KeyError, AttributeError) as e:
raise SiteDownloaderError(f'Failed to download Gfycat link {url}: {e}')
raise SiteDownloaderError(f"Failed to download Gfycat link {url}: {e}")
except json.JSONDecodeError as e:
raise SiteDownloaderError(f'Did not receive valid JSON data: {e}')
return {out,}
raise SiteDownloaderError(f"Did not receive valid JSON data: {e}")
return {
out,
}

View File

@@ -14,7 +14,6 @@ from bdfr.site_downloaders.base_downloader import BaseDownloader
class Imgur(BaseDownloader):
def __init__(self, post: Submission):
super().__init__(post)
self.raw_data = {}
@@ -23,63 +22,63 @@ class Imgur(BaseDownloader):
self.raw_data = self._get_data(self.post.url)
out = []
if 'album_images' in self.raw_data:
images = self.raw_data['album_images']
for image in images['images']:
if "album_images" in self.raw_data:
images = self.raw_data["album_images"]
for image in images["images"]:
out.append(self._compute_image_url(image))
else:
out.append(self._compute_image_url(self.raw_data))
return out
def _compute_image_url(self, image: dict) -> Resource:
ext = self._validate_extension(image['ext'])
if image.get('prefer_video', False):
ext = '.mp4'
ext = self._validate_extension(image["ext"])
if image.get("prefer_video", False):
ext = ".mp4"
image_url = 'https://i.imgur.com/' + image['hash'] + ext
image_url = "https://i.imgur.com/" + image["hash"] + ext
return Resource(self.post, image_url, Resource.retry_download(image_url))
@staticmethod
def _get_data(link: str) -> dict:
try:
imgur_id = re.match(r'.*/(.*?)(\..{0,})?$', link).group(1)
gallery = 'a/' if re.search(r'.*/(.*?)(gallery/|a/)', link) else ''
link = f'https://imgur.com/{gallery}{imgur_id}'
imgur_id = re.match(r".*/(.*?)(\..{0,})?$", link).group(1)
gallery = "a/" if re.search(r".*/(.*?)(gallery/|a/)", link) else ""
link = f"https://imgur.com/{gallery}{imgur_id}"
except AttributeError:
raise SiteDownloaderError(f'Could not extract Imgur ID from {link}')
raise SiteDownloaderError(f"Could not extract Imgur ID from {link}")
res = Imgur.retrieve_url(link, cookies={'over18': '1', 'postpagebeta': '0'})
res = Imgur.retrieve_url(link, cookies={"over18": "1", "postpagebeta": "0"})
soup = bs4.BeautifulSoup(res.text, 'html.parser')
scripts = soup.find_all('script', attrs={'type': 'text/javascript'})
scripts = [script.string.replace('\n', '') for script in scripts if script.string]
soup = bs4.BeautifulSoup(res.text, "html.parser")
scripts = soup.find_all("script", attrs={"type": "text/javascript"})
scripts = [script.string.replace("\n", "") for script in scripts if script.string]
script_regex = re.compile(r'\s*\(function\(widgetFactory\)\s*{\s*widgetFactory\.mergeConfig\(\'gallery\'')
script_regex = re.compile(r"\s*\(function\(widgetFactory\)\s*{\s*widgetFactory\.mergeConfig\(\'gallery\'")
chosen_script = list(filter(lambda s: re.search(script_regex, s), scripts))
if len(chosen_script) != 1:
raise SiteDownloaderError(f'Could not read page source from {link}')
raise SiteDownloaderError(f"Could not read page source from {link}")
chosen_script = chosen_script[0]
outer_regex = re.compile(r'widgetFactory\.mergeConfig\(\'gallery\', ({.*})\);')
inner_regex = re.compile(r'image\s*:(.*),\s*group')
outer_regex = re.compile(r"widgetFactory\.mergeConfig\(\'gallery\', ({.*})\);")
inner_regex = re.compile(r"image\s*:(.*),\s*group")
try:
image_dict = re.search(outer_regex, chosen_script).group(1)
image_dict = re.search(inner_regex, image_dict).group(1)
except AttributeError:
raise SiteDownloaderError(f'Could not find image dictionary in page source')
raise SiteDownloaderError(f"Could not find image dictionary in page source")
try:
image_dict = json.loads(image_dict)
except json.JSONDecodeError as e:
raise SiteDownloaderError(f'Could not parse received dict as JSON: {e}')
raise SiteDownloaderError(f"Could not parse received dict as JSON: {e}")
return image_dict
@staticmethod
def _validate_extension(extension_suffix: str) -> str:
extension_suffix = re.sub(r'\?.*', '', extension_suffix)
possible_extensions = ('.jpg', '.png', '.mp4', '.gif')
extension_suffix = re.sub(r"\?.*", "", extension_suffix)
possible_extensions = (".jpg", ".png", ".mp4", ".gif")
selection = [ext for ext in possible_extensions if ext == extension_suffix]
if len(selection) == 1:
return selection[0]

View File

@@ -20,11 +20,11 @@ class PornHub(Youtube):
def find_resources(self, authenticator: Optional[SiteAuthenticator] = None) -> list[Resource]:
ytdl_options = {
'format': 'best',
'nooverwrites': True,
"format": "best",
"nooverwrites": True,
}
if video_attributes := super().get_video_attributes(self.post.url):
extension = video_attributes['ext']
extension = video_attributes["ext"]
else:
raise SiteDownloaderError()

View File

@@ -2,9 +2,9 @@
import json
import re
import requests
from typing import Optional
import requests
from praw.models import Submission
from bdfr.exceptions import SiteDownloaderError
@@ -24,52 +24,53 @@ class Redgifs(BaseDownloader):
@staticmethod
def _get_link(url: str) -> set[str]:
try:
redgif_id = re.match(r'.*/(.*?)(\..{0,})?$', url).group(1)
redgif_id = re.match(r".*/(.*?)(\..{0,})?$", url).group(1)
except AttributeError:
raise SiteDownloaderError(f'Could not extract Redgifs ID from {url}')
raise SiteDownloaderError(f"Could not extract Redgifs ID from {url}")
auth_token = json.loads(Redgifs.retrieve_url('https://api.redgifs.com/v2/auth/temporary').text)['token']
auth_token = json.loads(Redgifs.retrieve_url("https://api.redgifs.com/v2/auth/temporary").text)["token"]
if not auth_token:
raise SiteDownloaderError('Unable to retrieve Redgifs API token')
raise SiteDownloaderError("Unable to retrieve Redgifs API token")
headers = {
'referer': 'https://www.redgifs.com/',
'origin': 'https://www.redgifs.com',
'content-type': 'application/json',
'Authorization': f'Bearer {auth_token}',
"referer": "https://www.redgifs.com/",
"origin": "https://www.redgifs.com",
"content-type": "application/json",
"Authorization": f"Bearer {auth_token}",
}
content = Redgifs.retrieve_url(f'https://api.redgifs.com/v2/gifs/{redgif_id}', headers=headers)
content = Redgifs.retrieve_url(f"https://api.redgifs.com/v2/gifs/{redgif_id}", headers=headers)
if content is None:
raise SiteDownloaderError('Could not read the page source')
raise SiteDownloaderError("Could not read the page source")
try:
response_json = json.loads(content.text)
except json.JSONDecodeError as e:
raise SiteDownloaderError(f'Received data was not valid JSON: {e}')
raise SiteDownloaderError(f"Received data was not valid JSON: {e}")
out = set()
try:
if response_json['gif']['type'] == 1: # type 1 is a video
if requests.get(response_json['gif']['urls']['hd'], headers=headers).ok:
out.add(response_json['gif']['urls']['hd'])
if response_json["gif"]["type"] == 1: # type 1 is a video
if requests.get(response_json["gif"]["urls"]["hd"], headers=headers).ok:
out.add(response_json["gif"]["urls"]["hd"])
else:
out.add(response_json['gif']['urls']['sd'])
elif response_json['gif']['type'] == 2: # type 2 is an image
if response_json['gif']['gallery']:
out.add(response_json["gif"]["urls"]["sd"])
elif response_json["gif"]["type"] == 2: # type 2 is an image
if response_json["gif"]["gallery"]:
content = Redgifs.retrieve_url(
f'https://api.redgifs.com/v2/gallery/{response_json["gif"]["gallery"]}')
f'https://api.redgifs.com/v2/gallery/{response_json["gif"]["gallery"]}'
)
response_json = json.loads(content.text)
out = {p['urls']['hd'] for p in response_json['gifs']}
out = {p["urls"]["hd"] for p in response_json["gifs"]}
else:
out.add(response_json['gif']['urls']['hd'])
out.add(response_json["gif"]["urls"]["hd"])
else:
raise KeyError
except (KeyError, AttributeError):
raise SiteDownloaderError('Failed to find JSON data in page')
raise SiteDownloaderError("Failed to find JSON data in page")
# Update subdomain if old one is returned
out = {re.sub('thumbs2', 'thumbs3', link) for link in out}
out = {re.sub('thumbs3', 'thumbs4', link) for link in out}
out = {re.sub("thumbs2", "thumbs3", link) for link in out}
out = {re.sub("thumbs3", "thumbs4", link) for link in out}
return out

View File

@@ -17,27 +17,29 @@ class SelfPost(BaseDownloader):
super().__init__(post)
def find_resources(self, authenticator: Optional[SiteAuthenticator] = None) -> list[Resource]:
out = Resource(self.post, self.post.url, lambda: None, '.txt')
out.content = self.export_to_string().encode('utf-8')
out = Resource(self.post, self.post.url, lambda: None, ".txt")
out.content = self.export_to_string().encode("utf-8")
out.create_hash()
return [out]
def export_to_string(self) -> str:
"""Self posts are formatted here"""
content = ("## ["
+ self.post.fullname
+ "]("
+ self.post.url
+ ")\n"
+ self.post.selftext
+ "\n\n---\n\n"
+ "submitted to [r/"
+ self.post.subreddit.title
+ "](https://www.reddit.com/r/"
+ self.post.subreddit.title
+ ") by [u/"
+ (self.post.author.name if self.post.author else "DELETED")
+ "](https://www.reddit.com/user/"
+ (self.post.author.name if self.post.author else "DELETED")
+ ")")
content = (
"## ["
+ self.post.fullname
+ "]("
+ self.post.url
+ ")\n"
+ self.post.selftext
+ "\n\n---\n\n"
+ "submitted to [r/"
+ self.post.subreddit.title
+ "](https://www.reddit.com/r/"
+ self.post.subreddit.title
+ ") by [u/"
+ (self.post.author.name if self.post.author else "DELETED")
+ "](https://www.reddit.com/user/"
+ (self.post.author.name if self.post.author else "DELETED")
+ ")"
)
return content

View File

@@ -25,30 +25,30 @@ class Vidble(BaseDownloader):
try:
res = self.get_links(self.post.url)
except AttributeError:
raise SiteDownloaderError(f'Could not read page at {self.post.url}')
raise SiteDownloaderError(f"Could not read page at {self.post.url}")
if not res:
raise SiteDownloaderError(rf'No resources found at {self.post.url}')
raise SiteDownloaderError(rf"No resources found at {self.post.url}")
res = [Resource(self.post, r, Resource.retry_download(r)) for r in res]
return res
@staticmethod
def get_links(url: str) -> set[str]:
if not re.search(r'vidble.com/(show/|album/|watch\?v)', url):
url = re.sub(r'/(\w*?)$', r'/show/\1', url)
if not re.search(r"vidble.com/(show/|album/|watch\?v)", url):
url = re.sub(r"/(\w*?)$", r"/show/\1", url)
page = requests.get(url)
soup = bs4.BeautifulSoup(page.text, 'html.parser')
content_div = soup.find('div', attrs={'id': 'ContentPlaceHolder1_divContent'})
images = content_div.find_all('img')
images = [i.get('src') for i in images]
videos = content_div.find_all('source', attrs={'type': 'video/mp4'})
videos = [v.get('src') for v in videos]
soup = bs4.BeautifulSoup(page.text, "html.parser")
content_div = soup.find("div", attrs={"id": "ContentPlaceHolder1_divContent"})
images = content_div.find_all("img")
images = [i.get("src") for i in images]
videos = content_div.find_all("source", attrs={"type": "video/mp4"})
videos = [v.get("src") for v in videos]
resources = filter(None, itertools.chain(images, videos))
resources = ['https://www.vidble.com' + r for r in resources]
resources = ["https://www.vidble.com" + r for r in resources]
resources = [Vidble.change_med_url(r) for r in resources]
return set(resources)
@staticmethod
def change_med_url(url: str) -> str:
out = re.sub(r'_med(\..{3,4})$', r'\1', url)
out = re.sub(r"_med(\..{3,4})$", r"\1", url)
return out

View File

@@ -22,18 +22,18 @@ class VReddit(Youtube):
def find_resources(self, authenticator: Optional[SiteAuthenticator] = None) -> list[Resource]:
ytdl_options = {
'playlistend': 1,
'nooverwrites': True,
"playlistend": 1,
"nooverwrites": True,
}
download_function = self._download_video(ytdl_options)
extension = self.get_video_attributes(self.post.url)['ext']
extension = self.get_video_attributes(self.post.url)["ext"]
res = Resource(self.post, self.post.url, download_function, extension)
return [res]
@staticmethod
def get_video_attributes(url: str) -> dict:
result = VReddit.get_video_data(url)
if 'ext' in result:
if "ext" in result:
return result
else:
try:
@@ -41,4 +41,4 @@ class VReddit(Youtube):
return result
except Exception as e:
logger.exception(e)
raise NotADownloadableLinkError(f'Video info extraction failed for {url}')
raise NotADownloadableLinkError(f"Video info extraction failed for {url}")

View File

@@ -22,57 +22,62 @@ class Youtube(BaseDownloader):
def find_resources(self, authenticator: Optional[SiteAuthenticator] = None) -> list[Resource]:
ytdl_options = {
'format': 'best',
'playlistend': 1,
'nooverwrites': True,
"format": "best",
"playlistend": 1,
"nooverwrites": True,
}
download_function = self._download_video(ytdl_options)
extension = self.get_video_attributes(self.post.url)['ext']
extension = self.get_video_attributes(self.post.url)["ext"]
res = Resource(self.post, self.post.url, download_function, extension)
return [res]
def _download_video(self, ytdl_options: dict) -> Callable:
yt_logger = logging.getLogger('youtube-dl')
yt_logger = logging.getLogger("youtube-dl")
yt_logger.setLevel(logging.CRITICAL)
ytdl_options['quiet'] = True
ytdl_options['logger'] = yt_logger
ytdl_options["quiet"] = True
ytdl_options["logger"] = yt_logger
def download(_: dict) -> bytes:
with tempfile.TemporaryDirectory() as temp_dir:
download_path = Path(temp_dir).resolve()
ytdl_options['outtmpl'] = str(download_path) + '/' + 'test.%(ext)s'
ytdl_options["outtmpl"] = str(download_path) + "/" + "test.%(ext)s"
try:
with yt_dlp.YoutubeDL(ytdl_options) as ydl:
ydl.download([self.post.url])
except yt_dlp.DownloadError as e:
raise SiteDownloaderError(f'Youtube download failed: {e}')
raise SiteDownloaderError(f"Youtube download failed: {e}")
downloaded_files = list(download_path.iterdir())
if downloaded_files:
downloaded_file = downloaded_files[0]
else:
raise NotADownloadableLinkError(f"No media exists in the URL {self.post.url}")
with downloaded_file.open('rb') as file:
with downloaded_file.open("rb") as file:
content = file.read()
return content
return download
@staticmethod
def get_video_data(url: str) -> dict:
yt_logger = logging.getLogger('youtube-dl')
yt_logger = logging.getLogger("youtube-dl")
yt_logger.setLevel(logging.CRITICAL)
with yt_dlp.YoutubeDL({'logger': yt_logger, }) as ydl:
with yt_dlp.YoutubeDL(
{
"logger": yt_logger,
}
) as ydl:
try:
result = ydl.extract_info(url, download=False)
except Exception as e:
logger.exception(e)
raise NotADownloadableLinkError(f'Video info extraction failed for {url}')
raise NotADownloadableLinkError(f"Video info extraction failed for {url}")
return result
@staticmethod
def get_video_attributes(url: str) -> dict:
result = Youtube.get_video_data(url)
if 'ext' in result:
if "ext" in result:
return result
else:
raise NotADownloadableLinkError(f'Video info extraction failed for {url}')
raise NotADownloadableLinkError(f"Video info extraction failed for {url}")