diff --git a/src/cc2olx/main.py b/src/cc2olx/main.py
index 524a7ab..197ebec 100644
--- a/src/cc2olx/main.py
+++ b/src/cc2olx/main.py
@@ -2,14 +2,13 @@
import shutil
import sys
import tempfile
-
from pathlib import Path
-from cc2olx import filesystem
-from cc2olx import olx
+from cc2olx import filesystem, olx, settings
from cc2olx.cli import parse_args, RESULT_TYPE_FOLDER, RESULT_TYPE_ZIP
-from cc2olx.models import Cartridge, OLX_STATIC_DIR
-from cc2olx.settings import collect_settings
+from cc2olx.constants import OLX_STATIC_DIR
+from cc2olx.models import Cartridge
+from cc2olx.parser import parse_options
def convert_one_file(input_file, workspace, link_file=None, passport_file=None):
@@ -47,32 +46,31 @@ def convert_one_file(input_file, workspace, link_file=None, passport_file=None):
def main():
- parsed_args = parse_args()
- settings = collect_settings(parsed_args)
+ args = parse_args()
+ options = parse_options(args)
- workspace = settings["workspace"]
- link_file = settings["link_file"]
- passport_file = settings["passport_file"]
+ workspace = options["workspace"]
+ link_file = options["link_file"]
+ passport_file = options["passport_file"]
# setup logger
- logging_config = settings["logging_config"]
- logging.basicConfig(level=logging_config["level"], format=logging_config["format"])
+ logging.basicConfig(level=options["log_level"], format=settings.LOG_FORMAT)
logger = logging.getLogger()
with tempfile.TemporaryDirectory() as tmpdirname:
temp_workspace = Path(tmpdirname) / workspace.stem
- for input_file in settings["input_files"]:
+ for input_file in options["input_files"]:
try:
convert_one_file(input_file, temp_workspace, link_file, passport_file)
except Exception:
logger.exception("Error while converting %s file", input_file)
- if settings["output_format"] == RESULT_TYPE_FOLDER:
+ if options["output_format"] == RESULT_TYPE_FOLDER:
shutil.rmtree(str(workspace), ignore_errors=True)
shutil.copytree(str(temp_workspace), str(workspace))
- if settings["output_format"] == RESULT_TYPE_ZIP:
+ if options["output_format"] == RESULT_TYPE_ZIP:
shutil.make_archive(str(workspace), "zip", str(temp_workspace))
logger.info("Conversion completed")
diff --git a/src/cc2olx/models.py b/src/cc2olx/models.py
index c8510d0..8d2051d 100644
--- a/src/cc2olx/models.py
+++ b/src/cc2olx/models.py
@@ -1,17 +1,15 @@
-import imghdr
import logging
import os.path
import re
-from textwrap import dedent
import zipfile
+from pathlib import Path
+from textwrap import dedent
+from typing import List, Optional
from cc2olx import filesystem
from cc2olx.external.canvas import ModuleMeta
-from cc2olx.qti import QtiParser
from cc2olx.utils import clean_file_name
-from .utils import simple_slug
-
logger = logging.getLogger()
MANIFEST = "imsmanifest.xml"
@@ -24,22 +22,6 @@
DIFFUSE_SHALLOW_SECTIONS = False
DIFFUSE_SHALLOW_SUBSECTIONS = True
-OLX_STATIC_DIR = "static"
-
-OLX_DIRECTORIES = [
- "about",
- "assets",
- "chapter",
- "course",
- "html",
- "info",
- "policies",
- "problem",
- "sequential",
- OLX_STATIC_DIR,
- "vertical",
-]
-
def is_leaf(container):
return "identifierref" in container
@@ -86,7 +68,7 @@ def __init__(self, cartridge_file, workspace):
self.module_meta = {}
# List of static files that are outside of `web_resources` directory, but still required
- self.extra_static_files = []
+ self._extra_static_files = []
self.workspace = workspace
@@ -99,6 +81,16 @@ def __repr__(self):
)
return text
+ @property
+ def extra_static_files(self) -> List[str]:
+ """
+ Provides an extra static files list.
+ """
+ return self._extra_static_files
+
+ def add_extra_static_file(self, value: str) -> None:
+ self._extra_static_files.append(value)
+
def process_canvas_cc(self, elements):
"""
Perform canvas cc specific processing.
@@ -310,102 +302,15 @@ def flatten(self, container):
output.extend(leaves)
return output
- def get_resource_content(self, identifier):
+ def define_resource(self, idref: Optional[str]) -> dict:
"""
- Get the resource named by `identifier`.
-
- If the resource can be retrieved, returns a tuple: the first element
- indicates the type of content, either "html" or "link". The second
- element is a dict with details, which vary by the type.
-
- If the resource can't be retrieved, returns a tuple of None, None.
-
+ Define a resource by its identifier.
"""
- res = self.resources_by_id.get(identifier)
- if res is None and self.is_canvas_flavor:
- res = self.resources_by_id.get(self.module_meta.get_identifierref(identifier))
- if res is None:
- logger.info("Missing resource: %s", identifier)
- return None, None
-
- res_type = res["type"]
-
- if res_type == "webcontent":
- res_relative_path = res["children"][0].href
- res_filename = self._res_filename(res_relative_path)
- if res_filename.suffix == ".html":
- try:
- with open(str(res_filename), encoding="utf-8") as res_file:
- html = res_file.read()
- except: # noqa: E722
- logger.error("Failure reading %s from id %s", res_filename, identifier) # noqa: E722
- raise
- return "html", {"html": html}
- elif "web_resources" in str(res_filename) and imghdr.what(str(res_filename)):
- static_filename = str(res_filename).split("web_resources/")[1]
- olx_static_path = "/{}/{}".format(OLX_STATIC_DIR, static_filename)
- html = (
- '
'
- '
'.format(olx_static_path, static_filename)
- )
- return "html", {"html": html}
- elif "web_resources" not in str(res_filename):
- # This webcontent is outside of ``web_resources`` directory
- # So we need to manually copy it to OLX_STATIC_DIR
- self.extra_static_files.append(res_relative_path)
- olx_static_path = "/{}/{}".format(OLX_STATIC_DIR, res_relative_path)
- html = (
- ''
- '{}
'.format(
- olx_static_path, res_relative_path, res_relative_path
- )
- )
- return "html", {"html": html}
- else:
- logger.info("Skipping webcontent: %s", res_filename)
- return None, None
-
- # Match any of imswl_xmlv1p1, imswl_xmlv1p2 etc
- elif re.match(r"^imswl_xmlv\d+p\d+$", res_type):
- tree = filesystem.get_xml_tree(self._res_filename(res["children"][0].href))
- root = tree.getroot()
- namespaces = {
- "imswl_xmlv1p1": "http://www.imsglobal.org/xsd/imsccv1p1/imswl_v1p1",
- "imswl_xmlv1p2": "http://www.imsglobal.org/xsd/imsccv1p2/imswl_v1p2",
- "imswl_xmlv1p3": "http://www.imsglobal.org/xsd/imsccv1p3/imswl_v1p3",
- }
- ns = {"wl": namespaces[res_type]}
- title = root.find("wl:title", ns).text
- url = root.find("wl:url", ns).get("href")
- return "link", {"href": url, "text": title}
-
- # Match any of imsbasiclti_xmlv1p0, imsbasiclti_xmlv1p3 etc
- elif re.match(r"^imsbasiclti_xmlv\d+p\d+$", res_type):
- data = self._parse_lti(res)
- # Canvas flavored courses have correct url in module meta for lti links
- if self.is_canvas_flavor:
- item_data = self.module_meta.get_external_tool_item_data(identifier)
- if item_data:
- data["launch_url"] = item_data.get("url", data["launch_url"])
- return "lti", data
-
- # Match any of imsqti_xmlv1p2/imscc_xmlv1p1/assessment, imsqti_xmlv1p3/imscc_xmlv1p3/assessment etc
- elif re.match(r"^imsqti_xmlv\d+p\d+/imscc_xmlv\d+p\d+/assessment$", res_type):
- res_filename = self._res_filename(res["children"][0].href)
- qti_parser = QtiParser(res_filename)
- return "qti", qti_parser.parse_qti()
-
- # Match any of imsdt_xmlv1p1, imsdt_xmlv1p2, imsdt_xmlv1p3 etc
- elif re.match(r"^imsdt_xmlv\d+p\d+$", res_type):
- data = self._parse_discussion(res, res_type)
- return "discussion", data
-
- else:
- text = f"Unimported content: type = {res_type!r}"
- if "href" in res:
- text += ", href = {!r}".format(res["href"])
- logger.info("%s", text)
- return "html", {"html": text}
+ resource = self.resources_by_id.get(idref)
+ if resource is None and self.is_canvas_flavor:
+ module_item_idref = self.module_meta.get_identifierref(idref)
+ resource = self.resources_by_id.get(module_item_idref)
+ return resource
def load_manifest_extracted(self):
manifest = self._extract()
@@ -480,6 +385,12 @@ def get_course_run(self):
# TODO: find a better value for this; lifecycle.contribute_date?
return "run"
+ def build_res_file_path(self, file_name: str) -> Path:
+ """
+ Build the resource file path.
+ """
+ return self.directory / file_name
+
def _extract(self):
path_extracted = filesystem.unzip_directory(self.file_path, self.workspace)
self.directory = path_extracted
@@ -511,11 +422,11 @@ def _update_namespaces(self, root):
)
def _parse_manifest(self, node):
- data = dict()
- data["metadata"] = self._parse_metadata(node)
- data["organizations"] = self._parse_organizations(node)
- data["resources"] = self._parse_resources(node)
- return data
+ return {
+ "metadata": self._parse_metadata(node),
+ "organizations": self._parse_organizations(node),
+ "resources": self._parse_resources(node),
+ }
def _clean_manifest(self, node):
"""
@@ -716,83 +627,3 @@ def _parse_dependency(self, node):
def _parse_resource_metadata(self, node):
# TODO: this
return None
-
- def _res_filename(self, file_name):
- return self.directory / file_name
-
- def _parse_lti(self, resource):
- """
- Parses LTI resource.
- """
-
- tree = filesystem.get_xml_tree(self._res_filename(resource["children"][0].href))
- root = tree.getroot()
- ns = {
- "blti": "http://www.imsglobal.org/xsd/imsbasiclti_v1p0",
- "lticp": "http://www.imsglobal.org/xsd/imslticp_v1p0",
- "lticm": "http://www.imsglobal.org/xsd/imslticm_v1p0",
- }
- title = root.find("blti:title", ns).text
- description = root.find("blti:description", ns).text
- launch_url = root.find("blti:secure_launch_url", ns)
- if launch_url is None:
- launch_url = root.find("blti:launch_url", ns)
- if launch_url is not None:
- launch_url = launch_url.text
- else:
- launch_url = ""
- width = root.find("blti:extensions/lticm:property[@name='selection_width']", ns)
- if width is None:
- width = "500"
- else:
- width = width.text
- height = root.find("blti:extensions/lticm:property[@name='selection_height']", ns)
- if height is None:
- height = "500"
- else:
- height = height.text
- custom = root.find("blti:custom", ns)
- if custom is None:
- parameters = dict()
- else:
- parameters = {option.get("name"): option.text for option in custom}
- # For Canvas flavored CC, tool_id can be used as lti_id if present
- tool_id = root.find("blti:extensions/lticm:property[@name='tool_id']", ns)
- if tool_id is None:
- # Create a simple slug lti_id from title
- lti_id = simple_slug(title)
- else:
- lti_id = tool_id.text
- data = {
- "title": title,
- "description": description,
- "launch_url": launch_url,
- "height": height,
- "width": width,
- "custom_parameters": parameters,
- "lti_id": lti_id,
- }
- return data
-
- def _parse_discussion(self, res, res_type):
- """
- Parses discussion content.
- """
-
- namespaces = {
- "imsdt_xmlv1p1": "http://www.imsglobal.org/xsd/imsccv1p1/imsdt_v1p1",
- "imsdt_xmlv1p2": "http://www.imsglobal.org/xsd/imsccv1p2/imsdt_v1p2",
- "imsdt_xmlv1p3": "http://www.imsglobal.org/xsd/imsccv1p3/imsdt_v1p3",
- }
-
- data = {"dependencies": []}
- for child in res["children"]:
- if isinstance(child, ResourceFile):
- tree = filesystem.get_xml_tree(self._res_filename(child.href))
- root = tree.getroot()
- ns = {"dt": namespaces[res_type]}
- data["title"] = root.find("dt:title", ns).text
- data["text"] = root.find("dt:text", ns).text
- elif isinstance(child, ResourceDependency):
- data["dependencies"].append(self.get_resource_content(child.identifierref))
- return data
diff --git a/src/cc2olx/olx.py b/src/cc2olx/olx.py
index d178488..4f6627e 100644
--- a/src/cc2olx/olx.py
+++ b/src/cc2olx/olx.py
@@ -1,14 +1,13 @@
-import html as HTMLParser
import json
import logging
-import re
-import urllib
import xml.dom.minidom
-from lxml import html
-from cc2olx.iframe_link_parser import KalturaIframeLinkParser
+from typing import List, Type
-from cc2olx.qti import QtiExport
-from cc2olx.utils import element_builder, passport_file_parser
+from cc2olx import settings
+from cc2olx.content_processors import AbstractContentProcessor
+from cc2olx.dataclasses import OlxGeneratorContext
+from cc2olx.iframe_link_parser import KalturaIframeLinkParser
+from cc2olx.utils import import_string, passport_file_parser
logger = logging.getLogger()
@@ -41,11 +40,17 @@ def __init__(self, cartridge, link_file=None, passport_file=None):
self.doc = None
self.link_file = link_file
self.passport_file = passport_file
- self.iframe_link_parser = None
- if link_file:
- self.iframe_link_parser = KalturaIframeLinkParser(self.link_file)
+ self.iframe_link_parser = KalturaIframeLinkParser(self.link_file) if link_file else None
self.lti_consumer_present = False
self.lti_consumer_ids = set()
+ self._content_processor_types = self._load_content_processor_types()
+
+ @staticmethod
+ def _load_content_processor_types() -> List[Type[AbstractContentProcessor]]:
+ """
+ Load content processor types.
+ """
+ return [import_string(processor_path) for processor_path in settings.CONTENT_PROCESSORS]
def xml(self):
self.doc = xml.dom.minidom.Document()
@@ -107,7 +112,7 @@ def policy(self):
lti_passports = self._get_lti_passport_list()
- if self.lti_consumer_present:
+ if self.lti_consumer_ids:
policy["course/course"]["advanced_modules"] = ["lti_consumer"]
if len(lti_passports):
@@ -156,8 +161,7 @@ def _add_olx_nodes(self, element, course_data, tags):
leaf = not tags
for element_data in course_data:
if leaf:
- content_type, details = self._get_content(element_data)
- children = self._create_olx_nodes(content_type, details)
+ children = self._create_olx_nodes(element_data)
else:
children = [self.doc.createElement(tags[0])]
@@ -174,127 +178,13 @@ def _add_olx_nodes(self, element, course_data, tags):
if "children" in element_data:
self._add_olx_nodes(child, element_data["children"], tags[1:])
- def _get_content(self, element_data):
- """
- Gets content type and details from element's data.
- """
-
- content_type = None
- details = None
-
- if "identifierref" in element_data:
- idref = element_data["identifierref"]
- content_type, details = self.cartridge.get_resource_content(idref)
-
- if content_type is None or not details:
- content_type = self.HTML
- details = {
- "html": "MISSING CONTENT
",
- }
-
- if content_type == self.LINK:
- content_type, details = process_link(details)
-
- return content_type, details
-
- def _process_static_links(self, html):
- """
- Process static links like src and href to have appropriate links.
- """
- items = re.findall(r'(src|href)\s*=\s*"(.+?)"', html)
-
- def process_wiki_reference(item, html):
- """
- Replace $WIKI_REFERENCE$ with edx /jump_to_id/
- """
- search_key = urllib.parse.unquote(item).replace("$WIKI_REFERENCE$/pages/", "")
-
- # remove query params and add suffix .html to match with resource_id_by_href
- search_key = search_key.split("?")[0] + ".html"
- for key in self.cartridge.resource_id_by_href.keys():
- if key.endswith(search_key):
- replace_with = "/jump_to_id/{}".format(self.cartridge.resource_id_by_href[key])
- html = html.replace(item, replace_with)
- return html
- logger.warn("Unable to process Wiki link - %s", item)
- return html
-
- def process_canvas_reference(item, html):
- """
- Replace $CANVAS_OBJECT_REFERENCE$ with edx /jump_to_id/
- """
- object_id = urllib.parse.unquote(item).replace("$CANVAS_OBJECT_REFERENCE$/quizzes/", "/jump_to_id/")
- html = html.replace(item, object_id)
- return html
-
- def process_ims_cc_filebase(item, html):
- """
- Replace $IMS-CC-FILEBASE$ with /static
- """
- new_item = urllib.parse.unquote(item).replace("$IMS-CC-FILEBASE$", "/static")
- # skip query parameters for static files
- new_item = new_item.split("?")[0]
- # & is not valid in an URL. But some file seem to have it when it should be &
- new_item = new_item.replace("&", "&")
- html = html.replace(item, new_item)
- return html
-
- def process_external_tools_link(item, html):
- """
- Replace $CANVAS_OBJECT_REFERENCE$/external_tools/retrieve with appropriate external link
- """
- external_tool_query = urllib.parse.urlparse(item).query
- # unescape query that has been HTML encoded so it can be parsed correctly
- unescaped_external_tool_query = HTMLParser.unescape(external_tool_query)
- external_tool_url = urllib.parse.parse_qs(unescaped_external_tool_query).get("url", [""])[0]
- html = html.replace(item, external_tool_url)
- return html
-
- for _, item in items:
- if "IMS-CC-FILEBASE" in item:
- html = process_ims_cc_filebase(item, html)
- elif "WIKI_REFERENCE" in item:
- html = process_wiki_reference(item, html)
- elif "external_tools" in item:
- html = process_external_tools_link(item, html)
- elif "CANVAS_OBJECT_REFERENCE" in item:
- html = process_canvas_reference(item, html)
-
- return html
-
- def _process_static_links_from_details(self, details):
- """
- Take a variable and recursively find & escape all static links within strings
-
- Args:
- self: self
- details: A dictionary or list of dictionaries containing node data.
-
- Returns:
- details: Returns detail data with static link
- escaped to an OLX-friendly format.
- """
-
- if isinstance(details, str):
- return self._process_static_links(details)
-
- if isinstance(details, list):
- for index, value in enumerate(details):
- details[index] = self._process_static_links_from_details(value)
- elif isinstance(details, dict):
- for key, value in details.items():
- details[key] = self._process_static_links_from_details(value)
-
- return details
-
- def _create_olx_nodes(self, content_type, details):
+ def _create_olx_nodes(self, element_data: dict):
"""
This helps to create OLX node of different type. For eg HTML, VIDEO, QTI, LTI,
Discussion.
Args:
- content_type ([str]): The type of node that has to be created.
- details (Dict[str, str]): Dictionary of the element and content of the element.
+ element_data (dict): a normalized CC element data.
Raises:
OlxExportException: Exception when nodes are not able to be created.
@@ -302,155 +192,16 @@ def _create_olx_nodes(self, content_type, details):
Returns:
[List]: List of OLX nodes that needs to be written.
"""
-
- nodes = []
- details = self._process_static_links_from_details(details)
-
- if content_type == self.HTML:
- nodes += self._process_html(details)
-
- elif content_type == self.VIDEO:
- nodes += self._create_video_node(details)
-
- elif content_type == self.LTI:
- # There is an LTI resource
- # Add lti_consumer in policy with lti_passports
- self.lti_consumer_present = True
- self.lti_consumer_ids.add(details["lti_id"])
- nodes.append(self._create_lti_node(details))
-
- elif content_type == self.QTI:
- qti_export = QtiExport(self.doc)
- nodes += qti_export.create_qti_node(details)
-
- elif content_type == self.DISCUSSION:
- nodes += self._create_discussion_node(details)
-
- else:
- raise OlxExportException(f'Content type "{content_type}" is not supported.')
-
- return nodes
-
- def _create_video_node(self, details):
- """
- This function creates Video OLX nodes.
-
- Args:
- details (Dict[str, str]): Dictionary that has Video tag value.
-
- Returns:
- [OLX Element]: Video OLX element.
- """
- xml_element = element_builder(self.doc)
- attributes = {"youtube": "1.00:" + details["youtube"], "youtube_id_1_0": details["youtube"]}
- child = xml_element("video", children=None, attributes=attributes)
- return [child]
-
- def _process_html(self, details):
- """
- This function helps to process the html and gives out
- corresponding HTML or Video OLX nodes.
-
- Args:
- details (Dict[str, str]): Dictionary that has HTML tag value.
-
- Returns:
- List[OLX Element]: List of html/Video OLX element.
- """
- video_olx = []
- nodes = []
- child = self.doc.createElement("html")
- html = self._process_static_links(details["html"])
- if self.link_file:
- html, video_olx = self._process_html_for_iframe(html)
- txt = self.doc.createCDATASection(html)
- child.appendChild(txt)
- nodes.append(child)
- for olx in video_olx:
- nodes.append(olx)
- return nodes
-
- def _process_html_for_iframe(self, html_str):
- """
- This function helps to parse the iframe with
- embedded video, to be converted into video xblock.
-
- Args:
- html_str ([str]): Html file content.
-
- Returns:
- html_str [str]: The html content of the file, if iframe is present
- and converted into xblock then iframe is removed
- from the HTML.
- video_olx [List[xml]]: List of xml children, i.e video xblock.
- """
- video_olx = []
- parsed_html = html.fromstring(html_str)
- iframes = parsed_html.xpath("//iframe")
- if not iframes:
- return html_str, video_olx
- video_olx, converted_iframes = self.iframe_link_parser.get_video_olx(self.doc, iframes)
- if video_olx:
- # If video xblock is present then we modify the HTML to remove the iframe
- # hence we need to convert the modified HTML back to string. We also remove
- # the parent if there are no other children.
- for iframe in converted_iframes:
- parent = iframe.getparent()
- parent.remove(iframe)
- if not parent.getchildren():
- parent.getparent().remove(parent)
- return html.tostring(parsed_html).decode("utf-8"), video_olx
- return html_str, video_olx
-
- def _create_lti_node(self, details):
- node = self.doc.createElement("lti_consumer")
- custom_parameters = "[{params}]".format(
- params=", ".join(
- [
- '"{key}={value}"'.format(
- key=key,
- value=value,
- )
- for key, value in details["custom_parameters"].items()
- ]
- ),
+ idref = element_data.get("identifierref")
+ context = OlxGeneratorContext(
+ iframe_link_parser=self.iframe_link_parser,
+ lti_consumer_ids=self.lti_consumer_ids,
)
- node.setAttribute("custom_parameters", custom_parameters)
- node.setAttribute("description", details["description"])
- node.setAttribute("display_name", details["title"])
- node.setAttribute("inline_height", details["height"])
- node.setAttribute("inline_width", details["width"])
- node.setAttribute("launch_url", details["launch_url"])
- node.setAttribute("modal_height", details["height"])
- node.setAttribute("modal_width", details["width"])
- node.setAttribute("xblock-family", "xblock.v1")
- node.setAttribute("lti_id", details["lti_id"])
- return node
-
- def _create_discussion_node(self, details):
- node = self.doc.createElement("discussion")
- node.setAttribute("display_name", "")
- node.setAttribute("discussion_category", details["title"])
- node.setAttribute("discussion_target", details["title"])
- html_node = self.doc.createElement("html")
- txt = "MISSING CONTENT" if details["text"] is None else details["text"]
- txt = self.doc.createCDATASection(txt)
- html_node.appendChild(txt)
- return [html_node, node]
-
-
-def process_link(details):
- """
- Possibly convert a link to a video.
- """
- # YouTube links can be like this: https://www.youtube.com/watch?v=gQ-cZRmHfs4&list=PL5B350D511278A56B
- ytmatch = re.search(r"youtube.com/watch\?v=([-\w]+)", details["href"])
- if ytmatch:
- return "video", {"youtube": ytmatch.group(1)}
+ for processor_type in self._content_processor_types:
+ processor = processor_type(self.cartridge, context)
- details = {
- "html": "{}".format(details["href"], details.get("text", "")),
- }
+ if olx_nodes := processor.process(idref):
+ return olx_nodes
- return "html", details
+ raise OlxExportException(f'The resource with "{idref}" identifier value is not supported.')
diff --git a/src/cc2olx/parser.py b/src/cc2olx/parser.py
new file mode 100644
index 0000000..9e5c8d2
--- /dev/null
+++ b/src/cc2olx/parser.py
@@ -0,0 +1,45 @@
+from pathlib import Path
+
+COMMON_CARTRIDGE_FILE_EXTENSION = ".imscc"
+
+
+def _is_cartridge_file(path):
+ return path.is_file() and path.suffix == COMMON_CARTRIDGE_FILE_EXTENSION
+
+
+def _get_files(parsed_args):
+ """
+ Collects all Common Cartridge files from list of files and directories.
+ """
+
+ files = set()
+
+ for path in parsed_args.inputs:
+ if not path.exists():
+ raise FileNotFoundError
+
+ if _is_cartridge_file(path):
+ files.add(path)
+
+ if path.is_dir():
+ for input_file in path.iterdir():
+ if _is_cartridge_file(input_file):
+ files.add(input_file)
+
+ return files
+
+
+def parse_options(args):
+ """
+ Parses script options from argparse arguments.
+ """
+ input_files = _get_files(args)
+
+ return {
+ "input_files": input_files,
+ "output_format": args.result,
+ "log_level": args.loglevel,
+ "workspace": Path.cwd() / args.output,
+ "link_file": args.link_file,
+ "passport_file": args.passport_file,
+ }
diff --git a/src/cc2olx/qti.py b/src/cc2olx/qti.py
deleted file mode 100644
index 444ab7a..0000000
--- a/src/cc2olx/qti.py
+++ /dev/null
@@ -1,624 +0,0 @@
-import logging
-import re
-import urllib.parse
-import xml.dom.minidom
-from collections import OrderedDict
-from html import unescape
-
-from lxml import etree, html
-
-from cc2olx import filesystem
-
-from .utils import element_builder
-
-logger = logging.getLogger()
-
-# problem types
-MULTIPLE_CHOICE = "cc.multiple_choice.v0p1"
-MULTIPLE_RESPONSE = "cc.multiple_response.v0p1"
-FILL_IN_THE_BLANK = "cc.fib.v0p1"
-ESSAY = "cc.essay.v0p1"
-BOOLEAN = "cc.true_false.v0p1"
-PATTERN_MATCH = "cc.pattern_match.v0p1"
-RESPROCESSING_TYPES = ["general_fb", "correct_fb", "general_incorrect_fb"]
-
-
-class QtiError(Exception):
- """
- Exception type for Qti parsing/conversion errors.
- """
-
-
-class QtiExport:
- """
- Contains methods for processing and conversion
- IMS Question & Test Interoperability (QTI) <= v1.2 into OLX markup
- """
-
- FIB_PROBLEM_TEXTLINE_SIZE_BUFFER = 10
-
- def __init__(self, root_xml_doc):
- self.doc = root_xml_doc
-
- def create_qti_node(self, details):
- """
- Creates OLX xml node, that represents content of unit with problems.
-
- Args:
- details: list of dictionaries, where each contains data to
- render problem.
- """
-
- problems = []
-
- for problem_data in details:
- cc_profile = problem_data["cc_profile"]
- create_problem = self._problem_creators_map.get(cc_profile)
-
- if create_problem is None:
- raise QtiError('Unknown cc_profile: "{}"'.format(problem_data["cc_profile"]))
-
- problem = create_problem(problem_data)
-
- # sometimes we might want to have additional items from one cc item
- if isinstance(problem, list) or isinstance(problem, tuple):
- problems += problem
- else:
- problems.append(problem)
-
- return problems
-
- @property
- def _problem_creators_map(self):
- """
- Returns: mapping between Common Cartridge profile value and function
- that creates actual problem node.
-
- Note: Since True/False problems in OLX are constructed identically to
- OLX Multiple Choice problems, we reuse `_create_multiple_choice_problem`
- for BOOLEAN type problems
- """
- return {
- MULTIPLE_CHOICE: self._create_multiple_choice_problem,
- MULTIPLE_RESPONSE: self._create_multiple_response_problem,
- FILL_IN_THE_BLANK: self._create_fib_problem,
- ESSAY: self._create_essay_problem,
- BOOLEAN: self._create_multiple_choice_problem,
- PATTERN_MATCH: self._create_pattern_match_problem,
- }
-
- def _create_problem_description(self, description_html_str):
- """
- Material texts can come in form of escaped HTML markup, which
- can't be considered as valid XML. ``xml.dom.minidom`` has no
- features to convert HTML to XML, so we use lxml parser here.
-
- Args:
- description_html_str: escaped HTML string
-
- Returns: instance of ``xml.dom.minidom.Node``
- """
- description_html_str = unescape(description_html_str)
-
- description_html_str = urllib.parse.unquote(description_html_str)
-
- element = html.fromstring(description_html_str)
- xml_string = etree.tostring(element)
- description = xml.dom.minidom.parseString(xml_string).firstChild
-
- return description
-
- def _add_choice(self, parent, is_correct, text):
- """
- Appends choices to given ``checkboxgroup`` or ``choicegroup`` parent.
- """
- choice = self.doc.createElement("choice")
- choice.setAttribute("correct", "true" if is_correct else "false")
- self._set_text(choice, text)
- parent.appendChild(choice)
-
- def _set_text(self, node, new_text):
- text_node = self.doc.createTextNode(new_text)
- node.appendChild(text_node)
-
- def _create_multiple_choice_problem(self, problem_data):
- """
- Creates XML node of problem.
- """
-
- problem = self.doc.createElement("problem")
- problem_content = self.doc.createElement("multiplechoiceresponse")
-
- problem_description = self._create_problem_description(problem_data["problem_description"])
-
- choice_group = self.doc.createElement("choicegroup")
- choice_group.setAttribute("type", "MultipleChoice")
-
- for choice_data in problem_data["choices"].values():
- self._add_choice(choice_group, choice_data["correct"], choice_data["text"])
-
- problem_content.appendChild(problem_description)
- problem_content.appendChild(choice_group)
- problem.appendChild(problem_content)
-
- return problem
-
- def _create_multiple_response_problem(self, problem_data):
- """
- Create XML node for multiple response problem. Sets partial_credit to EDC by default.
- """
-
- el = element_builder(self.doc)
-
- problem_description = self._create_problem_description(problem_data["problem_description"])
-
- # fmt: off
- problem = el('problem', [
- el('choiceresponse', [
-
- problem_description,
-
- el('checkboxgroup', [
- el('choice',
- choice['text'],
- {'correct': 'true' if choice['correct'] else 'false'}
- )
- for choice in problem_data['choices'].values()
- ], {'type': 'MultipleChoice'})
-
- ], {'partial_credit': 'EDC'})
- ])
- # fmt: on
- return problem
-
- def _create_fib_problem(self, problem_data):
- """
- Creates XML node of fill in the blank problems
- """
-
- # Track maximum answer length for textline at the bottom
- max_answer_length = 0
-
- problem = self.doc.createElement("problem")
-
- # Set the primary answer on the stringresponse
- # and set the type to case insensitive
- problem_content = self.doc.createElement("stringresponse")
- problem_content.setAttribute("answer", problem_data["answer"])
- problem_content.setAttribute("type", self._build_fib_problem_type(problem_data))
-
- if len(problem_data["answer"]) > max_answer_length:
- max_answer_length = len(problem_data["answer"])
-
- problem_description = self._create_problem_description(problem_data["problem_description"])
- problem_content.appendChild(problem_description)
-
- # For any (optional) additional accepted answers, add an
- # additional_answer element with that answer
- for answer in problem_data.get("additional_answers", []):
- additional_answer = self.doc.createElement("additional_answer")
- additional_answer.setAttribute("answer", answer)
- problem_content.appendChild(additional_answer)
-
- if len(answer) > max_answer_length:
- max_answer_length = len(answer)
-
- # Add a textline element with the max answer length plus a buffer
- textline = self.doc.createElement("textline")
- textline.setAttribute("size", str(max_answer_length + self.FIB_PROBLEM_TEXTLINE_SIZE_BUFFER))
- problem_content.appendChild(textline)
-
- problem.appendChild(problem_content)
-
- return problem
-
- @staticmethod
- def _build_fib_problem_type(problem_data):
- """
- Build `stringresponse` OLX type for a fill in the blank problem.
- """
- problem_types = ["ci"]
-
- if problem_data["is_regexp"]:
- problem_types.append("regexp")
-
- return " ".join(problem_types)
-
- def _create_essay_problem(self, problem_data):
- """
- Given parsed essay problem data, returns a openassessment component. If a sample
- solution provided, returns that as a HTML block before openassessment.
- """
-
- description = problem_data["problem_description"]
-
- el = element_builder(self.doc)
-
- if any(key in RESPROCESSING_TYPES for key in problem_data.keys()):
- resp_samples = [
- el("name", "Feedback"),
- el("label", "Feedback"),
- el("prompt", "Example Feedback"),
- ]
-
- for desc, key in zip(["General", "Correct", "Incorrect"], RESPROCESSING_TYPES):
- resp_samples.append(
- el(
- "option",
- [el("name", desc), el("label", desc), el("explanation", problem_data.get(key, desc))],
- {"points": "0"},
- )
- )
- criterion = el("criterion", resp_samples, {"feedback": "optional"})
- else:
- criterion = el(
- "criterion",
- [
- el("name", "Ideas"),
- el("label", "Ideas"),
- el("prompt", "Example criterion"),
- el(
- "option",
- [el("name", "Poor"), el("label", "Poor"), el("explanation", "Explanation")],
- {"points": "0"},
- ),
- el(
- "option",
- [el("name", "Good"), el("label", "Good"), el("explanation", "Explanation")],
- {"points": "1"},
- ),
- ],
- {"feedback": "optional"},
- )
-
- # fmt: off
- ora = el(
- 'openassessment',
- [
- el('title', 'Open Response Assessment'),
- el('assessments', [
- el(
- 'assessment',
- None,
- attributes={'name': 'staff-assessment', 'required': 'True'}
- )
- ]),
- el('prompts', [
- el('prompt', [
- el('description', description)
- ])
- ]),
- el('rubric', [
- criterion,
- el('feedbackprompt', 'Feedback prompt text'),
- el('feedback_default_text', 'Feedback prompt default text'),
- ])
- ],
- {
- 'url_name': problem_data['ident'],
- 'text_response': 'required',
- 'prompts_type': 'html'
- }
- )
- # fmt: on
-
- # if a sample solution exists add on top of ora, because
- # olx doesn't have a sample solution equivalent.
- if problem_data.get("sample_solution"):
- child = el("html", self.doc.createCDATASection(problem_data["sample_solution"]))
- return child, ora
-
- return ora
-
- def _create_pattern_match_problem(self, problem_data):
- raise NotImplementedError
-
-
-class QtiParser:
- """
- Used to parse Qti xml resource.
- """
-
- # Xml namespaces
- NS = {"qti": "http://www.imsglobal.org/xsd/ims_qtiasiv1p2"}
-
- def __init__(self, resource_filename):
- self.resource_filename = resource_filename
-
- def parse_qti(self):
- """
- Parses resource of ``imsqti_xmlv1p2/imscc_xmlv1p1/assessment`` type.
- """
-
- tree = filesystem.get_xml_tree(self.resource_filename)
- root = tree.getroot()
-
- # qti xml can contain multiple problems represented by elements
- problems = root.findall(".//qti:section/qti:item", self.NS)
-
- parsed_problems = []
-
- for i, problem in enumerate(problems):
- data = {}
-
- attributes = problem.attrib
-
- # We're adding unique string to identifier here to handle cases,
- # when we're getting malformed course (due to a weird Canvas behaviour)
- # with equal identifiers. LMS doesn't support blocks with the same identifiers.
- data["ident"] = attributes["ident"] + str(i)
- if title := attributes.get("title"):
- data["title"] = title
-
- cc_profile = self._parse_problem_profile(problem)
- data["cc_profile"] = cc_profile
-
- parse_problem = self._problem_parsers_map.get(cc_profile)
-
- if parse_problem is None:
- raise QtiError(f'Unknown cc_profile: "{cc_profile}"')
-
- try:
- data.update(parse_problem(problem))
- parsed_problems.append(data)
- except NotImplementedError:
- logger.info("Problem with ID %s can't be converted.", problem.attrib.get("ident"))
- logger.info(" Profile %s is not supported.", cc_profile)
- logger.info(" At file %s.", self.resource_filename)
-
- return parsed_problems
-
- def _parse_problem_profile(self, problem):
- """
- Returns ``cc_profile`` value from problem metadata. This field is mandatory for problem,
- so we throw exception if it's not present.
-
- Example of metadata structure:
- ```
-
-
-
- cc_profile
- cc.true_false.v0p1
-
-
-
- ```
- """
-
- metadata = problem.findall("qti:itemmetadata/qti:qtimetadata/qti:qtimetadatafield", self.NS)
-
- for field in metadata:
- label = field.find("qti:fieldlabel", self.NS).text
- entry = field.find("qti:fieldentry", self.NS).text
-
- if label == "cc_profile":
- return entry
-
- raise ValueError('Problem metadata must contain "cc_profile" field.')
-
- @property
- def _problem_parsers_map(self):
- """
- Returns: mapping between Common Cartridge profile value and function
- that parses actual problem node.
-
- Note: Since True/False problems in QTI are constructed identically to
- QTI Multiple Choice problems, we reuse `_parse_multiple_choice_problem`
- for BOOLEAN type problems
- """
- return {
- MULTIPLE_CHOICE: self._parse_multiple_choice_problem,
- MULTIPLE_RESPONSE: self._parse_multiple_response_problem,
- FILL_IN_THE_BLANK: self._parse_fib_problem,
- ESSAY: self._parse_essay_problem,
- BOOLEAN: self._parse_multiple_choice_problem,
- PATTERN_MATCH: self._parse_pattern_match_problem,
- }
-
- def _parse_fixed_answer_question_responses(self, presentation):
- """
- Returns dictionary where keys are response identifiers and values are
- response data.
-
- Example of ```` structure for the following profiles:
- - ``cc.multiple_choice.v0p1``
- - ``cc.multiple_response.v0p1``
- - ``cc.true_false.v0p1``
- ```
-
-
-
-
- Response 1
-
-
-
-
- Response 2
-
-
-
-
- ```
- """
- responses = OrderedDict()
-
- for response in presentation.findall("qti:response_lid/qti:render_choice/qti:response_label", self.NS):
- response_id = response.attrib["ident"]
- responses[response_id] = {
- "text": response.find("qti:material/qti:mattext", self.NS).text or "",
- "correct": False,
- }
-
- return responses
-
- def _mark_correct_responses(self, resprocessing, responses):
- """
- Example of ```` structure for the following profiles:
- - ``cc.multiple_choice.v0p1``
- - ``cc.true_false.v0p1``
- ```
-
-
-
-
-
-
- 8157
-
-
-
-
-
- 5534
-
-
-
-
-
- 4226
-
- 100
-
-
-
- ```
-
- This XML is a sort of instruction about how responses should be evaluated. In this
- particular example we have three correct answers with ids: 8157, 5534, 4226.
-
- Example of ```` structure for ``cc.multiple_response.v0p1``:
- ```
-
-
-
-
-
-
-
- 1759
-
- 5954
-
- 8170
- 9303
-
- 15
-
-
-
-
-
- ```
- Above example is for a multiple response type problem. In this example 1759, 8170 and
- 9303 are correct answers while 15 and 5954 are not. Note that this code also support
- ``or`` opearator too.
-
- For now, we just consider these responses correct in OLX, but according specification,
- conditions can be arbitrarily nested, and score can be computed by some formula, so to
- implement 100% conversion we need to write new XBlock.
- """
-
- for respcondition in resprocessing.findall("qti:respcondition", self.NS):
- correct_answers = respcondition.findall("qti:conditionvar/qti:varequal", self.NS)
-
- if len(correct_answers) == 0:
- correct_answers = respcondition.findall("qti:conditionvar/qti:and/qti:varequal", self.NS)
- correct_answers += respcondition.findall("qti:conditionvar/qti:or/qti:varequal", self.NS)
-
- for ans in correct_answers:
- responses[ans.text]["correct"] = True
-
- if respcondition.attrib.get("continue", "No") == "No":
- break
-
- def _parse_multiple_choice_problem(self, problem):
- """
- Returns ``problem_description``, ``choices`` and marks the correct answer
- """
- data = {}
-
- presentation = problem.find("qti:presentation", self.NS)
- resprocessing = problem.find("qti:resprocessing", self.NS)
-
- data["problem_description"] = presentation.find("qti:material/qti:mattext", self.NS).text
-
- data["choices"] = self._parse_fixed_answer_question_responses(presentation)
- self._mark_correct_responses(resprocessing, data["choices"])
-
- return data
-
- def _parse_multiple_response_problem(self, problem):
- """
- Returns ``problem_description``, ``choices`` and marks all the correct answers.
- """
- return self._parse_multiple_choice_problem(problem)
-
- def _parse_fib_problem(self, problem):
- """
- Returns ``problem_description``, ``answer``, and ``additional_answers``
- """
- data = {}
-
- presentation = problem.find("qti:presentation", self.NS)
- resprocessing = problem.find("qti:resprocessing", self.NS)
-
- data["problem_description"] = presentation.find("qti:material/qti:mattext", self.NS).text
-
- answers = []
- patterns = []
- for respcondition in resprocessing.findall("qti:respcondition", self.NS):
- for varequal in respcondition.findall("qti:conditionvar/qti:varequal", self.NS):
- answers.append(varequal.text)
-
- for varsubstring in respcondition.findall("qti:conditionvar/qti:varsubstring", self.NS):
- patterns.append(varsubstring.text)
-
- if respcondition.attrib.get("continue", "No") == "No":
- break
-
- data["is_regexp"] = bool(patterns)
- if data["is_regexp"]:
- data["answer"] = patterns.pop(0)
- answers = [re.escape(answer) for answer in answers]
- data["additional_answers"] = [*patterns, *answers]
- else:
- # Primary answer is the first one, additional answers are what is left
- data["answer"] = answers.pop(0)
- data["additional_answers"] = answers
-
- return data
-
- def _parse_essay_problem(self, problem):
- """
- Parses `cc.essay.v0p1` problem type and returns dictionary with
- presentation & sample solution if exists.
- """
-
- data = {}
- presentation = problem.find("qti:presentation", self.NS)
- itemfeedback = problem.find("qti:itemfeedback", self.NS)
- solution = problem.find("qti:itemfeedback/qti:solution", self.NS)
-
- data["problem_description"] = presentation.find("qti:material/qti:mattext", self.NS).text
-
- if solution is not None:
- sample_solution_selector = "qti:solutionmaterial//qti:material//qti:mattext"
- data["sample_solution"] = solution.find(sample_solution_selector, self.NS).text
-
- if itemfeedback is not None:
- for resp_type in RESPROCESSING_TYPES:
- response_text = self._essay_response_processing(problem, resp_type)
- if response_text:
- data[resp_type] = response_text
- return data
-
- def _essay_response_processing(self, problem, resp_type):
- respconditions = problem.find("qti:resprocessing/qti:respcondition", self.NS)
- if respconditions.find(f"qti:displayfeedback[@linkrefid='{resp_type}']", self.NS) is not None:
- text_selector = f"qti:itemfeedback[@ident='{resp_type}']/qti:flow_mat/qti:material/qti:mattext"
- return problem.find(text_selector, self.NS).text
-
- def _parse_pattern_match_problem(self, problem):
- raise NotImplementedError
diff --git a/src/cc2olx/settings.py b/src/cc2olx/settings.py
index 6435581..28b1e5a 100644
--- a/src/cc2olx/settings.py
+++ b/src/cc2olx/settings.py
@@ -1,51 +1,14 @@
from pathlib import Path
-COMMON_CARTRIDGE_FILE_EXTENSION = ".imscc"
-
-
-def _is_cartridge_file(path):
- return path.is_file() and path.suffix == COMMON_CARTRIDGE_FILE_EXTENSION
-
-
-def _get_files(parsed_args):
- """
- Collects all Common Cartridge files from list of files and directories.
- """
-
- files = set()
-
- for path in parsed_args.inputs:
- if not path.exists():
- raise FileNotFoundError
-
- if _is_cartridge_file(path):
- files.add(path)
-
- if path.is_dir():
- for input_file in path.iterdir():
- if _is_cartridge_file(input_file):
- files.add(input_file)
-
- return files
-
-
-def collect_settings(parsed_args):
- """
- Collects settings dictionary from argparse arguments.
- """
-
- input_files = _get_files(parsed_args)
- log_level = parsed_args.loglevel
- logging_config = {
- "level": log_level,
- "format": "{%(filename)s:%(lineno)d} - %(message)s",
- }
- settings = {
- "input_files": input_files,
- "output_format": parsed_args.result,
- "logging_config": logging_config,
- "workspace": Path.cwd() / parsed_args.output,
- "link_file": parsed_args.link_file,
- "passport_file": parsed_args.passport_file,
- }
- return settings
+BASE_DIR = Path(__file__).resolve().parent
+TEMPLATES_DIR = BASE_DIR / "templates"
+
+LOG_FORMAT = "{%(filename)s:%(lineno)d} - %(message)s"
+
+CONTENT_PROCESSORS = [
+ "cc2olx.content_processors.VideoContentProcessor",
+ "cc2olx.content_processors.LtiContentProcessor",
+ "cc2olx.content_processors.QtiContentProcessor",
+ "cc2olx.content_processors.DiscussionContentProcessor",
+ "cc2olx.content_processors.HtmlContentProcessor",
+]
diff --git a/src/cc2olx/utils.py b/src/cc2olx/utils.py
index e5c4fbf..74b6965 100644
--- a/src/cc2olx/utils.py
+++ b/src/cc2olx/utils.py
@@ -4,6 +4,9 @@
import string
import csv
import re
+import sys
+from importlib import import_module
+from typing import Type
logger = logging.getLogger()
@@ -108,3 +111,38 @@ def clean_file_name(filename: str):
cleaned_name = re.sub(special_characters, "_", filename)
return cleaned_name
+
+
+def cached_import(module_path: str, class_name: str) -> Type:
+ """
+ Provide the module from the cache or import it if it is not already loaded.
+ """
+ # Check whether module is loaded and fully initialized.
+ if not (
+ (module := sys.modules.get(module_path))
+ and (spec := getattr(module, "__spec__", None))
+ and getattr(spec, "_initializing", False) is False
+ ):
+ module = import_module(module_path)
+ return getattr(module, class_name)
+
+
+def import_string(dotted_path: str) -> Type:
+ """
+ Import a dotted module path.
+
+ Provide the attribute/class designated by the last name in the path.
+ Raise ImportError if the import failed.
+ """
+ try:
+ module_path, class_name = dotted_path.rsplit(".", 1)
+ except ValueError as err:
+ raise ImportError("%s doesn't look like a module path" % dotted_path) from err
+
+ try:
+ return cached_import(module_path, class_name)
+ except AttributeError as err:
+ raise ImportError(
+ 'Module "%s" does not define a "%s" attribute/class'
+ % (module_path, class_name)
+ ) from err