Skip to content

Commit

Permalink
refactor: Block type processors are integrated into the script workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
myhailo-chernyshov-rg committed Jan 7, 2025
1 parent e214c7d commit 805a4a1
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 1,166 deletions.
28 changes: 13 additions & 15 deletions src/cc2olx/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@
import shutil
import sys
import tempfile

from pathlib import Path

from cc2olx import filesystem
from cc2olx import olx
from cc2olx import filesystem, olx, settings
from cc2olx.cli import parse_args, RESULT_TYPE_FOLDER, RESULT_TYPE_ZIP
from cc2olx.models import Cartridge, OLX_STATIC_DIR
from cc2olx.settings import collect_settings
from cc2olx.constants import OLX_STATIC_DIR
from cc2olx.models import Cartridge
from cc2olx.parser import parse_options


def convert_one_file(input_file, workspace, link_file=None, passport_file=None):
Expand Down Expand Up @@ -47,32 +46,31 @@ def convert_one_file(input_file, workspace, link_file=None, passport_file=None):


def main():
parsed_args = parse_args()
settings = collect_settings(parsed_args)
args = parse_args()
options = parse_options(args)

workspace = settings["workspace"]
link_file = settings["link_file"]
passport_file = settings["passport_file"]
workspace = options["workspace"]
link_file = options["link_file"]
passport_file = options["passport_file"]

# setup logger
logging_config = settings["logging_config"]
logging.basicConfig(level=logging_config["level"], format=logging_config["format"])
logging.basicConfig(level=options["log_level"], format=settings.LOG_FORMAT)
logger = logging.getLogger()

with tempfile.TemporaryDirectory() as tmpdirname:
temp_workspace = Path(tmpdirname) / workspace.stem

for input_file in settings["input_files"]:
for input_file in options["input_files"]:
try:
convert_one_file(input_file, temp_workspace, link_file, passport_file)
except Exception:
logger.exception("Error while converting %s file", input_file)

if settings["output_format"] == RESULT_TYPE_FOLDER:
if options["output_format"] == RESULT_TYPE_FOLDER:
shutil.rmtree(str(workspace), ignore_errors=True)
shutil.copytree(str(temp_workspace), str(workspace))

if settings["output_format"] == RESULT_TYPE_ZIP:
if options["output_format"] == RESULT_TYPE_ZIP:
shutil.make_archive(str(workspace), "zip", str(temp_workspace))

logger.info("Conversion completed")
Expand Down
233 changes: 32 additions & 201 deletions src/cc2olx/models.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
import imghdr
import logging
import os.path
import re
from textwrap import dedent
import zipfile
from pathlib import Path
from textwrap import dedent
from typing import List, Optional

from cc2olx import filesystem
from cc2olx.external.canvas import ModuleMeta
from cc2olx.qti import QtiParser
from cc2olx.utils import clean_file_name

from .utils import simple_slug

logger = logging.getLogger()

MANIFEST = "imsmanifest.xml"
Expand All @@ -24,22 +22,6 @@
DIFFUSE_SHALLOW_SECTIONS = False
DIFFUSE_SHALLOW_SUBSECTIONS = True

OLX_STATIC_DIR = "static"

OLX_DIRECTORIES = [
"about",
"assets",
"chapter",
"course",
"html",
"info",
"policies",
"problem",
"sequential",
OLX_STATIC_DIR,
"vertical",
]


def is_leaf(container):
return "identifierref" in container
Expand Down Expand Up @@ -86,7 +68,7 @@ def __init__(self, cartridge_file, workspace):
self.module_meta = {}

# List of static files that are outside of `web_resources` directory, but still required
self.extra_static_files = []
self._extra_static_files = []

self.workspace = workspace

Expand All @@ -99,6 +81,16 @@ def __repr__(self):
)
return text

@property
def extra_static_files(self) -> List[str]:
"""
Provides an extra static files list.
"""
return self._extra_static_files

def add_extra_static_file(self, value: str) -> None:
self._extra_static_files.append(value)

