Move to standard module structure

This commit is contained in:
Serene-Arc
2021-02-07 11:05:18 +10:00
committed by Ali Parlakci
parent 185335e60b
commit d8a1204d8b
26 changed files with 80 additions and 81 deletions

View File

View File

@@ -0,0 +1,341 @@
#!/usr/bin/env python3
"""
This program downloads imgur, gfycat and direct image and video links of
saved posts from a reddit account. It is written in Python 3.
"""
import logging
import os
import sys
import time
from io import StringIO
from pathlib import Path
from prawcore.exceptions import InsufficientScope
from bulkredditdownloader.arguments import Arguments
from bulkredditdownloader.config import Config
from bulkredditdownloader.downloaders.Direct import Direct
from bulkredditdownloader.downloaders.Erome import Erome
from bulkredditdownloader.downloaders.gallery import Gallery
from bulkredditdownloader.downloaders.Gfycat import Gfycat
from bulkredditdownloader.downloaders.gifDeliveryNetwork import GifDeliveryNetwork
from bulkredditdownloader.downloaders.Imgur import Imgur
from bulkredditdownloader.downloaders.redgifs import Redgifs
from bulkredditdownloader.downloaders.selfPost import SelfPost
from bulkredditdownloader.downloaders.vreddit import VReddit
from bulkredditdownloader.downloaders.youtube import Youtube
from bulkredditdownloader.errors import (AlbumNotDownloadedCompletely, DomainInSkip, FailedToDownload, FileAlreadyExistsError,
ImgurLimitError, ImgurLoginError, InvalidJSONFile, NoSuitablePost, NotADownloadableLinkError,
TypeInSkip, full_exc_info)
from bulkredditdownloader.jsonHelper import JsonFile
from bulkredditdownloader.programMode import ProgramMode
from bulkredditdownloader.reddit import Reddit
from bulkredditdownloader.searcher import getPosts
from bulkredditdownloader.store import Store
from bulkredditdownloader.utils import GLOBAL, createLogFile, nameCorrector, printToFile
from time import sleep
__author__ = "Ali Parlakci"
__license__ = "GPL"
__version__ = "1.10.0"
__maintainer__ = "Ali Parlakci"
__email__ = "parlakciali@gmail.com"
def postFromLog(filename):
"""Analyze a log file and return a list of dictionaries containing
submissions
"""
if Path.is_file(Path(filename)):
content = JsonFile(filename).read()
else:
print("File not found")
sys.exit()
try:
del content["HEADER"]
except KeyError:
pass
posts = []
for post in content:
if not content[post][-1]['TYPE'] is None:
posts.append(content[post][-1])
return posts
def isPostExists(post, directory):
"""Figure out a file's name and checks if the file already exists"""
filename = GLOBAL.config['filename'].format(**post)
possible_extensions = [".jpg", ".png", ".mp4", ".gif", ".webm", ".md", ".mkv", ".flv"]
for extension in possible_extensions:
path = directory / Path(filename + extension)
if path.exists():
return True
return False
def downloadPost(submission, directory):
downloaders = {
"imgur": Imgur, "gfycat": Gfycat, "erome": Erome, "direct": Direct, "self": SelfPost,
"redgifs": Redgifs, "gifdeliverynetwork": GifDeliveryNetwork,
"v.redd.it": VReddit, "youtube": Youtube, "gallery": Gallery
}
print()
if submission['TYPE'] in downloaders:
downloaders[submission['TYPE']](directory, submission)
else:
raise NoSuitablePost
def download(submissions):
"""Analyze list of submissions and call the right function
to download each one, catch errors, update the log files
"""
downloaded_count = 0
duplicates = 0
failed_file = createLogFile("FAILED")
if GLOBAL.arguments.unsave:
reddit = Reddit(GLOBAL.config['credentials']['reddit']).begin()
subs_length = len(submissions)
for i in range(len(submissions)):
print(f"\n({i+1}/{subs_length})", end="")
print(submissions[i]['POSTID'],
f"r/{submissions[i]['SUBREDDIT']}",
f"u/{submissions[i]['REDDITOR']}",
submissions[i]['FLAIR'] if submissions[i]['FLAIR'] else "",
sep="",
end="")
print(f" {submissions[i]['TYPE'].upper()}", end="", no_print=True)
directory = GLOBAL.directory / \
GLOBAL.config["folderpath"].format(**submissions[i])
details = {
**submissions[i],
**{"TITLE": nameCorrector(
submissions[i]['TITLE'],
reference=str(directory)
+ GLOBAL.config['filename'].format(**submissions[i])
+ ".ext")}
}
filename = GLOBAL.config['filename'].format(**details)
if isPostExists(details, directory):
print()
print(directory)
print(filename)
print("It already exists")
duplicates += 1
continue
if any(domain in submissions[i]['CONTENTURL'] for domain in GLOBAL.arguments.skip):
print()
print(submissions[i]['CONTENTURL'])
print("Domain found in skip domains, skipping post...")
continue
try:
downloadPost(details, directory)
GLOBAL.downloadedPosts.add(details['POSTID'])
try:
if GLOBAL.arguments.unsave:
reddit.submission(id=details['POSTID']).unsave()
except InsufficientScope:
reddit = Reddit().begin()
reddit.submission(id=details['POSTID']).unsave()
downloaded_count += 1
except FileAlreadyExistsError:
print("It already exists")
GLOBAL.downloadedPosts.add(details['POSTID'])
duplicates += 1
except ImgurLoginError:
print("Imgur login failed. \nQuitting the program as unexpected errors might occur.")
sys.exit()
except ImgurLimitError as exception:
failed_file.add({int(i + 1): [
"{class_name}: {info}".format(class_name=exception.__class__.__name__, info=str(exception)), details
]})
except NotADownloadableLinkError as exception:
print("{class_name}: {info}".format(class_name=exception.__class__.__name__, info=str(exception)))
failed_file.add({int(i + 1): [
"{class_name}: {info}".format(class_name=exception.__class__.__name__, info=str(exception)),
submissions[i]
]})
except TypeInSkip:
print()
print(submissions[i]['CONTENTURL'])
print("Skipping post...")
except DomainInSkip:
print()
print(submissions[i]['CONTENTURL'])
print("Skipping post...")
except NoSuitablePost:
print("No match found, skipping...")
except FailedToDownload:
print("Failed to download the posts, skipping...")
except AlbumNotDownloadedCompletely:
print("Album did not downloaded completely.")
failed_file.add({int(i + 1): [
"{class_name}: {info}".format(class_name=exc.__class__.__name__, info=str(exc)),
submissions[i]
]})
except Exception as exc:
print("{class_name}: {info}\nSee CONSOLE_LOG.txt for more information".format(
class_name=exc.__class__.__name__, info=str(exc))
)
logging.error(sys.exc_info()[0].__name__, exc_info=full_exc_info(sys.exc_info()))
print(GLOBAL.log_stream.getvalue(), no_print=True)
failed_file.add({int(i + 1): [
"{class_name}: {info}".format(class_name=exc.__class__.__name__, info=str(exc)),
submissions[i]
]})
if duplicates:
print(f"\nThere {'were' if duplicates > 1 else 'was'} {duplicates} duplicate{'s' if duplicates > 1 else ''}")
if downloaded_count == 0:
print("Nothing is downloaded :(")
else:
print(f"Total of {downloaded_count} link{'s' if downloaded_count > 1 else ''} downloaded!")
def printLogo():
VanillaPrint(f"\nBulk Downloader for Reddit v{__version__}\n"
f"Written by Ali PARLAKCI parlakciali@gmail.com\n\n"
f"https://github.com/aliparlakci/bulk-downloader-for-reddit/\n"
)
def main():
if Path("config.json").exists():
GLOBAL.configDirectory = Path("config.json")
else:
if not Path(GLOBAL.defaultConfigDirectory).is_dir():
os.makedirs(GLOBAL.defaultConfigDirectory)
GLOBAL.configDirectory = GLOBAL.defaultConfigDirectory / "config.json"
try:
GLOBAL.config = Config(GLOBAL.configDirectory).generate()
except InvalidJSONFile as exception:
VanillaPrint(str(exception.__class__.__name__), ">>", str(exception))
VanillaPrint("Resolve it or remove it to proceed")
sys.exit()
sys.argv = sys.argv + GLOBAL.config["options"].split()
arguments = Arguments.parse()
GLOBAL.arguments = arguments
if arguments.set_filename:
Config(GLOBAL.configDirectory).setCustomFileName()
sys.exit()
if arguments.set_folderpath:
Config(GLOBAL.configDirectory).setCustomFolderPath()
sys.exit()
if arguments.set_default_directory:
Config(GLOBAL.configDirectory).setDefaultDirectory()
sys.exit()
if arguments.set_default_options:
Config(GLOBAL.configDirectory).setDefaultOptions()
sys.exit()
if arguments.use_local_config:
JsonFile("config.json").add(GLOBAL.config)
sys.exit()
if arguments.directory:
GLOBAL.directory = Path(arguments.directory.strip())
elif "default_directory" in GLOBAL.config and GLOBAL.config["default_directory"] != "":
GLOBAL.directory = Path(
GLOBAL.config["default_directory"].format(time=GLOBAL.RUN_TIME))
else:
GLOBAL.directory = Path(input("\ndownload directory: ").strip())
if arguments.downloaded_posts:
GLOBAL.downloadedPosts = Store(arguments.downloaded_posts)
else:
GLOBAL.downloadedPosts = Store()
printLogo()
print("\n", " ".join(sys.argv), "\n", no_print=True)
if arguments.log is not None:
log_dir = Path(arguments.log)
download(postFromLog(log_dir))
sys.exit()
program_mode = ProgramMode(arguments).generate()
try:
posts = getPosts(program_mode)
except Exception as exc:
logging.error(sys.exc_info()[0].__name__, exc_info=full_exc_info(sys.exc_info()))
print(GLOBAL.log_stream.getvalue(), no_print=True)
print(exc)
sys.exit()
if posts is None:
print("I could not find any posts in that URL")
sys.exit()
if GLOBAL.arguments.no_download:
pass
else:
download(posts)
if __name__ == "__main__":
GLOBAL.log_stream = StringIO()
logging.basicConfig(stream=GLOBAL.log_stream, level=logging.INFO)
try:
VanillaPrint = print
print = printToFile
GLOBAL.RUN_TIME = str(time.strftime("%d-%m-%Y_%H-%M-%S", time.localtime(time.time())))
main()
except KeyboardInterrupt:
if GLOBAL.directory is None:
GLOBAL.directory = Path("../..\\")
except Exception as exception:
if GLOBAL.directory is None:
GLOBAL.directory = Path("../..\\")
logging.error(sys.exc_info()[0].__name__, exc_info=full_exc_info(sys.exc_info()))
print(GLOBAL.log_stream.getvalue())
if not GLOBAL.arguments.quit:
input("\nPress enter to quit\n")

