Skip to content

Commit

Permalink
[#93] Added more tests for Task, Shot and Version classes.
Browse files Browse the repository at this point in the history
  • Loading branch information
eoyilmaz committed Nov 7, 2024
1 parent 9f76002 commit 28b729d
Show file tree
Hide file tree
Showing 5 changed files with 316 additions and 13 deletions.
16 changes: 4 additions & 12 deletions src/stalker/models/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -1405,12 +1405,6 @@ def _validate_task_depends_on(self, key: str, task_depends_on: "Task") -> "Task"
f"change the dependencies of a {self.status.code} task"
)

if self.is_container and self.status == cmpl:
raise StatusError(
f"This is a {self.status.code} container task and it is not allowed to "
f"change the dependency in {self.status.code} container tasks"
)

# check for the circular dependency
with DBSession.no_autoflush:
check_circular_dependency(depends_on, self, "depends_on")
Expand Down Expand Up @@ -2373,20 +2367,18 @@ def update_schedule_info(self) -> None:

@property
def percent_complete(self) -> float:
"""Calcualte and return the percent_complete value.
"""Calculate and return the percent_complete value.
The percent_complete value is based on the total_logged_seconds and
schedule_seconds of the task.
Container tasks will use info from their children.
Returns:
float: The percent complet value between 0 and 1.
float: The percent complete value between 0 and 1.
"""
if (
self.is_container
and self.total_logged_seconds is None
or self.schedule_seconds is None
if self.is_container and (
self._total_logged_seconds is None or self._schedule_seconds is None
):
self.update_schedule_info()

Expand Down
20 changes: 20 additions & 0 deletions tests/models/test_shot.py
Original file line number Diff line number Diff line change
Expand Up @@ -1478,3 +1478,23 @@ def test__init_on_load__works_as_expected(setup_shot_db_tests):
# the following should call Shot.__init_on_load__()
shot = Shot.query.filter(Shot.name == "SH123").first()
assert isinstance(shot, Shot)


def test_template_variables_include_scenes_for_shots(setup_shot_db_tests):
"""_template_variables include scenes for shots."""
data = setup_shot_db_tests
assert isinstance(data["test_shot"], Shot)
template_variables = data["test_shot"]._template_variables()
assert "scenes" in template_variables
assert data["test_shot"].scenes != []
assert template_variables["scenes"] == data["test_shot"].scenes


def test_template_variables_include_sequences_for_shots(setup_shot_db_tests):
"""_template_variables include sequences for shots."""
data = setup_shot_db_tests
assert isinstance(data["test_shot"], Shot)
template_variables = data["test_shot"]._template_variables()
assert "sequences" in template_variables
assert data["test_shot"].sequences != []
assert template_variables["sequences"] == data["test_shot"].sequences
205 changes: 205 additions & 0 deletions tests/models/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4135,6 +4135,20 @@ def test_computed_resources_attr_dont_update_with_resources_if_is_scheduled_is_t
assert new_task.computed_resources != test_value


def test_computed_resources_is_not_a_user_instance(setup_task_tests):
"""computed_resource is not a User instance raises TypeError."""
data = setup_task_tests
kwargs = copy.copy(data["kwargs"])
new_task = Task(**kwargs)
with pytest.raises(TypeError) as cm:
new_task.computed_resources.append("not a user")

assert str(cm.value) == (
"Task.computed_resources should be a list of stalker.models.auth.User "
"instances, not str: 'not a user'"
)


def test_persistent_allocation_arg_is_skipped(setup_task_tests):
"""persistent_allocation defaults if the persistent_allocation arg is skipped."""
data = setup_task_tests
Expand Down Expand Up @@ -4745,6 +4759,114 @@ def test_percent_complete_attr_is_working_as_expected_for_a_container_task(
assert parent_task.percent_complete == pytest.approx(77.7777778)


def test_percent_complete_attr_is_working_as_expected_for_a_container_task_with_no_data_1(
setup_task_db_tests,
):
"""percent complete attr is working as expected for a container task with no data."""
data = setup_task_db_tests
kwargs = copy.copy(data["kwargs"])
kwargs["depends_on"] = [] # remove dependencies just to make it
# easy to create time logs after stalker
# v0.2.6.1

new_task = Task(**kwargs)
new_task.status = data["status_rts"]

dt = datetime.datetime
td = datetime.timedelta
now = dt.now(pytz.utc)

defaults["timing_resolution"] = td(hours=1)
defaults["daily_working_hours"] = 9

parent_task = Task(**kwargs)

new_task.time_logs = []
tlog1 = TimeLog(
task=new_task,
resource=new_task.resources[0],
start=now - td(hours=4),
end=now - td(hours=2),
)

assert tlog1 in new_task.time_logs

tlog2 = TimeLog(
task=new_task,
resource=new_task.resources[1],
start=now - td(hours=4),
end=now + td(hours=1),
)

DBSession.commit()

new_task.parent = parent_task
DBSession.commit()

assert tlog2 in new_task.time_logs
assert new_task.total_logged_seconds == 7 * 3600
assert new_task.schedule_seconds == 9 * 3600
assert new_task.percent_complete == pytest.approx(77.7777778)

parent_task._total_logged_seconds = None
# parent_task._schedule_seconds = None
assert parent_task.percent_complete == pytest.approx(77.7777778)


def test_percent_complete_attr_is_working_as_expected_for_a_container_task_with_no_data_2(
setup_task_db_tests,
):
"""percent complete attr is working as expected for a container task with no data."""
data = setup_task_db_tests
kwargs = copy.copy(data["kwargs"])
kwargs["depends_on"] = [] # remove dependencies just to make it
# easy to create time logs after stalker
# v0.2.6.1

new_task = Task(**kwargs)
new_task.status = data["status_rts"]

dt = datetime.datetime
td = datetime.timedelta
now = dt.now(pytz.utc)

defaults["timing_resolution"] = td(hours=1)
defaults["daily_working_hours"] = 9

parent_task = Task(**kwargs)

new_task.time_logs = []
tlog1 = TimeLog(
task=new_task,
resource=new_task.resources[0],
start=now - td(hours=4),
end=now - td(hours=2),
)

assert tlog1 in new_task.time_logs

tlog2 = TimeLog(
task=new_task,
resource=new_task.resources[1],
start=now - td(hours=4),
end=now + td(hours=1),
)

DBSession.commit()

new_task.parent = parent_task
DBSession.commit()

assert tlog2 in new_task.time_logs
assert new_task.total_logged_seconds == 7 * 3600
assert new_task.schedule_seconds == 9 * 3600
assert new_task.percent_complete == pytest.approx(77.7777778)

# parent_task._total_logged_seconds = None
parent_task._schedule_seconds = None
assert parent_task.percent_complete == pytest.approx(77.7777778)


def test_percent_complete_attr_working_okay_for_a_task_w_effort_and_duration_children(
setup_task_db_tests,
):
Expand Down Expand Up @@ -5038,6 +5160,89 @@ def test_total_logged_seconds_is_the_sum_of_all_time_logs(setup_task_db_tests):
assert new_task.total_logged_seconds == 20 * 3600


def test_total_logged_seconds_calls_update_schedule_info(
setup_task_db_tests,
):
"""total_logged_seconds is the sum of all time_logs of the child tasks."""
data = setup_task_db_tests
kwargs = copy.copy(data["kwargs"])
kwargs["depends_on"] = []
new_task = Task(**kwargs)
dt = datetime.datetime
td = datetime.timedelta
now = dt.now(pytz.utc)
kwargs.pop("schedule_timing")
kwargs.pop("schedule_unit")
parent_task = Task(**kwargs)
new_task.parent = parent_task
new_task.time_logs = []
tlog1 = TimeLog(
task=new_task, resource=new_task.resources[0], start=now, end=now + td(hours=8)
)
DBSession.add(tlog1)
DBSession.commit()
assert tlog1 in new_task.time_logs
tlog2 = TimeLog(
task=new_task, resource=new_task.resources[1], start=now, end=now + td(hours=12)
)
DBSession.add(tlog2)
DBSession.commit()
# set the total_logged_seconds to None
# so the getter calls the update_schedule_info
parent_task._total_logged_seconds = None
assert tlog2 in new_task.time_logs
assert new_task.total_logged_seconds == 20 * 3600
assert parent_task.total_logged_seconds == 20 * 3600


def test_update_schedule_info_on_a_container_of_containers_task(
setup_task_db_tests,
):
"""total_logged_seconds is the sum of all time_logs of the child tasks."""
data = setup_task_db_tests
kwargs = copy.copy(data["kwargs"])
kwargs["depends_on"] = []
new_task = Task(**kwargs)
dt = datetime.datetime
td = datetime.timedelta
now = dt.now(pytz.utc)
kwargs.pop("schedule_timing")
kwargs.pop("schedule_unit")
parent_task = Task(**kwargs)
root_task = Task(**kwargs)
new_task.parent = parent_task
parent_task.parent = root_task
new_task.time_logs = []
tlog1 = TimeLog(
task=new_task, resource=new_task.resources[0], start=now, end=now + td(hours=8)
)
DBSession.add(new_task)
DBSession.add(parent_task)
DBSession.add(root_task)
DBSession.add(tlog1)
DBSession.commit()
assert tlog1 in new_task.time_logs
tlog2 = TimeLog(
task=new_task, resource=new_task.resources[1], start=now, end=now + td(hours=12)
)
DBSession.add(tlog2)
DBSession.commit()
# set the total_logged_seconds to None
# so the getter calls the update_schedule_info
root_task.update_schedule_info()


def test_update_schedule_info_with_leaf_tasks(
setup_task_db_tests,
):
"""total_logged_seconds is the sum of all time_logs of the child tasks."""
data = setup_task_db_tests
kwargs = copy.copy(data["kwargs"])
kwargs["depends_on"] = []
new_task = Task(**kwargs)
new_task.update_schedule_info()


def test_total_logged_seconds_is_the_sum_of_all_time_logs_of_children(
setup_task_db_tests,
):
Expand Down
19 changes: 18 additions & 1 deletion tests/models/test_task_status_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,6 @@ def test_container_cmpl_task_dependency_cannot_be_updated(
data["test_task1"].status = data["status_cmpl"]
assert data["test_task1"].status == data["status_cmpl"]
# create dependency
# with DBSession.no_autoflush:
with pytest.raises(StatusError) as cm:
data["test_task1"].depends_on.append(data["test_task8"])

Expand Down Expand Up @@ -2275,6 +2274,24 @@ def test_leaf_wip_task_with_no_dependency_and_no_timelogs_update_status_with_dep
assert data["status_rts"] == data["test_task5"].status


def test_container_task_update_status_with_dependent_status_will_skip(
setup_task_status_workflow_tests,
):
"""update_status_with_dependent_status() will skip container tasks."""
data = setup_task_status_workflow_tests
# the following should do nothing
data["test_task1"].update_status_with_dependent_statuses()


def test_update_status_with_children_statuses_with_leaf_task(
setup_task_status_workflow_tests,
):
"""update_status_with_children_statuses will skip leaf tasks."""
data = setup_task_status_workflow_tests
# the following should do nothing
data["test_task4"].update_status_with_children_statuses()


@pytest.fixture(scope="function")
def setup_task_status_workflow_db_tests(setup_postgresql_db):
"""Set up the Task status workflow tests with a database."""
Expand Down
Loading

0 comments on commit 28b729d

Please sign in to comment.