def process_canvas_cc(self, elements):
"""
Perform canvas cc specific processing.
Expand Down Expand Up @@ -310,102 +302,15 @@ def flatten(self, container):
output.extend(leaves)
return output

def get_resource_content(self, identifier):
def define_resource(self, idref: Optional[str]) -> dict:
"""
Get the resource named by `identifier`.
If the resource can be retrieved, returns a tuple: the first element
indicates the type of content, either "html" or "link". The second
element is a dict with details, which vary by the type.
If the resource can't be retrieved, returns a tuple of None, None.
Define a resource by its identifier.
"""
res = self.resources_by_id.get(identifier)
if res is None and self.is_canvas_flavor:
res = self.resources_by_id.get(self.module_meta.get_identifierref(identifier))
if res is None:
logger.info("Missing resource: %s", identifier)
return None, None

res_type = res["type"]

if res_type == "webcontent":
res_relative_path = res["children"][0].href
res_filename = self._res_filename(res_relative_path)
if res_filename.suffix == ".html":
try:
with open(str(res_filename), encoding="utf-8") as res_file:
html = res_file.read()
except: # noqa: E722
logger.error("Failure reading %s from id %s", res_filename, identifier) # noqa: E722
raise
return "html", {"html": html}
elif "web_resources" in str(res_filename) and imghdr.what(str(res_filename)):
static_filename = str(res_filename).split("web_resources/")[1]
olx_static_path = "/{}/{}".format(OLX_STATIC_DIR, static_filename)
html = (
'<html><head><meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>'
'</head><body><p><img src="{}" alt="{}"></p></body></html>'.format(olx_static_path, static_filename)
)
return "html", {"html": html}
elif "web_resources" not in str(res_filename):
# This webcontent is outside of ``web_resources`` directory
# So we need to manually copy it to OLX_STATIC_DIR
self.extra_static_files.append(res_relative_path)
olx_static_path = "/{}/{}".format(OLX_STATIC_DIR, res_relative_path)
html = (
'<html><head><meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>'
'</head><body><p><a href="{}" alt="{}">{}<a></p></body></html>'.format(
olx_static_path, res_relative_path, res_relative_path
)
)
return "html", {"html": html}
else:
logger.info("Skipping webcontent: %s", res_filename)
return None, None

# Match any of imswl_xmlv1p1, imswl_xmlv1p2 etc
elif re.match(r"^imswl_xmlv\d+p\d+$", res_type):
tree = filesystem.get_xml_tree(self._res_filename(res["children"][0].href))
root = tree.getroot()
namespaces = {
"imswl_xmlv1p1": "http://www.imsglobal.org/xsd/imsccv1p1/imswl_v1p1",
"imswl_xmlv1p2": "http://www.imsglobal.org/xsd/imsccv1p2/imswl_v1p2",
"imswl_xmlv1p3": "http://www.imsglobal.org/xsd/imsccv1p3/imswl_v1p3",
}
ns = {"wl": namespaces[res_type]}
title = root.find("wl:title", ns).text
url = root.find("wl:url", ns).get("href")
return "link", {"href": url, "text": title}

# Match any of imsbasiclti_xmlv1p0, imsbasiclti_xmlv1p3 etc
elif re.match(r"^imsbasiclti_xmlv\d+p\d+$", res_type):
data = self._parse_lti(res)
# Canvas flavored courses have correct url in module meta for lti links
if self.is_canvas_flavor:
item_data = self.module_meta.get_external_tool_item_data(identifier)
if item_data:
data["launch_url"] = item_data.get("url", data["launch_url"])
return "lti", data

# Match any of imsqti_xmlv1p2/imscc_xmlv1p1/assessment, imsqti_xmlv1p3/imscc_xmlv1p3/assessment etc
elif re.match(r"^imsqti_xmlv\d+p\d+/imscc_xmlv\d+p\d+/assessment$", res_type):
res_filename = self._res_filename(res["children"][0].href)
qti_parser = QtiParser(res_filename)
return "qti", qti_parser.parse_qti()

# Match any of imsdt_xmlv1p1, imsdt_xmlv1p2, imsdt_xmlv1p3 etc
elif re.match(r"^imsdt_xmlv\d+p\d+$", res_type):
data = self._parse_discussion(res, res_type)
return "discussion", data

else:
text = f"Unimported content: type = {res_type!r}"
if "href" in res:
text += ", href = {!r}".format(res["href"])
logger.info("%s", text)
return "html", {"html": text}
resource = self.resources_by_id.get(idref)
if resource is None and self.is_canvas_flavor:
module_item_idref = self.module_meta.get_identifierref(idref)
resource = self.resources_by_id.get(module_item_idref)
return resource

def load_manifest_extracted(self):
manifest = self._extract()
Expand Down Expand Up @@ -480,6 +385,12 @@ def get_course_run(self):
# TODO: find a better value for this; lifecycle.contribute_date?
return "run"

def build_res_file_path(self, file_name: str) -> Path:
"""
Build the resource file path.
"""
return self.directory / file_name

def _extract(self):
path_extracted = filesystem.unzip_directory(self.file_path, self.workspace)
self.directory = path_extracted
Expand Down Expand Up @@ -511,11 +422,11 @@ def _update_namespaces(self, root):
)

def _parse_manifest(self, node):
data = dict()
data["metadata"] = self._parse_metadata(node)
data["organizations"] = self._parse_organizations(node)
data["resources"] = self._parse_resources(node)
return data
return {
"metadata": self._parse_metadata(node),
"organizations": self._parse_organizations(node),
"resources": self._parse_resources(node),
}

def _clean_manifest(self, node):
"""
Expand Down Expand Up @@ -716,83 +627,3 @@ def _parse_dependency(self, node):
def _parse_resource_metadata(self, node):
# TODO: this
return None

