More linting with typechecking and type annotations.
This commit is contained in:
245
removeads.py
245
removeads.py
@@ -19,6 +19,7 @@ from math import floor, ceil, log
|
||||
from io import BytesIO, TextIOWrapper
|
||||
import json
|
||||
from typing import IO
|
||||
from typeguard import typechecked
|
||||
|
||||
# Third party libraries
|
||||
import coloredlogs
|
||||
@@ -54,7 +55,8 @@ from iso639.exceptions import InvalidLanguageValue
|
||||
# Then finally, change the Private Codec Data in the final MKV.
|
||||
|
||||
|
||||
def check_required_tools() -> tuple[bool,list[str]]:
|
||||
@typechecked
|
||||
def check_required_tools() -> tuple[bool,dict[str,str]]:
|
||||
"""Check if required external tools are installed.
|
||||
|
||||
Args:
|
||||
@@ -84,7 +86,8 @@ def check_required_tools() -> tuple[bool,list[str]]:
|
||||
|
||||
return all_optional_tools, paths
|
||||
|
||||
def get_tesseract_supported_lang(tesseract_path:str) -> dict[str,str]|None:
|
||||
@typechecked
|
||||
def get_tesseract_supported_lang(tesseract_path:str) -> dict[Lang, str]|None:
|
||||
"""Returns the set of natural languages supported by Tesseract OCR tool.
|
||||
|
||||
Args:
|
||||
@@ -118,6 +121,7 @@ def get_tesseract_supported_lang(tesseract_path:str) -> dict[str,str]|None:
|
||||
|
||||
return res
|
||||
|
||||
@typechecked
|
||||
def get_frame_rate(ffprobe_path:str, input_file: IO[bytes]) -> float|None:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -181,6 +185,7 @@ def get_frame_rate(ffprobe_path:str, input_file: IO[bytes]) -> float|None:
|
||||
|
||||
return frame_rate2
|
||||
|
||||
@typechecked
|
||||
def get_subtitles_tracks(ffprobe_path:str, mkv_path: str) -> dict[str,str]|None:
|
||||
logger = logging.getLogger(__name__)
|
||||
tracks={}
|
||||
@@ -213,7 +218,9 @@ def get_subtitles_tracks(ffprobe_path:str, mkv_path: str) -> dict[str,str]|None:
|
||||
|
||||
return tracks
|
||||
|
||||
def extract_srt(mkvextract:str, filename:str, subtitles:str, langs:list[str]) -> list|None:
|
||||
@typechecked
|
||||
def extract_srt(mkvextract:str, filename:str, subtitles:dict[str, list[int]],
|
||||
langs:dict[Lang,str]) -> list|None:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
params = [mkvextract, filename, 'tracks']
|
||||
@@ -354,6 +361,7 @@ class SupportedFormat(IntEnum):
|
||||
# -report -loglevel 0 -f null -
|
||||
|
||||
# Found codec private data using mkvinfo
|
||||
@typechecked
|
||||
def get_codec_private_data_from_mkv(mkvinfo_path:str, input_file: IO[bytes]) -> tuple[int, bytes]:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -395,17 +403,19 @@ def get_codec_private_data_from_mkv(mkvinfo_path:str, input_file: IO[bytes]) ->
|
||||
# All the following code is a transposition of documents:
|
||||
# ISO/IEC H.264-201602
|
||||
# ISO/IEC 14496-15
|
||||
|
||||
@typechecked
|
||||
def read_bit(buf:bytes, bit_position: int) -> tuple[int, int]:
|
||||
byte_position = floor(floor(bit_position/8))
|
||||
byte = buf[byte_position]
|
||||
bit = (byte >> (7-(bit_position % 8))) & 1
|
||||
return bit_position+1, bit
|
||||
|
||||
@typechecked
|
||||
def read_boolean(buf:bytes, bit_position: int) -> tuple[int, bool]:
|
||||
bit_position, b = read_bit(buf, bit_position)
|
||||
return bit_position, b==1
|
||||
|
||||
@typechecked
|
||||
def read_bits(buf:bytes, bit_position: int, nb_bits: int) -> tuple[int, int]:
|
||||
v = 0
|
||||
for _ in range(0, nb_bits):
|
||||
@@ -413,19 +423,23 @@ def read_bits(buf:bytes, bit_position: int, nb_bits: int) -> tuple[int, int]:
|
||||
v = v*2+bit
|
||||
return bit_position, v
|
||||
|
||||
@typechecked
|
||||
def read_byte(buf:bytes, bit_position: int) -> tuple[int, int]:
|
||||
bit_position, b = read_bits(buf, bit_position, 8)
|
||||
return bit_position, b
|
||||
|
||||
@typechecked
|
||||
def read_word(buf:bytes, bit_position: int) -> tuple[int, int]:
|
||||
bit_position, w = read_bits(buf, bit_position, 16)
|
||||
return bit_position, w
|
||||
|
||||
@typechecked
|
||||
def read_long(buf:bytes, bit_position: int) -> tuple[int, int]:
|
||||
bit_position, value = read_bits(buf, bit_position, 32)
|
||||
return bit_position, value
|
||||
|
||||
def read_unsigned_exp_golomb(buf:bytes, bit_position: int) -> tuple[int, bool]:
|
||||
@typechecked
|
||||
def read_unsigned_exp_golomb(buf:bytes, bit_position: int) -> tuple[int, int]:
|
||||
nb_zeroes=0
|
||||
while True:
|
||||
bit_position, b = read_bit(buf, bit_position)
|
||||
@@ -438,7 +452,8 @@ def read_unsigned_exp_golomb(buf:bytes, bit_position: int) -> tuple[int, bool]:
|
||||
v = (v1<<nb_zeroes)+v2
|
||||
return bit_position, v-1
|
||||
|
||||
def read_signed_exp_golomb(buf:bytes, bit_position: int) -> tuple[int, bool]:
|
||||
@typechecked
|
||||
def read_signed_exp_golomb(buf:bytes, bit_position: int) -> tuple[int, int]:
|
||||
bit_position, v = read_unsigned_exp_golomb(buf, bit_position)
|
||||
match v%2:
|
||||
case 0:
|
||||
@@ -446,6 +461,7 @@ def read_signed_exp_golomb(buf:bytes, bit_position: int) -> tuple[int, bool]:
|
||||
case 1:
|
||||
return bit_position, (v+1)>>1
|
||||
|
||||
@typechecked
|
||||
def write_bit(buf:bytes, bit_position: int, b) -> int:
|
||||
buf_length = len(buf)
|
||||
byte_position = floor(bit_position/8)
|
||||
@@ -459,6 +475,7 @@ def write_bit(buf:bytes, bit_position: int, b) -> int:
|
||||
|
||||
return bit_position
|
||||
|
||||
@typechecked
|
||||
def write_boolean(buf:bytes, bit_position: int, b: bool) -> int:
|
||||
if b:
|
||||
bit_position = write_bit(buf, bit_position, 1)
|
||||
@@ -466,6 +483,7 @@ def write_boolean(buf:bytes, bit_position: int, b: bool) -> int:
|
||||
bit_position = write_bit(buf, bit_position, 0)
|
||||
return bit_position
|
||||
|
||||
@typechecked
|
||||
def write_bits(buf:bytes, bit_position: int, v, size) -> int:
|
||||
for i in range(size-1,-1,-1):
|
||||
b = (v>>i)&1
|
||||
@@ -473,18 +491,22 @@ def write_bits(buf:bytes, bit_position: int, v, size) -> int:
|
||||
|
||||
return bit_position
|
||||
|
||||
@typechecked
|
||||
def write_byte(buf:bytes, bit_position: int, v) -> int:
|
||||
bit_position = write_bits(buf, bit_position, v, 8)
|
||||
return bit_position
|
||||
|
||||
@typechecked
|
||||
def write_word(buf:bytes, bit_position: int, v) -> int:
|
||||
bit_position = write_bits(buf, bit_position, v, 16)
|
||||
return bit_position
|
||||
|
||||
@typechecked
|
||||
def write_long(buf:bytes, bit_position: int, v) -> int:
|
||||
bit_position = write_bits(buf, bit_position, v, 32)
|
||||
return bit_position
|
||||
|
||||
@typechecked
|
||||
def write_unsigned_exp_golomb(buf:bytes, bit_position: int, v) -> int:
|
||||
n = floor(log(v+1)/log(2))+1
|
||||
# Write zeroes
|
||||
@@ -494,6 +516,7 @@ def write_unsigned_exp_golomb(buf:bytes, bit_position: int, v) -> int:
|
||||
|
||||
return bit_position
|
||||
|
||||
@typechecked
|
||||
def write_signed_exp_golomb(buf:bytes, bit_position: int, v) -> int:
|
||||
if v <= 0:
|
||||
bit_position = write_unsigned_exp_golomb(buf, bit_position, -v*2)
|
||||
@@ -502,8 +525,8 @@ def write_signed_exp_golomb(buf:bytes, bit_position: int, v) -> int:
|
||||
|
||||
return bit_position
|
||||
|
||||
|
||||
def parse_rbsp_trailing_bits(buf:bytes, bit_position) -> int:
|
||||
@typechecked
|
||||
def parse_rbsp_trailing_bits(buf:bytes, bit_position: int) -> int:
|
||||
bit_position, one = read_bit(buf, bit_position)
|
||||
if one==0:
|
||||
raise ValueError(f'Stop bit should be equal to one. Read: {one:d}')
|
||||
@@ -514,6 +537,7 @@ def parse_rbsp_trailing_bits(buf:bytes, bit_position) -> int:
|
||||
|
||||
return bit_position
|
||||
|
||||
@typechecked
|
||||
def write_rbsp_trailing_bits(buf:bytes, bit_position: int) -> int:
|
||||
bit_position = write_bit(buf, bit_position, 1)
|
||||
while bit_position%8 != 0:
|
||||
@@ -521,6 +545,7 @@ def write_rbsp_trailing_bits(buf:bytes, bit_position: int) -> int:
|
||||
|
||||
return bit_position
|
||||
|
||||
@typechecked
|
||||
def more_rbsp_data(buf:bytes, bit_position: int) -> bool:
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.debug('Is there more data in buffer of length: %d at bit position: %d',
|
||||
@@ -550,7 +575,8 @@ def more_rbsp_data(buf:bytes, bit_position: int) -> bool:
|
||||
return True
|
||||
|
||||
# Convert from RBSP (Raw Byte Sequence Payload) to SODB (String Of Data Bits)
|
||||
def rbsp_to_sodb(buf:bytes):
|
||||
@typechecked
|
||||
def rbsp_to_sodb(buf:bytes) -> bytes:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
logger.debug('RBSP: %s', hexdump.dump(buf, sep=':'))
|
||||
@@ -565,7 +591,8 @@ def rbsp_to_sodb(buf:bytes):
|
||||
return res
|
||||
|
||||
# Reverse operation SODB to RBSP.
|
||||
def sodb_to_rbsp(buf:bytes):
|
||||
@typechecked
|
||||
def sodb_to_rbsp(buf:bytes) -> bytes:
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.debug('SODB: %s', hexdump.dump(buf, sep=':'))
|
||||
|
||||
@@ -579,7 +606,8 @@ def sodb_to_rbsp(buf:bytes):
|
||||
return res
|
||||
|
||||
# Useful for SPS and PPS
|
||||
def parse_scaling_list(buf:bytes, bit_position: int, size):
|
||||
@typechecked
|
||||
def parse_scaling_list(buf:bytes, bit_position: int, size) -> tuple[int,list[int]]:
|
||||
res = []
|
||||
last_scale = 8
|
||||
next_scale = 8
|
||||
@@ -596,7 +624,9 @@ def parse_scaling_list(buf:bytes, bit_position: int, size):
|
||||
# TODO: test optimized version.
|
||||
# The ISO/IEC H.264-201602 seems to take into account the case where the end of the deltas list
|
||||
# is full of zeroes.
|
||||
def write_scaling_list(buf:bytes, bit_position: int, size, matrix, optimized: bool = False):
|
||||
@typechecked
|
||||
def write_scaling_list(buf:bytes, bit_position: int, size, matrix,
|
||||
optimized: bool = False) -> int:
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.debug('Dumping matrix: %s of size: %d, size parameter: %d.', matrix, len(matrix), size)
|
||||
|
||||
@@ -1083,7 +1113,7 @@ class SPS:
|
||||
bit_position = write_unsigned_exp_golomb(buf, bit_position, self.max_num_ref_frames)
|
||||
bit_position = write_boolean(buf, bit_position, self.gaps_in_frame_num_value_allowed_flag)
|
||||
bit_position = write_unsigned_exp_golomb(buf, bit_position, self.pic_width_in_mbs_minus1)
|
||||
bit_position = write_unsigned_exp_golomb(buf, bit_position,
|
||||
bit_position = write_unsigned_exp_golomb(buf, bit_position,
|
||||
self.pic_height_in_map_units_minus1)
|
||||
bit_position = write_boolean(buf, bit_position, self.frame_mbs_only_flag)
|
||||
if not self.frame_mbs_only_flag:
|
||||
@@ -1123,7 +1153,7 @@ class PPS:
|
||||
pic_size_in_map_units_minus1:int=0
|
||||
slice_group_id:dict = field(default_factory=dict)
|
||||
num_ref_idx_l0_default_active_minus1:int=0
|
||||
num_ref_idx_l1_default_active_minus1:int=0
|
||||
num_ref_idx_l2_default_active_minus1:int=0
|
||||
weighted_pred_flag:bool=False
|
||||
weighted_bipred_idc:int=0
|
||||
pic_init_qp_minus26:int=0
|
||||
@@ -1264,7 +1294,7 @@ class PPS:
|
||||
v = self.bottom_right[i]
|
||||
bit_position = write_unsigned_exp_golomb(buf, bit_position, v)
|
||||
elif self.slice_group_map_type in [3,4,5]:
|
||||
bit_position = write_boolean(buf, bit_position,
|
||||
bit_position = write_boolean(buf, bit_position,
|
||||
self.slice_group_change_direction_flag)
|
||||
bit_position = write_unsigned_exp_golomb(buf, bit_position,
|
||||
self.slice_group_change_rate_minus1)
|
||||
@@ -1310,7 +1340,7 @@ class PPS:
|
||||
else:
|
||||
logger.info("Writing matrix: %s", matrix)
|
||||
bit_position = write_scaling_list(buf, bit_position, 64, matrix)
|
||||
bit_position = write_signed_exp_golomb(buf, bit_position,
|
||||
bit_position = write_signed_exp_golomb(buf, bit_position,
|
||||
self.second_chroma_qp_index_offset)
|
||||
|
||||
bit_position = write_rbsp_trailing_bits(buf, bit_position)
|
||||
@@ -1434,8 +1464,7 @@ class AVCDecoderConfiguration:
|
||||
bit_position = write_bits(buf, bit_position, self.length_size_minus_one, 2)
|
||||
bit_position = write_bits(buf, bit_position, 0b111, 3)
|
||||
bit_position = write_bits(buf, bit_position, self.num_of_sequence_parameter_sets, 5)
|
||||
for spsid in self.sps:
|
||||
sps = self.sps[spsid]
|
||||
for spsid, sps in self.sps.items():
|
||||
sodb = sps.to_bytes()
|
||||
sodb_length = len(sodb)
|
||||
rbsp = sodb_to_rbsp(sodb)
|
||||
@@ -1450,11 +1479,10 @@ class AVCDecoderConfiguration:
|
||||
logger.debug('2. Buffer: %s', hexdump.dump(buf, sep=':'))
|
||||
|
||||
bit_position = write_byte(buf, bit_position, self.num_of_picture_parameter_sets)
|
||||
for ppsid in self.pps:
|
||||
for ppsid, lpps in self.pps.items():
|
||||
logger.debug('Writing PPS: %d', ppsid)
|
||||
pps = self.pps[ppsid]
|
||||
# TODO: does chroma_format should come from self ?
|
||||
sodb = pps.to_bytes(self.chroma_format)
|
||||
sodb = lpps.to_bytes(self.chroma_format)
|
||||
sodb_length = len(sodb)
|
||||
rbsp = sodb_to_rbsp(sodb)
|
||||
rbsp_length = len(rbsp)
|
||||
@@ -1531,6 +1559,7 @@ class AVCDecoderConfiguration:
|
||||
|
||||
# TODO: do the same with extended SPS !
|
||||
|
||||
@typechecked
|
||||
def parse_codec_private(codec_private_data: bytes) -> AVCDecoderConfiguration:
|
||||
if codec_private_data[0] != 0x63:
|
||||
raise ValueError(f'Matroska header is wrong: {codec_private_data[0]:x}')
|
||||
@@ -1554,7 +1583,8 @@ def parse_codec_private(codec_private_data: bytes) -> AVCDecoderConfiguration:
|
||||
|
||||
return avcconfig
|
||||
|
||||
def get_avc_config_from_h264(input_file: IO[bytes]):
|
||||
@typechecked
|
||||
def get_avc_config_from_h264(input_file: IO[bytes]) -> AVCDecoderConfiguration:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# TODO: improve this ...
|
||||
@@ -1594,13 +1624,15 @@ def get_avc_config_from_h264(input_file: IO[bytes]):
|
||||
return avcconfig
|
||||
|
||||
# Unused ?
|
||||
def get_codec_private_data_from_h264(input_file: IO[bytes]):
|
||||
@typechecked
|
||||
def get_codec_private_data_from_h264(input_file: IO[bytes]) -> AVCDecoderConfiguration:
|
||||
avcconfig = get_avc_config_from_h264(input_file)
|
||||
res = dump_codec_private_data(avcconfig)
|
||||
|
||||
return res
|
||||
|
||||
def parse_mkv_tree(mkvinfo_path:str, input_file: IO[bytes]):
|
||||
@typechecked
|
||||
def parse_mkv_tree(mkvinfo_path:str, input_file: IO[bytes]) -> dict[str,tuple[int,int]]:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
infd = input_file.fileno()
|
||||
@@ -1680,8 +1712,8 @@ def parse_mkv_tree(mkvinfo_path:str, input_file: IO[bytes]):
|
||||
# 0000 0001 xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx
|
||||
# value 0 to 2^56-2
|
||||
|
||||
|
||||
def get_ebml_length(length):
|
||||
@typechecked
|
||||
def get_ebml_length(length) -> bytes|None:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
if 0 <= length <= 2**7-2:
|
||||
@@ -1711,7 +1743,7 @@ def get_ebml_length(length):
|
||||
res = (encoded_length).to_bytes(size, byteorder='big')
|
||||
return res
|
||||
|
||||
|
||||
@typechecked
|
||||
def dump_codec_private_data(avc_decoder_configuration: AVCDecoderConfiguration) -> bytearray:
|
||||
logger = logging.getLogger(__name__)
|
||||
# Rebuild a Matroska Codec Private Element
|
||||
@@ -1729,8 +1761,8 @@ def dump_codec_private_data(avc_decoder_configuration: AVCDecoderConfiguration)
|
||||
|
||||
return res
|
||||
|
||||
|
||||
def change_ebml_element_size(input_file: IO[bytes], position:int, addendum):
|
||||
@typechecked
|
||||
def change_ebml_element_size(input_file: IO[bytes], position:int, addendum) -> int:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
initial_position = position
|
||||
@@ -1822,7 +1854,8 @@ def change_ebml_element_size(input_file: IO[bytes], position:int, addendum):
|
||||
# We return the potential increase in size of the file if the length field had to be increased.
|
||||
return delta
|
||||
|
||||
def change_codec_private_data(mkvinfo, input_file: IO[bytes], codec_data):
|
||||
@typechecked
|
||||
def change_codec_private_data(mkvinfo_path:str, input_file: IO[bytes], codec_data) -> None:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
infd = input_file.fileno()
|
||||
@@ -1830,7 +1863,7 @@ def change_codec_private_data(mkvinfo, input_file: IO[bytes], codec_data):
|
||||
|
||||
current_length = fstat(infd).st_size
|
||||
logger.info('Current size of file: %d', current_length)
|
||||
position, current_data = get_codec_private_data_from_mkv(mkvinfo, input_file)
|
||||
position, current_data = get_codec_private_data_from_mkv(mkvinfo_path, input_file)
|
||||
current_data_length = len(current_data)
|
||||
future_length = current_length - current_data_length + len(codec_data)
|
||||
logger.info('Expected size of file: %d', future_length)
|
||||
@@ -1838,11 +1871,10 @@ def change_codec_private_data(mkvinfo, input_file: IO[bytes], codec_data):
|
||||
logger.info('Current data at position %d: %s', position, hexdump.dump(current_data, sep=":"))
|
||||
logger.info('Future data: %s', hexdump.dump(codec_data, sep=":"))
|
||||
|
||||
elements = parse_mkv_tree(mkvinfo, input_file)
|
||||
elements = parse_mkv_tree(mkvinfo_path, input_file)
|
||||
|
||||
found = False
|
||||
for key in elements:
|
||||
pos, size = elements[key]
|
||||
for key, (pos,size) in elements.items():
|
||||
if pos == position:
|
||||
logger.info('Codec private data key: %s', key)
|
||||
found = True
|
||||
@@ -1893,7 +1925,8 @@ def change_codec_private_data(mkvinfo, input_file: IO[bytes], codec_data):
|
||||
# been resized).
|
||||
delta+=change_ebml_element_size(input_file, pos, delta)
|
||||
|
||||
def get_format(ffprobe_path:str, input_file: IO[bytes]):
|
||||
@typechecked
|
||||
def get_format(ffprobe_path:str, input_file: IO[bytes]) -> dict|None:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
infd = input_file.fileno()
|
||||
@@ -1910,7 +1943,7 @@ def get_format(ffprobe_path:str, input_file: IO[bytes]):
|
||||
|
||||
return None
|
||||
|
||||
|
||||
@typechecked
|
||||
def get_movie_duration(ffprobe_path:str, input_file: IO[bytes]) -> timedelta|None:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -1931,6 +1964,7 @@ def get_movie_duration(ffprobe_path:str, input_file: IO[bytes]) -> timedelta|Non
|
||||
return None
|
||||
|
||||
# ffprobe -loglevel quiet -select_streams v:0 -show_entries stream=width,height -of json sample.ts
|
||||
@typechecked
|
||||
def get_video_dimensions(ffprobe_path:str, input_file: IO[bytes]) -> tuple[int,int]:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -1950,8 +1984,8 @@ def get_video_dimensions(ffprobe_path:str, input_file: IO[bytes]) -> tuple[int,i
|
||||
logger.error('Impossible to retrieve dimensions of video')
|
||||
exit(-1)
|
||||
|
||||
|
||||
def get_streams(ffprobe_path:str, input_file: IO[bytes]) -> dict|None:
|
||||
@typechecked
|
||||
def get_streams(ffprobe_path:str, input_file: IO[bytes]) -> list|None:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
infd = input_file.fileno()
|
||||
@@ -1968,6 +2002,7 @@ def get_streams(ffprobe_path:str, input_file: IO[bytes]) -> dict|None:
|
||||
|
||||
return None
|
||||
|
||||
@typechecked
|
||||
def with_subtitles(ffprobe_path:str, input_file: IO[bytes]) -> bool:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -1988,6 +2023,7 @@ def with_subtitles(ffprobe_path:str, input_file: IO[bytes]) -> bool:
|
||||
|
||||
return False
|
||||
|
||||
@typechecked
|
||||
def parse_timestamp(ts:str) -> timedelta|None:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -2025,10 +2061,11 @@ def parse_timestamp(ts:str) -> timedelta|None:
|
||||
if us < 0 or us > 1000000:
|
||||
logger.error("milliseconds must be in [0,1000000[")
|
||||
return None
|
||||
ts = timedelta(hours=hour, minutes=minute, seconds=second, microseconds=us)
|
||||
res = timedelta(hours=hour, minutes=minute, seconds=second, microseconds=us)
|
||||
|
||||
return ts
|
||||
return res
|
||||
|
||||
@typechecked
|
||||
def parse_time_interval(interval:str) -> tuple[timedelta,timedelta]:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -2102,6 +2139,7 @@ def parse_time_interval(interval:str) -> tuple[timedelta,timedelta]:
|
||||
|
||||
return (ts1, ts2)
|
||||
|
||||
@typechecked
|
||||
def compare_time_interval(interval1: tuple[timedelta, timedelta],
|
||||
interval2: tuple[timedelta, timedelta]) -> int:
|
||||
ts11,ts12 = interval1
|
||||
@@ -2114,6 +2152,7 @@ def compare_time_interval(interval1: tuple[timedelta, timedelta],
|
||||
else:
|
||||
return 0
|
||||
|
||||
@typechecked
|
||||
def ffmpeg_convert(ffmpeg_path:str, ffprobe_path:str, input_file: IO[bytes], input_format:str,
|
||||
output_file: IO[bytes], output_format:str, duration: timedelta):
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -2159,6 +2198,7 @@ def ffmpeg_convert(ffmpeg_path:str, ffprobe_path:str, input_file: IO[bytes], inp
|
||||
if status != 0:
|
||||
logger.error('Conversion failed with status code: %d', status)
|
||||
|
||||
@typechecked
|
||||
def get_ts_frame(frame: dict) -> timedelta|None:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -2173,6 +2213,7 @@ def get_ts_frame(frame: dict) -> timedelta|None:
|
||||
ts = timedelta(seconds=pts_time)
|
||||
return ts
|
||||
|
||||
@typechecked
|
||||
def get_packet_duration(packet: dict) -> int:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -2186,15 +2227,16 @@ def get_packet_duration(packet: dict) -> int:
|
||||
|
||||
return duration
|
||||
|
||||
def get_frames_in_stream(ffprobe_path: str, input_file: IO[bytes], begin:str, end:str,
|
||||
stream_kind:str, sub_stream_id:int=0) -> list[timedelta]|None:
|
||||
@typechecked
|
||||
def get_frames_in_stream(ffprobe_path: str, input_file: IO[bytes], begin:timedelta, end:timedelta,
|
||||
stream_kind:str, sub_stream_id:int=0) -> list[dict]|None:
|
||||
logger = logging.getLogger(__name__)
|
||||
infd = input_file.fileno()
|
||||
set_inheritable(infd, True)
|
||||
|
||||
command = [ffprobe_path, '-loglevel', 'quiet', '-read_intervals', f'{begin}%{end}','-show_entries',
|
||||
'frame', '-select_streams', f'{stream_kind}:{sub_stream_id:d}','-of', 'json',
|
||||
f'/proc/self/fd/{infd:d}']
|
||||
command = [ffprobe_path, '-loglevel', 'quiet', '-read_intervals', f'{begin}%{end}',
|
||||
'-show_entries', 'frame', '-select_streams',
|
||||
f'{stream_kind}:{sub_stream_id:d}','-of', 'json', f'/proc/self/fd/{infd:d}']
|
||||
logger.debug('Executing: %s', command)
|
||||
|
||||
with Popen(command, stdout=PIPE, close_fds=False) as ffprobe:
|
||||
@@ -2219,13 +2261,13 @@ def get_frames_in_stream(ffprobe_path: str, input_file: IO[bytes], begin:str, en
|
||||
res = []
|
||||
for ts in sorted(tmp):
|
||||
res.append(tmp[ts])
|
||||
|
||||
return res
|
||||
else:
|
||||
logger.error('Impossible to retrieve frames inside file around [%s,%s]', begin, end)
|
||||
return None
|
||||
|
||||
# TODO: Finish implementation of this function and use it.
|
||||
@typechecked
|
||||
def get_nearest_idr_frame(ffprobe_path: str, input_file: IO[bytes], timestamp, before: bool=True,
|
||||
delta: timedelta=timedelta(seconds=2)):
|
||||
# pylint: disable=W0613
|
||||
@@ -2269,9 +2311,10 @@ def get_nearest_idr_frame(ffprobe_path: str, input_file: IO[bytes], timestamp, b
|
||||
|
||||
return None
|
||||
|
||||
def get_nearest_iframe(ffprobe_path:str, input_file: IO[bytes], timestamp:timedelta,
|
||||
before:bool=True,
|
||||
delta_max:timedelta=timedelta(seconds=15))-> tuple[int,list[dict]]:
|
||||
@typechecked
|
||||
def get_nearest_iframe(ffprobe_path:str, input_file: IO[bytes],
|
||||
timestamp:timedelta, before:bool=True,
|
||||
delta_max:timedelta=timedelta(seconds=15))-> tuple[int,dict]:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
infd = input_file.fileno()
|
||||
@@ -2350,8 +2393,9 @@ def get_nearest_iframe(ffprobe_path:str, input_file: IO[bytes], timestamp:timede
|
||||
|
||||
return(nb_frames, iframe)
|
||||
|
||||
@typechecked
|
||||
def extract_mkv_part(mkvmerge_path:str, input_file:IO[bytes], output_file:IO[bytes],
|
||||
begin:str, end:str) -> None:
|
||||
begin:timedelta, end:timedelta) -> None:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
logger.info('Extract video between I-frames at %s and %s', begin,end)
|
||||
@@ -2392,7 +2436,8 @@ def extract_mkv_part(mkvmerge_path:str, input_file:IO[bytes], output_file:IO[byt
|
||||
elif status == 2:
|
||||
logger.error('Extraction returns errors')
|
||||
|
||||
def extract_pictures(ffmpeg_path:str, input_file:IO[bytes], begin:str, nb_frames:int,
|
||||
@typechecked
|
||||
def extract_pictures(ffmpeg_path:str, input_file:IO[bytes], begin:timedelta, nb_frames:int,
|
||||
width:int=640, height:int=480) -> tuple[bytes,int]:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -2429,7 +2474,8 @@ def extract_pictures(ffmpeg_path:str, input_file:IO[bytes], begin:str, nb_frames
|
||||
lseek(outfd, 0, SEEK_SET)
|
||||
return images, outfd
|
||||
|
||||
def extract_sound(ffmpeg_path:str, input_file: IO[bytes], begin:str, output_filename:str,
|
||||
@typechecked
|
||||
def extract_sound(ffmpeg_path:str, input_file: IO[bytes], begin:timedelta, output_filename:str,
|
||||
packet_duration:int, sub_channel:int=0,
|
||||
nb_packets:int=0, sample_rate:int=48000, nb_channels:int=2) -> tuple[bytes,int]:
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -2465,6 +2511,7 @@ def extract_sound(ffmpeg_path:str, input_file: IO[bytes], begin:str, output_file
|
||||
|
||||
return sound, outfd
|
||||
|
||||
@typechecked
|
||||
def dump_ppm(pictures: list, prefix: str, temporaries: list[IO[bytes]]) -> None:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -2499,7 +2546,7 @@ def dump_ppm(pictures: list, prefix: str, temporaries: list[IO[bytes]]) -> None:
|
||||
|
||||
header_len=2+1+ceil(log(width, 10))+1+ceil(log(height, 10))+1+3+1
|
||||
try:
|
||||
with open(filename, 'w', encoding='utf8') as out:
|
||||
with open(filename, 'wb', encoding='utf8') as out:
|
||||
temporaries.append(out)
|
||||
outfd = out.fileno()
|
||||
length=header_len+3*width*height
|
||||
@@ -2511,10 +2558,10 @@ def dump_ppm(pictures: list, prefix: str, temporaries: list[IO[bytes]]) -> None:
|
||||
except IOError:
|
||||
logger.error('Impossible to create file: %s', filename)
|
||||
|
||||
|
||||
def extract_all_streams(ffmpeg_path: str, ffprobe_path: str, input_file: str, begin, end, streams,
|
||||
files_prefix, nb_frames: int, framerate: float, width: int, height:int,
|
||||
temporaries, dump_mem_fd=False):
|
||||
@typechecked
|
||||
def extract_all_streams(ffmpeg_path:str, ffprobe_path:str, input_file:IO[bytes], begin:timedelta,
|
||||
end:timedelta, streams, files_prefix, nb_frames:int, framerate:float,
|
||||
width:int, height:int, temporaries, dump_mem_fd:bool=False):
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# The command line for encoding only video track
|
||||
@@ -2553,7 +2600,7 @@ def extract_all_streams(ffmpeg_path: str, ffprobe_path: str, input_file: str, be
|
||||
interlaced_options = ['-top', '1', f'-flags:v:{video_id:d}', '+ilme+ildct',
|
||||
'-field_order', '1']
|
||||
case 'bb':
|
||||
interlaced_options = ['-top', '0', f'-flags:v:{video_id:d}', '+ilme+ildct',
|
||||
interlaced_options = ['-top', '0', f'-flags:v:{video_id:d}', '+ilme+ildct',
|
||||
'-field_order','2']
|
||||
case 'tb':
|
||||
interlaced_options = ['-top', '1', f'-flags:v:{video_id:d}', '+ilme+ildct',
|
||||
@@ -2563,7 +2610,7 @@ def extract_all_streams(ffmpeg_path: str, ffprobe_path: str, input_file: str, be
|
||||
'-field_order', '4']
|
||||
case _:
|
||||
interlaced_options = []
|
||||
|
||||
|
||||
# ======================================= #
|
||||
# TODO: adjust SAR and DAR
|
||||
# https://superuser.com/questions/907933/correct-aspect-ratio-without-re-encoding-video-file
|
||||
@@ -2574,8 +2621,8 @@ def extract_all_streams(ffmpeg_path: str, ffprobe_path: str, input_file: str, be
|
||||
logger.warning('Missing treatment for chroma location: %s', chroma_location)
|
||||
codec = stream['codec_name']
|
||||
images_bytes, memfd = extract_pictures(ffmpeg_path, input_file=input_file,
|
||||
begin=begin, nb_frames=nb_frames, width=width,
|
||||
height=height)
|
||||
begin=begin, nb_frames=nb_frames,
|
||||
width=width, height=height)
|
||||
if images_bytes is None:
|
||||
logger.error('Impossible to extract picture from video stream.')
|
||||
exit(-1)
|
||||
@@ -2639,7 +2686,7 @@ def extract_all_streams(ffmpeg_path: str, ffprobe_path: str, input_file: str, be
|
||||
|
||||
if dump_mem_fd:
|
||||
try:
|
||||
with open(tmpname,'w', encoding='utf8') as output:
|
||||
with open(tmpname,'wb', encoding='utf8') as output:
|
||||
temporaries.append(output)
|
||||
outfd = output.fileno()
|
||||
pos = 0
|
||||
@@ -2664,7 +2711,7 @@ def extract_all_streams(ffmpeg_path: str, ffprobe_path: str, input_file: str, be
|
||||
generic_input_params.extend(['-i', './empty.idx'])
|
||||
if 'tags' in stream:
|
||||
if 'language' in stream['tags']:
|
||||
generic_codec_params.extend([f'-metadata:s:s:{subtitle_id:d}',
|
||||
generic_codec_params.extend([f'-metadata:s:s:{subtitle_id:d}',
|
||||
f"language={stream['tags']['language']}"])
|
||||
generic_codec_params.extend([f'-c:s:{subtitle_id:d}', 'copy'])
|
||||
subtitle_id=subtitle_id+1
|
||||
@@ -2755,13 +2802,14 @@ def extract_all_streams(ffmpeg_path: str, ffprobe_path: str, input_file: str, be
|
||||
return None, None
|
||||
|
||||
# Merge a list of mkv files passed as input, and produce a new MKV as output
|
||||
def merge_mkvs(mkvmerge_path:str, inputs, output_name:str, concatenate: bool =True,
|
||||
timestamps=None) -> IO[bytes]:
|
||||
@typechecked
|
||||
def merge_mkvs(mkvmerge_path:str, inputs: list[IO[bytes]], output_name:str,
|
||||
concatenate: bool=True, timestamps: dict[int, IO[str]]={}) -> IO[bytes]:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
fds = []
|
||||
try:
|
||||
out = open(output_name, 'w+', encoding='utf8')
|
||||
out = open(output_name, 'wb+')
|
||||
except IOError:
|
||||
logger.error('Impossible to create file: %s', output_name)
|
||||
return None
|
||||
@@ -2784,7 +2832,7 @@ def merge_mkvs(mkvmerge_path:str, inputs, output_name:str, concatenate: bool =Tr
|
||||
fds.append(fd)
|
||||
set_inheritable(fd, True)
|
||||
# If we pass a timestamps file associated with the considered track, use it.
|
||||
if timestamps is not None and partnum in timestamps:
|
||||
if partnum in timestamps:
|
||||
tsfd = timestamps[partnum].fileno()
|
||||
lseek(tsfd, 0, SEEK_SET)
|
||||
fds.append(tsfd)
|
||||
@@ -2840,8 +2888,9 @@ def find_subtitles_tracks(ffprobe_path:str, input_file: IO[bytes]) -> dict|None:
|
||||
lseek(infd, 0, SEEK_SET)
|
||||
set_inheritable(infd, True)
|
||||
|
||||
command = [ffprobe_path, '-loglevel','quiet', '-i', f'/proc/self/fd/{infd:d}', '-select_streams',
|
||||
's', '-show_entries', 'stream=index:stream_tags=language', '-of', 'json']
|
||||
command = [ffprobe_path, '-loglevel','quiet', '-i', f'/proc/self/fd/{infd:d}',
|
||||
'-select_streams', 's', '-show_entries', 'stream=index:stream_tags=language',
|
||||
'-of', 'json']
|
||||
logger.debug('Executing: %s', command)
|
||||
|
||||
with Popen(command, stdout=PIPE, close_fds=False) as ffprobe:
|
||||
@@ -2855,6 +2904,7 @@ def find_subtitles_tracks(ffprobe_path:str, input_file: IO[bytes]) -> dict|None:
|
||||
ffprobe.wait()
|
||||
return None
|
||||
|
||||
@typechecked
|
||||
def extract_track_from_mkv(mkvextract_path: str, input_file: IO[bytes], index,
|
||||
output_file: IO[bytes], timestamps) -> None:
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -2899,6 +2949,7 @@ def extract_track_from_mkv(mkvextract_path: str, input_file: IO[bytes], index,
|
||||
else:
|
||||
logger.info('Track %d was succesfully extracted.', index)
|
||||
|
||||
@typechecked
|
||||
def remove_video_tracks_from_mkv(mkvmerge_path:str, input_file: IO[bytes],
|
||||
output_file: IO[bytes]) -> None:
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -2935,7 +2986,8 @@ def remove_video_tracks_from_mkv(mkvmerge_path:str, input_file: IO[bytes],
|
||||
else:
|
||||
logger.info('Video tracks were succesfully extracted.')
|
||||
|
||||
def remux_srt_subtitles(mkvmerge_path:str, input_file: IO[bytes], output_filename: IO[bytes],
|
||||
@typechecked
|
||||
def remux_srt_subtitles(mkvmerge_path:str, input_file: IO[bytes], output_filename: str,
|
||||
subtitles) -> None:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -2986,7 +3038,8 @@ def remux_srt_subtitles(mkvmerge_path:str, input_file: IO[bytes], output_filenam
|
||||
|
||||
return None
|
||||
|
||||
def concatenate_h264_parts(h264parts, output) -> None:
|
||||
@typechecked
|
||||
def concatenate_h264_parts(h264parts: list[IO[bytes]], output: IO[bytes]) -> None:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
total_length = 0
|
||||
@@ -3013,7 +3066,7 @@ def concatenate_h264_parts(h264parts, output) -> None:
|
||||
pb.update(nb_bytes)
|
||||
pos += nb_bytes
|
||||
|
||||
def concatenate_h264_ts_parts(h264_ts_parts, output) -> None:
|
||||
def concatenate_h264_ts_parts(h264_ts_parts: list[IO[bytes]], output: IO[bytes]) -> None:
|
||||
logger = logging.getLogger(__name__)
|
||||
header = '# timestamp format v2\n'
|
||||
|
||||
@@ -3053,7 +3106,7 @@ def do_coarse_processing(ffmpeg_path:str, ffprobe_path:str, mkvmerge_path:str,
|
||||
internal_mkv_name = f'{files_prefix}.mkv'
|
||||
|
||||
try:
|
||||
internal_mkv = open(internal_mkv_name, 'w+', encoding='utf8')
|
||||
internal_mkv = open(internal_mkv_name, 'wb+')
|
||||
except IOError:
|
||||
logger.error('Impossible to create file: %s', internal_mkv_name)
|
||||
exit(-1)
|
||||
@@ -3145,7 +3198,8 @@ def main() -> None:
|
||||
mkvfilename = basename+'.mkv'
|
||||
|
||||
try:
|
||||
input_file = open(args.input_file, mode='r', encoding='utf8')
|
||||
input_file = open(args.input_file, mode='rb')
|
||||
logger.debug("Type of input file: %s" % type(input_file))
|
||||
except IOError:
|
||||
logger.error("Impossible to open %s", args.input_file)
|
||||
exit(-1)
|
||||
@@ -3182,13 +3236,13 @@ def main() -> None:
|
||||
if format_of_file == SupportedFormat.TS:
|
||||
logger.info("Converting TS to MP4 (to fix timestamps).")
|
||||
try:
|
||||
with open(mp4filename, 'w+', encoding='utf8') as mp4:
|
||||
with open(mp4filename, 'wb+') as mp4:
|
||||
ffmpeg_convert(paths['ffmpeg'], paths['ffprobe'], input_file, 'mpegts', mp4, 'mp4',
|
||||
duration)
|
||||
temporaries.append(mp4)
|
||||
logger.info("Converting MP4 to MKV.")
|
||||
try:
|
||||
mkv = open(mkvfilename, 'w+', encoding='utf8')
|
||||
mkv = open(mkvfilename, 'wb+')
|
||||
except IOError:
|
||||
logger.error('')
|
||||
|
||||
@@ -3202,7 +3256,7 @@ def main() -> None:
|
||||
elif format_of_file == SupportedFormat.MP4:
|
||||
logger.info("Converting MP4 to MKV")
|
||||
try:
|
||||
mkv = open(mkvfilename, 'w+', encoding='utf8')
|
||||
mkv = open(mkvfilename, 'wb+')
|
||||
except IOError:
|
||||
logger.error('')
|
||||
ffmpeg_convert(paths['ffmpeg'], paths['ffprobe'], input_file, 'mp4', mkv, 'matroska',
|
||||
@@ -3238,7 +3292,8 @@ def main() -> None:
|
||||
exit(-1)
|
||||
|
||||
# We retrieve the main private codec data
|
||||
_, main_codec_private_data = get_codec_private_data_from_mkv(mkvinfo_path=paths['mkvinfo'], input_file=mkv)
|
||||
_, main_codec_private_data = get_codec_private_data_from_mkv(mkvinfo_path=paths['mkvinfo'],
|
||||
input_file=mkv)
|
||||
logger.debug('Main video stream has following private data: %s',
|
||||
hexdump.dump(main_codec_private_data, sep=':'))
|
||||
|
||||
@@ -3260,7 +3315,8 @@ def main() -> None:
|
||||
# If there exists a difference between our own reconstructed AVC configuration and the
|
||||
# original one, we abandon
|
||||
if iso_avc_config != main_avc_config:
|
||||
logger.error('AVC configurations are different: %s\n%s\n', main_avc_config, iso_avc_config)
|
||||
logger.error('AVC configurations are different: %s\n%s\n', main_avc_config,
|
||||
iso_avc_config)
|
||||
exit(-1)
|
||||
|
||||
# Pour chaque portion
|
||||
@@ -3373,19 +3429,19 @@ def main() -> None:
|
||||
internal_novideo_mkv_name = f'part-{partnum:d}-internal-novideo.mkv'
|
||||
|
||||
try:
|
||||
internal_mkv = open(internal_mkv_name, 'w+', encoding='utf8')
|
||||
internal_mkv = open(internal_mkv_name, 'wb+')
|
||||
except IOError:
|
||||
logger.error('Impossible to create file: %s', internal_mkv_name)
|
||||
exit(-1)
|
||||
|
||||
try:
|
||||
internal_novideo_mkv = open(internal_novideo_mkv_name, 'w+', encoding='utf8')
|
||||
internal_novideo_mkv = open(internal_novideo_mkv_name, 'wb+')
|
||||
except IOError:
|
||||
logger.error('Impossible to create file: %s', internal_novideo_mkv_name)
|
||||
exit(-1)
|
||||
|
||||
try:
|
||||
internal_h264 = open(internal_h264_name, 'w+', encoding='utf8')
|
||||
internal_h264 = open(internal_h264_name, 'wb+')
|
||||
except IOError:
|
||||
logger.error('Impossible to create file: %s', internal_h264_name)
|
||||
exit(-1)
|
||||
@@ -3403,8 +3459,8 @@ def main() -> None:
|
||||
|
||||
# Extract video stream of internal part as a raw H264 and its timestamps.
|
||||
logger.info('Extract video track as raw H264 file.')
|
||||
extract_track_from_mkv(mkvextract_path=paths['mkvextract'], input_file=internal_mkv, index=0,
|
||||
output_file=internal_h264, timestamps=internal_h264_ts)
|
||||
extract_track_from_mkv(mkvextract_path=paths['mkvextract'], input_file=internal_mkv,
|
||||
index=0, output_file=internal_h264, timestamps=internal_h264_ts)
|
||||
|
||||
# Remove video track from internal part of MKV
|
||||
logger.info('Remove video track from %s', internal_mkv_name)
|
||||
@@ -3443,7 +3499,8 @@ def main() -> None:
|
||||
h264_ts.append(h264_tail_ts)
|
||||
|
||||
logger.info('Merging MKV: %s', subparts)
|
||||
part = merge_mkvs(mkvmerge_path=paths['mkvmerge'], inputs=subparts,
|
||||
|
||||
part = merge_mkvs(mkvmerge_path=paths['mkvmerge'], inputs=subparts,
|
||||
output_name=f'part-{partnum:d}.mkv', concatenate=True)
|
||||
mkvparts.append(part)
|
||||
temporaries.append(part)
|
||||
@@ -3461,7 +3518,7 @@ def main() -> None:
|
||||
nb_mkv_parts = len(mkvparts)
|
||||
if nb_mkv_parts > 0:
|
||||
try:
|
||||
full_h264 = open(f'{basename}-full.h264', 'w+', encoding='utf8')
|
||||
full_h264 = open(f'{basename}-full.h264', 'wb+')
|
||||
except IOError:
|
||||
logger.error('Impossible to create file full H264 stream.')
|
||||
exit(-1)
|
||||
@@ -3495,7 +3552,7 @@ def main() -> None:
|
||||
|
||||
if nb_mkv_parts >=1 :
|
||||
try:
|
||||
final_novideo = open(final_novideo_name, 'r', encoding='utf8')
|
||||
final_novideo = open(final_novideo_name, 'rb')
|
||||
except IOError:
|
||||
logger.error('Impossible to open file: %s.', final_novideo_name)
|
||||
exit(-1)
|
||||
@@ -3505,11 +3562,13 @@ def main() -> None:
|
||||
full_h264_ts.seek(0)
|
||||
|
||||
logger.info('Merging final video track and all other tracks together')
|
||||
final_with_video = merge_mkvs(mkvmerge_path=paths['mkvmerge'], inputs=[full_h264, final_novideo],
|
||||
final_with_video = merge_mkvs(mkvmerge_path=paths['mkvmerge'], inputs=[full_h264,
|
||||
final_novideo],
|
||||
output_name=final_with_video_name, concatenate=False,
|
||||
timestamps={0: full_h264_ts})
|
||||
final_codec_private_data = dump_codec_private_data(main_avc_config)
|
||||
logger.debug('Final codec private data: %s', hexdump.dump(final_codec_private_data, sep=':'))
|
||||
logger.debug('Final codec private data: %s', hexdump.dump(final_codec_private_data,
|
||||
sep=':'))
|
||||
logger.info('Changing codec private data with the new one.')
|
||||
change_codec_private_data(paths['mkvinfo'], final_with_video, final_codec_private_data)
|
||||
|
||||
@@ -3545,17 +3604,18 @@ def main() -> None:
|
||||
|
||||
logger.info(sts)
|
||||
if len(sts) > 0:
|
||||
logger.info('Supported languages: %s' % supported_langs)
|
||||
list_of_subtitles = extract_srt(paths['mkvextract'], final_with_video_name, sts,
|
||||
supported_langs)
|
||||
logger.info(list_of_subtitles)
|
||||
for idx_name, sub_name, _, _ in list_of_subtitles:
|
||||
try:
|
||||
idx = open(idx_name,'r', encoding='utf8')
|
||||
idx = open(idx_name,'rb')
|
||||
except IOError:
|
||||
logger.error("Impossible to open %s.", idx_name)
|
||||
exit(-1)
|
||||
try:
|
||||
sub = open(sub_name,'r', encoding='utf8')
|
||||
sub = open(sub_name,'rb')
|
||||
except IOError:
|
||||
logger.error("Impossible to open %s.", sub_name)
|
||||
exit(-1)
|
||||
@@ -3563,7 +3623,8 @@ def main() -> None:
|
||||
temporaries.append(idx)
|
||||
temporaries.append(sub)
|
||||
|
||||
ocr = do_ocr(paths['vobsubocr'], list_of_subtitles, duration, temporaries, args.dump)
|
||||
ocr = do_ocr(paths['vobsubocr'], list_of_subtitles, duration, temporaries,
|
||||
args.dump)
|
||||
logger.info(ocr)
|
||||
|
||||
# Remux SRT subtitles
|
||||
|
||||
Reference in New Issue
Block a user