Compare commits

..

4 Commits

Author SHA1 Message Date
Frédéric Tronel
9a3d04af7f Better typing. 2025-11-23 15:56:09 +01:00
Frédéric Tronel
bb5206d916 Correct cleaning of subtitles. 2025-11-23 14:54:14 +01:00
Frédéric Tronel
e5b1917e78 Adding requirements for typing support. 2025-11-23 14:52:27 +01:00
Frédéric Tronel
dd94e5c773 More linting with typechecking and type annotations. 2025-10-31 09:54:53 +01:00
3 changed files with 166 additions and 100 deletions

View File

@@ -1,2 +1,2 @@
clean: clean:
rm -f *.ppm *.pcm part* *.srt *-ts.txt *-full.h264 *-novideo.mkv fre.* rm -f *.ppm *.pcm part* *.srt *-ts.txt *-full.h264 *-novideo.mkv fre-*.*

View File

@@ -19,6 +19,7 @@ from math import floor, ceil, log
from io import BytesIO, TextIOWrapper from io import BytesIO, TextIOWrapper
import json import json
from typing import IO from typing import IO
from typeguard import typechecked
# Third party libraries # Third party libraries
import coloredlogs import coloredlogs
@@ -54,7 +55,8 @@ from iso639.exceptions import InvalidLanguageValue
# Then finally, change the Private Codec Data in the final MKV. # Then finally, change the Private Codec Data in the final MKV.
def check_required_tools() -> tuple[bool,list[str]]: @typechecked
def check_required_tools() -> tuple[bool,dict[str,str]]:
"""Check if required external tools are installed. """Check if required external tools are installed.
Args: Args:
@@ -84,7 +86,8 @@ def check_required_tools() -> tuple[bool,list[str]]:
return all_optional_tools, paths return all_optional_tools, paths
def get_tesseract_supported_lang(tesseract_path:str) -> dict[str,str]|None: @typechecked
def get_tesseract_supported_lang(tesseract_path:str) -> dict[Lang, str]|None:
"""Returns the set of natural languages supported by Tesseract OCR tool. """Returns the set of natural languages supported by Tesseract OCR tool.
Args: Args:
@@ -118,6 +121,7 @@ def get_tesseract_supported_lang(tesseract_path:str) -> dict[str,str]|None:
return res return res
@typechecked
def get_frame_rate(ffprobe_path:str, input_file: IO[bytes]) -> float|None: def get_frame_rate(ffprobe_path:str, input_file: IO[bytes]) -> float|None:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -181,6 +185,7 @@ def get_frame_rate(ffprobe_path:str, input_file: IO[bytes]) -> float|None:
return frame_rate2 return frame_rate2
@typechecked
def get_subtitles_tracks(ffprobe_path:str, mkv_path: str) -> dict[str,str]|None: def get_subtitles_tracks(ffprobe_path:str, mkv_path: str) -> dict[str,str]|None:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
tracks={} tracks={}
@@ -213,7 +218,9 @@ def get_subtitles_tracks(ffprobe_path:str, mkv_path: str) -> dict[str,str]|None:
return tracks return tracks
def extract_srt(mkvextract:str, filename:str, subtitles:str, langs:list[str]) -> list|None: @typechecked
def extract_srt(mkvextract:str, filename:str, subtitles:dict[str, list[int]],
langs:dict[Lang,str]) -> list[tuple[str,str,str,str]]|None:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
params = [mkvextract, filename, 'tracks'] params = [mkvextract, filename, 'tracks']
@@ -270,7 +277,8 @@ def extract_srt(mkvextract:str, filename:str, subtitles:str, langs:list[str]) ->
return res return res
def do_ocr(vobsubocr, idxs, duration, temporaries, dump_mem_fd=False): @typechecked
def do_ocr(vobsubocr:str, idxs: list[tuple[str,str,str,str]], duration:timedelta, temporaries:list[IO[bytes]], dump_mem_fd:bool=False):
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
res = [] res = []
@@ -354,7 +362,9 @@ class SupportedFormat(IntEnum):
# -report -loglevel 0 -f null - # -report -loglevel 0 -f null -
# Found codec private data using mkvinfo # Found codec private data using mkvinfo
def get_codec_private_data_from_mkv(mkvinfo_path:str, input_file: IO[bytes]) -> tuple[int, bytes]: @typechecked
def get_codec_private_data_from_mkv(mkvinfo_path:str,
input_file: IO[bytes]) -> tuple[int, bytes]|tuple[None,None]:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
infd = input_file.fileno() infd = input_file.fileno()
@@ -395,17 +405,19 @@ def get_codec_private_data_from_mkv(mkvinfo_path:str, input_file: IO[bytes]) ->
# All the following code is a transposition of documents: # All the following code is a transposition of documents:
# ISO/IEC H.264-201602 # ISO/IEC H.264-201602
# ISO/IEC 14496-15 # ISO/IEC 14496-15
@typechecked
def read_bit(buf:bytes, bit_position: int) -> tuple[int, int]: def read_bit(buf:bytes, bit_position: int) -> tuple[int, int]:
byte_position = floor(floor(bit_position/8)) byte_position = floor(floor(bit_position/8))
byte = buf[byte_position] byte = buf[byte_position]
bit = (byte >> (7-(bit_position % 8))) & 1 bit = (byte >> (7-(bit_position % 8))) & 1
return bit_position+1, bit return bit_position+1, bit
@typechecked
def read_boolean(buf:bytes, bit_position: int) -> tuple[int, bool]: def read_boolean(buf:bytes, bit_position: int) -> tuple[int, bool]:
bit_position, b = read_bit(buf, bit_position) bit_position, b = read_bit(buf, bit_position)
return bit_position, b==1 return bit_position, b==1
@typechecked
def read_bits(buf:bytes, bit_position: int, nb_bits: int) -> tuple[int, int]: def read_bits(buf:bytes, bit_position: int, nb_bits: int) -> tuple[int, int]:
v = 0 v = 0
for _ in range(0, nb_bits): for _ in range(0, nb_bits):
@@ -413,19 +425,23 @@ def read_bits(buf:bytes, bit_position: int, nb_bits: int) -> tuple[int, int]:
v = v*2+bit v = v*2+bit
return bit_position, v return bit_position, v
@typechecked
def read_byte(buf:bytes, bit_position: int) -> tuple[int, int]: def read_byte(buf:bytes, bit_position: int) -> tuple[int, int]:
bit_position, b = read_bits(buf, bit_position, 8) bit_position, b = read_bits(buf, bit_position, 8)
return bit_position, b return bit_position, b
@typechecked
def read_word(buf:bytes, bit_position: int) -> tuple[int, int]: def read_word(buf:bytes, bit_position: int) -> tuple[int, int]:
bit_position, w = read_bits(buf, bit_position, 16) bit_position, w = read_bits(buf, bit_position, 16)
return bit_position, w return bit_position, w
@typechecked
def read_long(buf:bytes, bit_position: int) -> tuple[int, int]: def read_long(buf:bytes, bit_position: int) -> tuple[int, int]:
bit_position, value = read_bits(buf, bit_position, 32) bit_position, value = read_bits(buf, bit_position, 32)
return bit_position, value return bit_position, value
def read_unsigned_exp_golomb(buf:bytes, bit_position: int) -> tuple[int, bool]: @typechecked
def read_unsigned_exp_golomb(buf:bytes, bit_position: int) -> tuple[int, int]:
nb_zeroes=0 nb_zeroes=0
while True: while True:
bit_position, b = read_bit(buf, bit_position) bit_position, b = read_bit(buf, bit_position)
@@ -438,7 +454,8 @@ def read_unsigned_exp_golomb(buf:bytes, bit_position: int) -> tuple[int, bool]:
v = (v1<<nb_zeroes)+v2 v = (v1<<nb_zeroes)+v2
return bit_position, v-1 return bit_position, v-1
def read_signed_exp_golomb(buf:bytes, bit_position: int) -> tuple[int, bool]: @typechecked
def read_signed_exp_golomb(buf:bytes, bit_position: int) -> tuple[int, int]:
bit_position, v = read_unsigned_exp_golomb(buf, bit_position) bit_position, v = read_unsigned_exp_golomb(buf, bit_position)
match v%2: match v%2:
case 0: case 0:
@@ -446,6 +463,7 @@ def read_signed_exp_golomb(buf:bytes, bit_position: int) -> tuple[int, bool]:
case 1: case 1:
return bit_position, (v+1)>>1 return bit_position, (v+1)>>1
@typechecked
def write_bit(buf:bytes, bit_position: int, b) -> int: def write_bit(buf:bytes, bit_position: int, b) -> int:
buf_length = len(buf) buf_length = len(buf)
byte_position = floor(bit_position/8) byte_position = floor(bit_position/8)
@@ -459,6 +477,7 @@ def write_bit(buf:bytes, bit_position: int, b) -> int:
return bit_position return bit_position
@typechecked
def write_boolean(buf:bytes, bit_position: int, b: bool) -> int: def write_boolean(buf:bytes, bit_position: int, b: bool) -> int:
if b: if b:
bit_position = write_bit(buf, bit_position, 1) bit_position = write_bit(buf, bit_position, 1)
@@ -466,6 +485,7 @@ def write_boolean(buf:bytes, bit_position: int, b: bool) -> int:
bit_position = write_bit(buf, bit_position, 0) bit_position = write_bit(buf, bit_position, 0)
return bit_position return bit_position
@typechecked
def write_bits(buf:bytes, bit_position: int, v, size) -> int: def write_bits(buf:bytes, bit_position: int, v, size) -> int:
for i in range(size-1,-1,-1): for i in range(size-1,-1,-1):
b = (v>>i)&1 b = (v>>i)&1
@@ -473,18 +493,22 @@ def write_bits(buf:bytes, bit_position: int, v, size) -> int:
return bit_position return bit_position
@typechecked
def write_byte(buf:bytes, bit_position: int, v) -> int: def write_byte(buf:bytes, bit_position: int, v) -> int:
bit_position = write_bits(buf, bit_position, v, 8) bit_position = write_bits(buf, bit_position, v, 8)
return bit_position return bit_position
@typechecked
def write_word(buf:bytes, bit_position: int, v) -> int: def write_word(buf:bytes, bit_position: int, v) -> int:
bit_position = write_bits(buf, bit_position, v, 16) bit_position = write_bits(buf, bit_position, v, 16)
return bit_position return bit_position
@typechecked
def write_long(buf:bytes, bit_position: int, v) -> int: def write_long(buf:bytes, bit_position: int, v) -> int:
bit_position = write_bits(buf, bit_position, v, 32) bit_position = write_bits(buf, bit_position, v, 32)
return bit_position return bit_position
@typechecked
def write_unsigned_exp_golomb(buf:bytes, bit_position: int, v) -> int: def write_unsigned_exp_golomb(buf:bytes, bit_position: int, v) -> int:
n = floor(log(v+1)/log(2))+1 n = floor(log(v+1)/log(2))+1
# Write zeroes # Write zeroes
@@ -494,6 +518,7 @@ def write_unsigned_exp_golomb(buf:bytes, bit_position: int, v) -> int:
return bit_position return bit_position
@typechecked
def write_signed_exp_golomb(buf:bytes, bit_position: int, v) -> int: def write_signed_exp_golomb(buf:bytes, bit_position: int, v) -> int:
if v <= 0: if v <= 0:
bit_position = write_unsigned_exp_golomb(buf, bit_position, -v*2) bit_position = write_unsigned_exp_golomb(buf, bit_position, -v*2)
@@ -502,8 +527,8 @@ def write_signed_exp_golomb(buf:bytes, bit_position: int, v) -> int:
return bit_position return bit_position
@typechecked
def parse_rbsp_trailing_bits(buf:bytes, bit_position) -> int: def parse_rbsp_trailing_bits(buf:bytes, bit_position: int) -> int:
bit_position, one = read_bit(buf, bit_position) bit_position, one = read_bit(buf, bit_position)
if one==0: if one==0:
raise ValueError(f'Stop bit should be equal to one. Read: {one:d}') raise ValueError(f'Stop bit should be equal to one. Read: {one:d}')
@@ -514,6 +539,7 @@ def parse_rbsp_trailing_bits(buf:bytes, bit_position) -> int:
return bit_position return bit_position
@typechecked
def write_rbsp_trailing_bits(buf:bytes, bit_position: int) -> int: def write_rbsp_trailing_bits(buf:bytes, bit_position: int) -> int:
bit_position = write_bit(buf, bit_position, 1) bit_position = write_bit(buf, bit_position, 1)
while bit_position%8 != 0: while bit_position%8 != 0:
@@ -521,6 +547,7 @@ def write_rbsp_trailing_bits(buf:bytes, bit_position: int) -> int:
return bit_position return bit_position
@typechecked
def more_rbsp_data(buf:bytes, bit_position: int) -> bool: def more_rbsp_data(buf:bytes, bit_position: int) -> bool:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.debug('Is there more data in buffer of length: %d at bit position: %d', logger.debug('Is there more data in buffer of length: %d at bit position: %d',
@@ -550,7 +577,8 @@ def more_rbsp_data(buf:bytes, bit_position: int) -> bool:
return True return True
# Convert from RBSP (Raw Byte Sequence Payload) to SODB (String Of Data Bits) # Convert from RBSP (Raw Byte Sequence Payload) to SODB (String Of Data Bits)
def rbsp_to_sodb(buf:bytes): @typechecked
def rbsp_to_sodb(buf:bytes) -> bytes:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.debug('RBSP: %s', hexdump.dump(buf, sep=':')) logger.debug('RBSP: %s', hexdump.dump(buf, sep=':'))
@@ -565,7 +593,8 @@ def rbsp_to_sodb(buf:bytes):
return res return res
# Reverse operation SODB to RBSP. # Reverse operation SODB to RBSP.
def sodb_to_rbsp(buf:bytes): @typechecked
def sodb_to_rbsp(buf:bytes) -> bytes:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.debug('SODB: %s', hexdump.dump(buf, sep=':')) logger.debug('SODB: %s', hexdump.dump(buf, sep=':'))
@@ -579,7 +608,8 @@ def sodb_to_rbsp(buf:bytes):
return res return res
# Useful for SPS and PPS # Useful for SPS and PPS
def parse_scaling_list(buf:bytes, bit_position: int, size): @typechecked
def parse_scaling_list(buf:bytes, bit_position: int, size) -> tuple[int,list[int]]:
res = [] res = []
last_scale = 8 last_scale = 8
next_scale = 8 next_scale = 8
@@ -596,7 +626,9 @@ def parse_scaling_list(buf:bytes, bit_position: int, size):
# TODO: test optimized version. # TODO: test optimized version.
# The ISO/IEC H.264-201602 seems to take into account the case where the end of the deltas list # The ISO/IEC H.264-201602 seems to take into account the case where the end of the deltas list
# is full of zeroes. # is full of zeroes.
def write_scaling_list(buf:bytes, bit_position: int, size, matrix, optimized: bool = False): @typechecked
def write_scaling_list(buf:bytes, bit_position: int, size, matrix:list[int],
optimized: bool = False) -> int:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.debug('Dumping matrix: %s of size: %d, size parameter: %d.', matrix, len(matrix), size) logger.debug('Dumping matrix: %s of size: %d, size parameter: %d.', matrix, len(matrix), size)
@@ -1083,7 +1115,7 @@ class SPS:
bit_position = write_unsigned_exp_golomb(buf, bit_position, self.max_num_ref_frames) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.max_num_ref_frames)
bit_position = write_boolean(buf, bit_position, self.gaps_in_frame_num_value_allowed_flag) bit_position = write_boolean(buf, bit_position, self.gaps_in_frame_num_value_allowed_flag)
bit_position = write_unsigned_exp_golomb(buf, bit_position, self.pic_width_in_mbs_minus1) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.pic_width_in_mbs_minus1)
bit_position = write_unsigned_exp_golomb(buf, bit_position, bit_position = write_unsigned_exp_golomb(buf, bit_position,
self.pic_height_in_map_units_minus1) self.pic_height_in_map_units_minus1)
bit_position = write_boolean(buf, bit_position, self.frame_mbs_only_flag) bit_position = write_boolean(buf, bit_position, self.frame_mbs_only_flag)
if not self.frame_mbs_only_flag: if not self.frame_mbs_only_flag:
@@ -1123,7 +1155,7 @@ class PPS:
pic_size_in_map_units_minus1:int=0 pic_size_in_map_units_minus1:int=0
slice_group_id:dict = field(default_factory=dict) slice_group_id:dict = field(default_factory=dict)
num_ref_idx_l0_default_active_minus1:int=0 num_ref_idx_l0_default_active_minus1:int=0
num_ref_idx_l1_default_active_minus1:int=0 num_ref_idx_l2_default_active_minus1:int=0
weighted_pred_flag:bool=False weighted_pred_flag:bool=False
weighted_bipred_idc:int=0 weighted_bipred_idc:int=0
pic_init_qp_minus26:int=0 pic_init_qp_minus26:int=0
@@ -1264,7 +1296,7 @@ class PPS:
v = self.bottom_right[i] v = self.bottom_right[i]
bit_position = write_unsigned_exp_golomb(buf, bit_position, v) bit_position = write_unsigned_exp_golomb(buf, bit_position, v)
elif self.slice_group_map_type in [3,4,5]: elif self.slice_group_map_type in [3,4,5]:
bit_position = write_boolean(buf, bit_position, bit_position = write_boolean(buf, bit_position,
self.slice_group_change_direction_flag) self.slice_group_change_direction_flag)
bit_position = write_unsigned_exp_golomb(buf, bit_position, bit_position = write_unsigned_exp_golomb(buf, bit_position,
self.slice_group_change_rate_minus1) self.slice_group_change_rate_minus1)
@@ -1310,7 +1342,7 @@ class PPS:
else: else:
logger.info("Writing matrix: %s", matrix) logger.info("Writing matrix: %s", matrix)
bit_position = write_scaling_list(buf, bit_position, 64, matrix) bit_position = write_scaling_list(buf, bit_position, 64, matrix)
bit_position = write_signed_exp_golomb(buf, bit_position, bit_position = write_signed_exp_golomb(buf, bit_position,
self.second_chroma_qp_index_offset) self.second_chroma_qp_index_offset)
bit_position = write_rbsp_trailing_bits(buf, bit_position) bit_position = write_rbsp_trailing_bits(buf, bit_position)
@@ -1434,8 +1466,7 @@ class AVCDecoderConfiguration:
bit_position = write_bits(buf, bit_position, self.length_size_minus_one, 2) bit_position = write_bits(buf, bit_position, self.length_size_minus_one, 2)
bit_position = write_bits(buf, bit_position, 0b111, 3) bit_position = write_bits(buf, bit_position, 0b111, 3)
bit_position = write_bits(buf, bit_position, self.num_of_sequence_parameter_sets, 5) bit_position = write_bits(buf, bit_position, self.num_of_sequence_parameter_sets, 5)
for spsid in self.sps: for spsid, sps in self.sps.items():
sps = self.sps[spsid]
sodb = sps.to_bytes() sodb = sps.to_bytes()
sodb_length = len(sodb) sodb_length = len(sodb)
rbsp = sodb_to_rbsp(sodb) rbsp = sodb_to_rbsp(sodb)
@@ -1450,11 +1481,10 @@ class AVCDecoderConfiguration:
logger.debug('2. Buffer: %s', hexdump.dump(buf, sep=':')) logger.debug('2. Buffer: %s', hexdump.dump(buf, sep=':'))
bit_position = write_byte(buf, bit_position, self.num_of_picture_parameter_sets) bit_position = write_byte(buf, bit_position, self.num_of_picture_parameter_sets)
for ppsid in self.pps: for ppsid, lpps in self.pps.items():
logger.debug('Writing PPS: %d', ppsid) logger.debug('Writing PPS: %d', ppsid)
pps = self.pps[ppsid]
# TODO: does chroma_format should come from self ? # TODO: does chroma_format should come from self ?
sodb = pps.to_bytes(self.chroma_format) sodb = lpps.to_bytes(self.chroma_format)
sodb_length = len(sodb) sodb_length = len(sodb)
rbsp = sodb_to_rbsp(sodb) rbsp = sodb_to_rbsp(sodb)
rbsp_length = len(rbsp) rbsp_length = len(rbsp)
@@ -1531,6 +1561,7 @@ class AVCDecoderConfiguration:
# TODO: do the same with extended SPS ! # TODO: do the same with extended SPS !
@typechecked
def parse_codec_private(codec_private_data: bytes) -> AVCDecoderConfiguration: def parse_codec_private(codec_private_data: bytes) -> AVCDecoderConfiguration:
if codec_private_data[0] != 0x63: if codec_private_data[0] != 0x63:
raise ValueError(f'Matroska header is wrong: {codec_private_data[0]:x}') raise ValueError(f'Matroska header is wrong: {codec_private_data[0]:x}')
@@ -1554,7 +1585,8 @@ def parse_codec_private(codec_private_data: bytes) -> AVCDecoderConfiguration:
return avcconfig return avcconfig
def get_avc_config_from_h264(input_file: IO[bytes]): @typechecked
def get_avc_config_from_h264(input_file: IO[bytes]) -> AVCDecoderConfiguration:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# TODO: improve this ... # TODO: improve this ...
@@ -1594,13 +1626,15 @@ def get_avc_config_from_h264(input_file: IO[bytes]):
return avcconfig return avcconfig
# Unused ? # Unused ?
def get_codec_private_data_from_h264(input_file: IO[bytes]): @typechecked
def get_codec_private_data_from_h264(input_file: IO[bytes]) -> AVCDecoderConfiguration:
avcconfig = get_avc_config_from_h264(input_file) avcconfig = get_avc_config_from_h264(input_file)
res = dump_codec_private_data(avcconfig) res = dump_codec_private_data(avcconfig)
return res return res
def parse_mkv_tree(mkvinfo_path:str, input_file: IO[bytes]): @typechecked
def parse_mkv_tree(mkvinfo_path:str, input_file: IO[bytes]) -> dict[str,tuple[int,int]]:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
infd = input_file.fileno() infd = input_file.fileno()
@@ -1680,8 +1714,8 @@ def parse_mkv_tree(mkvinfo_path:str, input_file: IO[bytes]):
# 0000 0001 xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx # 0000 0001 xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx
# value 0 to 2^56-2 # value 0 to 2^56-2
@typechecked
def get_ebml_length(length): def get_ebml_length(length:int) -> bytes|None:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
if 0 <= length <= 2**7-2: if 0 <= length <= 2**7-2:
@@ -1711,7 +1745,7 @@ def get_ebml_length(length):
res = (encoded_length).to_bytes(size, byteorder='big') res = (encoded_length).to_bytes(size, byteorder='big')
return res return res
@typechecked
def dump_codec_private_data(avc_decoder_configuration: AVCDecoderConfiguration) -> bytearray: def dump_codec_private_data(avc_decoder_configuration: AVCDecoderConfiguration) -> bytearray:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Rebuild a Matroska Codec Private Element # Rebuild a Matroska Codec Private Element
@@ -1729,8 +1763,8 @@ def dump_codec_private_data(avc_decoder_configuration: AVCDecoderConfiguration)
return res return res
@typechecked
def change_ebml_element_size(input_file: IO[bytes], position:int, addendum): def change_ebml_element_size(input_file: IO[bytes], position:int, addendum:int) -> int:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
initial_position = position initial_position = position
@@ -1822,7 +1856,8 @@ def change_ebml_element_size(input_file: IO[bytes], position:int, addendum):
# We return the potential increase in size of the file if the length field had to be increased. # We return the potential increase in size of the file if the length field had to be increased.
return delta return delta
def change_codec_private_data(mkvinfo, input_file: IO[bytes], codec_data): @typechecked
def change_codec_private_data(mkvinfo_path:str, input_file: IO[bytes], codec_data:bytes) -> None:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
infd = input_file.fileno() infd = input_file.fileno()
@@ -1830,7 +1865,7 @@ def change_codec_private_data(mkvinfo, input_file: IO[bytes], codec_data):
current_length = fstat(infd).st_size current_length = fstat(infd).st_size
logger.info('Current size of file: %d', current_length) logger.info('Current size of file: %d', current_length)
position, current_data = get_codec_private_data_from_mkv(mkvinfo, input_file) position, current_data = get_codec_private_data_from_mkv(mkvinfo_path, input_file)
current_data_length = len(current_data) current_data_length = len(current_data)
future_length = current_length - current_data_length + len(codec_data) future_length = current_length - current_data_length + len(codec_data)
logger.info('Expected size of file: %d', future_length) logger.info('Expected size of file: %d', future_length)
@@ -1838,11 +1873,10 @@ def change_codec_private_data(mkvinfo, input_file: IO[bytes], codec_data):
logger.info('Current data at position %d: %s', position, hexdump.dump(current_data, sep=":")) logger.info('Current data at position %d: %s', position, hexdump.dump(current_data, sep=":"))
logger.info('Future data: %s', hexdump.dump(codec_data, sep=":")) logger.info('Future data: %s', hexdump.dump(codec_data, sep=":"))
elements = parse_mkv_tree(mkvinfo, input_file) elements = parse_mkv_tree(mkvinfo_path, input_file)
found = False found = False
for key in elements: for key, (pos,size) in elements.items():
pos, size = elements[key]
if pos == position: if pos == position:
logger.info('Codec private data key: %s', key) logger.info('Codec private data key: %s', key)
found = True found = True
@@ -1893,7 +1927,8 @@ def change_codec_private_data(mkvinfo, input_file: IO[bytes], codec_data):
# been resized). # been resized).
delta+=change_ebml_element_size(input_file, pos, delta) delta+=change_ebml_element_size(input_file, pos, delta)
def get_format(ffprobe_path:str, input_file: IO[bytes]): @typechecked
def get_format(ffprobe_path:str, input_file: IO[bytes]) -> dict|None:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
infd = input_file.fileno() infd = input_file.fileno()
@@ -1910,7 +1945,7 @@ def get_format(ffprobe_path:str, input_file: IO[bytes]):
return None return None
@typechecked
def get_movie_duration(ffprobe_path:str, input_file: IO[bytes]) -> timedelta|None: def get_movie_duration(ffprobe_path:str, input_file: IO[bytes]) -> timedelta|None:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -1931,6 +1966,7 @@ def get_movie_duration(ffprobe_path:str, input_file: IO[bytes]) -> timedelta|Non
return None return None
# ffprobe -loglevel quiet -select_streams v:0 -show_entries stream=width,height -of json sample.ts # ffprobe -loglevel quiet -select_streams v:0 -show_entries stream=width,height -of json sample.ts
@typechecked
def get_video_dimensions(ffprobe_path:str, input_file: IO[bytes]) -> tuple[int,int]: def get_video_dimensions(ffprobe_path:str, input_file: IO[bytes]) -> tuple[int,int]:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -1950,8 +1986,8 @@ def get_video_dimensions(ffprobe_path:str, input_file: IO[bytes]) -> tuple[int,i
logger.error('Impossible to retrieve dimensions of video') logger.error('Impossible to retrieve dimensions of video')
exit(-1) exit(-1)
@typechecked
def get_streams(ffprobe_path:str, input_file: IO[bytes]) -> dict|None: def get_streams(ffprobe_path:str, input_file: IO[bytes]) -> list|None:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
infd = input_file.fileno() infd = input_file.fileno()
@@ -1968,6 +2004,7 @@ def get_streams(ffprobe_path:str, input_file: IO[bytes]) -> dict|None:
return None return None
@typechecked
def with_subtitles(ffprobe_path:str, input_file: IO[bytes]) -> bool: def with_subtitles(ffprobe_path:str, input_file: IO[bytes]) -> bool:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -1988,6 +2025,7 @@ def with_subtitles(ffprobe_path:str, input_file: IO[bytes]) -> bool:
return False return False
@typechecked
def parse_timestamp(ts:str) -> timedelta|None: def parse_timestamp(ts:str) -> timedelta|None:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -2025,10 +2063,11 @@ def parse_timestamp(ts:str) -> timedelta|None:
if us < 0 or us > 1000000: if us < 0 or us > 1000000:
logger.error("milliseconds must be in [0,1000000[") logger.error("milliseconds must be in [0,1000000[")
return None return None
ts = timedelta(hours=hour, minutes=minute, seconds=second, microseconds=us) res = timedelta(hours=hour, minutes=minute, seconds=second, microseconds=us)
return ts return res
@typechecked
def parse_time_interval(interval:str) -> tuple[timedelta,timedelta]: def parse_time_interval(interval:str) -> tuple[timedelta,timedelta]:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -2102,6 +2141,7 @@ def parse_time_interval(interval:str) -> tuple[timedelta,timedelta]:
return (ts1, ts2) return (ts1, ts2)
@typechecked
def compare_time_interval(interval1: tuple[timedelta, timedelta], def compare_time_interval(interval1: tuple[timedelta, timedelta],
interval2: tuple[timedelta, timedelta]) -> int: interval2: tuple[timedelta, timedelta]) -> int:
ts11,ts12 = interval1 ts11,ts12 = interval1
@@ -2114,6 +2154,7 @@ def compare_time_interval(interval1: tuple[timedelta, timedelta],
else: else:
return 0 return 0
@typechecked
def ffmpeg_convert(ffmpeg_path:str, ffprobe_path:str, input_file: IO[bytes], input_format:str, def ffmpeg_convert(ffmpeg_path:str, ffprobe_path:str, input_file: IO[bytes], input_format:str,
output_file: IO[bytes], output_format:str, duration: timedelta): output_file: IO[bytes], output_format:str, duration: timedelta):
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -2159,6 +2200,7 @@ def ffmpeg_convert(ffmpeg_path:str, ffprobe_path:str, input_file: IO[bytes], inp
if status != 0: if status != 0:
logger.error('Conversion failed with status code: %d', status) logger.error('Conversion failed with status code: %d', status)
@typechecked
def get_ts_frame(frame: dict) -> timedelta|None: def get_ts_frame(frame: dict) -> timedelta|None:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -2173,6 +2215,7 @@ def get_ts_frame(frame: dict) -> timedelta|None:
ts = timedelta(seconds=pts_time) ts = timedelta(seconds=pts_time)
return ts return ts
@typechecked
def get_packet_duration(packet: dict) -> int: def get_packet_duration(packet: dict) -> int:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -2186,15 +2229,16 @@ def get_packet_duration(packet: dict) -> int:
return duration return duration
def get_frames_in_stream(ffprobe_path: str, input_file: IO[bytes], begin:str, end:str, @typechecked
stream_kind:str, sub_stream_id:int=0) -> list[timedelta]|None: def get_frames_in_stream(ffprobe_path: str, input_file: IO[bytes], begin:timedelta, end:timedelta,
stream_kind:str, sub_stream_id:int=0) -> list[dict]|None:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
infd = input_file.fileno() infd = input_file.fileno()
set_inheritable(infd, True) set_inheritable(infd, True)
command = [ffprobe_path, '-loglevel', 'quiet', '-read_intervals', f'{begin}%{end}','-show_entries', command = [ffprobe_path, '-loglevel', 'quiet', '-read_intervals', f'{begin}%{end}',
'frame', '-select_streams', f'{stream_kind}:{sub_stream_id:d}','-of', 'json', '-show_entries', 'frame', '-select_streams',
f'/proc/self/fd/{infd:d}'] f'{stream_kind}:{sub_stream_id:d}','-of', 'json', f'/proc/self/fd/{infd:d}']
logger.debug('Executing: %s', command) logger.debug('Executing: %s', command)
with Popen(command, stdout=PIPE, close_fds=False) as ffprobe: with Popen(command, stdout=PIPE, close_fds=False) as ffprobe:
@@ -2219,15 +2263,15 @@ def get_frames_in_stream(ffprobe_path: str, input_file: IO[bytes], begin:str, en
res = [] res = []
for ts in sorted(tmp): for ts in sorted(tmp):
res.append(tmp[ts]) res.append(tmp[ts])
return res return res
else: else:
logger.error('Impossible to retrieve frames inside file around [%s,%s]', begin, end) logger.error('Impossible to retrieve frames inside file around [%s,%s]', begin, end)
return None return None
# TODO: Finish implementation of this function and use it. # TODO: Finish implementation of this function and use it.
def get_nearest_idr_frame(ffprobe_path: str, input_file: IO[bytes], timestamp, before: bool=True, @typechecked
delta: timedelta=timedelta(seconds=2)): def get_nearest_idr_frame(ffprobe_path: str, input_file: IO[bytes], timestamp:timedelta,
before: bool=True, delta: timedelta=timedelta(seconds=2)):
# pylint: disable=W0613 # pylint: disable=W0613
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -2269,9 +2313,10 @@ def get_nearest_idr_frame(ffprobe_path: str, input_file: IO[bytes], timestamp, b
return None return None
def get_nearest_iframe(ffprobe_path:str, input_file: IO[bytes], timestamp:timedelta, @typechecked
before:bool=True, def get_nearest_iframe(ffprobe_path:str, input_file: IO[bytes],
delta_max:timedelta=timedelta(seconds=15))-> tuple[int,list[dict]]: timestamp:timedelta, before:bool=True,
delta_max:timedelta=timedelta(seconds=15))-> tuple[int,dict]:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
infd = input_file.fileno() infd = input_file.fileno()
@@ -2350,8 +2395,9 @@ def get_nearest_iframe(ffprobe_path:str, input_file: IO[bytes], timestamp:timede
return(nb_frames, iframe) return(nb_frames, iframe)
@typechecked
def extract_mkv_part(mkvmerge_path:str, input_file:IO[bytes], output_file:IO[bytes], def extract_mkv_part(mkvmerge_path:str, input_file:IO[bytes], output_file:IO[bytes],
begin:str, end:str) -> None: begin:timedelta, end:timedelta) -> None:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.info('Extract video between I-frames at %s and %s', begin,end) logger.info('Extract video between I-frames at %s and %s', begin,end)
@@ -2392,8 +2438,9 @@ def extract_mkv_part(mkvmerge_path:str, input_file:IO[bytes], output_file:IO[byt
elif status == 2: elif status == 2:
logger.error('Extraction returns errors') logger.error('Extraction returns errors')
def extract_pictures(ffmpeg_path:str, input_file:IO[bytes], begin:str, nb_frames:int, @typechecked
width:int=640, height:int=480) -> tuple[bytes,int]: def extract_pictures(ffmpeg_path:str, input_file:IO[bytes], begin:timedelta, nb_frames:int,
width:int=640, height:int=480) -> tuple[bytes,int]|tuple[None,None]:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
infd = input_file.fileno() infd = input_file.fileno()
@@ -2429,9 +2476,11 @@ def extract_pictures(ffmpeg_path:str, input_file:IO[bytes], begin:str, nb_frames
lseek(outfd, 0, SEEK_SET) lseek(outfd, 0, SEEK_SET)
return images, outfd return images, outfd
def extract_sound(ffmpeg_path:str, input_file: IO[bytes], begin:str, output_filename:str, @typechecked
def extract_sound(ffmpeg_path:str, input_file: IO[bytes], begin:timedelta, output_filename:str,
packet_duration:int, sub_channel:int=0, packet_duration:int, sub_channel:int=0,
nb_packets:int=0, sample_rate:int=48000, nb_channels:int=2) -> tuple[bytes,int]: nb_packets:int=0, sample_rate:int=48000,
nb_channels:int=2) -> tuple[bytes,int]|tuple[None,None]:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
outfd = memfd_create(output_filename, flags=0) outfd = memfd_create(output_filename, flags=0)
@@ -2465,7 +2514,8 @@ def extract_sound(ffmpeg_path:str, input_file: IO[bytes], begin:str, output_file
return sound, outfd return sound, outfd
def dump_ppm(pictures: list, prefix: str, temporaries: list[IO[bytes]]) -> None: @typechecked
def dump_ppm(pictures: bytes, prefix: str, temporaries: list[IO[bytes]]) -> None:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# "P6\nWIDTH HEIGHT\n255\n" # "P6\nWIDTH HEIGHT\n255\n"
@@ -2499,7 +2549,7 @@ def dump_ppm(pictures: list, prefix: str, temporaries: list[IO[bytes]]) -> None:
header_len=2+1+ceil(log(width, 10))+1+ceil(log(height, 10))+1+3+1 header_len=2+1+ceil(log(width, 10))+1+ceil(log(height, 10))+1+3+1
try: try:
with open(filename, 'w', encoding='utf8') as out: with open(filename, 'wb') as out:
temporaries.append(out) temporaries.append(out)
outfd = out.fileno() outfd = out.fileno()
length=header_len+3*width*height length=header_len+3*width*height
@@ -2511,10 +2561,10 @@ def dump_ppm(pictures: list, prefix: str, temporaries: list[IO[bytes]]) -> None:
except IOError: except IOError:
logger.error('Impossible to create file: %s', filename) logger.error('Impossible to create file: %s', filename)
@typechecked
def extract_all_streams(ffmpeg_path: str, ffprobe_path: str, input_file: str, begin, end, streams, def extract_all_streams(ffmpeg_path:str, ffprobe_path:str, input_file:IO[bytes], begin:timedelta,
files_prefix, nb_frames: int, framerate: float, width: int, height:int, end:timedelta, streams, files_prefix, nb_frames:int, framerate:float,
temporaries, dump_mem_fd=False): width:int, height:int, temporaries, dump_mem_fd:bool=False):
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# The command line for encoding only video track # The command line for encoding only video track
@@ -2553,7 +2603,7 @@ def extract_all_streams(ffmpeg_path: str, ffprobe_path: str, input_file: str, be
interlaced_options = ['-top', '1', f'-flags:v:{video_id:d}', '+ilme+ildct', interlaced_options = ['-top', '1', f'-flags:v:{video_id:d}', '+ilme+ildct',
'-field_order', '1'] '-field_order', '1']
case 'bb': case 'bb':
interlaced_options = ['-top', '0', f'-flags:v:{video_id:d}', '+ilme+ildct', interlaced_options = ['-top', '0', f'-flags:v:{video_id:d}', '+ilme+ildct',
'-field_order','2'] '-field_order','2']
case 'tb': case 'tb':
interlaced_options = ['-top', '1', f'-flags:v:{video_id:d}', '+ilme+ildct', interlaced_options = ['-top', '1', f'-flags:v:{video_id:d}', '+ilme+ildct',
@@ -2563,7 +2613,7 @@ def extract_all_streams(ffmpeg_path: str, ffprobe_path: str, input_file: str, be
'-field_order', '4'] '-field_order', '4']
case _: case _:
interlaced_options = [] interlaced_options = []
# ======================================= # # ======================================= #
# TODO: adjust SAR and DAR # TODO: adjust SAR and DAR
# https://superuser.com/questions/907933/correct-aspect-ratio-without-re-encoding-video-file # https://superuser.com/questions/907933/correct-aspect-ratio-without-re-encoding-video-file
@@ -2574,8 +2624,8 @@ def extract_all_streams(ffmpeg_path: str, ffprobe_path: str, input_file: str, be
logger.warning('Missing treatment for chroma location: %s', chroma_location) logger.warning('Missing treatment for chroma location: %s', chroma_location)
codec = stream['codec_name'] codec = stream['codec_name']
images_bytes, memfd = extract_pictures(ffmpeg_path, input_file=input_file, images_bytes, memfd = extract_pictures(ffmpeg_path, input_file=input_file,
begin=begin, nb_frames=nb_frames, width=width, begin=begin, nb_frames=nb_frames,
height=height) width=width, height=height)
if images_bytes is None: if images_bytes is None:
logger.error('Impossible to extract picture from video stream.') logger.error('Impossible to extract picture from video stream.')
exit(-1) exit(-1)
@@ -2639,7 +2689,7 @@ def extract_all_streams(ffmpeg_path: str, ffprobe_path: str, input_file: str, be
if dump_mem_fd: if dump_mem_fd:
try: try:
with open(tmpname,'w', encoding='utf8') as output: with open(tmpname,'wb') as output:
temporaries.append(output) temporaries.append(output)
outfd = output.fileno() outfd = output.fileno()
pos = 0 pos = 0
@@ -2664,7 +2714,7 @@ def extract_all_streams(ffmpeg_path: str, ffprobe_path: str, input_file: str, be
generic_input_params.extend(['-i', './empty.idx']) generic_input_params.extend(['-i', './empty.idx'])
if 'tags' in stream: if 'tags' in stream:
if 'language' in stream['tags']: if 'language' in stream['tags']:
generic_codec_params.extend([f'-metadata:s:s:{subtitle_id:d}', generic_codec_params.extend([f'-metadata:s:s:{subtitle_id:d}',
f"language={stream['tags']['language']}"]) f"language={stream['tags']['language']}"])
generic_codec_params.extend([f'-c:s:{subtitle_id:d}', 'copy']) generic_codec_params.extend([f'-c:s:{subtitle_id:d}', 'copy'])
subtitle_id=subtitle_id+1 subtitle_id=subtitle_id+1
@@ -2755,13 +2805,14 @@ def extract_all_streams(ffmpeg_path: str, ffprobe_path: str, input_file: str, be
return None, None return None, None
# Merge a list of mkv files passed as input, and produce a new MKV as output # Merge a list of mkv files passed as input, and produce a new MKV as output
def merge_mkvs(mkvmerge_path:str, inputs, output_name:str, concatenate: bool =True, @typechecked
timestamps=None) -> IO[bytes]: def merge_mkvs(mkvmerge_path:str, inputs: list[IO[bytes]], output_name:str,
concatenate: bool=True, timestamps: dict[int, IO[str]]={}) -> IO[bytes]|None:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
fds = [] fds = []
try: try:
out = open(output_name, 'w+', encoding='utf8') out = open(output_name, 'wb+')
except IOError: except IOError:
logger.error('Impossible to create file: %s', output_name) logger.error('Impossible to create file: %s', output_name)
return None return None
@@ -2784,7 +2835,7 @@ def merge_mkvs(mkvmerge_path:str, inputs, output_name:str, concatenate: bool =Tr
fds.append(fd) fds.append(fd)
set_inheritable(fd, True) set_inheritable(fd, True)
# If we pass a timestamps file associated with the considered track, use it. # If we pass a timestamps file associated with the considered track, use it.
if timestamps is not None and partnum in timestamps: if partnum in timestamps:
tsfd = timestamps[partnum].fileno() tsfd = timestamps[partnum].fileno()
lseek(tsfd, 0, SEEK_SET) lseek(tsfd, 0, SEEK_SET)
fds.append(tsfd) fds.append(tsfd)
@@ -2840,8 +2891,9 @@ def find_subtitles_tracks(ffprobe_path:str, input_file: IO[bytes]) -> dict|None:
lseek(infd, 0, SEEK_SET) lseek(infd, 0, SEEK_SET)
set_inheritable(infd, True) set_inheritable(infd, True)
command = [ffprobe_path, '-loglevel','quiet', '-i', f'/proc/self/fd/{infd:d}', '-select_streams', command = [ffprobe_path, '-loglevel','quiet', '-i', f'/proc/self/fd/{infd:d}',
's', '-show_entries', 'stream=index:stream_tags=language', '-of', 'json'] '-select_streams', 's', '-show_entries', 'stream=index:stream_tags=language',
'-of', 'json']
logger.debug('Executing: %s', command) logger.debug('Executing: %s', command)
with Popen(command, stdout=PIPE, close_fds=False) as ffprobe: with Popen(command, stdout=PIPE, close_fds=False) as ffprobe:
@@ -2855,6 +2907,7 @@ def find_subtitles_tracks(ffprobe_path:str, input_file: IO[bytes]) -> dict|None:
ffprobe.wait() ffprobe.wait()
return None return None
@typechecked
def extract_track_from_mkv(mkvextract_path: str, input_file: IO[bytes], index, def extract_track_from_mkv(mkvextract_path: str, input_file: IO[bytes], index,
output_file: IO[bytes], timestamps) -> None: output_file: IO[bytes], timestamps) -> None:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -2899,6 +2952,7 @@ def extract_track_from_mkv(mkvextract_path: str, input_file: IO[bytes], index,
else: else:
logger.info('Track %d was succesfully extracted.', index) logger.info('Track %d was succesfully extracted.', index)
@typechecked
def remove_video_tracks_from_mkv(mkvmerge_path:str, input_file: IO[bytes], def remove_video_tracks_from_mkv(mkvmerge_path:str, input_file: IO[bytes],
output_file: IO[bytes]) -> None: output_file: IO[bytes]) -> None:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -2935,7 +2989,8 @@ def remove_video_tracks_from_mkv(mkvmerge_path:str, input_file: IO[bytes],
else: else:
logger.info('Video tracks were succesfully extracted.') logger.info('Video tracks were succesfully extracted.')
def remux_srt_subtitles(mkvmerge_path:str, input_file: IO[bytes], output_filename: IO[bytes], @typechecked
def remux_srt_subtitles(mkvmerge_path:str, input_file: IO[bytes], output_filename: str,
subtitles) -> None: subtitles) -> None:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -2986,7 +3041,8 @@ def remux_srt_subtitles(mkvmerge_path:str, input_file: IO[bytes], output_filenam
return None return None
def concatenate_h264_parts(h264parts, output) -> None: @typechecked
def concatenate_h264_parts(h264parts: list[IO[bytes]], output: IO[bytes]) -> None:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
total_length = 0 total_length = 0
@@ -3013,7 +3069,7 @@ def concatenate_h264_parts(h264parts, output) -> None:
pb.update(nb_bytes) pb.update(nb_bytes)
pos += nb_bytes pos += nb_bytes
def concatenate_h264_ts_parts(h264_ts_parts, output) -> None: def concatenate_h264_ts_parts(h264_ts_parts: list[IO[bytes]], output: IO[bytes]) -> None:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
header = '# timestamp format v2\n' header = '# timestamp format v2\n'
@@ -3053,7 +3109,7 @@ def do_coarse_processing(ffmpeg_path:str, ffprobe_path:str, mkvmerge_path:str,
internal_mkv_name = f'{files_prefix}.mkv' internal_mkv_name = f'{files_prefix}.mkv'
try: try:
internal_mkv = open(internal_mkv_name, 'w+', encoding='utf8') internal_mkv = open(internal_mkv_name, 'wb+')
except IOError: except IOError:
logger.error('Impossible to create file: %s', internal_mkv_name) logger.error('Impossible to create file: %s', internal_mkv_name)
exit(-1) exit(-1)
@@ -3145,7 +3201,8 @@ def main() -> None:
mkvfilename = basename+'.mkv' mkvfilename = basename+'.mkv'
try: try:
input_file = open(args.input_file, mode='r', encoding='utf8') input_file = open(args.input_file, mode='rb')
logger.debug("Type of input file: %s" % type(input_file))
except IOError: except IOError:
logger.error("Impossible to open %s", args.input_file) logger.error("Impossible to open %s", args.input_file)
exit(-1) exit(-1)
@@ -3182,13 +3239,13 @@ def main() -> None:
if format_of_file == SupportedFormat.TS: if format_of_file == SupportedFormat.TS:
logger.info("Converting TS to MP4 (to fix timestamps).") logger.info("Converting TS to MP4 (to fix timestamps).")
try: try:
with open(mp4filename, 'w+', encoding='utf8') as mp4: with open(mp4filename, 'wb+') as mp4:
ffmpeg_convert(paths['ffmpeg'], paths['ffprobe'], input_file, 'mpegts', mp4, 'mp4', ffmpeg_convert(paths['ffmpeg'], paths['ffprobe'], input_file, 'mpegts', mp4, 'mp4',
duration) duration)
temporaries.append(mp4) temporaries.append(mp4)
logger.info("Converting MP4 to MKV.") logger.info("Converting MP4 to MKV.")
try: try:
mkv = open(mkvfilename, 'w+', encoding='utf8') mkv = open(mkvfilename, 'wb+')
except IOError: except IOError:
logger.error('') logger.error('')
@@ -3202,7 +3259,7 @@ def main() -> None:
elif format_of_file == SupportedFormat.MP4: elif format_of_file == SupportedFormat.MP4:
logger.info("Converting MP4 to MKV") logger.info("Converting MP4 to MKV")
try: try:
mkv = open(mkvfilename, 'w+', encoding='utf8') mkv = open(mkvfilename, 'wb+')
except IOError: except IOError:
logger.error('') logger.error('')
ffmpeg_convert(paths['ffmpeg'], paths['ffprobe'], input_file, 'mp4', mkv, 'matroska', ffmpeg_convert(paths['ffmpeg'], paths['ffprobe'], input_file, 'mp4', mkv, 'matroska',
@@ -3238,7 +3295,8 @@ def main() -> None:
exit(-1) exit(-1)
# We retrieve the main private codec data # We retrieve the main private codec data
_, main_codec_private_data = get_codec_private_data_from_mkv(mkvinfo_path=paths['mkvinfo'], input_file=mkv) _, main_codec_private_data = get_codec_private_data_from_mkv(mkvinfo_path=paths['mkvinfo'],
input_file=mkv)
logger.debug('Main video stream has following private data: %s', logger.debug('Main video stream has following private data: %s',
hexdump.dump(main_codec_private_data, sep=':')) hexdump.dump(main_codec_private_data, sep=':'))
@@ -3260,7 +3318,8 @@ def main() -> None:
# If there exists a difference between our own reconstructed AVC configuration and the # If there exists a difference between our own reconstructed AVC configuration and the
# original one, we abandon # original one, we abandon
if iso_avc_config != main_avc_config: if iso_avc_config != main_avc_config:
logger.error('AVC configurations are different: %s\n%s\n', main_avc_config, iso_avc_config) logger.error('AVC configurations are different: %s\n%s\n', main_avc_config,
iso_avc_config)
exit(-1) exit(-1)
# Pour chaque portion # Pour chaque portion
@@ -3373,19 +3432,19 @@ def main() -> None:
internal_novideo_mkv_name = f'part-{partnum:d}-internal-novideo.mkv' internal_novideo_mkv_name = f'part-{partnum:d}-internal-novideo.mkv'
try: try:
internal_mkv = open(internal_mkv_name, 'w+', encoding='utf8') internal_mkv = open(internal_mkv_name, 'wb+')
except IOError: except IOError:
logger.error('Impossible to create file: %s', internal_mkv_name) logger.error('Impossible to create file: %s', internal_mkv_name)
exit(-1) exit(-1)
try: try:
internal_novideo_mkv = open(internal_novideo_mkv_name, 'w+', encoding='utf8') internal_novideo_mkv = open(internal_novideo_mkv_name, 'wb+')
except IOError: except IOError:
logger.error('Impossible to create file: %s', internal_novideo_mkv_name) logger.error('Impossible to create file: %s', internal_novideo_mkv_name)
exit(-1) exit(-1)
try: try:
internal_h264 = open(internal_h264_name, 'w+', encoding='utf8') internal_h264 = open(internal_h264_name, 'wb+')
except IOError: except IOError:
logger.error('Impossible to create file: %s', internal_h264_name) logger.error('Impossible to create file: %s', internal_h264_name)
exit(-1) exit(-1)
@@ -3403,8 +3462,8 @@ def main() -> None:
# Extract video stream of internal part as a raw H264 and its timestamps. # Extract video stream of internal part as a raw H264 and its timestamps.
logger.info('Extract video track as raw H264 file.') logger.info('Extract video track as raw H264 file.')
extract_track_from_mkv(mkvextract_path=paths['mkvextract'], input_file=internal_mkv, index=0, extract_track_from_mkv(mkvextract_path=paths['mkvextract'], input_file=internal_mkv,
output_file=internal_h264, timestamps=internal_h264_ts) index=0, output_file=internal_h264, timestamps=internal_h264_ts)
# Remove video track from internal part of MKV # Remove video track from internal part of MKV
logger.info('Remove video track from %s', internal_mkv_name) logger.info('Remove video track from %s', internal_mkv_name)
@@ -3443,7 +3502,8 @@ def main() -> None:
h264_ts.append(h264_tail_ts) h264_ts.append(h264_tail_ts)
logger.info('Merging MKV: %s', subparts) logger.info('Merging MKV: %s', subparts)
part = merge_mkvs(mkvmerge_path=paths['mkvmerge'], inputs=subparts,
part = merge_mkvs(mkvmerge_path=paths['mkvmerge'], inputs=subparts,
output_name=f'part-{partnum:d}.mkv', concatenate=True) output_name=f'part-{partnum:d}.mkv', concatenate=True)
mkvparts.append(part) mkvparts.append(part)
temporaries.append(part) temporaries.append(part)
@@ -3461,7 +3521,7 @@ def main() -> None:
nb_mkv_parts = len(mkvparts) nb_mkv_parts = len(mkvparts)
if nb_mkv_parts > 0: if nb_mkv_parts > 0:
try: try:
full_h264 = open(f'{basename}-full.h264', 'w+', encoding='utf8') full_h264 = open(f'{basename}-full.h264', 'wb+')
except IOError: except IOError:
logger.error('Impossible to create file full H264 stream.') logger.error('Impossible to create file full H264 stream.')
exit(-1) exit(-1)
@@ -3495,7 +3555,7 @@ def main() -> None:
if nb_mkv_parts >=1 : if nb_mkv_parts >=1 :
try: try:
final_novideo = open(final_novideo_name, 'r', encoding='utf8') final_novideo = open(final_novideo_name, 'rb')
except IOError: except IOError:
logger.error('Impossible to open file: %s.', final_novideo_name) logger.error('Impossible to open file: %s.', final_novideo_name)
exit(-1) exit(-1)
@@ -3505,11 +3565,13 @@ def main() -> None:
full_h264_ts.seek(0) full_h264_ts.seek(0)
logger.info('Merging final video track and all other tracks together') logger.info('Merging final video track and all other tracks together')
final_with_video = merge_mkvs(mkvmerge_path=paths['mkvmerge'], inputs=[full_h264, final_novideo], final_with_video = merge_mkvs(mkvmerge_path=paths['mkvmerge'], inputs=[full_h264,
final_novideo],
output_name=final_with_video_name, concatenate=False, output_name=final_with_video_name, concatenate=False,
timestamps={0: full_h264_ts}) timestamps={0: full_h264_ts})
final_codec_private_data = dump_codec_private_data(main_avc_config) final_codec_private_data = dump_codec_private_data(main_avc_config)
logger.debug('Final codec private data: %s', hexdump.dump(final_codec_private_data, sep=':')) logger.debug('Final codec private data: %s', hexdump.dump(final_codec_private_data,
sep=':'))
logger.info('Changing codec private data with the new one.') logger.info('Changing codec private data with the new one.')
change_codec_private_data(paths['mkvinfo'], final_with_video, final_codec_private_data) change_codec_private_data(paths['mkvinfo'], final_with_video, final_codec_private_data)
@@ -3545,17 +3607,18 @@ def main() -> None:
logger.info(sts) logger.info(sts)
if len(sts) > 0: if len(sts) > 0:
logger.info('Supported languages: %s' % supported_langs)
list_of_subtitles = extract_srt(paths['mkvextract'], final_with_video_name, sts, list_of_subtitles = extract_srt(paths['mkvextract'], final_with_video_name, sts,
supported_langs) supported_langs)
logger.info(list_of_subtitles) logger.info(list_of_subtitles)
for idx_name, sub_name, _, _ in list_of_subtitles: for idx_name, sub_name, _, _ in list_of_subtitles:
try: try:
idx = open(idx_name,'r', encoding='utf8') idx = open(idx_name,'rb')
except IOError: except IOError:
logger.error("Impossible to open %s.", idx_name) logger.error("Impossible to open %s.", idx_name)
exit(-1) exit(-1)
try: try:
sub = open(sub_name,'r', encoding='utf8') sub = open(sub_name,'rb')
except IOError: except IOError:
logger.error("Impossible to open %s.", sub_name) logger.error("Impossible to open %s.", sub_name)
exit(-1) exit(-1)
@@ -3563,7 +3626,8 @@ def main() -> None:
temporaries.append(idx) temporaries.append(idx)
temporaries.append(sub) temporaries.append(sub)
ocr = do_ocr(paths['vobsubocr'], list_of_subtitles, duration, temporaries, args.dump) ocr = do_ocr(paths['vobsubocr'], list_of_subtitles, duration, temporaries,
args.dump)
logger.info(ocr) logger.info(ocr)
# Remux SRT subtitles # Remux SRT subtitles

View File

@@ -4,3 +4,5 @@ coloredlogs
tqdm tqdm
iso639-lang iso639-lang
hexdump hexdump
typing
typeguard