View File

@@ -0,0 +1,153 @@
import argparse
import sys
class Arguments:
@staticmethod
def parse(arguments=None):
"""Initialize argparse and add arguments"""
if arguments is None:
arguments = []
parser = argparse.ArgumentParser(allow_abbrev=False,
description="This program downloads media from reddit posts")
parser.add_argument("--directory", "-d",
help="Specifies the directory where posts will be downloaded to",
metavar="DIRECTORY")
parser.add_argument("--verbose", "-v",
help="Verbose Mode",
action="store_true",
default=False)
parser.add_argument("--quit", "-q",
help="Auto quit afer the process finishes",
action="store_true",
default=False)
parser.add_argument("--link", "-l",
help="Get posts from link",
metavar="link")
parser.add_argument("--saved",
action="store_true",
required="--unsave" in sys.argv,
help="Triggers saved mode")
parser.add_argument("--unsave",
action="store_true",
help="Unsaves downloaded posts")
parser.add_argument("--submitted",
action="store_true",
help="Gets posts of --user")
parser.add_argument("--upvoted",
action="store_true",
help="Gets upvoted posts of --user")
parser.add_argument("--log",
help="Takes a log file which created by itself (json files),reads posts and tries "
"downloading them again.",
# type=argparse.FileType('r'),
metavar="LOG FILE")
parser.add_argument("--subreddit",
nargs="+",
help="Triggers subreddit mode and takes subreddit's name without r/. use \"frontpage\" "
"for frontpage",
metavar="SUBREDDIT",
type=str)
parser.add_argument("--multireddit",
help="Triggers multireddit mode and takes multireddit's name without m",
metavar="MULTIREDDIT",
type=str)
parser.add_argument("--user",
help="reddit username if needed. use \"me\" for current user",
required="--multireddit" in sys.argv or "--submitted" in sys.argv,
metavar="redditor",
type=str)
parser.add_argument(
"--search",
help="Searches for given query in given subreddits",
metavar="query",
type=str)
parser.add_argument("--sort",
help="Either hot, top, new, controversial, rising or relevance default: hot",
choices=["hot", "top", "new", "controversial", "rising", "relevance"],
metavar="SORT TYPE",
type=str)
parser.add_argument("--limit",
help="default: unlimited",
metavar="Limit",
type=int)
parser.add_argument("--time",
help="Either hour, day, week, month, year or all. default: all",
choices=["all", "hour", "day", "week", "month", "year"],
metavar="TIME_LIMIT",
type=str)
parser.add_argument("--skip",
nargs="+",
help="Skip posts with given type",
type=str,
choices=["images", "videos", "gifs", "self"],
default=[])
parser.add_argument("--skip-domain",
nargs="+",
help="Skip posts with given domain",
type=str,
default=[])
parser.add_argument("--set-folderpath",
action="store_true",
help="Set custom folderpath"
)
parser.add_argument("--set-filename",
action="store_true",
help="Set custom filename",
)
parser.add_argument("--set-default-directory",
action="store_true",
help="Set a default directory to be used in case no directory is given",
)
parser.add_argument("--set-default-options",
action="store_true",
help="Set default options to use everytime program runs",
)
parser.add_argument("--use-local-config",
action="store_true",
help="Creates a config file in the program's directory"
" and uses it. Useful for having multiple configs",
)
parser.add_argument("--no-dupes",
action="store_true",
help="Do not download duplicate posts on different subreddits",
)
parser.add_argument("--downloaded-posts",
help="Use a hash file to keep track of downloaded files",
type=str
)
parser.add_argument("--no-download",
action="store_true",
help="Just saved posts into a the POSTS.json file without downloading"
)
if not arguments:
return parser.parse_args()
else:
return parser.parse_args(arguments)

View File

@@ -0,0 +1,109 @@
from bulkredditdownloader.reddit import Reddit
from bulkredditdownloader.jsonHelper import JsonFile
from bulkredditdownloader.utils import nameCorrector
class Config:
def __init__(self, filename: str):
self.filename = filename
self.file = JsonFile(self.filename)
def generate(self) -> dict:
self._validateCredentials()
self._readCustomFileName()
self._readCustomFolderPath()
self._readDefaultOptions()
return self.file.read()
def setCustomFileName(self):
print("""
IMPORTANT: Do not change the filename structure frequently.
If you did, the program could not find duplicates and
would download the already downloaded files again.
This would not create any duplicates in the directory but
the program would not be as snappy as it should be.
Type a template file name for each post.
You can use SUBREDDIT, REDDITOR, POSTID, TITLE, UPVOTES, FLAIR, DATE in curly braces
The text in curly braces will be replaced with the corresponding property of an each post
For example: {FLAIR}_{SUBREDDIT}_{REDDITOR}
Existing filename template:""", None if "filename" not in self.file.read() else self.file.read()["filename"])
filename = nameCorrector(input(">> ").upper())
self.file.add({"filename": filename})
def _readCustomFileName(self):
content = self.file.read()
if "filename" not in content:
self.file.add({"filename": "{REDDITOR}_{TITLE}_{POSTID}"})
content = self.file.read()
if "{POSTID}" not in content["filename"]:
self.file.add({"filename": content["filename"] + "_{POSTID}"})
def setCustomFolderPath(self):
print("""
Type a folder structure (generic folder path)
Use slash or DOUBLE backslash to separate folders
You can use SUBREDDIT, REDDITOR, POSTID, TITLE, UPVOTES, FLAIR, DATE in curly braces
The text in curly braces will be replaced with the corresponding property of an each post
For example: {REDDITOR}/{SUBREDDIT}/{FLAIR}
Existing folder structure""", None if "folderpath" not in self.file.read() else self.file.read()["folderpath"])
folderpath = nameCorrector(input(">> ").strip("\\").strip("/").upper())
self.file.add({"folderpath": folderpath})
def _readCustomFolderPath(self, path=None):
content = self.file.read()
if "folderpath" not in content:
self.file.add({"folderpath": "{SUBREDDIT}"})
def setDefaultOptions(self):
print("""
Type options to be used everytime script runs
For example: --no-dupes --quit --limit 100 --skip youtube.com
Existing default options:""", None if "options" not in self.file.read() else self.file.read()["options"])
options = input(">> ").strip("")
self.file.add({"options": options})
def _readDefaultOptions(self):
content = self.file.read()
if "options" not in content:
self.file.add({"options": ""})
def _validateCredentials(self):
"""Read credentials from config.json file"""
try:
content = self.file.read()["credentials"]
except BaseException:
self.file.add({"credentials": {}})
content = self.file.read()["credentials"]
if "reddit" in content and len(content["reddit"]) != 0:
pass
else:
Reddit().begin()
print()
def setDefaultDirectory(self):
print("""Set a default directory to use in case no directory is given
Leave blank to reset it. You can use {time} in foler names to use to timestamp it
For example: D:/archive/BDFR_{time}
""")
print("Current default directory:", self.file.read()[
"default_directory"] if "default_directory" in self.file.read() else "")
self.file.add({"default_directory": input(">> ")})

View File

@@ -0,0 +1,17 @@
import os
import pathlib
from bulkredditdownloader.downloaders.downloaderUtils import getExtension, getFile
from bulkredditdownloader.utils import GLOBAL
class Direct:
def __init__(self, directory: pathlib.Path, post: dict):
post['EXTENSION'] = getExtension(post['CONTENTURL'])
if not os.path.exists(directory):
os.makedirs(directory)
filename = GLOBAL.config['filename'].format(**post) + post["EXTENSION"]
short_filename = post['POSTID'] + post['EXTENSION']
getFile(filename, short_filename, directory, post['CONTENTURL'])

View File

