Switched to tge nebula api
This commit is contained in:
@ -1,4 +1,4 @@
|
||||
__version__ = '0.1.0'
|
||||
__version__ = '0.2.0'
|
||||
|
||||
from nebula_rss.nebula_video import NebulaVideo
|
||||
from nebula_rss.nebula_loader import NebulaLoader
|
||||
|
@ -1,18 +1,11 @@
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
from typing import List, Optional
|
||||
|
||||
from selenium import webdriver
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
from selenium.webdriver.firefox.options import Options
|
||||
from selenium.webdriver.firefox.service import Service
|
||||
from selenium.webdriver.support.wait import WebDriverWait
|
||||
from selenium.webdriver.remote.remote_connection import LOGGER
|
||||
from bs4 import BeautifulSoup
|
||||
import requests
|
||||
|
||||
|
||||
from nebula_rss import NebulaVideo
|
||||
|
||||
@ -26,77 +19,48 @@ class NebulaLoader:
|
||||
):
|
||||
self.username = username
|
||||
self.password = password
|
||||
|
||||
LOGGER.setLevel(logging.FATAL)
|
||||
service = None
|
||||
if driver_path:
|
||||
service = Service(driver_path)
|
||||
options = Options()
|
||||
options.headless = True
|
||||
self.driver = webdriver.Firefox(
|
||||
service=service,
|
||||
options=options,
|
||||
log_path=os.devnull,
|
||||
service_log_path=os.devnull)
|
||||
self.driver.implicitly_wait(10) # seconds
|
||||
self.session = requests.Session()
|
||||
|
||||
@staticmethod
|
||||
def _parse_anchor(anchor) -> NebulaVideo:
|
||||
info_div = anchor.next_sibling
|
||||
details_anchor = info_div.find_all('a')[1]
|
||||
divs = details_anchor.find_all('div')
|
||||
title_div = divs[0]
|
||||
details_div = divs[1]
|
||||
creator = details_div.find('span').string
|
||||
release_text = details_div.find('time').get('datetime')
|
||||
release_date = datetime.datetime.fromisoformat(release_text.replace('Z', '+00:00'))
|
||||
return NebulaVideo(
|
||||
title=title_div.string,
|
||||
creator=creator,
|
||||
url='https://nebula.app' + anchor.get('href'),
|
||||
release_at=release_date
|
||||
)
|
||||
def _parse_api_response(api_video) -> NebulaVideo:
|
||||
title = api_video['title']
|
||||
creator = api_video['channel_title']
|
||||
url = api_video['share_url']
|
||||
published_at = datetime.datetime.strptime(api_video['published_at'], "%Y-%m-%dT%H:%M:%S%z")
|
||||
return NebulaVideo(title, creator, url, published_at)
|
||||
|
||||
def load_from_api(self):
|
||||
api_base = 'https://api.watchnebula.com/api/v1'
|
||||
user_agent = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko)'\
|
||||
'Version/15.2 Safari/605.1.15'
|
||||
headers = {
|
||||
'User-Agent': user_agent,
|
||||
'Nebula-Platform': 'web'
|
||||
}
|
||||
login_data = {
|
||||
'email': self.username,
|
||||
'password': self.password
|
||||
}
|
||||
self.session.headers.update(headers)
|
||||
login_response = self.session.post(api_base+'/auth/login/', json=login_data)
|
||||
login_token = login_response.json()['key']
|
||||
token_header = {
|
||||
'Authorization': 'Token ' + login_token
|
||||
}
|
||||
self.session.headers.update(token_header)
|
||||
|
||||
auth_response = self.session.post(api_base+'/authorization/', json={})
|
||||
jwt = auth_response.json()['token']
|
||||
token_header['Authorization'] = 'Bearer ' + jwt
|
||||
self.session.headers.update(token_header)
|
||||
|
||||
videos_response = self.session.get('https://content.watchnebula.com/library/video/?page=1')
|
||||
return [NebulaLoader._parse_api_response(v) for v in videos_response.json()['results']]
|
||||
|
||||
def load(self) -> List[NebulaVideo]:
|
||||
videos = []
|
||||
try:
|
||||
videos = self._load()
|
||||
videos = self.load_from_api()
|
||||
finally:
|
||||
self.driver.quit()
|
||||
return videos
|
||||
|
||||
def _load(self) -> List[NebulaVideo]:
|
||||
self.driver.get('https://nebula.app/login')
|
||||
|
||||
username_input = '//*[@name="email"]'
|
||||
password_input = '//*[@name="password"]'
|
||||
login_submit = '//*[@id="NebulaApp"]/div[2]/div[2]/div[1]/div/form/button'
|
||||
|
||||
self.driver.find_element(By.XPATH, username_input).send_keys(self.username)
|
||||
self.driver.find_element(By.XPATH, password_input).send_keys(self.password)
|
||||
self.driver.find_element(By.XPATH, login_submit).click()
|
||||
|
||||
delay = 3
|
||||
# wait for "My shows" link
|
||||
wait = WebDriverWait(self.driver, delay)
|
||||
myshows = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, 'menu a[href|="/myshows"]')))
|
||||
myshows.click()
|
||||
myshows = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, 'h2')))
|
||||
|
||||
video_links = []
|
||||
count_remaining = 5
|
||||
follower_error_re = re.compile("You aren't following any creators yet.*")
|
||||
while not video_links and count_remaining > 0:
|
||||
time.sleep(2)
|
||||
soup = BeautifulSoup(self.driver.page_source, features="lxml")
|
||||
follower_error = False
|
||||
follower_error = [p for p in soup.find_all('p') if p.find(string=follower_error_re)]
|
||||
if follower_error:
|
||||
print('Error loading videos, reloading page')
|
||||
self.driver.refresh()
|
||||
count_remaining = 5
|
||||
else:
|
||||
all_anchors = soup.find_all('a')
|
||||
video_links = [a for a in all_anchors if a.get('href').startswith('/videos/') and a.get('aria-hidden')]
|
||||
count_remaining -= 1
|
||||
return [NebulaLoader._parse_anchor(v) for v in video_links]
|
||||
|
Reference in New Issue
Block a user