Loading REQUIREMENTS.txt +1 −0 Original line number Diff line number Diff line snscrape==0.4.3.20220106 aiohttp[speedups] requests switch nest_asyncio Loading lostmediafinder/__main__.py +3 −1 Original line number Diff line number Diff line Loading @@ -3,6 +3,8 @@ The CLI interface of LostMediaFinder. None of this is public API! """ import asyncio import click from switch import Switch Loading @@ -27,7 +29,7 @@ def youtube(ctx, id: str, format: str) -> int: """ click.echo("\033[1m\033[4m\033[1;31mUsing LostMediaFinder from the command-line is unstable!\033[0m", err=True) click.echo("Generating report, this could take some time...", err=True) response = YouTubeResponse.generate(id) response = asyncio.run(YouTubeResponse.generate(id)) if response.status == "bad.id": raise ValueError("Bad video ID - does not match regex") with Switch(format) as case: Loading lostmediafinder/finder.py +32 −25 Original line number Diff line number Diff line Loading @@ -6,8 +6,7 @@ import random import time import urllib.parse import requests from requests.auth import HTTPBasicAuth import aiohttp from switch import Switch from .types import YouTubeService, T Loading @@ -19,22 +18,24 @@ class WaybackMachine(YouTubeService): name = "Wayback Machine" @classmethod async def _run(cls, id, includeRaw=True, asynchronous=False) -> T: async def _run(cls, id, session: aiohttp.ClientSession, includeRaw=True) -> T: ismeta = False lien = f"https://web.archive.org/web/2oe_/http://wayback-fakeurl.archive.org/yt/{id}" response = requests.get(lien, allow_redirects=False, timeout=15) archived = bool(response.headers.get("location")) # if there's a redirect, it's archived async with session.get(lien, allow_redirects=False, timeout=15) as response: redirect = response.headers.get("location") archived = bool(redirect) # if there's a redirect, it's archived response2 = None if not archived: lien = None check = urllib.parse.quote(f"https://youtube.com/watch?v={id}", safe="") # not exhaustive but... response2 = requests.get(f"https://archive.org/wayback/available?url={check}", timeout=8).json() async with session.get(f"https://archive.org/wayback/available?url={check}", timeout=8) as resp: response2 = await resp.json() if response2["archived_snapshots"]: archived = True ismeta = True lien = response2["archived_snapshots"]["closest"]["url"] rawraw = (response.headers.get("location"), response2) if includeRaw else None rawraw = (redirect, response2) if includeRaw else None return cls( archived=archived, capcount=int(archived), rawraw=rawraw, available=lien, lastupdated=time.time(), name=cls.getName(), Loading @@ -53,12 +54,13 @@ class InternetArchive(YouTubeService): ] @classmethod async def _run(cls, id, includeRaw=True, asynchronous=False) -> T: async def _run(cls, id, session: aiohttp.ClientSession, includeRaw=True) -> T: responses = [] is_dark = False for template in cls.items_tried: ident = template % id metadata = requests.get(f"https://archive.org/metadata/{ident}", timeout=12).json() async with session.get(f"https://archive.org/metadata/{ident}", timeout=12) as resp: metadata = await resp.json() responses.append(metadata) if metadata.get("is_dark"): is_dark = True Loading @@ -84,9 +86,10 @@ class GhostArchive(YouTubeService): Queries GhostArchive for the video you requested. """ @classmethod async def _run(cls, id, includeRaw=True, asynchronous=False) -> T: async def _run(cls, id, session: aiohttp.ClientSession, includeRaw=True) -> T: link = f"https://ghostarchive.org/varchive/{id}" code = requests.get(link).status_code async with session.get(link) as resp: code = resp.status rawraw = code if includeRaw else None archived = None with Switch(code) as case: Loading @@ -95,7 +98,7 @@ class GhostArchive(YouTubeService): elif case(404): archived = False elif case.default: raise AssertionError(f"bad status code (expected one of (200, 404), got {code})") raise AssertionError(f"bad status code (expected one of (200, 404, 500), got {code})") else: raise RuntimeError("We should never be here!") capcount = int(archived) Loading @@ -116,16 +119,18 @@ class Ya(YouTubeService): ) @classmethod async def _run(cls, id, includeRaw=True, asynchronous=False): async def _run(cls, id, session: aiohttp.ClientSession, includeRaw=True): vid = id assert cls._getFromConfig("ya", "enabled"), "#youtubearchive API access is not enabled" auth = HTTPBasicAuth(cls._getFromConfig("ya", "username"), cls._getFromConfig("ya", "password")) auth = aiohttp.BasicAuth(cls._getFromConfig("ya", "username"), cls._getFromConfig("ya", "password")) comments = False count = requests.get("https://ya.borg.xyz/cgi-bin/capture-count?v=" + vid, auth=auth, timeout=5).text async with session.get("https://ya.borg.xyz/cgi-bin/capture-count?v=" + vid, auth=auth, timeout=5) as resp: count = await resp.text() if not count: raise ValueError("Server returned empty response!") commentcount = requests.get("https://ya.borg.xyz/cgi-bin/capture-comment-counts?v="+vid, auth=auth).text count = int(count) async with session.get("https://ya.borg.xyz/cgi-bin/capture-comment-counts?v=" + vid, auth=auth) as resp: commentcount = await resp.text() archived = (count > 0) comments = [i for i in commentcount.split("\n") if i.strip("∅\n") and i.strip() != "0"] rawraw = (count, commentcount) if includeRaw else None Loading @@ -144,7 +149,7 @@ class Filmot(YouTubeService): cooldown: int = 2 @classmethod async def _run(cls, id, includeRaw=True, asynchronous=False) -> T: async def _run(cls, id, session: aiohttp.ClientSession, includeRaw=True) -> T: enabled = cls._getFromConfig("filmot", "enabled") assert enabled, "Filmot API access is not enabled." key = cls._getFromConfig("filmot", "key") Loading @@ -153,7 +158,8 @@ class Filmot(YouTubeService): lastupdated = time.time() cls.lastretrieved = time.time() lastupdated = time.time() metadata = requests.get(f"https://filmot.com/api/getvideos?key={key}&id={id}&flags=1").json() async with session.get(f"https://filmot.com/api/getvideos?key={key}&id={id}&flags=1") as resp: metadata = await resp.json() rawraw = metadata if includeRaw else None if len(metadata) > 0: # pylint: disable=simplifiable-if-statement archived = True Loading @@ -177,11 +183,12 @@ class Playboard(YouTubeService): note = "The Playboard scraper is unreliable; please verify values yourself." @classmethod async def _run(cls, id, includeRaw=True, asynchronous=False): async def _run(cls, id, session: aiohttp.ClientSession, includeRaw=True): user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.%s.0.0 Safari/537.36" user_agent = user_agent % random.randint(0, 100) url = f"https://playboard.co/en/video/{id}" code = requests.get(url, headers={"User-Agent": user_agent}).status_code async with session.get(url, headers={"User-Agent": user_agent}) as resp: code = resp.status rawraw = {"status_code": code, "ua_used": user_agent} lastupdated = time.time() available = None Loading lostmediafinder/types.py +17 −11 Original line number Diff line number Diff line Loading @@ -7,6 +7,8 @@ import time import typing import re import asyncio import aiohttp import cachetools import asyncache Loading Loading @@ -69,12 +71,12 @@ class Service(JSONDataclass): return val @classmethod async def _run(cls, id, includeRaw=True, asynchronous=False) -> T: async def _run(cls, id, session: aiohttp.ClientSession, includeRaw=True) -> T: raise NotImplementedError("Subclass Service and impl the _run function") @classmethod @asyncache.cached(cachetools.TTLCache(1024, 600)) async def run(cls, id: str, includeRaw=True, **kwargs): async def run(cls, id: str, session: aiohttp.ClientSession, includeRaw=True, **kwargs): """ Retrieves the data from the service. Arguments: Loading @@ -82,7 +84,7 @@ class Service(JSONDataclass): includeRaw (bool): Whether or not to include the raw data as sent from the service. If you don't need this data, turn this off; it's only the default for compatibility. """ try: return await cls._run(id, includeRaw=includeRaw, **kwargs) return await cls._run(id, session, includeRaw=includeRaw, **kwargs) except Exception as ename: # pylint: disable=broad-except note = f"An error occured while retrieving data from {cls.getName()}." print(ename) Loading @@ -109,7 +111,9 @@ class Service(JSONDataclass): Archived? {self.archived} {meta} {lien} \t{self.note.strip()} """ return string if self.error: string += f"\t{self.error}\n" return string + "\n" class YouTubeService(Service): # pylint: disable=abstract-method pass Loading @@ -132,14 +136,13 @@ class YouTubeResponse(JSONDataclass): verdict: dict api_version: int = 3 def coerce_to_api_version(selfNEW, target): def coerce_to_api_version(selfNEW, target): # pylint: disable=no-self-argument """ Downgrades the API version to one of your choice, then returns it. Arguments: target (int): The target API version. Must be lower than self.api_version """ import copy self = copy.deepcopy(selfNEW) currentApiVersion = self.api_version if currentApiVersion < target: Loading @@ -152,7 +155,7 @@ class YouTubeResponse(JSONDataclass): assert self.api_version == target return self def _convert_v3_to_v2(selfNEW): def _convert_v3_to_v2(selfNEW): # pylint: disable=no-self-argument self = copy.deepcopy(selfNEW) assert self.api_version == 3 self.api_version = 2 Loading Loading @@ -190,7 +193,7 @@ class YouTubeResponse(JSONDataclass): return verdict @classmethod async def generate(cls, id, asyncio=False): async def generate(cls, id): """ Runs all the Services. Arguments: Loading @@ -200,9 +203,12 @@ class YouTubeResponse(JSONDataclass): return cls(status="bad.id", id=id, keys=[], verdict={"video":False,"comments":False,"metaonly":False,"human_friendly":"Invalid video ID. "}) keys = [] services = cls._get_services() for subclass in services: result = None result = await subclass.run(id) coroutines = [] async with aiohttp.ClientSession() as session: for service in services: coroutines.append(service.run(id, session)) results = await asyncio.gather(*coroutines) for result in results: keys.append(result) any_comments_archived = any(map(lambda e : e.comments, keys)) any_metaonly_archived = any(map(lambda e : e.metaonly and e.archived, keys)) Loading Loading
REQUIREMENTS.txt +1 −0 Original line number Diff line number Diff line snscrape==0.4.3.20220106 aiohttp[speedups] requests switch nest_asyncio Loading
lostmediafinder/__main__.py +3 −1 Original line number Diff line number Diff line Loading @@ -3,6 +3,8 @@ The CLI interface of LostMediaFinder. None of this is public API! """ import asyncio import click from switch import Switch Loading @@ -27,7 +29,7 @@ def youtube(ctx, id: str, format: str) -> int: """ click.echo("\033[1m\033[4m\033[1;31mUsing LostMediaFinder from the command-line is unstable!\033[0m", err=True) click.echo("Generating report, this could take some time...", err=True) response = YouTubeResponse.generate(id) response = asyncio.run(YouTubeResponse.generate(id)) if response.status == "bad.id": raise ValueError("Bad video ID - does not match regex") with Switch(format) as case: Loading
lostmediafinder/finder.py +32 −25 Original line number Diff line number Diff line Loading @@ -6,8 +6,7 @@ import random import time import urllib.parse import requests from requests.auth import HTTPBasicAuth import aiohttp from switch import Switch from .types import YouTubeService, T Loading @@ -19,22 +18,24 @@ class WaybackMachine(YouTubeService): name = "Wayback Machine" @classmethod async def _run(cls, id, includeRaw=True, asynchronous=False) -> T: async def _run(cls, id, session: aiohttp.ClientSession, includeRaw=True) -> T: ismeta = False lien = f"https://web.archive.org/web/2oe_/http://wayback-fakeurl.archive.org/yt/{id}" response = requests.get(lien, allow_redirects=False, timeout=15) archived = bool(response.headers.get("location")) # if there's a redirect, it's archived async with session.get(lien, allow_redirects=False, timeout=15) as response: redirect = response.headers.get("location") archived = bool(redirect) # if there's a redirect, it's archived response2 = None if not archived: lien = None check = urllib.parse.quote(f"https://youtube.com/watch?v={id}", safe="") # not exhaustive but... response2 = requests.get(f"https://archive.org/wayback/available?url={check}", timeout=8).json() async with session.get(f"https://archive.org/wayback/available?url={check}", timeout=8) as resp: response2 = await resp.json() if response2["archived_snapshots"]: archived = True ismeta = True lien = response2["archived_snapshots"]["closest"]["url"] rawraw = (response.headers.get("location"), response2) if includeRaw else None rawraw = (redirect, response2) if includeRaw else None return cls( archived=archived, capcount=int(archived), rawraw=rawraw, available=lien, lastupdated=time.time(), name=cls.getName(), Loading @@ -53,12 +54,13 @@ class InternetArchive(YouTubeService): ] @classmethod async def _run(cls, id, includeRaw=True, asynchronous=False) -> T: async def _run(cls, id, session: aiohttp.ClientSession, includeRaw=True) -> T: responses = [] is_dark = False for template in cls.items_tried: ident = template % id metadata = requests.get(f"https://archive.org/metadata/{ident}", timeout=12).json() async with session.get(f"https://archive.org/metadata/{ident}", timeout=12) as resp: metadata = await resp.json() responses.append(metadata) if metadata.get("is_dark"): is_dark = True Loading @@ -84,9 +86,10 @@ class GhostArchive(YouTubeService): Queries GhostArchive for the video you requested. """ @classmethod async def _run(cls, id, includeRaw=True, asynchronous=False) -> T: async def _run(cls, id, session: aiohttp.ClientSession, includeRaw=True) -> T: link = f"https://ghostarchive.org/varchive/{id}" code = requests.get(link).status_code async with session.get(link) as resp: code = resp.status rawraw = code if includeRaw else None archived = None with Switch(code) as case: Loading @@ -95,7 +98,7 @@ class GhostArchive(YouTubeService): elif case(404): archived = False elif case.default: raise AssertionError(f"bad status code (expected one of (200, 404), got {code})") raise AssertionError(f"bad status code (expected one of (200, 404, 500), got {code})") else: raise RuntimeError("We should never be here!") capcount = int(archived) Loading @@ -116,16 +119,18 @@ class Ya(YouTubeService): ) @classmethod async def _run(cls, id, includeRaw=True, asynchronous=False): async def _run(cls, id, session: aiohttp.ClientSession, includeRaw=True): vid = id assert cls._getFromConfig("ya", "enabled"), "#youtubearchive API access is not enabled" auth = HTTPBasicAuth(cls._getFromConfig("ya", "username"), cls._getFromConfig("ya", "password")) auth = aiohttp.BasicAuth(cls._getFromConfig("ya", "username"), cls._getFromConfig("ya", "password")) comments = False count = requests.get("https://ya.borg.xyz/cgi-bin/capture-count?v=" + vid, auth=auth, timeout=5).text async with session.get("https://ya.borg.xyz/cgi-bin/capture-count?v=" + vid, auth=auth, timeout=5) as resp: count = await resp.text() if not count: raise ValueError("Server returned empty response!") commentcount = requests.get("https://ya.borg.xyz/cgi-bin/capture-comment-counts?v="+vid, auth=auth).text count = int(count) async with session.get("https://ya.borg.xyz/cgi-bin/capture-comment-counts?v=" + vid, auth=auth) as resp: commentcount = await resp.text() archived = (count > 0) comments = [i for i in commentcount.split("\n") if i.strip("∅\n") and i.strip() != "0"] rawraw = (count, commentcount) if includeRaw else None Loading @@ -144,7 +149,7 @@ class Filmot(YouTubeService): cooldown: int = 2 @classmethod async def _run(cls, id, includeRaw=True, asynchronous=False) -> T: async def _run(cls, id, session: aiohttp.ClientSession, includeRaw=True) -> T: enabled = cls._getFromConfig("filmot", "enabled") assert enabled, "Filmot API access is not enabled." key = cls._getFromConfig("filmot", "key") Loading @@ -153,7 +158,8 @@ class Filmot(YouTubeService): lastupdated = time.time() cls.lastretrieved = time.time() lastupdated = time.time() metadata = requests.get(f"https://filmot.com/api/getvideos?key={key}&id={id}&flags=1").json() async with session.get(f"https://filmot.com/api/getvideos?key={key}&id={id}&flags=1") as resp: metadata = await resp.json() rawraw = metadata if includeRaw else None if len(metadata) > 0: # pylint: disable=simplifiable-if-statement archived = True Loading @@ -177,11 +183,12 @@ class Playboard(YouTubeService): note = "The Playboard scraper is unreliable; please verify values yourself." @classmethod async def _run(cls, id, includeRaw=True, asynchronous=False): async def _run(cls, id, session: aiohttp.ClientSession, includeRaw=True): user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.%s.0.0 Safari/537.36" user_agent = user_agent % random.randint(0, 100) url = f"https://playboard.co/en/video/{id}" code = requests.get(url, headers={"User-Agent": user_agent}).status_code async with session.get(url, headers={"User-Agent": user_agent}) as resp: code = resp.status rawraw = {"status_code": code, "ua_used": user_agent} lastupdated = time.time() available = None Loading
lostmediafinder/types.py +17 −11 Original line number Diff line number Diff line Loading @@ -7,6 +7,8 @@ import time import typing import re import asyncio import aiohttp import cachetools import asyncache Loading Loading @@ -69,12 +71,12 @@ class Service(JSONDataclass): return val @classmethod async def _run(cls, id, includeRaw=True, asynchronous=False) -> T: async def _run(cls, id, session: aiohttp.ClientSession, includeRaw=True) -> T: raise NotImplementedError("Subclass Service and impl the _run function") @classmethod @asyncache.cached(cachetools.TTLCache(1024, 600)) async def run(cls, id: str, includeRaw=True, **kwargs): async def run(cls, id: str, session: aiohttp.ClientSession, includeRaw=True, **kwargs): """ Retrieves the data from the service. Arguments: Loading @@ -82,7 +84,7 @@ class Service(JSONDataclass): includeRaw (bool): Whether or not to include the raw data as sent from the service. If you don't need this data, turn this off; it's only the default for compatibility. """ try: return await cls._run(id, includeRaw=includeRaw, **kwargs) return await cls._run(id, session, includeRaw=includeRaw, **kwargs) except Exception as ename: # pylint: disable=broad-except note = f"An error occured while retrieving data from {cls.getName()}." print(ename) Loading @@ -109,7 +111,9 @@ class Service(JSONDataclass): Archived? {self.archived} {meta} {lien} \t{self.note.strip()} """ return string if self.error: string += f"\t{self.error}\n" return string + "\n" class YouTubeService(Service): # pylint: disable=abstract-method pass Loading @@ -132,14 +136,13 @@ class YouTubeResponse(JSONDataclass): verdict: dict api_version: int = 3 def coerce_to_api_version(selfNEW, target): def coerce_to_api_version(selfNEW, target): # pylint: disable=no-self-argument """ Downgrades the API version to one of your choice, then returns it. Arguments: target (int): The target API version. Must be lower than self.api_version """ import copy self = copy.deepcopy(selfNEW) currentApiVersion = self.api_version if currentApiVersion < target: Loading @@ -152,7 +155,7 @@ class YouTubeResponse(JSONDataclass): assert self.api_version == target return self def _convert_v3_to_v2(selfNEW): def _convert_v3_to_v2(selfNEW): # pylint: disable=no-self-argument self = copy.deepcopy(selfNEW) assert self.api_version == 3 self.api_version = 2 Loading Loading @@ -190,7 +193,7 @@ class YouTubeResponse(JSONDataclass): return verdict @classmethod async def generate(cls, id, asyncio=False): async def generate(cls, id): """ Runs all the Services. Arguments: Loading @@ -200,9 +203,12 @@ class YouTubeResponse(JSONDataclass): return cls(status="bad.id", id=id, keys=[], verdict={"video":False,"comments":False,"metaonly":False,"human_friendly":"Invalid video ID. "}) keys = [] services = cls._get_services() for subclass in services: result = None result = await subclass.run(id) coroutines = [] async with aiohttp.ClientSession() as session: for service in services: coroutines.append(service.run(id, session)) results = await asyncio.gather(*coroutines) for result in results: keys.append(result) any_comments_archived = any(map(lambda e : e.comments, keys)) any_metaonly_archived = any(map(lambda e : e.metaonly and e.archived, keys)) Loading