import datetime import os import re import time from typing import List, Optional import requests from nebula_rss import NebulaVideo class NebulaLoader: def __init__( self, username: str, password: str, driver_path: Optional[os.PathLike] = None ): self.username = username self.password = password self.session = requests.Session() @staticmethod def _parse_api_response(api_video) -> NebulaVideo: title = api_video['title'] creator = api_video['channel_title'] url = api_video['share_url'] published_at = datetime.datetime.strptime(api_video['published_at'], "%Y-%m-%dT%H:%M:%S%z") return NebulaVideo(title, creator, url, published_at) def load_from_api(self): api_base = 'https://api.watchnebula.com/api/v1' user_agent = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko)'\ 'Version/15.2 Safari/605.1.15' headers = { 'User-Agent': user_agent, 'Nebula-Platform': 'web' } login_data = { 'email': self.username, 'password': self.password } self.session.headers.update(headers) login_response = self.session.post(api_base+'/auth/login/', json=login_data) login_token = login_response.json()['key'] token_header = { 'Authorization': 'Token ' + login_token } self.session.headers.update(token_header) auth_response = self.session.post(api_base+'/authorization/', json={}) jwt = auth_response.json()['token'] token_header['Authorization'] = 'Bearer ' + jwt self.session.headers.update(token_header) videos_response = self.session.get('https://content.watchnebula.com/library/video/?page=1') return [NebulaLoader._parse_api_response(v) for v in videos_response.json()['results']] def load(self) -> List[NebulaVideo]: videos = [] try: videos = self.load_from_api() finally: self.driver.quit() return videos