@@ -0,0 +1,121 @@
import os
import pathlib
import urllib.error
import urllib.request
from html.parser import HTMLParser
from bulkredditdownloader.downloaders.downloaderUtils import getExtension, getFile
from bulkredditdownloader.errors import AlbumNotDownloadedCompletely, FileAlreadyExistsError, NotADownloadableLinkError
from bulkredditdownloader.utils import GLOBAL
from bulkredditdownloader.utils import printToFile as print
class Erome:
def __init__(self, directory: pathlib.Path, post: dict):
try:
images = self.getLinks(post['CONTENTURL'])
except urllib.error.HTTPError:
raise NotADownloadableLinkError("Not a downloadable link")
images_length = len(images)
how_many_downloaded = images_length
duplicates = 0
if images_length == 1:
extension = getExtension(images[0])
"""Filenames are declared here"""
filename = GLOBAL.config['filename'].format(**post) + post["EXTENSION"]
short_filename = post['POSTID'] + extension
image_url = images[0]
if 'https://' not in image_url or 'http://' not in image_url:
image_url = "https://" + image_url
getFile(filename, short_filename, directory, image_url)
else:
filename = GLOBAL.config['filename'].format(**post)
print(filename)
folder_dir = directory / filename
try:
if not os.path.exists(folder_dir):
os.makedirs(folder_dir)
except FileNotFoundError:
folder_dir = directory / post['POSTID']
os.makedirs(folder_dir)
for i in range(images_length):
extension = getExtension(images[i])
filename = str(i + 1) + extension
image_url = images[i]
if 'https://' not in image_url and 'http://' not in image_url:
image_url = "https://" + image_url
print(" ({}/{})".format(i + 1, images_length))
print(" {}".format(filename))
try:
getFile(filename, filename, folder_dir, image_url, indent=2)
print()
except FileAlreadyExistsError:
print(" The file already exists" + " " * 10, end="\n\n")
duplicates += 1
how_many_downloaded -= 1
except Exception as exception:
# raise exception
print("\n Could not get the file")
print(
" "
+ "{class_name}: {info}".format(class_name=exception.__class__.__name__, info=str(exception))
+ "\n"
)
how_many_downloaded -= 1
if duplicates == images_length:
raise FileAlreadyExistsError
elif how_many_downloaded + duplicates < images_length:
raise AlbumNotDownloadedCompletely("Album Not Downloaded Completely")
def getLinks(self, url: str) -> list[str]:
content = []
line_number = None
class EromeParser(HTMLParser):
tag = None
def handle_starttag(self, tag, attrs):
self.tag = {tag: {attr[0]: attr[1] for attr in attrs}}
page_source = (urllib.request.urlopen(url).read().decode().split('\n'))
""" FIND WHERE ALBUM STARTS IN ORDER NOT TO GET WRONG LINKS"""
for i in range(len(page_source)):
obj = EromeParser()
obj.feed(page_source[i])
tag = obj.tag
if tag is not None:
if "div" in tag:
if "id" in tag["div"]:
if tag["div"]["id"] == "album":
line_number = i
break
for line in page_source[line_number:]:
obj = EromeParser()
obj.feed(line)
tag = obj.tag
if tag is not None:
if "img" in tag:
if "class" in tag["img"]:
if tag["img"]["class"] == "img-front":
content.append(tag["img"]["src"])
elif "source" in tag:
content.append(tag["source"]["src"])
return [link for link in content if link.endswith("_480p.mp4") or not link.endswith(".mp4")]

View File

@@ -0,0 +1,54 @@
import json
import os
import urllib.request
from bs4 import BeautifulSoup
from bulkredditdownloader.downloaders.downloaderUtils import getExtension, getFile
from bulkredditdownloader.downloaders.gifDeliveryNetwork import GifDeliveryNetwork
from bulkredditdownloader.errors import NotADownloadableLinkError
from bulkredditdownloader.utils import GLOBAL
import pathlib
class Gfycat:
def __init__(self, directory: pathlib.Path, post: dict):
try:
post['MEDIAURL'] = self.getLink(post['CONTENTURL'])
except IndexError:
raise NotADownloadableLinkError("Could not read the page source")
post['EXTENSION'] = getExtension(post['MEDIAURL'])
if not os.path.exists(directory):
os.makedirs(directory)
filename = GLOBAL.config['filename'].format(**post) + post["EXTENSION"]
short_filename = post['POSTID'] + post['EXTENSION']
getFile(filename, short_filename, directory, post['MEDIAURL'])
@staticmethod
def getLink(url: str) -> str:
"""Extract direct link to the video from page's source
and return it
"""
if '.webm' in url or '.mp4' in url or '.gif' in url:
return url
if url[-1:] == '/':
url = url[:-1]
url = "https://gfycat.com/" + url.split('/')[-1]
page_source = (urllib.request.urlopen(url).read().decode())
soup = BeautifulSoup(page_source, "html.parser")
attributes = {"data-react-helmet": "true", "type": "application/ld+json"}
content = soup.find("script", attrs=attributes)
if content is None:
return GifDeliveryNetwork.getLink(url)
return json.loads(content.contents[0])["video"]["contentUrl"]

View File

@@ -0,0 +1,147 @@
import json
import os
import pathlib
import requests
from bulkredditdownloader.downloaders.Direct import Direct
from bulkredditdownloader.downloaders.downloaderUtils import getFile
from bulkredditdownloader.errors import (AlbumNotDownloadedCompletely, ExtensionError, FileAlreadyExistsError, ImageNotFound,
NotADownloadableLinkError, TypeInSkip)
from bulkredditdownloader.utils import GLOBAL, nameCorrector
from bulkredditdownloader.utils import printToFile as print
class Imgur:
imgur_image_domain = "https://i.imgur.com/"
def __init__(self, directory: pathlib.Path, post: dict):
link = post['CONTENTURL']
if link.endswith(".gifv"):
link = link.replace(".gifv", ".mp4")
Direct(directory, {**post, 'CONTENTURL': link})
return
self.raw_data = self.getData(link)
self.directory = directory
self.post = post
if self.isAlbum:
if self.raw_data["album_images"]["count"] != 1:
self.downloadAlbum(self.raw_data["album_images"])
else:
self.download(self.raw_data["album_images"]["images"][0])
else:
self.download(self.raw_data)
def downloadAlbum(self, images: dict):
folder_name = GLOBAL.config['filename'].format(**self.post)
folder_dir = self.directory / folder_name
images_length = images["count"]
how_many_downloaded = 0
duplicates = 0
try:
if not os.path.exists(folder_dir):
os.makedirs(folder_dir)
except FileNotFoundError:
folder_dir = self.directory / self.post['POSTID']
os.makedirs(folder_dir)
print(folder_name)
for i in range(images_length):
extension = self.validateExtension(images["images"][i]["ext"])
image_url = self.imgur_image_domain + images["images"][i]["hash"] + extension
filename = "_".join([str(i + 1),
nameCorrector(images["images"][i]['title']),
images["images"][i]['hash']]) + extension
short_filename = str(i + 1) + "_" + images["images"][i]['hash']
print("\n ({}/{})".format(i + 1, images_length))
try:
getFile(filename, short_filename, folder_dir, image_url, indent=2)
how_many_downloaded += 1
print()
except FileAlreadyExistsError:
print(" The file already exists" + " " * 10, end="\n\n")
duplicates += 1
except TypeInSkip:
print(" Skipping...")
how_many_downloaded += 1
except Exception as exception:
print("\n Could not get the file")
print(
" " +
"{class_name}: {info}\nSee CONSOLE_LOG.txt for more information".format(
class_name=exception.__class__.__name__,
info=str(exception)
)
+ "\n"
)
print(GLOBAL.log_stream.getvalue(), no_print=True)
if duplicates == images_length:
raise FileAlreadyExistsError
elif how_many_downloaded + duplicates < images_length:
raise AlbumNotDownloadedCompletely("Album Not Downloaded Completely")
def download(self, image: dict):
extension = self.validateExtension(image["ext"])
image_url = self.imgur_image_domain + image["hash"] + extension
filename = GLOBAL.config['filename'].format(**self.post) + extension
short_filename = self.post['POSTID'] + extension
getFile(filename, short_filename, self.directory, image_url)
@property
def isAlbum(self) -> bool:
return "album_images" in self.raw_data
@staticmethod
def getData(link: str) -> dict:
cookies = {"over18": "1", "postpagebeta": "0"}
res = requests.get(link, cookies=cookies)
if res.status_code != 200:
raise ImageNotFound(f"Server responded with {res.status_code} to {link}")
page_source = requests.get(link, cookies=cookies).text
starting_string = "image : "
ending_string = "group :"
starting_string_lenght = len(starting_string)
try:
start_index = page_source.index(starting_string) + starting_string_lenght
end_index = page_source.index(ending_string, start_index)
except ValueError:
raise NotADownloadableLinkError(
f"Could not read the page source on {link}")
while page_source[end_index] != "}":
end_index -= 1
try:
data = page_source[start_index:end_index + 2].strip()[:-1]
except Exception:
page_source[end_index + 1] = '}'
data = page_source[start_index:end_index + 3].strip()[:-1]
return json.loads(data)
@staticmethod
def validateExtension(string: str) -> str:
possible_extensions = [".jpg", ".png", ".mp4", ".gif"]
for extension in possible_extensions:
if extension in string:
return extension
else:
raise ExtensionError(f"\"{string}\" is not recognized as a valid extension.")

View File