def _res_filename(self, file_name):
return self.directory / file_name

def _parse_lti(self, resource):
"""
Parses LTI resource.
"""

tree = filesystem.get_xml_tree(self._res_filename(resource["children"][0].href))
root = tree.getroot()
ns = {
"blti": "http://www.imsglobal.org/xsd/imsbasiclti_v1p0",
"lticp": "http://www.imsglobal.org/xsd/imslticp_v1p0",
"lticm": "http://www.imsglobal.org/xsd/imslticm_v1p0",
}
title = root.find("blti:title", ns).text
description = root.find("blti:description", ns).text
launch_url = root.find("blti:secure_launch_url", ns)
if launch_url is None:
launch_url = root.find("blti:launch_url", ns)
if launch_url is not None:
launch_url = launch_url.text
else:
launch_url = ""
width = root.find("blti:extensions/lticm:property[@name='selection_width']", ns)
if width is None:
width = "500"
else:
width = width.text
height = root.find("blti:extensions/lticm:property[@name='selection_height']", ns)
if height is None:
height = "500"
else:
height = height.text
custom = root.find("blti:custom", ns)
if custom is None:
parameters = dict()
else:
parameters = {option.get("name"): option.text for option in custom}
# For Canvas flavored CC, tool_id can be used as lti_id if present
tool_id = root.find("blti:extensions/lticm:property[@name='tool_id']", ns)
if tool_id is None:
# Create a simple slug lti_id from title
lti_id = simple_slug(title)
else:
lti_id = tool_id.text
data = {
"title": title,
"description": description,
"launch_url": launch_url,
"height": height,
"width": width,
"custom_parameters": parameters,
"lti_id": lti_id,
}
return data

def _parse_discussion(self, res, res_type):
"""
Parses discussion content.
"""

namespaces = {
"imsdt_xmlv1p1": "http://www.imsglobal.org/xsd/imsccv1p1/imsdt_v1p1",
"imsdt_xmlv1p2": "http://www.imsglobal.org/xsd/imsccv1p2/imsdt_v1p2",
"imsdt_xmlv1p3": "http://www.imsglobal.org/xsd/imsccv1p3/imsdt_v1p3",
}

data = {"dependencies": []}
for child in res["children"]:
if isinstance(child, ResourceFile):
tree = filesystem.get_xml_tree(self._res_filename(child.href))
root = tree.getroot()
ns = {"dt": namespaces[res_type]}
data["title"] = root.find("dt:title", ns).text
data["text"] = root.find("dt:text", ns).text
elif isinstance(child, ResourceDependency):
data["dependencies"].append(self.get_resource_content(child.identifierref))
return data
Loading

0 comments on commit 805a4a1

Please sign in to comment.