Commit f1a6ff59 authored by TheTechRobo's avatar TheTechRobo
Browse files

Add Distributed YouTube Archive

parent 6d764b62
Loading
Loading
Loading
Loading
+9 −3
Original line number Diff line number Diff line
@@ -23,6 +23,15 @@ methods:
    username:
    password:

  distributed_youtube_archive:
    title: "Distributed YouTube Archive"
    enabled: false
    key: ""

  hobune_stream:
    title: Hobune.stream
    enabled: true

  filmot:
    title: Filmot
    enabled: false
@@ -32,6 +41,3 @@ methods:
    title: Playboard.co
    enabled: true
  hobune_stream:
    title: Hobune.stream
    enabled: true
 No newline at end of file
+86 −39
Original line number Diff line number Diff line
@@ -2,7 +2,7 @@
All the Service implementations live here.
"""

import random, time, urllib.parse, aiohttp
import random, time, urllib.parse, aiohttp, asyncio
from switch import Switch
from .types import YouTubeService, T, methods

@@ -170,6 +170,8 @@ class GhostArchive(YouTubeService):
                archived = True
            elif case(404):
                archived = False
            elif case(500):
                archived = False
            elif case.default:
                raise AssertionError(f"bad status code (expected one of (200, 404, 500), got {code})")
            else:
@@ -215,6 +217,88 @@ class HackintYa(YouTubeService):
            note=cls.note if archived else "", rawraw=rawraw, metaonly=False
        )

FYT_UA = "FindYoutubeVideo/1.0 operated by TheTechRobo"

class DistributedYoutubeArchive(YouTubeService):
    """
    Queries DYA for the video in question.
    """
    name = methods['distributed_youtube_archive']['title']
    configId = "distributed_youtube_archive"

    @classmethod
    async def _run(cls, id, session: aiohttp.ClientSession, includeRaw=True):
        token = methods[cls.configId]['key']
        user_agent = FYT_UA
        lastupdated = time.time()
        async with session.get(f"https://dya-t-api.strangled.net/api/video/{id}", headers={"Authorization": token}) as resp:
            status = resp.status
            if status not in (200, 404):
                raise RuntimeError(f"DYA returned bad status code {status}")
            j = await resp.json()
            if "contributions" not in j:
                j['contributions_length'] = None
                assert "error" in j, "No error or contributions field returned"
                archived = False
            elif not j['contributions']:
                j['contributions_length'] = len(j['contributions'])
                del j['contributions']
                archived = False
            else:
                j['contributions_length'] = len(j['contributions'])
                del j['contributions']
                archived = True
        capcount = j['contributions_length']
        note = "One or more contributors to the Distributed YouTube Archive have the video. Join their Discord server and ask for help." if archived else ""
        metaonly = False
        comments = None # we can't tell whether there are comments or not
        available = "https://discord.gg/YNeVJ72NS4" if archived else None
        return cls(
            archived=archived, capcount=capcount, lastupdated=lastupdated,
            name=cls.getName(), note=note, rawraw=j, metaonly=metaonly,
            comments=comments, available=available
        )

class Hobune(YouTubeService):
    """
    Queries Hobune.stream for the video in question.
    """
    name = methods["hobune_stream"]["title"]
    configId = "hobune_stream"
    lastretrieved = 0
    cooldown = 0.5

    @classmethod
    async def _run(cls, id, session: aiohttp.ClientSession, includeRaw=True):
        while time.time() - cls.lastretrieved < cls.cooldown:
            await asyncio.sleep(0.1)
        user_agent = "FindYoutubeVideo/1.0 operated by thetechrobo@proton.me"
        urls_to_try = ("https://hobune.stream/videos/{}", "https://hobune.stream/tpa-h/videos/{}")
        raw = []
        archived = False
        available = None
        lastupdated = time.time()
        cls.lastretrieved = lastupdated
        for url in urls_to_try:
            url = url.format(id)
            async with session.head(url, headers={"User-Agent": user_agent}, timeout=5) as resp:
                code = resp.status
                raw.append(code)
            if code == 200:
                archived = True
                available = url
                break
            elif code == 404:
                archived = False
                available = None
            else:
                raise RuntimeError("Hobune.stream returned invalid status code %s" % code)
        return cls(
            archived=archived, capcount=1 if archived else 0,
            lastupdated=lastupdated, name=cls.getName(), note="",
            rawraw=raw, metaonly=False, comments=False, available=available
        )

# TODO: Make a YouTubeServiceWithCooldown or something

class Filmot(YouTubeService):
@@ -231,10 +315,9 @@ class Filmot(YouTubeService):
        key = methods[cls.configId]["api_key"]

        while time.time() - cls.lastretrieved < cls.cooldown:
            time.sleep(0.1)
            await asyncio.sleep(0.1)
        lastupdated = time.time()
        cls.lastretrieved = time.time()
        lastupdated = time.time()
        async with session.get(f"https://filmot.com/api/getvideos?key={key}&id={id}&flags=1") as resp:
            metadata = await resp.json(content_type=None)
        rawraw = metadata if includeRaw else None
@@ -283,39 +366,3 @@ class Playboard(YouTubeService):
                rawraw=rawraw, metaonly=True, comments=False,
                available=available
        )

class Hobune(YouTubeService):
    """
    Queries Hobune.stream for the video in question.
    """
    name = methods["hobune_stream"]["title"]
    configId = "hobune_stream"

    @classmethod
    async def _run(cls, id, session: aiohttp.ClientSession, includeRaw=True):
        user_agent = "FindYoutubeVideo/1.0 operated by thetechrobo@proton.me"
        urls_to_try = ("https://hobune.stream/videos/{}", "https://hobune.stream/tpa-h/videos/{}")
        raw = []
        archived = False
        available = None
        lastupdated = time.time()
        for url in urls_to_try:
            url = url.format(id)
            async with session.head(url, headers={"User-Agent": user_agent}, timeout=5) as resp:
                code = resp.status
                raw.append(code)
            if code == 200:
                archived = True
                available = url
                break
            elif code == 404:
                archived = False
                available = None
            else:
                raise RuntimeError("Hobune.stream returned invalid status code %s" % code)
        return cls(
            archived=archived, capcount=1 if archived else 0,
            lastupdated=lastupdated, name=cls.getName(), note="",
            rawraw=raw, metaonly=False, comments=False, available=available
        )