@@ -0,0 +1,109 @@
import hashlib
import os
import sys
import urllib.request
from pathlib import Path
from bulkredditdownloader.errors import DomainInSkip, FailedToDownload, FileAlreadyExistsError, TypeInSkip
from bulkredditdownloader.utils import GLOBAL
from bulkredditdownloader.utils import printToFile as print
def dlProgress(count: int, block_size: int, total_size: int):
"""Function for writing download progress to console
"""
download_mbs = int(count * block_size * (10 ** (-6)))
file_size = int(total_size * (10 ** (-6)))
sys.stdout.write("{}Mb/{}Mb\r".format(download_mbs, file_size))
sys.stdout.flush()
def getExtension(link: str):
"""Extract file extension from image link.
If didn't find any, return '.jpg'
"""
image_types = ['jpg', 'png', 'mp4', 'webm', 'gif']
parsed = link.split('.')
for fileType in image_types:
if fileType in parsed:
return "." + parsed[-1]
else:
if "v.redd.it" not in link:
return '.jpg'
else:
return '.mp4'
def getFile(filename: str, short_filename: str, folder_dir: Path, image_url: str, indent: int = 0, silent: bool = False):
formats = {
"videos": [".mp4", ".webm"],
"images": [".jpg", ".jpeg", ".png", ".bmp"],
"gifs": [".gif"],
"self": []
}
for file_type in GLOBAL.arguments.skip:
for extension in formats[file_type]:
if extension in filename:
raise TypeInSkip
if any(domain in image_url for domain in GLOBAL.arguments.skip_domain):
raise DomainInSkip
headers = [
("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.87 "
"Safari/537.36 OPR/54.0.2952.64"),
("Accept", "text/html,application/xhtml+xml,application/xml;"
"q=0.9,image/webp,image/apng,*/*;q=0.8"),
("Accept-Charset", "ISO-8859-1,utf-8;q=0.7,*;q=0.3"),
("Accept-Encoding", "none"),
("Accept-Language", "en-US,en;q=0.8"),
("Connection", "keep-alive")
]
if not os.path.exists(folder_dir):
os.makedirs(folder_dir)
opener = urllib.request.build_opener()
if "imgur" not in image_url:
opener.addheaders = headers
urllib.request.install_opener(opener)
if not silent:
print(" " * indent + str(folder_dir), " " * indent + str(filename), sep="\n")
for i in range(3):
file_dir = Path(folder_dir) / filename
temp_dir = Path(folder_dir) / (filename + ".tmp")
if not (os.path.isfile(file_dir)):
try:
urllib.request.urlretrieve(image_url, temp_dir, reporthook=dlProgress)
file_hash = createHash(temp_dir)
if GLOBAL.arguments.no_dupes:
if file_hash in GLOBAL.downloadedPosts():
os.remove(temp_dir)
raise FileAlreadyExistsError
GLOBAL.downloadedPosts.add(file_hash)
os.rename(temp_dir, file_dir)
if not silent:
print(" " * indent + "Downloaded" + " " * 10)
return None
except ConnectionResetError:
raise FailedToDownload
except FileNotFoundError:
filename = short_filename
else:
raise FileAlreadyExistsError
raise FailedToDownload
def createHash(filename: str) -> str:
hash_md5 = hashlib.md5()
with open(filename, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()

View File

@@ -0,0 +1,111 @@
import json
import os
import urllib
import requests
import pathlib
from bulkredditdownloader.downloaders.downloaderUtils import getFile
from bulkredditdownloader.errors import (AlbumNotDownloadedCompletely, FileAlreadyExistsError, ImageNotFound, NotADownloadableLinkError,
TypeInSkip)
from bulkredditdownloader.utils import GLOBAL
from bulkredditdownloader.utils import printToFile as print
class Gallery:
def __init__(self, directory: pathlib.Path, post):
link = post['CONTENTURL']
self.raw_data = self.getData(link)
self.directory = directory
self.post = post
images = {}
count = 0
for model in self.raw_data['posts']['models']:
try:
for item in self.raw_data['posts']['models'][model]['media']['gallery']['items']:
try:
images[count] = {'id': item['mediaId'], 'url': self.raw_data['posts']
['models'][model]['media']['mediaMetadata'][item['mediaId']]['s']['u']}
count += 1
except Exception:
continue
except Exception:
continue
self.downloadAlbum(images, count)
@staticmethod
def getData(link: str) -> dict:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.87 Safari/537.36 OPR/54.0.2952.64",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
}
res = requests.get(link, headers=headers)
if res.status_code != 200:
raise ImageNotFound(f"Server responded with {res.status_code} to {link}")
page_source = res.text
starting_string = "_r = {"
ending_string = "</script>"
starting_string_lenght = len(starting_string)
try:
start_index = page_source.index(starting_string) + starting_string_lenght
end_index = page_source.index(ending_string, start_index)
except ValueError:
raise NotADownloadableLinkError(f"Could not read the page source on {link}")
data = json.loads(page_source[start_index - 1:end_index + 1].strip()[:-1])
return data
def downloadAlbum(self, images: dict, count: int):
folder_name = GLOBAL.config['filename'].format(**self.post)
folder_dir = self.directory / folder_name
how_many_downloaded = 0
duplicates = 0
try:
if not os.path.exists(folder_dir):
os.makedirs(folder_dir)
except FileNotFoundError:
folder_dir = self.directory / self.post['POSTID']
os.makedirs(folder_dir)
print(folder_name)
for i in range(count):
path = urllib.parse.urlparse(images[i]['url']).path
extension = os.path.splitext(path)[1]
filename = "_".join([str(i + 1), images[i]['id']]) + extension
short_filename = str(i + 1) + "_" + images[i]['id']
print("\n ({}/{})".format(i + 1, count))
try:
getFile(filename, short_filename, folder_dir, images[i]['url'], indent=2)
how_many_downloaded += 1
print()
except FileAlreadyExistsError:
print(" The file already exists" + " " * 10, end="\n\n")
duplicates += 1
except TypeInSkip:
print(" Skipping...")
how_many_downloaded += 1
except Exception as exception:
print("\n Could not get the file")
print(" " + "{class_name}: {info}\nSee CONSOLE_LOG.txt for more information".format(
class_name=exception.__class__.__name__, info=str(exception)) + "\n"
)
print(GLOBAL.log_stream.getvalue(), no_print=True)
if duplicates == count:
raise FileAlreadyExistsError
elif how_many_downloaded + duplicates < count:
raise AlbumNotDownloadedCompletely("Album Not Downloaded Completely")

View File

@@ -0,0 +1,50 @@
import os
import pathlib
import urllib.request
from bs4 import BeautifulSoup
from bulkredditdownloader.downloaders.downloaderUtils import getExtension, getFile
from bulkredditdownloader.errors import NotADownloadableLinkError
from bulkredditdownloader.utils import GLOBAL
class GifDeliveryNetwork:
def __init__(self, directory: pathlib.Path, post: dict):
try:
post['MEDIAURL'] = self.getLink(post['CONTENTURL'])
except IndexError:
raise NotADownloadableLinkError("Could not read the page source")
post['EXTENSION'] = getExtension(post['MEDIAURL'])
if not os.path.exists(directory):
os.makedirs(directory)
filename = GLOBAL.config['filename'].format(**post) + post["EXTENSION"]
short_filename = post['POSTID'] + post['EXTENSION']
getFile(filename, short_filename, directory, post['MEDIAURL'])
@staticmethod
def getLink(url: str) -> str:
"""Extract direct link to the video from page's source
and return it
"""
if '.webm' in url.split('/')[-1] or '.mp4' in url.split('/')[-1] or '.gif' in url.split('/')[-1]:
return url
if url[-1:] == '/':
url = url[:-1]
url = "https://www.gifdeliverynetwork.com/" + url.split('/')[-1]
page_source = (urllib.request.urlopen(url).read().decode())
soup = BeautifulSoup(page_source, "html.parser")
attributes = {"id": "mp4Source", "type": "video/mp4"}
content = soup.find("source", attrs=attributes)
if content is None:
raise NotADownloadableLinkError("Could not read the page source")
return content["src"]

View File

