Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove strict attribute #664

Merged
merged 1 commit into from
Mar 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions auto_editor/analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ class Levels:
bar: Bar
no_cache: bool
log: Log
strict: bool # This is for `edit_audio()` for Palet functions.

@property
def media_length(self) -> int:
Expand Down Expand Up @@ -391,17 +390,12 @@ def subtitle(


def initLevels(
src: FileInfo,
tb: Fraction,
bar: Bar,
no_cache: bool,
log: Log,
strict: bool = False,
src: FileInfo, tb: Fraction, bar: Bar, no_cache: bool, log: Log
) -> Levels:
try:
container = av.open(src.path)
except av.FFmpegError as e:
log.error(e)

mod_time = int(src.path.stat().st_mtime)
return Levels(container, src.path.name, mod_time, tb, bar, no_cache, log, strict)
return Levels(container, src.path.name, mod_time, tb, bar, no_cache, log)
3 changes: 1 addition & 2 deletions auto_editor/cmds/repl.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,11 @@ def main(sys_args: list[str] = sys.argv[1:]) -> None:

if args.input:
log = Log(quiet=True, temp_dir=args.temp_dir)
strict = len(args.input) < 2
sources = [initFileInfo(path, log) for path in args.input]
src = sources[0]
tb = src.get_fps() if args.timebase is None else args.timebase
env["timebase"] = tb
env["@levels"] = initLevels(src, tb, initBar("modern"), False, log, strict)
env["@levels"] = initLevels(src, tb, initBar("modern"), False, log)

env.update(make_standard_env())
print(f"Auto-Editor {auto_editor.__version__}")
Expand Down
4 changes: 2 additions & 2 deletions auto_editor/cmds/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,11 +545,11 @@ def edit_positive_tests():
def edit_negative_tests():
run.check(
["resources/wav/example-cut-s16le.wav", "--edit", "motion"],
"video stream '0' does not ",
"video stream",
)
run.check(
["resources/only-video/man-on-green-screen.gif", "--edit", "audio"],
"audio stream '0' does not ",
"audio stream",
)

def yuv442p():
Expand Down
41 changes: 19 additions & 22 deletions auto_editor/lang/palet.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

if TYPE_CHECKING:
from collections.abc import Callable
from typing import Any, NoReturn
from typing import Any, NoReturn, TypeGuard

from numpy.typing import NDArray

Expand Down Expand Up @@ -510,15 +510,16 @@ def p_slice(
return seq[start:end:step]


def is_boolean_array(v: object) -> TypeGuard[np.ndarray]:
return isinstance(v, np.ndarray) and v.dtype.kind == "b"


is_iterable = Contract(
"iterable?",
lambda v: type(v) in {str, range, list, tuple, dict, Quoted}
or isinstance(v, np.ndarray),
)
is_boolarr = Contract(
"bool-array?",
lambda v: isinstance(v, np.ndarray) and v.dtype.kind == "b",
)
is_boolarr = Contract("bool-array?", is_boolean_array)


def raise_(msg: str | Exception) -> NoReturn:
Expand Down Expand Up @@ -569,8 +570,6 @@ def edit_audio(
raise MyError("Can't use `audio` if there's no input media")

levels = cast(Levels, env["@levels"])
strict = levels.strict

stream_data: NDArray[np.bool_] | None = None
if stream == Sym("all"):
stream_range = range(0, len(levels.container.streams.audio))
Expand All @@ -585,17 +584,15 @@ def edit_audio(
stream_data = audio_list
else:
stream_data = boolop(stream_data, audio_list, np.logical_or)
except LevelError as e:
raise_(e) if strict else levels.all()
except LevelError:
return np.array([], dtype=np.bool_)

if stream_data is not None:
mut_remove_small(stream_data, minclip, replace=1, with_=0)
mut_remove_small(stream_data, mincut, replace=0, with_=1)
if stream_data is None:
return np.array([], dtype=np.bool_)

return stream_data

stream = 0 if stream == Sym("all") else stream
return raise_(f"audio stream '{stream}' does not exist") if strict else levels.all()
mut_remove_small(stream_data, minclip, replace=1, with_=0)
mut_remove_small(stream_data, mincut, replace=0, with_=1)
return stream_data


def edit_motion(
Expand All @@ -607,18 +604,18 @@ def edit_motion(
if "@levels" not in env:
raise MyError("Can't use `motion` if there's no input media")

levels = env["@levels"]
levels = cast(Levels, env["@levels"])
try:
return levels.motion(stream, blur, width) >= threshold
except LevelError as e:
return raise_(e) if levels.strict else levels.all()
except LevelError:
return np.array([], dtype=np.bool_)


def edit_subtitle(pattern, stream=0, **kwargs):
if "@levels" not in env:
raise MyError("Can't use `subtitle` if there's no input media")

levels = env["@levels"]
levels = cast(Levels, env["@levels"])
if "ignore-case" not in kwargs:
kwargs["ignore-case"] = False
if "max-count" not in kwargs:
Expand All @@ -627,8 +624,8 @@ def edit_subtitle(pattern, stream=0, **kwargs):
max_count = kwargs["max-count"]
try:
return levels.subtitle(pattern, stream, ignore_case, max_count)
except LevelError as e:
return raise_(e) if levels.strict else levels.all()
except LevelError:
return np.array([], dtype=np.bool_)


class StackTraceManager:
Expand Down
45 changes: 27 additions & 18 deletions auto_editor/make_layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from auto_editor.analyze import initLevels
from auto_editor.ffwrapper import FileInfo
from auto_editor.lang.palet import Lexer, Parser, env, interpret, is_boolarr
from auto_editor.lang.palet import Lexer, Parser, env, interpret, is_boolean_array
from auto_editor.lib.data_structs import print_str
from auto_editor.lib.err import MyError
from auto_editor.timeline import ASpace, TlAudio, TlVideo, VSpace, v1, v3
Expand Down Expand Up @@ -122,7 +122,6 @@ def make_timeline(

has_loud = np.array([], dtype=np.bool_)
src_index = np.array([], dtype=np.int32)
concat = np.concatenate

try:
stdenv = __import__("auto_editor.lang.stdenv", fromlist=["lang"])
Expand All @@ -137,41 +136,51 @@ def make_timeline(
parser = Parser(Lexer("config.pal", file.read()))
interpret(env, parser)

results = []
for i, src in enumerate(sources):
try:
parser = Parser(Lexer("`--edit`", args.edit))
if log.is_debug:
log.debug(f"edit: {parser}")

env["timebase"] = tb
env["src"] = f"{src.path}"
env["@levels"] = initLevels(
src, tb, bar, args.no_cache, log, len(sources) < 2
)

results = interpret(env, parser)
env["@levels"] = initLevels(src, tb, bar, args.no_cache, log)

if len(results) == 0:
inter_result = interpret(env, parser)
if len(inter_result) == 0:
log.error("Expression in --edit must return a bool-array, got nothing")

result = results[-1]
result = inter_result[-1]
if callable(result):
result = result()
except MyError as e:
log.error(e)

if not is_boolarr(result):
if not is_boolean_array(result):
log.error(
f"Expression in --edit must return a bool-array, got {print_str(result)}"
)
assert isinstance(result, np.ndarray)

mut_margin(result, start_margin, end_margin)

has_loud = concat((has_loud, result))
src_index = concat((src_index, np.full(len(result), i, dtype=np.int32)))

assert len(has_loud) > 0
results.append(result)

if all(len(result) == 0 for result in results):
if "subtitle" in args.edit:
log.error("No file(s) have the selected subtitle stream.")
if "motion" in args.edit:
log.error("No file(s) have the selected video stream.")
if "audio" in args.edit:
log.error("No file(s) have the selected audio stream.")

src_indexes = []
for i in range(0, len(results)):
if len(results[i]) == 0:
results[i] = initLevels(sources[i], tb, bar, args.no_cache, log).all()
src_indexes.append(np.full(len(results[i]), i, dtype=np.int32))

has_loud = np.concatenate(results)
src_index = np.concatenate(src_indexes)
if len(has_loud) == 0:
log.error("Empty timeline. Nothing to do.")

# Setup for handling custom speeds
speed_index = has_loud.astype(np.uint)
Expand Down