Better typing.

This commit is contained in:
Frédéric Tronel
2025-11-23 15:56:09 +01:00
parent bb5206d916
commit 9a3d04af7f

View File

@@ -220,7 +220,7 @@ def get_subtitles_tracks(ffprobe_path:str, mkv_path: str) -> dict[str,str]|None:
@typechecked @typechecked
def extract_srt(mkvextract:str, filename:str, subtitles:dict[str, list[int]], def extract_srt(mkvextract:str, filename:str, subtitles:dict[str, list[int]],
langs:dict[Lang,str]) -> list|None: langs:dict[Lang,str]) -> list[tuple[str,str,str,str]]|None:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
params = [mkvextract, filename, 'tracks'] params = [mkvextract, filename, 'tracks']
@@ -277,7 +277,8 @@ def extract_srt(mkvextract:str, filename:str, subtitles:dict[str, list[int]],
return res return res
def do_ocr(vobsubocr, idxs, duration, temporaries, dump_mem_fd=False): @typechecked
def do_ocr(vobsubocr:str, idxs: list[tuple[str,str,str,str]], duration:timedelta, temporaries:list[IO[bytes]], dump_mem_fd:bool=False):
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
res = [] res = []
@@ -362,7 +363,8 @@ class SupportedFormat(IntEnum):
# Found codec private data using mkvinfo # Found codec private data using mkvinfo
@typechecked @typechecked
def get_codec_private_data_from_mkv(mkvinfo_path:str, input_file: IO[bytes]) -> tuple[int, bytes]: def get_codec_private_data_from_mkv(mkvinfo_path:str,
input_file: IO[bytes]) -> tuple[int, bytes]|tuple[None,None]:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
infd = input_file.fileno() infd = input_file.fileno()
@@ -625,7 +627,7 @@ def parse_scaling_list(buf:bytes, bit_position: int, size) -> tuple[int,list[int
# The ISO/IEC H.264-201602 seems to take into account the case where the end of the deltas list # The ISO/IEC H.264-201602 seems to take into account the case where the end of the deltas list
# is full of zeroes. # is full of zeroes.
@typechecked @typechecked
def write_scaling_list(buf:bytes, bit_position: int, size, matrix, def write_scaling_list(buf:bytes, bit_position: int, size, matrix:list[int],
optimized: bool = False) -> int: optimized: bool = False) -> int:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.debug('Dumping matrix: %s of size: %d, size parameter: %d.', matrix, len(matrix), size) logger.debug('Dumping matrix: %s of size: %d, size parameter: %d.', matrix, len(matrix), size)
@@ -1713,7 +1715,7 @@ def parse_mkv_tree(mkvinfo_path:str, input_file: IO[bytes]) -> dict[str,tuple[in
# value 0 to 2^56-2 # value 0 to 2^56-2
@typechecked @typechecked
def get_ebml_length(length) -> bytes|None: def get_ebml_length(length:int) -> bytes|None:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
if 0 <= length <= 2**7-2: if 0 <= length <= 2**7-2:
@@ -1762,7 +1764,7 @@ def dump_codec_private_data(avc_decoder_configuration: AVCDecoderConfiguration)
return res return res
@typechecked @typechecked
def change_ebml_element_size(input_file: IO[bytes], position:int, addendum) -> int: def change_ebml_element_size(input_file: IO[bytes], position:int, addendum:int) -> int:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
initial_position = position initial_position = position
@@ -1855,7 +1857,7 @@ def change_ebml_element_size(input_file: IO[bytes], position:int, addendum) -> i
return delta return delta
@typechecked @typechecked
def change_codec_private_data(mkvinfo_path:str, input_file: IO[bytes], codec_data) -> None: def change_codec_private_data(mkvinfo_path:str, input_file: IO[bytes], codec_data:bytes) -> None:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
infd = input_file.fileno() infd = input_file.fileno()
@@ -2268,8 +2270,8 @@ def get_frames_in_stream(ffprobe_path: str, input_file: IO[bytes], begin:timedel
# TODO: Finish implementation of this function and use it. # TODO: Finish implementation of this function and use it.
@typechecked @typechecked
def get_nearest_idr_frame(ffprobe_path: str, input_file: IO[bytes], timestamp, before: bool=True, def get_nearest_idr_frame(ffprobe_path: str, input_file: IO[bytes], timestamp:timedelta,
delta: timedelta=timedelta(seconds=2)): before: bool=True, delta: timedelta=timedelta(seconds=2)):
# pylint: disable=W0613 # pylint: disable=W0613
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -2438,7 +2440,7 @@ def extract_mkv_part(mkvmerge_path:str, input_file:IO[bytes], output_file:IO[byt
@typechecked @typechecked
def extract_pictures(ffmpeg_path:str, input_file:IO[bytes], begin:timedelta, nb_frames:int, def extract_pictures(ffmpeg_path:str, input_file:IO[bytes], begin:timedelta, nb_frames:int,
width:int=640, height:int=480) -> tuple[bytes,int]: width:int=640, height:int=480) -> tuple[bytes,int]|tuple[None,None]:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
infd = input_file.fileno() infd = input_file.fileno()
@@ -2477,7 +2479,8 @@ def extract_pictures(ffmpeg_path:str, input_file:IO[bytes], begin:timedelta, nb_
@typechecked @typechecked
def extract_sound(ffmpeg_path:str, input_file: IO[bytes], begin:timedelta, output_filename:str, def extract_sound(ffmpeg_path:str, input_file: IO[bytes], begin:timedelta, output_filename:str,
packet_duration:int, sub_channel:int=0, packet_duration:int, sub_channel:int=0,
nb_packets:int=0, sample_rate:int=48000, nb_channels:int=2) -> tuple[bytes,int]: nb_packets:int=0, sample_rate:int=48000,
nb_channels:int=2) -> tuple[bytes,int]|tuple[None,None]:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
outfd = memfd_create(output_filename, flags=0) outfd = memfd_create(output_filename, flags=0)
@@ -2512,7 +2515,7 @@ def extract_sound(ffmpeg_path:str, input_file: IO[bytes], begin:timedelta, outpu
return sound, outfd return sound, outfd
@typechecked @typechecked
def dump_ppm(pictures: list, prefix: str, temporaries: list[IO[bytes]]) -> None: def dump_ppm(pictures: bytes, prefix: str, temporaries: list[IO[bytes]]) -> None:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# "P6\nWIDTH HEIGHT\n255\n" # "P6\nWIDTH HEIGHT\n255\n"
@@ -2546,7 +2549,7 @@ def dump_ppm(pictures: list, prefix: str, temporaries: list[IO[bytes]]) -> None:
header_len=2+1+ceil(log(width, 10))+1+ceil(log(height, 10))+1+3+1 header_len=2+1+ceil(log(width, 10))+1+ceil(log(height, 10))+1+3+1
try: try:
with open(filename, 'wb', encoding='utf8') as out: with open(filename, 'wb') as out:
temporaries.append(out) temporaries.append(out)
outfd = out.fileno() outfd = out.fileno()
length=header_len+3*width*height length=header_len+3*width*height
@@ -2686,7 +2689,7 @@ def extract_all_streams(ffmpeg_path:str, ffprobe_path:str, input_file:IO[bytes],
if dump_mem_fd: if dump_mem_fd:
try: try:
with open(tmpname,'wb', encoding='utf8') as output: with open(tmpname,'wb') as output:
temporaries.append(output) temporaries.append(output)
outfd = output.fileno() outfd = output.fileno()
pos = 0 pos = 0
@@ -2804,7 +2807,7 @@ def extract_all_streams(ffmpeg_path:str, ffprobe_path:str, input_file:IO[bytes],
# Merge a list of mkv files passed as input, and produce a new MKV as output # Merge a list of mkv files passed as input, and produce a new MKV as output
@typechecked @typechecked
def merge_mkvs(mkvmerge_path:str, inputs: list[IO[bytes]], output_name:str, def merge_mkvs(mkvmerge_path:str, inputs: list[IO[bytes]], output_name:str,
concatenate: bool=True, timestamps: dict[int, IO[str]]={}) -> IO[bytes]: concatenate: bool=True, timestamps: dict[int, IO[str]]={}) -> IO[bytes]|None:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
fds = [] fds = []