nebula-rss/nebula_rss/nebula_loader.py

67 lines
2.1 KiB
Python
Raw Normal View History

2022-01-11 14:17:33 +00:00
import datetime
2022-01-11 13:09:24 +00:00
import os
2022-01-11 14:17:33 +00:00
from typing import List, Optional
2022-01-11 13:09:24 +00:00
2022-01-12 11:29:27 +00:00
import requests
2022-01-11 13:09:24 +00:00
from nebula_rss import NebulaVideo
class NebulaLoader:
def __init__(
self,
username: str,
password: str,
driver_path: Optional[os.PathLike] = None
):
self.username = username
self.password = password
2022-01-12 11:29:27 +00:00
self.session = requests.Session()
2022-01-11 13:09:24 +00:00
2022-01-11 14:17:33 +00:00
@staticmethod
2022-01-12 11:29:27 +00:00
def _parse_api_response(api_video) -> NebulaVideo:
title = api_video['title']
creator = api_video['channel_title']
url = api_video['share_url']
2022-11-18 15:48:41 +00:00
published_at = datetime.datetime.strptime(
api_video['published_at'],
"%Y-%m-%dT%H:%M:%S%z")
2022-01-12 11:29:27 +00:00
return NebulaVideo(title, creator, url, published_at)
def load_from_api(self):
api_base = 'https://api.watchnebula.com/api/v1'
2022-11-18 15:48:41 +00:00
user_agent = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)'\
' AppleWebKit/605.1.15 (KHTML, like Gecko)'\
2022-01-12 11:29:27 +00:00
'Version/15.2 Safari/605.1.15'
headers = {
'User-Agent': user_agent,
'Nebula-Platform': 'web'
}
login_data = {
'email': self.username,
'password': self.password
}
self.session.headers.update(headers)
2022-11-18 15:48:41 +00:00
login_response = self.session.post(
api_base+'/auth/login/',
json=login_data)
2022-01-12 11:29:27 +00:00
login_token = login_response.json()['key']
token_header = {
'Authorization': 'Token ' + login_token
}
self.session.headers.update(token_header)
auth_response = self.session.post(api_base+'/authorization/', json={})
jwt = auth_response.json()['token']
token_header['Authorization'] = 'Bearer ' + jwt
self.session.headers.update(token_header)
2022-11-18 15:48:41 +00:00
videos_response = self.session.get(
'https://content.watchnebula.com/library/video/?page=1')
return [NebulaLoader._parse_api_response(v)
for v in videos_response.json()['results']]
2022-01-11 14:17:33 +00:00
def load(self) -> List[NebulaVideo]:
2022-01-14 04:54:22 +00:00
return self.load_from_api()