Compare commits

..

3 Commits

Author SHA1 Message Date
Frédéric Tronel
f982b5b42d Removing remaining % string formatting. 2025-11-23 17:27:13 +01:00
Frédéric Tronel
ed4ed5e7df Remove remaining % formatting in logging functions. 2025-11-23 17:16:16 +01:00
Frédéric Tronel
c83bd09411 Adding doc string for more functions. 2025-11-23 17:14:02 +01:00

View File

@@ -57,12 +57,20 @@ from iso639.exceptions import InvalidLanguageValue
@typechecked
def check_required_tools() -> tuple[bool,dict[str,str]]:
"""Check if required external tools are installed.
"""
Checks if all required external tools are installed.
This function verifies the presence of required and optional external tools on the system.
It returns a tuple containing a boolean indicating whether all optional tools are installed,
along with a dictionary containing the paths to all tools.
Args:
None
Returns:
tuple[bool, list[str]] : does all optional tools are installed and the paths of all tools.
tuple[bool, dict[str, str]]:
- bool: True if all optional tools are installed, False otherwise
- dict[str, str]: dictionary containing the paths to all tools
"""
logger = logging.getLogger(__name__)
all_optional_tools = True
@@ -88,13 +96,20 @@ def check_required_tools() -> tuple[bool,dict[str,str]]:
@typechecked
def get_tesseract_supported_lang(tesseract_path:str) -> dict[Lang, str]|None:
"""Returns the set of natural languages supported by Tesseract OCR tool.
"""
Retrieves the set of natural languages supported by the Tesseract OCR tool.
This function runs the Tesseract binary with the --list-langs option and parses the output
to extract the supported languages.
Args:
tesseract_path: str: path to tesseract binary.
tesseract_path (str): The path to the Tesseract binary.
Returns:
dict[str, str] : a mapping ....
dict[Lang, str] | None:
- A dictionary mapping Lang objects to their corresponding language codes
(e.g., "eng" for English)
- None if an error occurs while running the Tesseract binary
"""
logger = logging.getLogger(__name__)
res = {}
@@ -123,6 +138,25 @@ def get_tesseract_supported_lang(tesseract_path:str) -> dict[Lang, str]|None:
@typechecked
def get_frame_rate(ffprobe_path:str, input_file: IO[bytes]) -> float|None:
"""
Retrieves the frame rate of a video file using the ffprobe tool.
This function runs the ffprobe binary with the specified input file and parses the output
to extract the frame rate.
It uses two methods to calculate the frame rate: one based on the timestamp of the frames
and another based on the duration of the frames.
If the two calculated frame rates are significantly different, the function returns an error
Args:
ffprobe_path (str): The path to the ffprobe binary.
input_file (IO[bytes]): The input video file.
Returns:
float | None:
- The frame rate of the video file as a floating-point number
- None if an error occurs while running the ffprobe binary or if the calculated
frame rates are inconsistent
"""
logger = logging.getLogger(__name__)
infd = input_file.fileno()
@@ -278,7 +312,8 @@ def extract_srt(mkvextract:str, filename:str, subtitles:dict[str, list[int]],
return res
@typechecked
def do_ocr(vobsubocr:str, idxs: list[tuple[str,str,str,str]], duration:timedelta, temporaries:list[IO[bytes]], dump_mem_fd:bool=False):
def do_ocr(vobsubocr:str, idxs: list[tuple[str,str,str,str]], duration:timedelta,
temporaries:list[IO[bytes]], dump_mem_fd:bool=False):
logger = logging.getLogger(__name__)
res = []
@@ -407,6 +442,22 @@ def get_codec_private_data_from_mkv(mkvinfo_path:str,
# ISO/IEC 14496-15
@typechecked
def read_bit(buf:bytes, bit_position: int) -> tuple[int, int]:
"""
Read a single bit from a byte buffer.
This function is part of the implementation of the H.264/AVC video compression standard,
as specified in ISO/IEC H.264-201602 and ISO/IEC 14496-15.
It takes a byte buffer and a bit position as input, and returns a tuple containing
the updated bit position and the value of the bit at the specified position.
Args:
buf (bytes): The byte buffer to read from.
bit_position (int): The position of the bit to read, starting from 0.
Returns:
tuple[int, int]: A tuple containing the updated bit position (bit_position + 1) and the
value of the bit (0 or 1).
"""
byte_position = floor(floor(bit_position/8))
byte = buf[byte_position]
bit = (byte >> (7-(bit_position % 8))) & 1
@@ -414,6 +465,21 @@ def read_bit(buf:bytes, bit_position: int) -> tuple[int, int]:
@typechecked
def read_boolean(buf:bytes, bit_position: int) -> tuple[int, bool]:
"""
Read a boolean value from a byte buffer.
This function reads a single bit from the byte buffer at the specified position and interprets
it as a boolean value.
It returns a tuple containing the updated bit position and the boolean value.
Args:
buf (bytes): The byte buffer to read from.
bit_position (int): The position of the bit to read, starting from 0.
Returns:
tuple[int, bool]: A tuple containing the updated bit position and the boolean value
(True if the bit is 1, False if the bit is 0).
"""
bit_position, b = read_bit(buf, bit_position)
return bit_position, b==1
@@ -1513,29 +1579,29 @@ class AVCDecoderConfiguration:
def merge(self, config):
# Check config compatibility
if self.configuration_version != config.configuration_version:
raise ValueError('Configuration versions are different: %d vs %s' %\
(self.configuration_version, config.configuration_version))
raise ValueError(f'Configuration versions are different: {self.configuration_version:d}\
vs {config.configuration_version:d}')
if self.avc_profile_indication != config.avc_profile_indication:
raise ValueError('AVC profiles are different: %d vs %s' %\
(self.avc_profile_indication, config.avc_profile_indication))
raise ValueError(f'AVC profiles are different: {self.avc_profile_indication:d} vs \
{config.avc_profile_indication:d}')
if self.profile_compatibility != config.profile_compatibility:
raise ValueError('Profile compatilities are different: %d vs %s' %\
(self.profile_compatibility, config.profile_compatibility))
raise ValueError(f'Profile compatilities are different: {self.profile_compatibility:d} \
vs {config.profile_compatibility:d}')
if self.avc_level_indication != config.avc_level_indication:
raise ValueError('Level indications are different: %d vs %s' %\
(self.avc_level_indication, config.avc_level_indication))
raise ValueError(f'Level indications are different: {self.avc_level_indication:d} vs \
{config.avc_level_indication:d}')
if self.length_size_minus_one != config.length_size_minus_one:
raise ValueError('Length units are different: %d vs %s' %\
(self.length_size_minus_one, config.length_size_minus_one))
raise ValueError(f'Length units are different: {self.length_size_minus_one:d} vs \
{config.length_size_minus_one:d}')
if self.chroma_format != config.chroma_format:
raise ValueError('Colour format are different: %d vs %s' %\
(self.chroma_format, config.chroma_format))
raise ValueError(f'Colour format are different: {self.chroma_format:d} vs \
{config.chroma_format:d}')
if self.bit_depth_luma_minus8 != config.bit_depth_luma_minus8:
raise ValueError('Depth of luminance are different: %d vs %s' %\
(self.bit_depth_luma_minus8, config.bit_depth_luma_minus8))
raise ValueError(f'Depth of luminance are different: {self.bit_depth_luma_minus8:d} vs \
{config.bit_depth_luma_minus8:d}')
if self.bit_depth_chroma_minus8 != config.bit_depth_chroma_minus8:
raise ValueError('Depth of chromaticity are different: %d vs %s' %\
(self.bit_depth_chroma_minus8, config.bit_depth_luma_minus8))
raise ValueError(f'Depth of chromaticity are different: \
{self.bit_depth_chroma_minus8:d} vs {config.bit_depth_luma_minus8:d}')
for spsid in config.sps:
sps = config.sps[spsid]
@@ -2006,6 +2072,25 @@ def get_streams(ffprobe_path:str, input_file: IO[bytes]) -> list|None:
@typechecked
def with_subtitles(ffprobe_path:str, input_file: IO[bytes]) -> bool:
"""
Checks if a media file contains subtitles using the ffprobe tool.
This function runs the ffprobe binary with the specified input file and parses the output
to determine if the file contains subtitles.
It returns True if at least one subtitle stream is found, False otherwise.
Args:
ffprobe_path (str): The path to the ffprobe binary.
input_file (IO[bytes]): The input media file.
Returns:
bool:
- True if the media file contains at least one subtitle stream
- False if:
- the media file does not contain any subtitle streams
- an error occurs while running the ffprobe binary
- the streams information cannot be retrieved from the media file
"""
logger = logging.getLogger(__name__)
infd = input_file.fileno()
@@ -2027,6 +2112,23 @@ def with_subtitles(ffprobe_path:str, input_file: IO[bytes]) -> bool:
@typechecked
def parse_timestamp(ts:str) -> timedelta|None:
"""
Parse a timestamp string into a timedelta object.
This function takes a string representing a timestamp in the format HH:MM:SS[.us] and returns
a timedelta object representing the corresponding time interval.
The timestamp string can have an optional microsecond component.
Args:
ts (str): The timestamp string to parse.
Returns:
timedelta | None:
- A timedelta object representing the parsed timestamp
- None if:
- the timestamp string is not in the correct format
- the timestamp values are out of range (e.g. hour > 23, minute > 59, etc.)
"""
logger = logging.getLogger(__name__)
ts_reg_exp = (r'^(?P<hour>[0-9]{1,2}):(?P<minute>[0-9]{1,2})'
@@ -2068,7 +2170,24 @@ def parse_timestamp(ts:str) -> timedelta|None:
return res
@typechecked
def parse_time_interval(interval:str) -> tuple[timedelta,timedelta]:
def parse_time_interval(interval: str) -> tuple[timedelta, timedelta] | None:
"""
Parse a time interval string into a tuple of two timedelta objects.
This function takes a string representing a time interval in the format HH:MM:SS[.ms]-HH:MM:SS[.ms] and returns a tuple of two timedelta objects representing the start and end times of the interval.
The time interval string can have an optional millisecond component.
Args:
interval (str): The time interval string to parse.
Returns:
tuple[timedelta, timedelta] | None:
- A tuple of two timedelta objects representing the start and end times of the interval
- None if:
- the time interval string is not in the correct format
- the time values are out of range (e.g. hour > 23, minute > 59, etc.)
- the end time is before the start time (non-monotonic interval)
"""
logger = logging.getLogger(__name__)
interval_reg_exp = (r'^(?P<hour1>[0-9]{1,2}):(?P<minute1>[0-9]{1,2}):(?P<second1>[0-9]{1,2})'
@@ -2144,6 +2263,22 @@ def parse_time_interval(interval:str) -> tuple[timedelta,timedelta]:
@typechecked
def compare_time_interval(interval1: tuple[timedelta, timedelta],
interval2: tuple[timedelta, timedelta]) -> int:
"""
Compare two time intervals.
This function compares two time intervals represented by tuples of two timedelta objects.
It returns an integer indicating the relationship between the two intervals:
- -1 if interval 1 is before interval 2
- 1 if interval 1 is after interval 2
- 0 if the two intervals overlap or are equal
Args:
interval1 (tuple[timedelta, timedelta]): The first time interval
interval2 (tuple[timedelta, timedelta]): The second time interval
Returns:
int: The relationship between the two time intervals
"""
ts11,ts12 = interval1
ts21,ts22 = interval2
@@ -2441,6 +2576,27 @@ def extract_mkv_part(mkvmerge_path:str, input_file:IO[bytes], output_file:IO[byt
@typechecked
def extract_pictures(ffmpeg_path:str, input_file:IO[bytes], begin:timedelta, nb_frames:int,
width:int=640, height:int=480) -> tuple[bytes,int]|tuple[None,None]:
"""
Extract pictures from a video file using FFmpeg.
This function runs the FFmpeg binary to extract a specified number of frames from a video file,
starting at a given time.
The extracted frames are stored in memory as PPM images and returned as a tuple containing
the image data and a file descriptor to the memory created by memfd_create.
Args:
ffmpeg_path (str): The path to the FFmpeg binary.
input_file (IO[bytes]): The input video file.
begin (timedelta): The start time of the extraction.
nb_frames (int): The number of frames to extract.
width (int, optional): The width of the extracted images. Defaults to 640.
height (int, optional): The height of the extracted images. Defaults to 480.
Returns:
tuple[bytes, int] | tuple[None, None]:
- A tuple containing the extracted image data as bytes and a file descriptor
- A tuple containing None, None if the extraction fails
"""
logger = logging.getLogger(__name__)
infd = input_file.fileno()
@@ -2516,6 +2672,26 @@ def extract_sound(ffmpeg_path:str, input_file: IO[bytes], begin:timedelta, outpu
@typechecked
def dump_ppm(pictures: bytes, prefix: str, temporaries: list[IO[bytes]]) -> None:
"""
Dump PPM pictures from a bytes buffer to files.
This function takes a bytes buffer containing PPM pictures, a prefix for the output file names, and a list of temporary files.
It extracts each PPM picture from the buffer, checks its validity, and writes it to a file.
The output files are named according to the prefix and a zero-padded three-digit number.
Args:
pictures (bytes): The bytes buffer containing the PPM pictures.
prefix (str): The prefix for the output file names.
temporaries (list[IO[bytes]]): A list of temporary files that will be used to store the output files.
Returns:
None
Raises:
None, but logs errors if:
- the PPM picture is not valid (e.g. wrong magic number, dimensions, or color encoding)
- an I/O error occurs while creating or writing to an output file
"""
logger = logging.getLogger(__name__)
# "P6\nWIDTH HEIGHT\n255\n"
@@ -3202,7 +3378,7 @@ def main() -> None:
try:
input_file = open(args.input_file, mode='rb')
logger.debug("Type of input file: %s" % type(input_file))
logger.debug("Type of input file: %s", type(input_file))
except IOError:
logger.error("Impossible to open %s", args.input_file)
exit(-1)
@@ -3607,7 +3783,7 @@ def main() -> None:
logger.info(sts)
if len(sts) > 0:
logger.info('Supported languages: %s' % supported_langs)
logger.info('Supported languages: %s', supported_langs)
list_of_subtitles = extract_srt(paths['mkvextract'], final_with_video_name, sts,
supported_langs)
logger.info(list_of_subtitles)