Implement callbacks for downloading
This commit is contained in:
@@ -6,7 +6,7 @@ import logging
|
||||
import re
|
||||
import time
|
||||
import urllib.parse
|
||||
from typing import Optional
|
||||
from typing import Callable, Optional
|
||||
|
||||
import _hashlib
|
||||
import requests
|
||||
@@ -18,40 +18,44 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Resource:
|
||||
def __init__(self, source_submission: Submission, url: str, extension: str = None):
|
||||
def __init__(self, source_submission: Submission, url: str, download_function: Callable, extension: str = None):
|
||||
self.source_submission = source_submission
|
||||
self.content: Optional[bytes] = None
|
||||
self.url = url
|
||||
self.hash: Optional[_hashlib.HASH] = None
|
||||
self.extension = extension
|
||||
self.download_function = download_function
|
||||
if not self.extension:
|
||||
self.extension = self._determine_extension()
|
||||
|
||||
@staticmethod
|
||||
def retry_download(url: str, max_wait_time: int, current_wait_time: int = 60) -> Optional[bytes]:
|
||||
try:
|
||||
response = requests.get(url)
|
||||
if re.match(r'^2\d{2}', str(response.status_code)) and response.content:
|
||||
return response.content
|
||||
elif response.status_code in (408, 429):
|
||||
raise requests.exceptions.ConnectionError(f'Response code {response.status_code}')
|
||||
else:
|
||||
raise BulkDownloaderException(
|
||||
f'Unrecoverable error requesting resource: HTTP Code {response.status_code}')
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError) as e:
|
||||
logger.warning(f'Error occured downloading from {url}, waiting {current_wait_time} seconds: {e}')
|
||||
time.sleep(current_wait_time)
|
||||
if current_wait_time < max_wait_time:
|
||||
current_wait_time += 60
|
||||
return Resource.retry_download(url, max_wait_time, current_wait_time)
|
||||
else:
|
||||
logger.error(f'Max wait time exceeded for resource at url {url}')
|
||||
raise
|
||||
def retry_download(url: str, max_wait_time: int) -> Callable:
|
||||
def http_download() -> Optional[bytes]:
|
||||
current_wait_time = 60
|
||||
while True:
|
||||
try:
|
||||
response = requests.get(url)
|
||||
if re.match(r'^2\d{2}', str(response.status_code)) and response.content:
|
||||
return response.content
|
||||
elif response.status_code in (408, 429):
|
||||
raise requests.exceptions.ConnectionError(f'Response code {response.status_code}')
|
||||
else:
|
||||
raise BulkDownloaderException(
|
||||
f'Unrecoverable error requesting resource: HTTP Code {response.status_code}')
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError) as e:
|
||||
logger.warning(f'Error occured downloading from {url}, waiting {current_wait_time} seconds: {e}')
|
||||
time.sleep(current_wait_time)
|
||||
if current_wait_time < max_wait_time:
|
||||
current_wait_time += 60
|
||||
else:
|
||||
logger.error(f'Max wait time exceeded for resource at url {url}')
|
||||
raise
|
||||
return http_download
|
||||
|
||||
def download(self, max_wait_time: int):
|
||||
def download(self):
|
||||
if not self.content:
|
||||
try:
|
||||
content = self.retry_download(self.url, max_wait_time)
|
||||
content = self.download_function()
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
raise BulkDownloaderException(f'Could not download resource: {e}')
|
||||
except BulkDownloaderException:
|
||||
|
||||
Reference in New Issue
Block a user