From 0fef0b99a4a465ea0dd73d0a05a97c1199b1c974 Mon Sep 17 00:00:00 2001 From: Luca Martinelli Date: Fri, 29 Jan 2021 12:06:51 +0100 Subject: [PATCH] Moodle UniPD support --- serie_dl/__init__.py | 129 +++++++++++++----------- serie_dl/content_parser.py | 12 ++- serie_dl/cookie_saver.py | 42 ++++++++ serie_dl/parsers/moodle_unipd_parser.py | 119 ++++++++++++++++++++++ 4 files changed, 243 insertions(+), 59 deletions(-) create mode 100644 serie_dl/cookie_saver.py create mode 100644 serie_dl/parsers/moodle_unipd_parser.py diff --git a/serie_dl/__init__.py b/serie_dl/__init__.py index e63ffe0..84fd527 100644 --- a/serie_dl/__init__.py +++ b/serie_dl/__init__.py @@ -4,7 +4,7 @@ import argparse import json import csv -import os +from serie_dl.cookie_saver import CookieSaver args_parser = argparse.ArgumentParser( description="Download multiple files (serie's episodes or movies) using youtube-dl") @@ -20,6 +20,9 @@ args_parser.add_argument("-c", "--conf", dest="configname", help="Use custom config file") args_parser.set_defaults(configname="default.ini") +args_parser.add_argument("--cookie", dest="save_cookies", action="store_true", + help="Open browser and save cookie file") +args_parser.set_defaults(save_cookies=False) args = args_parser.parse_args() @@ -121,7 +124,8 @@ def get_configs(): # load config from config file, otherwise use defaults values view_log = config["GLOBAL"].getboolean("view_log") - chrome_location = config["PARSER"].get("chrome_location") if config["PARSER"].get("chrome_location") != "" else None + chrome_location = config["PARSER"].get( + "chrome_location") if config["PARSER"].get("chrome_location") != "" else None chromedriver_location = config["PARSER"].get("chromedriver_location") headless = config["PARSER"].getboolean("headless") elapse_time = config["PARSER"].getint("elapse_time") @@ -154,59 +158,70 @@ def main(): # get options from config file parser_options, downloader_options = get_configs() - # set custom download folder if passed to args - if args.outputfolder is not None: - downloader_options["download_folder"] = args.outputfolder - - # get already parse contents from input file, otherwise use None - contents_parsed = None - if args.parsedfile is not None: - try: - with open(args.parsedfile, "r") as f: - contents_parsed = json.load(f) - except Exception as e: - print("[WARNING]", e) - - # if no passed already parsed file, parse movies and series - if contents_parsed is None: - # setup parser - content_parser = ContentParser(options=parser_options) - - content_to_parse = [] - - # if csv file is given in args, get infos from it, otherwise ask to user - if args.sourcefile is None: - content_to_parse = get_input_contents() - else: - content_to_parse = parse_csv() - - # parse contents - contents_parsed = content_parser.parse_contents(content_to_parse) - - # if onlyparse, then save parsed contents to parsed_data.json, otherwise, start download files - if not args.onlyparse: - # setup downloader - downloader = ContentDownloader(options=downloader_options) - download_success, download_failed = downloader.download_contents( - contents_parsed) - - print("\n[DOWNLOADED] Successfull downloads:", len(download_success)) - print("[FAILED] Failed downloads (see log.txt):", len(download_failed)) - - # save failed downloads to log.txt - with open("log.txt", "a+") as f: - for failed in download_failed: - if failed["type"] == "movie": - f.write(downloader_options["movie_tmpl"].format( - movie_title=failed["title"])) - else: - f.write(downloader_options["serie_tmpl"].format(serie_name=failed["serie_title"], - season_num=failed["season"], - episode_num=failed["episode"], - episode_title=failed["title"])) - + if args.save_cookies: + cookie_saver = CookieSaver(parser_options) + save = input("When you are ready, press s to save cookies: ") + while save != "s": + save = input("When you are ready, press s to save cookies: ") + if save == "s": + cookie_saver.save_cookies() else: - # save parsed contents to parsed_data.json - with open('parsed_data.json', 'w') as f: - json.dump(contents_parsed, f) - print("\n[SUCCESS] Parsed data saved in parsed_data.json") + # set custom download folder if passed to args + if args.outputfolder is not None: + downloader_options["download_folder"] = args.outputfolder + + # get already parse contents from input file, otherwise use None + contents_parsed = None + if args.parsedfile is not None: + try: + with open(args.parsedfile, "r") as f: + contents_parsed = json.load(f) + except Exception as e: + print("[WARNING]", e) + + # if no passed already parsed file, parse movies and series + if contents_parsed is None: + # setup parser + content_parser = ContentParser(options=parser_options) + + content_to_parse = [] + + # if csv file is given in args, get infos from it, otherwise ask to user + if args.sourcefile is None: + content_to_parse = get_input_contents() + else: + content_to_parse = parse_csv() + + # parse contents + contents_parsed = content_parser.parse_contents(content_to_parse) + + # if onlyparse, then save parsed contents to parsed_data.json, otherwise, start download files + if not args.onlyparse: + # setup downloader + downloader = ContentDownloader(options=downloader_options) + download_success, download_failed = downloader.download_contents( + contents_parsed) + + print("\n[DOWNLOADED] Successfull downloads:", + len(download_success)) + print("[FAILED] Failed downloads (see log.txt):", + len(download_failed)) + + # save failed downloads to log.txt + with open("log.txt", "a+") as f: + for failed in download_failed: + print(failed) + if "type" in failed.keys() and failed["type"] == "movie": + f.write(downloader_options["movie_tmpl"].format( + movie_title=failed["title"])) + else: + f.write(downloader_options["serie_tmpl"].format(serie_name=failed["serie_title"], + season_num=failed["season"], + episode_num=failed["episode"], + episode_title=failed["title"])) + + else: + # save parsed contents to parsed_data.json + with open('parsed_data.json', 'w') as f: + json.dump(contents_parsed, f) + print("\n[SUCCESS] Parsed data saved in parsed_data.json") diff --git a/serie_dl/content_parser.py b/serie_dl/content_parser.py index e90576f..df7c4d1 100644 --- a/serie_dl/content_parser.py +++ b/serie_dl/content_parser.py @@ -1,5 +1,3 @@ -from serie_dl.parsers.animeunity_parser import AnimeUnityParser -from urllib import parse from selenium import webdriver from selenium.webdriver.common.desired_capabilities import DesiredCapabilities from urllib.parse import urlparse @@ -7,6 +5,8 @@ from serie_dl.parsers.vvvvid_parser import VVVVIDParser from serie_dl.parsers.seriehd_parser import SerieHDParser from serie_dl.parsers.animeunity_parser import AnimeUnityParser +from serie_dl.parsers.moodle_unipd_parser import MoodleUniPDParser +import pickle class ContentParser: @@ -30,6 +30,7 @@ def __init__(self, options, custom_parser=None): "vvvvid": VVVVIDParser(self.__options), "guardaserie": GenioParser(self.__options), "animeunity": AnimeUnityParser(self.__options), + "elearning.dei.unipd": MoodleUniPDParser(self.__options), "seriehd": SerieHDParser(self.__options)} if custom_parser is not None: self.__site_parsers.update(custom_parser) @@ -239,3 +240,10 @@ def __setup_driver(self): # set driver self.__driver = webdriver.Chrome(executable_path=self.__options["chromedriver_location"], desired_capabilities=caps, options=chrome_options) + # load cookies if any + try: + cookies = pickle.load(open("cookies.pkl", "rb")) + for cookie in cookies: + self.__driver.add_cookie(cookie) + except: + pass diff --git a/serie_dl/cookie_saver.py b/serie_dl/cookie_saver.py new file mode 100644 index 0000000..83d45e7 --- /dev/null +++ b/serie_dl/cookie_saver.py @@ -0,0 +1,42 @@ +from selenium import webdriver +from selenium.webdriver.common.desired_capabilities import DesiredCapabilities +import pickle + + +class CookieSaver: + # default options (chrome_location: None get chrome binary automatically) + __options = {"chrome_location": None, + "chromedriver_location": "./chromedriver/chromedriver.exe", + "headless": True, + "elapse_time": 30, + "view_log": True} + __driver = None + + def __init__(self, options): + # update options with one given by the user (if there's) + if options is not None: + self.__options.update(options) + # open browser + self.__setup_driver() + + def set_options(self, options): + # update options + self.__options.update(options) + + def save_cookies(self): + pickle.dump(self.__driver.get_cookies(), open("cookies.pkl", "wb")) + self.__driver.quit() + + def __setup_driver(self): + # get network flow + caps = DesiredCapabilities.CHROME + caps['goog:loggingPrefs'] = {'performance': 'ALL'} + chrome_options = webdriver.ChromeOptions() + if self.__options["chrome_location"] is not None: + chrome_options.binary_location = self.__options["chrome_location"] + chrome_options.add_argument('--window-size=1080,720') + # hide info and warnings + chrome_options.add_argument('--log-level=3') + # set driver + self.__driver = webdriver.Chrome(executable_path=self.__options["chromedriver_location"], + desired_capabilities=caps, options=chrome_options) diff --git a/serie_dl/parsers/moodle_unipd_parser.py b/serie_dl/parsers/moodle_unipd_parser.py new file mode 100644 index 0000000..5a42e97 --- /dev/null +++ b/serie_dl/parsers/moodle_unipd_parser.py @@ -0,0 +1,119 @@ +import json +import time +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.common.by import By +from selenium.webdriver.support.ui import WebDriverWait + +class MoodleUniPDParser: + __options = {} + __content = {} + # ["movie"] for movie only, ["serie"] for serie only, + support = ["serie", "movie"] + __logged = False + + def __init__(self, options=None, content=None): + # update options with one given by the user (if there's) + if options is not None: + self.__options.update(options) + if content is not None: + self.__content.update(content) + + def __wait_login(self): + if self.__logged: + return True + logged = input("Press y when you're logged in: ") + while logged != "y": + logged = input("Press y when you're logged in: ") + if logged == "y": + self.__logged = True + return True + + # called to set content (eg. you can get page url) + def set_content(self, content): + self.__content.update(content) + + # driver is selenium webdriver + def parse_title(self, driver): + if self.__wait_login(): + return driver.find_element_by_css_selector(".heading-title").get_attribute("textContent").strip() + + def parse_movie_title(self, driver): + if self.__wait_login(): + return self.parse_title(driver) + + def parse_seasons(self, driver): + if self.__wait_login(): + return [driver.find_element_by_css_selector(".course-content")] + + # element is season element got by parse_seasons + def parse_episodes(self, driver, element): + ret_elements = [] + i = 1 + for episode_element in element.find_elements_by_css_selector(".modtype_kalvidres"): + ret_elements.append({ + "season": 1, + "episode": i, + "element": episode_element + }) + i += 1 + return ret_elements + + # element is episode element got by parse_episodes + def parse_episode_title(self, driver, element): + element_ep = element["element"] + return element_ep.find_element_by_css_selector(".instancename").text + + # element is episode element got by parse_episodes + def parse_episode_link(self, driver, element): + element_ep = element["element"] + return element_ep.find_element_by_css_selector("a").get_attribute("href") + + # element is episode element got by parse_episodes + def parse_ep_ss_num(self, driver, element): + return [element["season"], element["episode"]] + + def parse_dwn_url(self, driver): + # start video + wait = WebDriverWait(driver, 10) + videoplayer = wait.until(EC.presence_of_element_located( + (By.CSS_SELECTOR, "#contentframe"))) + driver.get(videoplayer.get_attribute("src")) + + player_btn = wait.until(EC.presence_of_element_located( + (By.CSS_SELECTOR, "#kplayer"))) + player_btn.click() + + # initialize video download url + video_dwl_url = None + + # get current time (if no responses after elapsed_time, exit and return None) + start_time = time.time() + elapsed_time = 0 # no time elapsed + + # while no video found or elapsed time not passed, try to get download link from network flow + while video_dwl_url is None and elapsed_time <= self.__options["elapse_time"]: + # get network flow + browser_log = driver.get_log("performance") + events = [] + for entry in browser_log: + events.append(json.loads(entry["message"])["message"]) + + # check each network request, if contains master.m3u8, it is the download link + for e in events: + try: + if e["params"]["response"]["url"].find("index.m3u8") >= 0: + video_dwl_url = e["params"]["response"]["url"] + except KeyError: + pass + # update elapsed time + elapsed_time = time.time() - start_time + + # if video download got, then return it, otherwise return exception + if video_dwl_url is not None: + return video_dwl_url + else: + raise Exception("Error on getting download link") + + def parse_dwl_url_movie(self, driver): + if self.__wait_login(): + return self.parse_dwn_url(driver)