@@ -0,0 +1,57 @@
import json
import os
import pathlib
import urllib.request
from bs4 import BeautifulSoup
from bulkredditdownloader.downloaders.downloaderUtils import getExtension, getFile
from bulkredditdownloader.errors import NotADownloadableLinkError
from bulkredditdownloader.utils import GLOBAL
class Redgifs:
def __init__(self, directory: pathlib.Path, post: dict):
try:
post['MEDIAURL'] = self.getLink(post['CONTENTURL'])
except IndexError:
raise NotADownloadableLinkError("Could not read the page source")
post['EXTENSION'] = getExtension(post['MEDIAURL'])
if not os.path.exists(directory):
os.makedirs(directory)
filename = GLOBAL.config['filename'].format(**post) + post["EXTENSION"]
short_filename = post['POSTID'] + post['EXTENSION']
getFile(filename, short_filename, directory, post['MEDIAURL'])
@staticmethod
def getLink(url: str) -> str:
"""Extract direct link to the video from page's source
and return it
"""
if '.webm' in url or '.mp4' in url or '.gif' in url:
return url
if url[-1:] == '/':
url = url[:-1]
url = urllib.request.Request(
"https://redgifs.com/watch/" + url.split('/')[-1])
url.add_header(
'User-Agent',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.87 Safari/537.36 OPR/54.0.2952.64')
page_source = (urllib.request.urlopen(url).read().decode())
soup = BeautifulSoup(page_source, "html.parser")
attributes = {"data-react-helmet": "true", "type": "application/ld+json"}
content = soup.find("script", attrs=attributes)
if content is None:
raise NotADownloadableLinkError("Could not read the page source")
return json.loads(content.contents[0])["video"]["contentUrl"]

View File

@@ -0,0 +1,61 @@
from src.utils import printToFile as print
import io
import os
import pathlib
from pathlib import Path
from bulkredditdownloader.errors import FileAlreadyExistsError, TypeInSkip
from bulkredditdownloader.utils import GLOBAL
from bulkredditdownloader.utils import printToFile as print
VanillaPrint = print
class SelfPost:
def __init__(self, directory: pathlib.Path, post: dict):
if "self" in GLOBAL.arguments.skip:
raise TypeInSkip
if not os.path.exists(directory):
os.makedirs(directory)
filename = GLOBAL.config['filename'].format(**post)
file_dir = directory / (filename + ".md")
print(file_dir)
print(filename + ".md")
if Path.is_file(file_dir):
raise FileAlreadyExistsError
try:
self.writeToFile(file_dir, post)
except FileNotFoundError:
file_dir = post['POSTID'] + ".md"
file_dir = directory / file_dir
self.writeToFile(file_dir, post)
@staticmethod
def writeToFile(directory: pathlib.Path, post: dict):
"""Self posts are formatted here"""
content = ("## ["
+ post["TITLE"]
+ "]("
+ post["CONTENTURL"]
+ ")\n"
+ post["CONTENT"]
+ "\n\n---\n\n"
+ "submitted to [r/"
+ post["SUBREDDIT"]
+ "](https://www.reddit.com/r/"
+ post["SUBREDDIT"]
+ ") by [u/"
+ post["REDDITOR"]
+ "](https://www.reddit.com/user/"
+ post["REDDITOR"]
+ ")")
with io.open(directory, "w", encoding="utf-8") as FILE:
VanillaPrint(content, file=FILE)
print("Downloaded")

View File

@@ -0,0 +1,58 @@
import os
import pathlib
import subprocess
from bulkredditdownloader.downloaders.downloaderUtils import getFile
from bulkredditdownloader.utils import GLOBAL
from bulkredditdownloader.utils import printToFile as print
class VReddit:
def __init__(self, directory: pathlib.Path, post: dict):
extension = ".mp4"
if not os.path.exists(directory):
os.makedirs(directory)
filename = GLOBAL.config['filename'].format(**post) + extension
short_filename = post['POSTID'] + extension
try:
fnull = open(os.devnull, 'w')
subprocess.call("ffmpeg", stdout=fnull, stderr=subprocess.STDOUT)
except Exception:
getFile(filename, short_filename, directory, post['CONTENTURL'])
print("FFMPEG library not found, skipping merging video and audio")
else:
video_name = post['POSTID'] + "_video"
video_url = post['CONTENTURL']
audio_name = post['POSTID'] + "_audio"
audio_url = video_url[:video_url.rfind('/')] + '/DASH_audio.mp4'
print(directory, filename, sep="\n")
getFile(video_name, video_name, directory, video_url, silent=True)
getFile(audio_name, audio_name, directory, audio_url, silent=True)
try:
self._mergeAudio(video_name, audio_name, filename, short_filename, directory)
except KeyboardInterrupt:
os.remove(directory / filename)
os.remove(directory / audio_name)
os.rename(directory / video_name, directory / filename)
@staticmethod
def _mergeAudio(
video: pathlib.Path,
audio: pathlib.Path,
filename: pathlib.Path,
short_filename,
directory: pathlib.Path):
input_video = str(directory / video)
input_audio = str(directory / audio)
fnull = open(os.devnull, 'w')
cmd = "ffmpeg -i {} -i {} -c:v copy -c:a aac -strict experimental {}".format(
input_audio, input_video, str(directory / filename))
subprocess.call(cmd.split(), stdout=fnull, stderr=subprocess.STDOUT)
os.remove(directory / video)
os.remove(directory / audio)

View File

@@ -0,0 +1,55 @@
import os
import pathlib
import sys
import youtube_dl
from bulkredditdownloader.downloaders.downloaderUtils import createHash
from bulkredditdownloader.errors import FileAlreadyExistsError
from bulkredditdownloader.utils import GLOBAL
from bulkredditdownloader.utils import printToFile as print
class Youtube:
def __init__(self, directory: pathlib.Path, post: dict):
if not os.path.exists(directory):
os.makedirs(directory)
filename = GLOBAL.config['filename'].format(**post)
print(filename)
self.download(filename, directory, post['CONTENTURL'])
def download(self, filename: str, directory: pathlib.Path, url: str):
ydl_opts = {
"format": "best",
"outtmpl": str(directory / (filename + ".%(ext)s")),
"progress_hooks": [self._hook],
"playlistend": 1,
"nooverwrites": True,
"quiet": True
}
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
location = directory / (filename + ".mp4")
if GLOBAL.arguments.no_dupes:
try:
file_hash = createHash(str(location))
except FileNotFoundError:
return None
if file_hash in GLOBAL.downloadedPosts():
os.remove(location)
raise FileAlreadyExistsError
GLOBAL.downloadedPosts.add(file_hash)
@staticmethod
def _hook(d):
if d['status'] == 'finished':
return print("Downloaded")
downloaded_mbs = int(d['downloaded_bytes'] * (10**(-6)))
file_size = int(d['total_bytes'] * (10**(-6)))
sys.stdout.write("{}Mb/{}Mb\r".format(downloaded_mbs, file_size))
sys.stdout.flush()

View File

@@ -0,0 +1,137 @@
import sys
def full_exc_info(exc_info):
def current_stack(skip=0):
try:
1 / 0
except ZeroDivisionError:
f = sys.exc_info()[2].tb_frame
for i in range(skip + 2):
f = f.f_back
lst = []
while f is not None:
lst.append((f, f.f_lineno))
f = f.f_back
return lst
def extend_traceback(tb, stack):
class FauxTb():
def __init__(self, tb_frame, tb_lineno, tb_next):
self.tb_frame = tb_frame
self.tb_lineno = tb_lineno
self.tb_next = tb_next
"""Extend traceback with stack info."""
head = tb
for tb_frame, tb_lineno in stack:
head = FauxTb(tb_frame, tb_lineno, head)
return head
"""Like sys.exc_info, but includes the full traceback."""
t, v, tb = exc_info
full_tb = extend_traceback(tb, current_stack(1))
return t, v, full_tb
class RedditLoginFailed(Exception):
pass
class ImgurLoginError(Exception):
pass
class FileAlreadyExistsError(Exception):
pass
class NotADownloadableLinkError(Exception):
pass
class AlbumNotDownloadedCompletely(Exception):
pass
class FileNameTooLong(Exception):
pass
class InvalidRedditLink(Exception):
pass
class ProgramModeError(Exception):
pass
class SearchModeError(Exception):
pass
class RedditorNameError(Exception):
pass
class NoMatchingSubmissionFound(Exception):
pass
class NoPrawSupport(Exception):
pass
class NoRedditSupport(Exception):
pass
class MultiredditNotFound(Exception):
pass
class InsufficientPermission(Exception):
pass
class InvalidSortingType(Exception):
pass
class NoSuitablePost(Exception):
pass
class ImgurLimitError(Exception):
pass
class DirectLinkNotFound(Exception):
pass
class InvalidJSONFile(Exception):
pass
class FailedToDownload(Exception):
pass
class TypeInSkip(Exception):
pass
class DomainInSkip(Exception):
pass
class ImageNotFound(Exception):
pass
class ExtensionError(Exception):
pass

View File

@@ -0,0 +1,57 @@
import json
import os
from bulkredditdownloader.errors import InvalidJSONFile
class JsonFile:
""" Write and read JSON files
Use add(self,toBeAdded) to add to files
Use delete(self,*deletedKeys) to delete keys
"""
file_dir = ""
def __init__(self, file_dir: str):
self.file_dir = file_dir
if not os.path.exists(self.file_dir):
self.__writeToFile({}, create=True)
def read(self) -> dict:
try:
with open(self.file_dir, 'r') as f:
return json.load(f)
except json.decoder.JSONDecodeError:
raise InvalidJSONFile(f"{self.file_dir} cannot be read")
def add(self, to_be_added: dict, sub=None) -> dict:
"""Takes a dictionary and merges it with json file.
It uses new key's value if a key already exists.
Returns the new content as a dictionary.
"""
data = self.read()
if sub:
data[sub] = {**data[sub], **to_be_added}
else:
data = {**data, **to_be_added}
self.__writeToFile(data)
return self.read()
def delete(self, *delete_keys: str):
"""Delete given keys from JSON file.
Returns the new content as a dictionary.
"""
data = self.read()
for deleteKey in delete_keys:
if deleteKey in data:
del data[deleteKey]
found = True
if not found:
return False
self.__writeToFile(data)
def __writeToFile(self, content: (dict, list, tuple), create: bool = False):
if not create:
os.remove(self.file_dir)
with open(self.file_dir, 'w') as f:
json.dump(content, f, indent=4)

View File

@@ -0,0 +1,234 @@
from pprint import pprint
try:
from bulkredditdownloader.errors import InvalidRedditLink
except ModuleNotFoundError:
from errors import InvalidRedditLink
def QueryParser(passed_queries: str) -> dict:
extracted_queries = {}
question_mark_index = passed_queries.index("?")
header = passed_queries[:question_mark_index]
extracted_queries["HEADER"] = header
queries = passed_queries[question_mark_index + 1:]
parsed_queries = queries.split("&")
for query in parsed_queries:
query = query.split("=")
extracted_queries[query[0]] = query[1]
if extracted_queries["HEADER"] == "search":
extracted_queries["q"] = extracted_queries["q"].replace("%20", " ")
return extracted_queries
def LinkParser(link: str) -> dict:
result = {}
short_link = False
if "reddit.com" not in link:
raise InvalidRedditLink("Invalid reddit link")
splitted_link = link.split("/")
if splitted_link[0] == "https:" or splitted_link[0] == "http:":
splitted_link = splitted_link[2:]
try:
if (splitted_link[-2].endswith("reddit.com") and
splitted_link[-1] == "") or splitted_link[-1].endswith("reddit.com"):
result["sort"] = "best"
return result
except IndexError:
if splitted_link[0].endswith("reddit.com"):
result["sort"] = "best"
return result
if "redd.it" in splitted_link:
short_link = True
if splitted_link[0].endswith("reddit.com"):
splitted_link = splitted_link[1:]
if "comments" in splitted_link:
result = {"post": link}
return result
elif "me" in splitted_link or \
"u" in splitted_link or \
"user" in splitted_link or \
"r" in splitted_link or \
"m" in splitted_link:
if "r" in splitted_link:
result["subreddit"] = splitted_link[splitted_link.index("r") + 1]
elif "m" in splitted_link:
result["multireddit"] = splitted_link[splitted_link.index("m") + 1]
result["user"] = splitted_link[splitted_link.index("m") - 1]
else:
for index in range(len(splitted_link)):
if splitted_link[index] == "u" or splitted_link[index] == "user":
result["user"] = splitted_link[index + 1]
elif splitted_link[index] == "me":
result["user"] = "me"
for index in range(len(splitted_link)):
if splitted_link[index] in ["hot", "top", "new", "controversial", "rising"]:
result["sort"] = splitted_link[index]
if index == 0:
result["subreddit"] = "frontpage"
elif splitted_link[index] in ["submitted", "saved", "posts", "upvoted"]:
if splitted_link[index] == "submitted" or splitted_link[index] == "posts":
result["submitted"] = {}
elif splitted_link[index] == "saved":
result["saved"] = True
elif splitted_link[index] == "upvoted":
result["upvoted"] = True
elif "?" in splitted_link[index]:
parsed_query = QueryParser(splitted_link[index])
if parsed_query["HEADER"] == "search":
del parsed_query["HEADER"]
result["search"] = parsed_query
elif parsed_query["HEADER"] == "submitted" or \
parsed_query["HEADER"] == "posts":
del parsed_query["HEADER"]
result["submitted"] = parsed_query
else:
del parsed_query["HEADER"]
result["queries"] = parsed_query
if not ("upvoted" in result or
"saved" in result or
"submitted" in result or
"multireddit" in result) and "user" in result:
result["submitted"] = {}
return result
def LinkDesigner(link) -> dict:
attributes = LinkParser(link)
mode = {}
if "post" in attributes:
mode["post"] = attributes["post"]
mode["sort"] = ""
mode["time"] = ""
return mode
elif "search" in attributes:
mode["search"] = attributes["search"]["q"]
if "restrict_sr" in attributes["search"]:
if not (attributes["search"]["restrict_sr"] == 0 or
attributes["search"]["restrict_sr"] == "off" or
attributes["search"]["restrict_sr"] == ""):
if "subreddit" in attributes:
mode["subreddit"] = attributes["subreddit"]
elif "multireddit" in attributes:
mode["multreddit"] = attributes["multireddit"]
mode["user"] = attributes["user"]
else:
mode["subreddit"] = "all"
else:
mode["subreddit"] = "all"
if "t" in attributes["search"]:
mode["time"] = attributes["search"]["t"]
else:
mode["time"] = "all"
if "sort" in attributes["search"]:
mode["sort"] = attributes["search"]["sort"]
else:
mode["sort"] = "relevance"
if "include_over_18" in attributes["search"]:
if attributes["search"]["include_over_18"] == 1 or attributes["search"]["include_over_18"] == "on":
mode["nsfw"] = True
else:
mode["nsfw"] = False
else:
if "queries" in attributes:
if not ("submitted" in attributes or "posts" in attributes):
if "t" in attributes["queries"]:
mode["time"] = attributes["queries"]["t"]
else:
mode["time"] = "day"
else:
if "t" in attributes["queries"]:
mode["time"] = attributes["queries"]["t"]
else:
mode["time"] = "all"
if "sort" in attributes["queries"]:
mode["sort"] = attributes["queries"]["sort"]
else:
mode["sort"] = "new"
else:
mode["time"] = "day"
if "subreddit" in attributes and "search" not in attributes:
mode["subreddit"] = attributes["subreddit"]
elif "user" in attributes and "search" not in attributes:
mode["user"] = attributes["user"]
if "submitted" in attributes:
mode["submitted"] = True
if "sort" in attributes["submitted"]:
mode["sort"] = attributes["submitted"]["sort"]
elif "sort" in mode:
pass
else:
mode["sort"] = "new"
if "t" in attributes["submitted"]:
mode["time"] = attributes["submitted"]["t"]
else:
mode["time"] = "all"
elif "saved" in attributes:
mode["saved"] = True
elif "upvoted" in attributes:
mode["upvoted"] = True
elif "multireddit" in attributes:
mode["multireddit"] = attributes["multireddit"]
if "sort" in attributes:
mode["sort"] = attributes["sort"]
elif "sort" in mode:
pass
else:
mode["sort"] = "hot"
return mode
if __name__ == "__main__":
while True:
link = input("> ")
pprint(LinkDesigner(link))

View File

@@ -0,0 +1,241 @@
import sys
from pathlib import Path
from bulkredditdownloader.errors import InvalidSortingType, ProgramModeError, RedditorNameError, SearchModeError
from bulkredditdownloader.parser import LinkDesigner
import argparse
class ProgramMode:
def __init__(self, arguments: argparse.Namespace):
self.arguments = arguments
def generate(self) -> dict:
try:
self._validateProgramMode()
except ProgramModeError:
self._promptUser()
program_mode = {}
if self.arguments.user is not None:
program_mode["user"] = self.arguments.user
if self.arguments.search is not None:
program_mode["search"] = self.arguments.search
if self.arguments.sort == "hot" or \
self.arguments.sort == "controversial" or \
self.arguments.sort == "rising":
self.arguments.sort = "relevance"
if self.arguments.sort is not None:
program_mode["sort"] = self.arguments.sort
else:
if self.arguments.submitted:
program_mode["sort"] = "new"
else:
program_mode["sort"] = "hot"
if self.arguments.time is not None:
program_mode["time"] = self.arguments.time
else:
program_mode["time"] = "all"
if self.arguments.link is not None:
self.arguments.link = self.arguments.link.strip("\"")
program_mode = LinkDesigner(self.arguments.link)
if self.arguments.search is not None:
program_mode["search"] = self.arguments.search
if self.arguments.sort is not None:
program_mode["sort"] = self.arguments.sort
if self.arguments.time is not None:
program_mode["time"] = self.arguments.time
elif self.arguments.subreddit is not None:
if isinstance(self.arguments.subreddit, list):
self.arguments.subreddit = "+".join(self.arguments.subreddit)
program_mode["subreddit"] = self.arguments.subreddit
elif self.arguments.multireddit is not None:
program_mode["multireddit"] = self.arguments.multireddit
elif self.arguments.saved is True:
program_mode["saved"] = True
elif self.arguments.upvoted is True:
program_mode["upvoted"] = True
elif self.arguments.submitted is not None:
program_mode["submitted"] = True
if self.arguments.sort == "rising":
raise InvalidSortingType("Invalid sorting type has given")
program_mode["limit"] = self.arguments.limit
return program_mode
@staticmethod
def _chooseFrom(choices: list[str]):
print()
choices_by_index = list(str(x) for x in range(len(choices) + 1))
for i in range(len(choices)):
print("{indent}[{order}] {mode}".format(indent=" " * 4, order=i + 1, mode=choices[i]))
print(" " * 4 + "[0] exit\n")
choice = input("> ")
while not choice.lower() in choices + choices_by_index + ["exit"]:
print("Invalid input\n")
input("> ")
if choice == "0" or choice == "exit":
sys.exit()
elif choice in choices_by_index:
return choices[int(choice) - 1]
else:
return choice
def _promptUser(self):
print("select program mode:")
program_modes = ["search", "subreddit", "multireddit", "submitted", "upvoted", "saved", "log"]
program_mode = self._chooseFrom(program_modes)
if program_mode == "search":
self.arguments.search = input("\nquery: ")
self.arguments.subreddit = input("\nsubreddit: ")
print("\nselect sort type:")
sort_types = ["relevance", "top", "new"]
sort_type = self._chooseFrom(sort_types)
self.arguments.sort = sort_type
print("\nselect time filter:")
time_filters = ["hour", "day", "week", "month", "year", "all"]
time_filter = self._chooseFrom(time_filters)
self.arguments.time = time_filter
if program_mode == "subreddit":
subreddit_input = input("(type frontpage for all subscribed subreddits,\n"
" use plus to seperate multi subreddits:"
" pics+funny+me_irl etc.)\n\n"
"subreddit: ")
self.arguments.subreddit = subreddit_input
if " " in self.arguments.subreddit:
self.arguments.subreddit = "+".join(
self.arguments.subreddit.split())
# DELETE THE PLUS (+) AT THE END
if not subreddit_input.lower() == "frontpage" and self.arguments.subreddit[-1] == "+":
self.arguments.subreddit = self.arguments.subreddit[:-1]
print("\nselect sort type:")
sort_types = ["hot", "top", "new", "rising", "controversial"]
sort_type = self._chooseFrom(sort_types)
self.arguments.sort = sort_type
if sort_type in ["top", "controversial"]:
print("\nselect time filter:")
time_filters = ["hour", "day", "week", "month", "year", "all"]
time_filter = self._chooseFrom(time_filters)
self.arguments.time = time_filter
else:
self.arguments.time = "all"
elif program_mode == "multireddit":
self.arguments.user = input("\nmultireddit owner: ")
self.arguments.multireddit = input("\nmultireddit: ")
print("\nselect sort type:")
sort_types = ["hot", "top", "new", "rising", "controversial"]
sort_type = self._chooseFrom(sort_types)
self.arguments.sort = sort_type
if sort_type in ["top", "controversial"]:
print("\nselect time filter:")
time_filters = ["hour", "day", "week", "month", "year", "all"]
time_filter = self._chooseFrom(time_filters)
self.arguments.time = time_filter
else:
self.arguments.time = "all"
elif program_mode == "submitted":
self.arguments.submitted = True
self.arguments.user = input("\nredditor: ")
print("\nselect sort type:")
sort_types = ["hot", "top", "new", "controversial"]
sort_type = self._chooseFrom(sort_types)
self.arguments.sort = sort_type
if sort_type == "top":
print("\nselect time filter:")
time_filters = ["hour", "day", "week", "month", "year", "all"]
time_filter = self._chooseFrom(time_filters)
self.arguments.time = time_filter
else:
self.arguments.time = "all"
elif program_mode == "upvoted":
self.arguments.upvoted = True
self.arguments.user = input("\nredditor: ")
elif program_mode == "saved":
self.arguments.saved = True
elif program_mode == "log":
while True:
self.arguments.log = input("\nlog file directory:")
if Path(self.arguments.log).is_file():
break
while True:
try:
self.arguments.limit = int(input("\nlimit (0 for none): "))
if self.arguments.limit == 0:
self.arguments.limit = None
break
except ValueError:
pass
def _validateProgramMode(self):
"""Check if command-line self.arguments are given correcly,
if not, raise errors
"""
if self.arguments.user is None:
user = 0
else:
user = 1
search = 1 if self.arguments.search else 0
modes = ["saved", "subreddit", "submitted", "log", "link", "upvoted", "multireddit"]
values = {x: 0 if getattr(self.arguments, x) is None or
getattr(self.arguments, x) is False
else 1
for x in modes
}
if not sum(values[x] for x in values) == 1:
raise ProgramModeError("Invalid program mode")
if search + values["saved"] == 2:
raise SearchModeError("You cannot search in your saved posts")
if search + values["submitted"] == 2:
raise SearchModeError("You cannot search in submitted posts")
if search + values["upvoted"] == 2:
raise SearchModeError("You cannot search in upvoted posts")
if search + values["log"] == 2:
raise SearchModeError("You cannot search in log files")
if values["upvoted"] + values["submitted"] == 1 and user == 0:
raise RedditorNameError("No redditor name given")

View File

@@ -0,0 +1,91 @@
import random
import socket
import webbrowser
import praw
from prawcore.exceptions import ResponseException
from bulkredditdownloader.errors import RedditLoginFailed
from bulkredditdownloader.jsonHelper import JsonFile
from bulkredditdownloader.utils import GLOBAL
class Reddit:
def __init__(self, refresh_token: str = None):
self.SCOPES = ['identity', 'history', 'read', 'save']
self.PORT = 7634
self.refresh_token = refresh_token
self.redditInstance = None
self.arguments = {
"client_id": GLOBAL.reddit_client_id,
"client_secret": GLOBAL.reddit_client_secret,
"user_agent": str(socket.gethostname())
}
def begin(self) -> praw.Reddit:
if self.refresh_token:
self.arguments["refresh_token"] = self.refresh_token
self.redditInstance = praw.Reddit(**self.arguments)
try:
self.redditInstance.auth.scopes()
return self.redditInstance
except ResponseException:
self.arguments["redirect_uri"] = "http://localhost:" + \
str(self.PORT)
self.redditInstance = praw.Reddit(**self.arguments)
reddit, refresh_token = self.getRefreshToken(*self.SCOPES)
else:
self.arguments["redirect_uri"] = "http://localhost:" + \
str(self.PORT)
self.redditInstance = praw.Reddit(**self.arguments)
reddit, refresh_token = self.getRefreshToken(*self.SCOPES)
JsonFile(GLOBAL.configDirectory).add({"reddit_username": str(
reddit.user.me()), "reddit": refresh_token}, "credentials")
return self.redditInstance
def recieve_connection(self) -> socket:
"""Wait for and then return a connected socket..
Opens a TCP connection on port 8080, and waits for a single client.
"""
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', self.PORT))
server.listen(1)
client = server.accept()[0]
server.close()
return client
def send_message(self, client: socket, message: str):
"""Send message to client and close the connection."""
client.send('HTTP/1.1 200 OK\r\n\r\n{}'.format(message).encode('utf-8'))
client.close()
def getRefreshToken(self, scopes: list[str]) -> tuple[praw.Reddit, str]:
state = str(random.randint(0, 65000))
url = self.redditInstance.auth.url(scopes, state, 'permanent')
print("---Setting up the Reddit API---\n")
print("Go to this URL and login to reddit:\n", url, sep="\n", end="\n\n")
webbrowser.open(url, new=2)
client = self.recieve_connection()
data = client.recv(1024).decode('utf-8')
str(data)
param_tokens = data.split(' ', 2)[1].split('?', 1)[1].split('&')
params = {key: value for (key, value) in [token.split('=') for token in param_tokens]}
if state != params['state']:
self.send_message(client, 'State mismatch. Expected: {} Received: {}'.format(state, params['state']))
raise RedditLoginFailed
if 'error' in params:
self.send_message(client, params['error'])
raise RedditLoginFailed
refresh_token = self.redditInstance.auth.authorize(params['code'])
self.send_message(client,
"<script>"
"alert(\"You can go back to terminal window now.\");"
"</script>"
)
return self.redditInstance, refresh_token

View File

@@ -0,0 +1,341 @@
import sys
import time
import urllib.request
from urllib.error import HTTPError
from prawcore.exceptions import Forbidden, NotFound
from bulkredditdownloader.errors import (InsufficientPermission, InvalidSortingType, MultiredditNotFound, NoMatchingSubmissionFound,
NoPrawSupport)
from bulkredditdownloader.reddit import Reddit
from praw.models.listing.generator import ListingGenerator
from bulkredditdownloader.utils import GLOBAL, createLogFile, printToFile
from praw.models import Submission
print = printToFile
def getPosts(program_mode: dict) -> list[dict]:
"""Call PRAW regarding to arguments and pass it to extractDetails.
Return what extractDetails has returned.
"""
reddit = Reddit(GLOBAL.config["credentials"]["reddit"]).begin()
if program_mode["sort"] == "best":
raise NoPrawSupport("PRAW does not support that")
if "subreddit" in program_mode:
if "search" in program_mode:
if program_mode["subreddit"] == "frontpage":
program_mode["subreddit"] = "all"
if "user" in program_mode:
if program_mode["user"] == "me":
program_mode["user"] = str(reddit.user.me())
if "search" not in program_mode:
if program_mode["sort"] == "top" or program_mode["sort"] == "controversial":
keyword_params = {"time_filter": program_mode["time"], "limit": program_mode["limit"]}
# OTHER SORT TYPES DON'T TAKE TIME_FILTER
else:
keyword_params = {"limit": program_mode["limit"]}
else:
keyword_params = {"time_filter": program_mode["time"], "limit": program_mode["limit"]}
if "search" in program_mode:
if program_mode["sort"] in ["hot", "rising", "controversial"]:
raise InvalidSortingType("Invalid sorting type has given")
if "subreddit" in program_mode:
print(
"search for \"{search}\" in\n"
"subreddit: {subreddit}\nsort: {sort}\n"
"time: {time}\nlimit: {limit}\n".format(
search=program_mode["search"],
limit=program_mode["limit"],
sort=program_mode["sort"],
subreddit=program_mode["subreddit"],
time=program_mode["time"]
).upper(), no_print=True
)
return extractDetails(
reddit.subreddit(program_mode["subreddit"]).search(
program_mode["search"],
limit=program_mode["limit"],
sort=program_mode["sort"],
time_filter=program_mode["time"]
)
)
elif "multireddit" in program_mode:
raise NoPrawSupport("PRAW does not support that")
elif "user" in program_mode:
raise NoPrawSupport("PRAW does not support that")
elif "saved" in program_mode:
raise ("Reddit does not support that")
if program_mode["sort"] == "relevance":
raise InvalidSortingType("Invalid sorting type has given")
if "saved" in program_mode:
print("saved posts\nuser:{username}\nlimit={limit}\n".format(
username=reddit.user.me(),
limit=program_mode["limit"]).upper(),
no_print=True
)
return extractDetails(reddit.user.me().saved(limit=program_mode["limit"]))
if "subreddit" in program_mode:
if program_mode["subreddit"] == "frontpage":
print(
"subreddit: {subreddit}\nsort: {sort}\n"
"time: {time}\nlimit: {limit}\n".format(
limit=program_mode["limit"],
sort=program_mode["sort"],
subreddit=program_mode["subreddit"],
time=program_mode["time"]).upper(),
no_print=True
)
return extractDetails(getattr(reddit.front, program_mode["sort"])(**keyword_params))
else:
print(
"subreddit: {subreddit}\nsort: {sort}\n"
"time: {time}\nlimit: {limit}\n".format(
limit=program_mode["limit"],
sort=program_mode["sort"],
subreddit=program_mode["subreddit"],
time=program_mode["time"]).upper(),
no_print=True
)
return extractDetails(
getattr(reddit.subreddit(program_mode["subreddit"]), program_mode["sort"])(**keyword_params)
)
print(
"subreddit: {subreddit}\nsort: {sort}\n"
"time: {time}\nlimit: {limit}\n".format(
limit=programMode["limit"],
sort=programMode["sort"],
subreddit=programMode["subreddit"],
time=programMode["time"]
).upper(), noPrint=True
)
return extractDetails(
getattr(
reddit.subreddit(programMode["subreddit"]), programMode["sort"]
)(**keyword_params)
)
elif "multireddit" in program_mode:
print(
"user: {user}\n"
"multireddit: {multireddit}\nsort: {sort}\n"
"time: {time}\nlimit: {limit}\n".format(
user=program_mode["user"],
limit=program_mode["limit"],
sort=program_mode["sort"],
multireddit=program_mode["multireddit"],
time=program_mode["time"]).upper(),
no_print=True
)
try:
return extractDetails(
getattr(reddit.multireddit(program_mode["user"], program_mode["multireddit"]),
program_mode["sort"]
)(**keyword_params)
)
except NotFound:
raise MultiredditNotFound("Multireddit not found")
elif "submitted" in program_mode:
print(
"submitted posts of {user}\nsort: {sort}\n"
"time: {time}\nlimit: {limit}\n".format(
limit=program_mode["limit"],
sort=program_mode["sort"],
user=program_mode["user"],
time=program_mode["time"]).upper(),
no_print=True
)
return extractDetails(
getattr(reddit.redditor(program_mode["user"]).submissions, program_mode["sort"])(**keyword_params)
)
elif "upvoted" in program_mode:
print(
"upvoted posts of {user}\nlimit: {limit}\n".format(
user=program_mode["user"],
limit=program_mode["limit"]).upper(),
no_print=True
)
try:
return extractDetails(reddit.redditor(program_mode["user"]).upvoted(limit=program_mode["limit"]))
except Forbidden:
raise InsufficientPermission(
"You do not have permission to do that")
elif "post" in program_mode:
print("post: {post}\n".format(post=program_mode["post"]).upper(), no_print=True)
return extractDetails(reddit.submission(url=program_mode["post"]), single_post=True)
def extractDetails(posts: (ListingGenerator, Submission), single_post=False) -> list[dict]:
"""Check posts and decide if it can be downloaded.
If so, create a dictionary with post details and append them to a list.
Write all of posts to file. Return the list
"""
post_list = []
post_count = 1
all_posts = {}
print("\nGETTING POSTS")
posts_file = createLogFile("POSTS")
if single_post:
submission = posts
post_count += 1
try:
details = {'POSTID': submission.id,
'TITLE': submission.title,
'REDDITOR': str(submission.author),
'TYPE': None,
'CONTENTURL': submission.url,
'SUBREDDIT': submission.subreddit.display_name,
'UPVOTES': submission.score,
'FLAIR': submission.link_flair_text,
'DATE': str(time.strftime("%Y-%m-%d_%H-%M", time.localtime(submission.created_utc)))
}
except AttributeError:
pass
if not any(
domain in submission.domain for domain in GLOBAL.arguments.skip_domain):
result = matchWithDownloader(submission)
if result is not None:
details = {**details, **result}
post_list.append(details)
posts_file.add({post_count: details})
else:
try:
for submission in posts:
if post_count % 100 == 0:
sys.stdout.write("")
sys.stdout.flush()
if post_count % 1000 == 0:
sys.stdout.write("\n" + " " * 14)
sys.stdout.flush()
try:
details = {'POSTID': submission.id,
'TITLE': submission.title,
'REDDITOR': str(submission.author),
'TYPE': None,
'CONTENTURL': submission.url,
'SUBREDDIT': submission.subreddit.display_name,
'UPVOTES': submission.score,
'FLAIR': submission.link_flair_text,
'DATE': str(time.strftime("%Y-%m-%d_%H-%M", time.localtime(submission.created_utc)))
}
except AttributeError:
continue
if details['POSTID'] in GLOBAL.downloadedPosts():
continue
if not any(
domain in submission.domain for domain in GLOBAL.arguments.skip_domain):
result = matchWithDownloader(submission)
if result is not None:
details = {**details, **result}
post_list.append(details)
all_posts[post_count] = details
post_count += 1
except KeyboardInterrupt:
print("\nKeyboardInterrupt", no_print=True)
posts_file.add(all_posts)
if not len(post_list) == 0:
print()
return post_list
else:
raise NoMatchingSubmissionFound("No matching submission was found")
def matchWithDownloader(submission: Submission) -> dict[str, str]:
direct_link = extractDirectLink(submission.url)
if direct_link:
return {'TYPE': 'direct', 'CONTENTURL': direct_link}
if 'v.redd.it' in submission.domain:
bitrates = ["DASH_1080", "DASH_720", "DASH_600", "DASH_480", "DASH_360", "DASH_240"]
for bitrate in bitrates:
video_url = submission.url + "/" + bitrate + ".mp4"
try:
response_code = urllib.request.urlopen(video_url).getcode()
except urllib.error.HTTPError:
response_code = 0
if response_code == 200:
return {'TYPE': 'v.redd.it', 'CONTENTURL': video_url}
if 'gfycat' in submission.domain:
return {'TYPE': 'gfycat'}
if 'youtube' in submission.domain and 'watch' in submission.url:
return {'TYPE': 'youtube'}
if 'youtu.be' in submission.domain:
url = urllib.request.urlopen(submission.url).geturl()
if 'watch' in url:
return {'TYPE': 'youtube'}
elif 'imgur' in submission.domain:
return {'TYPE': 'imgur'}
elif 'erome' in submission.domain:
return {'TYPE': 'erome'}
elif 'redgifs' in submission.domain:
return {'TYPE': 'redgifs'}
elif 'gifdeliverynetwork' in submission.domain:
return {'TYPE': 'gifdeliverynetwork'}
if 'reddit.com/gallery' in submission.url:
return {'TYPE': 'gallery'}
elif submission.is_self and 'self' not in GLOBAL.arguments.skip:
return {'TYPE': 'self',
'CONTENT': submission.selftext}
def extractDirectLink(url: str) -> (bool, str):
"""Check if link is a direct image link.
If so, return URL,
if not, return False
"""
image_types = ['jpg', 'jpeg', 'png', 'mp4', 'webm', 'gif']
if url[-1] == "/":
url = url[:-1]
if "i.reddituploads.com" in url:
return url
for extension in image_types:
if extension == url.split(".")[-1]:
return url
else:
return None

View File

@@ -0,0 +1,25 @@
from os import path
class Store:
def __init__(self, directory: str = None):
self.directory = directory
if self.directory:
if path.exists(directory):
with open(directory, 'r') as f:
self.list = f.read().split("\n")
else:
with open(self.directory, 'a'):
pass
self.list = []
else:
self.list = []
def __call__(self) -> list:
return self.list
def add(self, data: dict):
self.list.append(data)
if self.directory:
with open(self.directory, 'a') as f:
f.write("{data}\n".format(data=data))

View File

@@ -0,0 +1,90 @@
import io
import sys
from os import makedirs, path
from pathlib import Path
from typing import Optional
from bulkredditdownloader.jsonHelper import JsonFile
class GLOBAL:
"""Declare global variables"""
RUN_TIME = ""
config = {'imgur_client_id': None, 'imgur_client_secret': None}
arguments = None
directory = None
defaultConfigDirectory = Path.home() / "Bulk Downloader for Reddit"
configDirectory = ""
reddit_client_id = "U-6gk4ZCh3IeNQ"
reddit_client_secret = "7CZHY6AmKweZME5s50SfDGylaPg"
printVanilla = print
log_stream = None
@staticmethod
def downloadedPosts() -> list:
return []
def createLogFile(title: str) -> JsonFile:
"""Create a log file with given name
inside a folder time stampt in its name and
put given arguments inside \"HEADER\" key
"""
folder_directory = GLOBAL.directory / "LOG_FILES" / GLOBAL.RUN_TIME
log_filename = title.upper() + '.json'
if not path.exists(folder_directory):
makedirs(folder_directory)
file = JsonFile(folder_directory / Path(log_filename))
header = " ".join(sys.argv)
file.add({"HEADER": header})
return file
def printToFile(*args, no_print=False, **kwargs):
"""Print to both CONSOLE and
CONSOLE LOG file in a folder time stampt in the name
"""
folder_directory = GLOBAL.directory / Path("LOG_FILES") / Path(GLOBAL.RUN_TIME)
if not no_print or GLOBAL.arguments.verbose or "file" in kwargs:
print(*args, **kwargs)
if not path.exists(folder_directory):
makedirs(folder_directory)
if "file" not in kwargs:
with io.open(folder_directory / "CONSOLE_LOG.txt", "a", encoding="utf-8") as FILE:
print(*args, file=FILE, **kwargs)
def nameCorrector(string: str, reference: Optional[str] = None) -> str:
"""Swap strange characters from given string
with underscore (_) and shorten it.
Return the string
"""
limit = 247
string_length = len(string)
if reference:
reference_length = len(reference)
total_lenght = reference_length
else:
total_lenght = string_length
if total_lenght > limit:
limit -= reference_length
string = string[:limit - 1]
string = string.replace(" ", "_")
if len(string.split('\n')) > 1:
string = "".join(string.split('\n'))
bad_chars = ['\\', '/', ':', '*', '?', '"', '<', '>', '|', '#', '.', '@', '', '', '\'', '!']
string = "".join([i if i not in bad_chars else "_" for i in string])
return string