Add tentative typing

This commit is contained in:
Serene-Arc
2021-02-06 22:29:13 +10:00
committed by Ali Parlakci
parent 4143b86467
commit 185335e60b
20 changed files with 84 additions and 69 deletions

View File

@@ -87,7 +87,6 @@ def isPostExists(post, directory):
def downloadPost(submission, directory):
downloaders = {
"imgur": Imgur, "gfycat": Gfycat, "erome": Erome, "direct": Direct, "self": SelfPost,
"redgifs": Redgifs, "gifdeliverynetwork": GifDeliveryNetwork,
@@ -101,7 +100,6 @@ def downloadPost(submission, directory):
raise NoSuitablePost
def download(submissions):
"""Analyze list of submissions and call the right function
to download each one, catch errors, update the log files

View File

@@ -5,11 +5,11 @@ from src.utils import nameCorrector
class Config:
def __init__(self, filename):
def __init__(self, filename: str):
self.filename = filename
self.file = JsonFile(self.filename)
def generate(self):
def generate(self) -> dict:
self._validateCredentials()
self._readCustomFileName()
self._readCustomFolderPath()
@@ -80,7 +80,7 @@ Existing default options:""", None if "options" not in self.file.read() else sel
self.file.add({"options": options})
def _readDefaultOptions(self, path=None):
def _readDefaultOptions(self):
content = self.file.read()
if "options" not in content:
self.file.add({"options": ""})

View File

@@ -1,11 +1,12 @@
import os
import pathlib
from src.downloaders.downloaderUtils import getExtension, getFile
from src.utils import GLOBAL
class Direct:
def __init__(self, directory, post):
def __init__(self, directory: pathlib.Path, post: dict):
post['EXTENSION'] = getExtension(post['CONTENTURL'])
if not os.path.exists(directory):
os.makedirs(directory)

View File

@@ -1,4 +1,5 @@
import os
import pathlib
import urllib.error
import urllib.request
from html.parser import HTMLParser
@@ -10,7 +11,7 @@ from src.utils import printToFile as print
class Erome:
def __init__(self, directory, post):
def __init__(self, directory: pathlib.Path, post: dict):
try:
images = self.getLinks(post['CONTENTURL'])
except urllib.error.HTTPError:
@@ -80,7 +81,7 @@ class Erome:
elif how_many_downloaded + duplicates < images_length:
raise AlbumNotDownloadedCompletely("Album Not Downloaded Completely")
def getLinks(self, url):
def getLinks(self, url: str) -> list[str]:
content = []
line_number = None

View File

@@ -8,11 +8,12 @@ from src.downloaders.downloaderUtils import getExtension, getFile
from src.downloaders.gifDeliveryNetwork import GifDeliveryNetwork
from src.errors import NotADownloadableLinkError
from src.utils import GLOBAL
import pathlib
class Gfycat:
def __init__(self, directory, post):
def __init__(self, directory: pathlib.Path, post: dict):
try:
post['MEDIAURL'] = self.getLink(post['CONTENTURL'])
except IndexError:
@@ -29,7 +30,7 @@ class Gfycat:
getFile(filename, short_filename, directory, post['MEDIAURL'])
@staticmethod
def getLink(url):
def getLink(url: str) -> str:
"""Extract direct link to the video from page's source
and return it
"""

View File

@@ -1,5 +1,7 @@
import json
import os
import pathlib
import requests
from src.downloaders.Direct import Direct
@@ -14,13 +16,13 @@ class Imgur:
imgur_image_domain = "https://i.imgur.com/"
def __init__(self, directory, post):
def __init__(self, directory: pathlib.Path, post: dict):
link = post['CONTENTURL']
if link.endswith(".gifv"):
link = link.replace(".gifv", ".mp4")
Direct(directory, {**post, 'CONTENTURL': link})
return None
return
self.raw_data = self.getData(link)
@@ -35,7 +37,7 @@ class Imgur:
else:
self.download(self.raw_data)
def downloadAlbum(self, images):
def downloadAlbum(self, images: dict):
folder_name = GLOBAL.config['filename'].format(**self.post)
folder_dir = self.directory / folder_name
@@ -92,7 +94,7 @@ class Imgur:
elif how_many_downloaded + duplicates < images_length:
raise AlbumNotDownloadedCompletely("Album Not Downloaded Completely")
def download(self, image):
def download(self, image: dict):
extension = self.validateExtension(image["ext"])
image_url = self.imgur_image_domain + image["hash"] + extension
@@ -102,11 +104,11 @@ class Imgur:
getFile(filename, short_filename, self.directory, image_url)
@property
def isAlbum(self):
def isAlbum(self) -> bool:
return "album_images" in self.raw_data
@staticmethod
def getData(link):
def getData(link: str) -> dict:
cookies = {"over18": "1", "postpagebeta": "0"}
res = requests.get(link, cookies=cookies)
if res.status_code != 200:
@@ -135,7 +137,7 @@ class Imgur:
return json.loads(data)
@staticmethod
def validateExtension(string):
def validateExtension(string: str) -> str:
possible_extensions = [".jpg", ".png", ".mp4", ".gif"]
for extension in possible_extensions:

View File

@@ -9,7 +9,7 @@ from src.utils import GLOBAL
from src.utils import printToFile as print
def dlProgress(count, block_size, total_size):
def dlProgress(count: int, block_size: int, total_size: int):
"""Function for writing download progress to console
"""
download_mbs = int(count * block_size * (10 ** (-6)))
@@ -18,7 +18,7 @@ def dlProgress(count, block_size, total_size):
sys.stdout.flush()
def getExtension(link):
def getExtension(link: str):
"""Extract file extension from image link.
If didn't find any, return '.jpg'
"""
@@ -34,7 +34,7 @@ def getExtension(link):
return '.mp4'
def getFile(filename, short_filename, folder_dir, image_url, indent=0, silent=False):
def getFile(filename: str, short_filename: str, folder_dir: Path, image_url: str, indent: int = 0, silent: bool = False):
formats = {
"videos": [".mp4", ".webm"],
"images": [".jpg", ".jpeg", ".png", ".bmp"],
@@ -101,7 +101,7 @@ def getFile(filename, short_filename, folder_dir, image_url, indent=0, silent=Fa
raise FailedToDownload
def createHash(filename):
def createHash(filename: str) -> str:
hash_md5 = hashlib.md5()
with open(filename, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):

View File

@@ -3,6 +3,7 @@ import os
import urllib
import requests
import pathlib
from src.downloaders.downloaderUtils import getFile
from src.errors import (AlbumNotDownloadedCompletely, FileAlreadyExistsError, ImageNotFound, NotADownloadableLinkError,
@@ -12,7 +13,7 @@ from src.utils import printToFile as print
class Gallery:
def __init__(self, directory, post):
def __init__(self, directory: pathlib.Path, post):
link = post['CONTENTURL']
self.raw_data = self.getData(link)
@@ -36,7 +37,7 @@ class Gallery:
self.downloadAlbum(images, count)
@staticmethod
def getData(link):
def getData(link: str) -> dict:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.87 Safari/537.36 OPR/54.0.2952.64",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
@@ -59,7 +60,7 @@ class Gallery:
data = json.loads(page_source[start_index - 1:end_index + 1].strip()[:-1])
return data
def downloadAlbum(self, images, count):
def downloadAlbum(self, images: dict, count: int):
folder_name = GLOBAL.config['filename'].format(**self.post)
folder_dir = self.directory / folder_name

View File

@@ -1,4 +1,5 @@
import os
import pathlib
import urllib.request
from bs4 import BeautifulSoup
@@ -9,7 +10,7 @@ from src.utils import GLOBAL
class GifDeliveryNetwork:
def __init__(self, directory, post):
def __init__(self, directory: pathlib.Path, post: dict):
try:
post['MEDIAURL'] = self.getLink(post['CONTENTURL'])
except IndexError:
@@ -26,7 +27,7 @@ class GifDeliveryNetwork:
getFile(filename, short_filename, directory, post['MEDIAURL'])
@staticmethod
def getLink(url):
def getLink(url: str) -> str:
"""Extract direct link to the video from page's source
and return it
"""

View File

@@ -1,5 +1,6 @@
import json
import os
import pathlib
import urllib.request
from bs4 import BeautifulSoup
@@ -10,7 +11,7 @@ from src.utils import GLOBAL
class Redgifs:
def __init__(self, directory, post):
def __init__(self, directory: pathlib.Path, post: dict):
try:
post['MEDIAURL'] = self.getLink(post['CONTENTURL'])
except IndexError:
@@ -27,7 +28,7 @@ class Redgifs:
getFile(filename, short_filename, directory, post['MEDIAURL'])
@staticmethod
def getLink(url):
def getLink(url: str) -> str:
"""Extract direct link to the video from page's source
and return it
"""

View File

@@ -1,6 +1,7 @@
from src.utils import printToFile as print
import io
import os
import pathlib
from pathlib import Path
from src.errors import FileAlreadyExistsError, TypeInSkip
@@ -11,7 +12,7 @@ VanillaPrint = print
class SelfPost:
def __init__(self, directory, post):
def __init__(self, directory: pathlib.Path, post: dict):
if "self" in GLOBAL.arguments.skip:
raise TypeInSkip
@@ -36,7 +37,7 @@ class SelfPost:
self.writeToFile(file_dir, post)
@staticmethod
def writeToFile(directory, post):
def writeToFile(directory: pathlib.Path, post: dict):
"""Self posts are formatted here"""
content = ("## ["
+ post["TITLE"]

View File

@@ -1,4 +1,5 @@
import os
import pathlib
import subprocess
from src.downloaders.downloaderUtils import getFile
@@ -7,7 +8,7 @@ from src.utils import printToFile as print
class VReddit:
def __init__(self, directory, post):
def __init__(self, directory: pathlib.Path, post: dict):
extension = ".mp4"
if not os.path.exists(directory):
os.makedirs(directory)
@@ -39,7 +40,12 @@ class VReddit:
os.rename(directory / video_name, directory / filename)
@staticmethod
def _mergeAudio(video, audio, filename, short_filename, directory):
def _mergeAudio(
video: pathlib.Path,
audio: pathlib.Path,
filename: pathlib.Path,
short_filename,
directory: pathlib.Path):
input_video = str(directory / video)
input_audio = str(directory / audio)

View File

@@ -1,4 +1,5 @@
import os
import pathlib
import sys
import youtube_dl
@@ -11,7 +12,7 @@ from src.utils import printToFile as print
class Youtube:
def __init__(self, directory, post):
def __init__(self, directory: pathlib.Path, post: dict):
if not os.path.exists(directory):
os.makedirs(directory)
@@ -20,7 +21,7 @@ class Youtube:
self.download(filename, directory, post['CONTENTURL'])
def download(self, filename, directory, url):
def download(self, filename: str, directory: pathlib.Path, url: str):
ydl_opts = {
"format": "best",
"outtmpl": str(directory / (filename + ".%(ext)s")),
@@ -36,7 +37,7 @@ class Youtube:
if GLOBAL.arguments.no_dupes:
try:
file_hash = createHash(location)
file_hash = createHash(str(location))
except FileNotFoundError:
return None
if file_hash in GLOBAL.downloadedPosts():

View File

@@ -1,5 +1,5 @@
import json
from os import path, remove
import os
from src.errors import InvalidJSONFile
@@ -12,19 +12,19 @@ class JsonFile:
file_dir = ""
def __init__(self, file_dir):
def __init__(self, file_dir: str):
self.file_dir = file_dir
if not path.exists(self.file_dir):
if not os.path.exists(self.file_dir):
self.__writeToFile({}, create=True)
def read(self):
def read(self) -> dict:
try:
with open(self.file_dir, 'r') as f:
return json.load(f)
except json.decoder.JSONDecodeError:
raise InvalidJSONFile(f"{self.file_dir} cannot be read")
def add(self, to_be_added, sub=None):
def add(self, to_be_added: dict, sub=None) -> dict:
"""Takes a dictionary and merges it with json file.
It uses new key's value if a key already exists.
Returns the new content as a dictionary.
@@ -37,7 +37,7 @@ class JsonFile:
self.__writeToFile(data)
return self.read()
def delete(self, *delete_keys):
def delete(self, *delete_keys: str):
"""Delete given keys from JSON file.
Returns the new content as a dictionary.
"""
@@ -50,8 +50,8 @@ class JsonFile:
return False
self.__writeToFile(data)
def __writeToFile(self, content, create=False):
def __writeToFile(self, content: (dict, list, tuple), create: bool = False):
if not create:
remove(self.file_dir)
os.remove(self.file_dir)
with open(self.file_dir, 'w') as f:
json.dump(content, f, indent=4)

View File

@@ -6,7 +6,7 @@ except ModuleNotFoundError:
from errors import InvalidRedditLink
def QueryParser(passed_queries, index):
def QueryParser(passed_queries: str) -> dict:
extracted_queries = {}
question_mark_index = passed_queries.index("?")
@@ -26,7 +26,7 @@ def QueryParser(passed_queries, index):
return extracted_queries
def LinkParser(link):
def LinkParser(link: str) -> dict:
result = {}
short_link = False
@@ -81,9 +81,7 @@ def LinkParser(link):
result["user"] = "me"
for index in range(len(splitted_link)):
if splitted_link[index] in [
"hot", "top", "new", "controversial", "rising"
]:
if splitted_link[index] in ["hot", "top", "new", "controversial", "rising"]:
result["sort"] = splitted_link[index]
@@ -101,7 +99,7 @@ def LinkParser(link):
result["upvoted"] = True
elif "?" in splitted_link[index]:
parsed_query = QueryParser(splitted_link[index], index)
parsed_query = QueryParser(splitted_link[index])
if parsed_query["HEADER"] == "search":
del parsed_query["HEADER"]
result["search"] = parsed_query
@@ -124,7 +122,7 @@ def LinkParser(link):
return result
def LinkDesigner(link):
def LinkDesigner(link) -> dict:
attributes = LinkParser(link)
mode = {}

View File

@@ -3,15 +3,16 @@ from pathlib import Path
from src.errors import InvalidSortingType, ProgramModeError, RedditorNameError, SearchModeError
from src.parser import LinkDesigner
import argparse
class ProgramMode:
def __init__(self, arguments):
def __init__(self, arguments: argparse.Namespace):
self.arguments = arguments
def generate(self):
def generate(self) -> dict:
try:
self._validateProgramMode()
except ProgramModeError:
@@ -82,7 +83,7 @@ class ProgramMode:
return program_mode
@staticmethod
def _chooseFrom(choices):
def _chooseFrom(choices: list[str]):
print()
choices_by_index = list(str(x) for x in range(len(choices) + 1))
for i in range(len(choices)):

View File

@@ -13,7 +13,7 @@ from src.utils import GLOBAL
class Reddit:
def __init__(self, refresh_token=None):
def __init__(self, refresh_token: str = None):
self.SCOPES = ['identity', 'history', 'read', 'save']
self.PORT = 7634
self.refresh_token = refresh_token
@@ -24,7 +24,7 @@ class Reddit:
"user_agent": str(socket.gethostname())
}
def begin(self):
def begin(self) -> praw.Reddit:
if self.refresh_token:
self.arguments["refresh_token"] = self.refresh_token
self.redditInstance = praw.Reddit(**self.arguments)
@@ -46,7 +46,7 @@ class Reddit:
reddit.user.me()), "reddit": refresh_token}, "credentials")
return self.redditInstance
def recieve_connection(self):
def recieve_connection(self) -> socket:
"""Wait for and then return a connected socket..
Opens a TCP connection on port 8080, and waits for a single client.
"""
@@ -58,13 +58,12 @@ class Reddit:
server.close()
return client
@staticmethod
def send_message(client, message):
def send_message(self, client: socket, message: str):
"""Send message to client and close the connection."""
client.send('HTTP/1.1 200 OK\r\n\r\n{}'.format(message).encode('utf-8'))
client.close()
def getRefreshToken(self, *scopes):
def getRefreshToken(self, scopes: list[str]) -> tuple[praw.Reddit, str]:
state = str(random.randint(0, 65000))
url = self.redditInstance.auth.url(scopes, state, 'permanent')
print("---Setting up the Reddit API---\n")

View File

@@ -8,12 +8,14 @@ from prawcore.exceptions import Forbidden, NotFound
from src.errors import (InsufficientPermission, InvalidSortingType, MultiredditNotFound, NoMatchingSubmissionFound,
NoPrawSupport)
from src.reddit import Reddit
from praw.models.listing.generator import ListingGenerator
from src.utils import GLOBAL, createLogFile, printToFile
from praw.models import Submission
print = printToFile
def getPosts(program_mode):
def getPosts(program_mode: dict) -> list[dict]:
"""Call PRAW regarding to arguments and pass it to extractDetails.
Return what extractDetails has returned.
"""
@@ -180,7 +182,7 @@ def getPosts(program_mode):
return extractDetails(reddit.submission(url=program_mode["post"]), single_post=True)
def extractDetails(posts, single_post=False):
def extractDetails(posts: (ListingGenerator, Submission), single_post=False) -> list[dict]:
"""Check posts and decide if it can be downloaded.
If so, create a dictionary with post details and append them to a list.
Write all of posts to file. Return the list
@@ -270,7 +272,7 @@ def extractDetails(posts, single_post=False):
raise NoMatchingSubmissionFound("No matching submission was found")
def matchWithDownloader(submission):
def matchWithDownloader(submission: Submission) -> dict[str, str]:
direct_link = extractDirectLink(submission.url)
if direct_link:
return {'TYPE': 'direct', 'CONTENTURL': direct_link}
@@ -320,7 +322,7 @@ def matchWithDownloader(submission):
'CONTENT': submission.selftext}
def extractDirectLink(url):
def extractDirectLink(url: str) -> (bool, str):
"""Check if link is a direct image link.
If so, return URL,
if not, return False

View File

@@ -2,7 +2,7 @@ from os import path
class Store:
def __init__(self, directory=None):
def __init__(self, directory: str = None):
self.directory = directory
if self.directory:
if path.exists(directory):
@@ -15,10 +15,10 @@ class Store:
else:
self.list = []
def __call__(self):
def __call__(self) -> list:
return self.list
def add(self, data):
def add(self, data: dict):
self.list.append(data)
if self.directory:
with open(self.directory, 'a') as f:

View File

@@ -2,6 +2,7 @@ import io
import sys
from os import makedirs, path
from pathlib import Path
from typing import Optional
from src.jsonHelper import JsonFile
@@ -20,11 +21,11 @@ class GLOBAL:
log_stream = None
@staticmethod
def downloadedPosts():
def downloadedPosts() -> list:
return []
def createLogFile(title):
def createLogFile(title: str) -> JsonFile:
"""Create a log file with given name
inside a folder time stampt in its name and
put given arguments inside \"HEADER\" key
@@ -60,7 +61,7 @@ def printToFile(*args, no_print=False, **kwargs):
print(*args, file=FILE, **kwargs)
def nameCorrector(string, reference=None):
def nameCorrector(string: str, reference: Optional[str] = None) -> str:
"""Swap strange characters from given string
with underscore (_) and shorten it.
Return the string