diff --git a/removeads.py b/removeads.py index e4d454e..e0e0ea6 100755 --- a/removeads.py +++ b/removeads.py @@ -19,6 +19,7 @@ from math import floor, ceil, log from io import BytesIO, TextIOWrapper import json from typing import IO +from typeguard import typechecked # Third party libraries import coloredlogs @@ -54,7 +55,8 @@ from iso639.exceptions import InvalidLanguageValue # Then finally, change the Private Codec Data in the final MKV. -def check_required_tools() -> tuple[bool,list[str]]: +@typechecked +def check_required_tools() -> tuple[bool,dict[str,str]]: """Check if required external tools are installed. Args: @@ -84,7 +86,8 @@ def check_required_tools() -> tuple[bool,list[str]]: return all_optional_tools, paths -def get_tesseract_supported_lang(tesseract_path:str) -> dict[str,str]|None: +@typechecked +def get_tesseract_supported_lang(tesseract_path:str) -> dict[Lang, str]|None: """Returns the set of natural languages supported by Tesseract OCR tool. Args: @@ -118,6 +121,7 @@ def get_tesseract_supported_lang(tesseract_path:str) -> dict[str,str]|None: return res +@typechecked def get_frame_rate(ffprobe_path:str, input_file: IO[bytes]) -> float|None: logger = logging.getLogger(__name__) @@ -181,6 +185,7 @@ def get_frame_rate(ffprobe_path:str, input_file: IO[bytes]) -> float|None: return frame_rate2 +@typechecked def get_subtitles_tracks(ffprobe_path:str, mkv_path: str) -> dict[str,str]|None: logger = logging.getLogger(__name__) tracks={} @@ -213,7 +218,9 @@ def get_subtitles_tracks(ffprobe_path:str, mkv_path: str) -> dict[str,str]|None: return tracks -def extract_srt(mkvextract:str, filename:str, subtitles:str, langs:list[str]) -> list|None: +@typechecked +def extract_srt(mkvextract:str, filename:str, subtitles:dict[str, list[int]], + langs:dict[Lang,str]) -> list|None: logger = logging.getLogger(__name__) params = [mkvextract, filename, 'tracks'] @@ -354,6 +361,7 @@ class SupportedFormat(IntEnum): # -report -loglevel 0 -f null - # Found codec private data using mkvinfo +@typechecked def get_codec_private_data_from_mkv(mkvinfo_path:str, input_file: IO[bytes]) -> tuple[int, bytes]: logger = logging.getLogger(__name__) @@ -395,17 +403,19 @@ def get_codec_private_data_from_mkv(mkvinfo_path:str, input_file: IO[bytes]) -> # All the following code is a transposition of documents: # ISO/IEC H.264-201602 # ISO/IEC 14496-15 - +@typechecked def read_bit(buf:bytes, bit_position: int) -> tuple[int, int]: byte_position = floor(floor(bit_position/8)) byte = buf[byte_position] bit = (byte >> (7-(bit_position % 8))) & 1 return bit_position+1, bit +@typechecked def read_boolean(buf:bytes, bit_position: int) -> tuple[int, bool]: bit_position, b = read_bit(buf, bit_position) return bit_position, b==1 +@typechecked def read_bits(buf:bytes, bit_position: int, nb_bits: int) -> tuple[int, int]: v = 0 for _ in range(0, nb_bits): @@ -413,19 +423,23 @@ def read_bits(buf:bytes, bit_position: int, nb_bits: int) -> tuple[int, int]: v = v*2+bit return bit_position, v +@typechecked def read_byte(buf:bytes, bit_position: int) -> tuple[int, int]: bit_position, b = read_bits(buf, bit_position, 8) return bit_position, b +@typechecked def read_word(buf:bytes, bit_position: int) -> tuple[int, int]: bit_position, w = read_bits(buf, bit_position, 16) return bit_position, w +@typechecked def read_long(buf:bytes, bit_position: int) -> tuple[int, int]: bit_position, value = read_bits(buf, bit_position, 32) return bit_position, value -def read_unsigned_exp_golomb(buf:bytes, bit_position: int) -> tuple[int, bool]: +@typechecked +def read_unsigned_exp_golomb(buf:bytes, bit_position: int) -> tuple[int, int]: nb_zeroes=0 while True: bit_position, b = read_bit(buf, bit_position) @@ -438,7 +452,8 @@ def read_unsigned_exp_golomb(buf:bytes, bit_position: int) -> tuple[int, bool]: v = (v1< tuple[int, bool]: +@typechecked +def read_signed_exp_golomb(buf:bytes, bit_position: int) -> tuple[int, int]: bit_position, v = read_unsigned_exp_golomb(buf, bit_position) match v%2: case 0: @@ -446,6 +461,7 @@ def read_signed_exp_golomb(buf:bytes, bit_position: int) -> tuple[int, bool]: case 1: return bit_position, (v+1)>>1 +@typechecked def write_bit(buf:bytes, bit_position: int, b) -> int: buf_length = len(buf) byte_position = floor(bit_position/8) @@ -459,6 +475,7 @@ def write_bit(buf:bytes, bit_position: int, b) -> int: return bit_position +@typechecked def write_boolean(buf:bytes, bit_position: int, b: bool) -> int: if b: bit_position = write_bit(buf, bit_position, 1) @@ -466,6 +483,7 @@ def write_boolean(buf:bytes, bit_position: int, b: bool) -> int: bit_position = write_bit(buf, bit_position, 0) return bit_position +@typechecked def write_bits(buf:bytes, bit_position: int, v, size) -> int: for i in range(size-1,-1,-1): b = (v>>i)&1 @@ -473,18 +491,22 @@ def write_bits(buf:bytes, bit_position: int, v, size) -> int: return bit_position +@typechecked def write_byte(buf:bytes, bit_position: int, v) -> int: bit_position = write_bits(buf, bit_position, v, 8) return bit_position +@typechecked def write_word(buf:bytes, bit_position: int, v) -> int: bit_position = write_bits(buf, bit_position, v, 16) return bit_position +@typechecked def write_long(buf:bytes, bit_position: int, v) -> int: bit_position = write_bits(buf, bit_position, v, 32) return bit_position +@typechecked def write_unsigned_exp_golomb(buf:bytes, bit_position: int, v) -> int: n = floor(log(v+1)/log(2))+1 # Write zeroes @@ -494,6 +516,7 @@ def write_unsigned_exp_golomb(buf:bytes, bit_position: int, v) -> int: return bit_position +@typechecked def write_signed_exp_golomb(buf:bytes, bit_position: int, v) -> int: if v <= 0: bit_position = write_unsigned_exp_golomb(buf, bit_position, -v*2) @@ -502,8 +525,8 @@ def write_signed_exp_golomb(buf:bytes, bit_position: int, v) -> int: return bit_position - -def parse_rbsp_trailing_bits(buf:bytes, bit_position) -> int: +@typechecked +def parse_rbsp_trailing_bits(buf:bytes, bit_position: int) -> int: bit_position, one = read_bit(buf, bit_position) if one==0: raise ValueError(f'Stop bit should be equal to one. Read: {one:d}') @@ -514,6 +537,7 @@ def parse_rbsp_trailing_bits(buf:bytes, bit_position) -> int: return bit_position +@typechecked def write_rbsp_trailing_bits(buf:bytes, bit_position: int) -> int: bit_position = write_bit(buf, bit_position, 1) while bit_position%8 != 0: @@ -521,6 +545,7 @@ def write_rbsp_trailing_bits(buf:bytes, bit_position: int) -> int: return bit_position +@typechecked def more_rbsp_data(buf:bytes, bit_position: int) -> bool: logger = logging.getLogger(__name__) logger.debug('Is there more data in buffer of length: %d at bit position: %d', @@ -550,7 +575,8 @@ def more_rbsp_data(buf:bytes, bit_position: int) -> bool: return True # Convert from RBSP (Raw Byte Sequence Payload) to SODB (String Of Data Bits) -def rbsp_to_sodb(buf:bytes): +@typechecked +def rbsp_to_sodb(buf:bytes) -> bytes: logger = logging.getLogger(__name__) logger.debug('RBSP: %s', hexdump.dump(buf, sep=':')) @@ -565,7 +591,8 @@ def rbsp_to_sodb(buf:bytes): return res # Reverse operation SODB to RBSP. -def sodb_to_rbsp(buf:bytes): +@typechecked +def sodb_to_rbsp(buf:bytes) -> bytes: logger = logging.getLogger(__name__) logger.debug('SODB: %s', hexdump.dump(buf, sep=':')) @@ -579,7 +606,8 @@ def sodb_to_rbsp(buf:bytes): return res # Useful for SPS and PPS -def parse_scaling_list(buf:bytes, bit_position: int, size): +@typechecked +def parse_scaling_list(buf:bytes, bit_position: int, size) -> tuple[int,list[int]]: res = [] last_scale = 8 next_scale = 8 @@ -596,7 +624,9 @@ def parse_scaling_list(buf:bytes, bit_position: int, size): # TODO: test optimized version. # The ISO/IEC H.264-201602 seems to take into account the case where the end of the deltas list # is full of zeroes. -def write_scaling_list(buf:bytes, bit_position: int, size, matrix, optimized: bool = False): +@typechecked +def write_scaling_list(buf:bytes, bit_position: int, size, matrix, + optimized: bool = False) -> int: logger = logging.getLogger(__name__) logger.debug('Dumping matrix: %s of size: %d, size parameter: %d.', matrix, len(matrix), size) @@ -1083,7 +1113,7 @@ class SPS: bit_position = write_unsigned_exp_golomb(buf, bit_position, self.max_num_ref_frames) bit_position = write_boolean(buf, bit_position, self.gaps_in_frame_num_value_allowed_flag) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.pic_width_in_mbs_minus1) - bit_position = write_unsigned_exp_golomb(buf, bit_position, + bit_position = write_unsigned_exp_golomb(buf, bit_position, self.pic_height_in_map_units_minus1) bit_position = write_boolean(buf, bit_position, self.frame_mbs_only_flag) if not self.frame_mbs_only_flag: @@ -1123,7 +1153,7 @@ class PPS: pic_size_in_map_units_minus1:int=0 slice_group_id:dict = field(default_factory=dict) num_ref_idx_l0_default_active_minus1:int=0 - num_ref_idx_l1_default_active_minus1:int=0 + num_ref_idx_l2_default_active_minus1:int=0 weighted_pred_flag:bool=False weighted_bipred_idc:int=0 pic_init_qp_minus26:int=0 @@ -1264,7 +1294,7 @@ class PPS: v = self.bottom_right[i] bit_position = write_unsigned_exp_golomb(buf, bit_position, v) elif self.slice_group_map_type in [3,4,5]: - bit_position = write_boolean(buf, bit_position, + bit_position = write_boolean(buf, bit_position, self.slice_group_change_direction_flag) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.slice_group_change_rate_minus1) @@ -1310,7 +1340,7 @@ class PPS: else: logger.info("Writing matrix: %s", matrix) bit_position = write_scaling_list(buf, bit_position, 64, matrix) - bit_position = write_signed_exp_golomb(buf, bit_position, + bit_position = write_signed_exp_golomb(buf, bit_position, self.second_chroma_qp_index_offset) bit_position = write_rbsp_trailing_bits(buf, bit_position) @@ -1434,8 +1464,7 @@ class AVCDecoderConfiguration: bit_position = write_bits(buf, bit_position, self.length_size_minus_one, 2) bit_position = write_bits(buf, bit_position, 0b111, 3) bit_position = write_bits(buf, bit_position, self.num_of_sequence_parameter_sets, 5) - for spsid in self.sps: - sps = self.sps[spsid] + for spsid, sps in self.sps.items(): sodb = sps.to_bytes() sodb_length = len(sodb) rbsp = sodb_to_rbsp(sodb) @@ -1450,11 +1479,10 @@ class AVCDecoderConfiguration: logger.debug('2. Buffer: %s', hexdump.dump(buf, sep=':')) bit_position = write_byte(buf, bit_position, self.num_of_picture_parameter_sets) - for ppsid in self.pps: + for ppsid, lpps in self.pps.items(): logger.debug('Writing PPS: %d', ppsid) - pps = self.pps[ppsid] # TODO: does chroma_format should come from self ? - sodb = pps.to_bytes(self.chroma_format) + sodb = lpps.to_bytes(self.chroma_format) sodb_length = len(sodb) rbsp = sodb_to_rbsp(sodb) rbsp_length = len(rbsp) @@ -1531,6 +1559,7 @@ class AVCDecoderConfiguration: # TODO: do the same with extended SPS ! +@typechecked def parse_codec_private(codec_private_data: bytes) -> AVCDecoderConfiguration: if codec_private_data[0] != 0x63: raise ValueError(f'Matroska header is wrong: {codec_private_data[0]:x}') @@ -1554,7 +1583,8 @@ def parse_codec_private(codec_private_data: bytes) -> AVCDecoderConfiguration: return avcconfig -def get_avc_config_from_h264(input_file: IO[bytes]): +@typechecked +def get_avc_config_from_h264(input_file: IO[bytes]) -> AVCDecoderConfiguration: logger = logging.getLogger(__name__) # TODO: improve this ... @@ -1594,13 +1624,15 @@ def get_avc_config_from_h264(input_file: IO[bytes]): return avcconfig # Unused ? -def get_codec_private_data_from_h264(input_file: IO[bytes]): +@typechecked +def get_codec_private_data_from_h264(input_file: IO[bytes]) -> AVCDecoderConfiguration: avcconfig = get_avc_config_from_h264(input_file) res = dump_codec_private_data(avcconfig) return res -def parse_mkv_tree(mkvinfo_path:str, input_file: IO[bytes]): +@typechecked +def parse_mkv_tree(mkvinfo_path:str, input_file: IO[bytes]) -> dict[str,tuple[int,int]]: logger = logging.getLogger(__name__) infd = input_file.fileno() @@ -1680,8 +1712,8 @@ def parse_mkv_tree(mkvinfo_path:str, input_file: IO[bytes]): # 0000 0001 xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx # value 0 to 2^56-2 - -def get_ebml_length(length): +@typechecked +def get_ebml_length(length) -> bytes|None: logger = logging.getLogger(__name__) if 0 <= length <= 2**7-2: @@ -1711,7 +1743,7 @@ def get_ebml_length(length): res = (encoded_length).to_bytes(size, byteorder='big') return res - +@typechecked def dump_codec_private_data(avc_decoder_configuration: AVCDecoderConfiguration) -> bytearray: logger = logging.getLogger(__name__) # Rebuild a Matroska Codec Private Element @@ -1729,8 +1761,8 @@ def dump_codec_private_data(avc_decoder_configuration: AVCDecoderConfiguration) return res - -def change_ebml_element_size(input_file: IO[bytes], position:int, addendum): +@typechecked +def change_ebml_element_size(input_file: IO[bytes], position:int, addendum) -> int: logger = logging.getLogger(__name__) initial_position = position @@ -1822,7 +1854,8 @@ def change_ebml_element_size(input_file: IO[bytes], position:int, addendum): # We return the potential increase in size of the file if the length field had to be increased. return delta -def change_codec_private_data(mkvinfo, input_file: IO[bytes], codec_data): +@typechecked +def change_codec_private_data(mkvinfo_path:str, input_file: IO[bytes], codec_data) -> None: logger = logging.getLogger(__name__) infd = input_file.fileno() @@ -1830,7 +1863,7 @@ def change_codec_private_data(mkvinfo, input_file: IO[bytes], codec_data): current_length = fstat(infd).st_size logger.info('Current size of file: %d', current_length) - position, current_data = get_codec_private_data_from_mkv(mkvinfo, input_file) + position, current_data = get_codec_private_data_from_mkv(mkvinfo_path, input_file) current_data_length = len(current_data) future_length = current_length - current_data_length + len(codec_data) logger.info('Expected size of file: %d', future_length) @@ -1838,11 +1871,10 @@ def change_codec_private_data(mkvinfo, input_file: IO[bytes], codec_data): logger.info('Current data at position %d: %s', position, hexdump.dump(current_data, sep=":")) logger.info('Future data: %s', hexdump.dump(codec_data, sep=":")) - elements = parse_mkv_tree(mkvinfo, input_file) + elements = parse_mkv_tree(mkvinfo_path, input_file) found = False - for key in elements: - pos, size = elements[key] + for key, (pos,size) in elements.items(): if pos == position: logger.info('Codec private data key: %s', key) found = True @@ -1893,7 +1925,8 @@ def change_codec_private_data(mkvinfo, input_file: IO[bytes], codec_data): # been resized). delta+=change_ebml_element_size(input_file, pos, delta) -def get_format(ffprobe_path:str, input_file: IO[bytes]): +@typechecked +def get_format(ffprobe_path:str, input_file: IO[bytes]) -> dict|None: logger = logging.getLogger(__name__) infd = input_file.fileno() @@ -1910,7 +1943,7 @@ def get_format(ffprobe_path:str, input_file: IO[bytes]): return None - +@typechecked def get_movie_duration(ffprobe_path:str, input_file: IO[bytes]) -> timedelta|None: logger = logging.getLogger(__name__) @@ -1931,6 +1964,7 @@ def get_movie_duration(ffprobe_path:str, input_file: IO[bytes]) -> timedelta|Non return None # ffprobe -loglevel quiet -select_streams v:0 -show_entries stream=width,height -of json sample.ts +@typechecked def get_video_dimensions(ffprobe_path:str, input_file: IO[bytes]) -> tuple[int,int]: logger = logging.getLogger(__name__) @@ -1950,8 +1984,8 @@ def get_video_dimensions(ffprobe_path:str, input_file: IO[bytes]) -> tuple[int,i logger.error('Impossible to retrieve dimensions of video') exit(-1) - -def get_streams(ffprobe_path:str, input_file: IO[bytes]) -> dict|None: +@typechecked +def get_streams(ffprobe_path:str, input_file: IO[bytes]) -> list|None: logger = logging.getLogger(__name__) infd = input_file.fileno() @@ -1968,6 +2002,7 @@ def get_streams(ffprobe_path:str, input_file: IO[bytes]) -> dict|None: return None +@typechecked def with_subtitles(ffprobe_path:str, input_file: IO[bytes]) -> bool: logger = logging.getLogger(__name__) @@ -1988,6 +2023,7 @@ def with_subtitles(ffprobe_path:str, input_file: IO[bytes]) -> bool: return False +@typechecked def parse_timestamp(ts:str) -> timedelta|None: logger = logging.getLogger(__name__) @@ -2025,10 +2061,11 @@ def parse_timestamp(ts:str) -> timedelta|None: if us < 0 or us > 1000000: logger.error("milliseconds must be in [0,1000000[") return None - ts = timedelta(hours=hour, minutes=minute, seconds=second, microseconds=us) + res = timedelta(hours=hour, minutes=minute, seconds=second, microseconds=us) - return ts + return res +@typechecked def parse_time_interval(interval:str) -> tuple[timedelta,timedelta]: logger = logging.getLogger(__name__) @@ -2102,6 +2139,7 @@ def parse_time_interval(interval:str) -> tuple[timedelta,timedelta]: return (ts1, ts2) +@typechecked def compare_time_interval(interval1: tuple[timedelta, timedelta], interval2: tuple[timedelta, timedelta]) -> int: ts11,ts12 = interval1 @@ -2114,6 +2152,7 @@ def compare_time_interval(interval1: tuple[timedelta, timedelta], else: return 0 +@typechecked def ffmpeg_convert(ffmpeg_path:str, ffprobe_path:str, input_file: IO[bytes], input_format:str, output_file: IO[bytes], output_format:str, duration: timedelta): logger = logging.getLogger(__name__) @@ -2159,6 +2198,7 @@ def ffmpeg_convert(ffmpeg_path:str, ffprobe_path:str, input_file: IO[bytes], inp if status != 0: logger.error('Conversion failed with status code: %d', status) +@typechecked def get_ts_frame(frame: dict) -> timedelta|None: logger = logging.getLogger(__name__) @@ -2173,6 +2213,7 @@ def get_ts_frame(frame: dict) -> timedelta|None: ts = timedelta(seconds=pts_time) return ts +@typechecked def get_packet_duration(packet: dict) -> int: logger = logging.getLogger(__name__) @@ -2186,15 +2227,16 @@ def get_packet_duration(packet: dict) -> int: return duration -def get_frames_in_stream(ffprobe_path: str, input_file: IO[bytes], begin:str, end:str, - stream_kind:str, sub_stream_id:int=0) -> list[timedelta]|None: +@typechecked +def get_frames_in_stream(ffprobe_path: str, input_file: IO[bytes], begin:timedelta, end:timedelta, + stream_kind:str, sub_stream_id:int=0) -> list[dict]|None: logger = logging.getLogger(__name__) infd = input_file.fileno() set_inheritable(infd, True) - command = [ffprobe_path, '-loglevel', 'quiet', '-read_intervals', f'{begin}%{end}','-show_entries', - 'frame', '-select_streams', f'{stream_kind}:{sub_stream_id:d}','-of', 'json', - f'/proc/self/fd/{infd:d}'] + command = [ffprobe_path, '-loglevel', 'quiet', '-read_intervals', f'{begin}%{end}', + '-show_entries', 'frame', '-select_streams', + f'{stream_kind}:{sub_stream_id:d}','-of', 'json', f'/proc/self/fd/{infd:d}'] logger.debug('Executing: %s', command) with Popen(command, stdout=PIPE, close_fds=False) as ffprobe: @@ -2219,13 +2261,13 @@ def get_frames_in_stream(ffprobe_path: str, input_file: IO[bytes], begin:str, en res = [] for ts in sorted(tmp): res.append(tmp[ts]) - return res else: logger.error('Impossible to retrieve frames inside file around [%s,%s]', begin, end) return None # TODO: Finish implementation of this function and use it. +@typechecked def get_nearest_idr_frame(ffprobe_path: str, input_file: IO[bytes], timestamp, before: bool=True, delta: timedelta=timedelta(seconds=2)): # pylint: disable=W0613 @@ -2269,9 +2311,10 @@ def get_nearest_idr_frame(ffprobe_path: str, input_file: IO[bytes], timestamp, b return None -def get_nearest_iframe(ffprobe_path:str, input_file: IO[bytes], timestamp:timedelta, - before:bool=True, - delta_max:timedelta=timedelta(seconds=15))-> tuple[int,list[dict]]: +@typechecked +def get_nearest_iframe(ffprobe_path:str, input_file: IO[bytes], + timestamp:timedelta, before:bool=True, + delta_max:timedelta=timedelta(seconds=15))-> tuple[int,dict]: logger = logging.getLogger(__name__) infd = input_file.fileno() @@ -2350,8 +2393,9 @@ def get_nearest_iframe(ffprobe_path:str, input_file: IO[bytes], timestamp:timede return(nb_frames, iframe) +@typechecked def extract_mkv_part(mkvmerge_path:str, input_file:IO[bytes], output_file:IO[bytes], - begin:str, end:str) -> None: + begin:timedelta, end:timedelta) -> None: logger = logging.getLogger(__name__) logger.info('Extract video between I-frames at %s and %s', begin,end) @@ -2392,7 +2436,8 @@ def extract_mkv_part(mkvmerge_path:str, input_file:IO[bytes], output_file:IO[byt elif status == 2: logger.error('Extraction returns errors') -def extract_pictures(ffmpeg_path:str, input_file:IO[bytes], begin:str, nb_frames:int, +@typechecked +def extract_pictures(ffmpeg_path:str, input_file:IO[bytes], begin:timedelta, nb_frames:int, width:int=640, height:int=480) -> tuple[bytes,int]: logger = logging.getLogger(__name__) @@ -2429,7 +2474,8 @@ def extract_pictures(ffmpeg_path:str, input_file:IO[bytes], begin:str, nb_frames lseek(outfd, 0, SEEK_SET) return images, outfd -def extract_sound(ffmpeg_path:str, input_file: IO[bytes], begin:str, output_filename:str, +@typechecked +def extract_sound(ffmpeg_path:str, input_file: IO[bytes], begin:timedelta, output_filename:str, packet_duration:int, sub_channel:int=0, nb_packets:int=0, sample_rate:int=48000, nb_channels:int=2) -> tuple[bytes,int]: logger = logging.getLogger(__name__) @@ -2465,6 +2511,7 @@ def extract_sound(ffmpeg_path:str, input_file: IO[bytes], begin:str, output_file return sound, outfd +@typechecked def dump_ppm(pictures: list, prefix: str, temporaries: list[IO[bytes]]) -> None: logger = logging.getLogger(__name__) @@ -2499,7 +2546,7 @@ def dump_ppm(pictures: list, prefix: str, temporaries: list[IO[bytes]]) -> None: header_len=2+1+ceil(log(width, 10))+1+ceil(log(height, 10))+1+3+1 try: - with open(filename, 'w', encoding='utf8') as out: + with open(filename, 'wb', encoding='utf8') as out: temporaries.append(out) outfd = out.fileno() length=header_len+3*width*height @@ -2511,10 +2558,10 @@ def dump_ppm(pictures: list, prefix: str, temporaries: list[IO[bytes]]) -> None: except IOError: logger.error('Impossible to create file: %s', filename) - -def extract_all_streams(ffmpeg_path: str, ffprobe_path: str, input_file: str, begin, end, streams, - files_prefix, nb_frames: int, framerate: float, width: int, height:int, - temporaries, dump_mem_fd=False): +@typechecked +def extract_all_streams(ffmpeg_path:str, ffprobe_path:str, input_file:IO[bytes], begin:timedelta, + end:timedelta, streams, files_prefix, nb_frames:int, framerate:float, + width:int, height:int, temporaries, dump_mem_fd:bool=False): logger = logging.getLogger(__name__) # The command line for encoding only video track @@ -2553,7 +2600,7 @@ def extract_all_streams(ffmpeg_path: str, ffprobe_path: str, input_file: str, be interlaced_options = ['-top', '1', f'-flags:v:{video_id:d}', '+ilme+ildct', '-field_order', '1'] case 'bb': - interlaced_options = ['-top', '0', f'-flags:v:{video_id:d}', '+ilme+ildct', + interlaced_options = ['-top', '0', f'-flags:v:{video_id:d}', '+ilme+ildct', '-field_order','2'] case 'tb': interlaced_options = ['-top', '1', f'-flags:v:{video_id:d}', '+ilme+ildct', @@ -2563,7 +2610,7 @@ def extract_all_streams(ffmpeg_path: str, ffprobe_path: str, input_file: str, be '-field_order', '4'] case _: interlaced_options = [] - + # ======================================= # # TODO: adjust SAR and DAR # https://superuser.com/questions/907933/correct-aspect-ratio-without-re-encoding-video-file @@ -2574,8 +2621,8 @@ def extract_all_streams(ffmpeg_path: str, ffprobe_path: str, input_file: str, be logger.warning('Missing treatment for chroma location: %s', chroma_location) codec = stream['codec_name'] images_bytes, memfd = extract_pictures(ffmpeg_path, input_file=input_file, - begin=begin, nb_frames=nb_frames, width=width, - height=height) + begin=begin, nb_frames=nb_frames, + width=width, height=height) if images_bytes is None: logger.error('Impossible to extract picture from video stream.') exit(-1) @@ -2639,7 +2686,7 @@ def extract_all_streams(ffmpeg_path: str, ffprobe_path: str, input_file: str, be if dump_mem_fd: try: - with open(tmpname,'w', encoding='utf8') as output: + with open(tmpname,'wb', encoding='utf8') as output: temporaries.append(output) outfd = output.fileno() pos = 0 @@ -2664,7 +2711,7 @@ def extract_all_streams(ffmpeg_path: str, ffprobe_path: str, input_file: str, be generic_input_params.extend(['-i', './empty.idx']) if 'tags' in stream: if 'language' in stream['tags']: - generic_codec_params.extend([f'-metadata:s:s:{subtitle_id:d}', + generic_codec_params.extend([f'-metadata:s:s:{subtitle_id:d}', f"language={stream['tags']['language']}"]) generic_codec_params.extend([f'-c:s:{subtitle_id:d}', 'copy']) subtitle_id=subtitle_id+1 @@ -2755,13 +2802,14 @@ def extract_all_streams(ffmpeg_path: str, ffprobe_path: str, input_file: str, be return None, None # Merge a list of mkv files passed as input, and produce a new MKV as output -def merge_mkvs(mkvmerge_path:str, inputs, output_name:str, concatenate: bool =True, - timestamps=None) -> IO[bytes]: +@typechecked +def merge_mkvs(mkvmerge_path:str, inputs: list[IO[bytes]], output_name:str, + concatenate: bool=True, timestamps: dict[int, IO[str]]={}) -> IO[bytes]: logger = logging.getLogger(__name__) fds = [] try: - out = open(output_name, 'w+', encoding='utf8') + out = open(output_name, 'wb+') except IOError: logger.error('Impossible to create file: %s', output_name) return None @@ -2784,7 +2832,7 @@ def merge_mkvs(mkvmerge_path:str, inputs, output_name:str, concatenate: bool =Tr fds.append(fd) set_inheritable(fd, True) # If we pass a timestamps file associated with the considered track, use it. - if timestamps is not None and partnum in timestamps: + if partnum in timestamps: tsfd = timestamps[partnum].fileno() lseek(tsfd, 0, SEEK_SET) fds.append(tsfd) @@ -2840,8 +2888,9 @@ def find_subtitles_tracks(ffprobe_path:str, input_file: IO[bytes]) -> dict|None: lseek(infd, 0, SEEK_SET) set_inheritable(infd, True) - command = [ffprobe_path, '-loglevel','quiet', '-i', f'/proc/self/fd/{infd:d}', '-select_streams', - 's', '-show_entries', 'stream=index:stream_tags=language', '-of', 'json'] + command = [ffprobe_path, '-loglevel','quiet', '-i', f'/proc/self/fd/{infd:d}', + '-select_streams', 's', '-show_entries', 'stream=index:stream_tags=language', + '-of', 'json'] logger.debug('Executing: %s', command) with Popen(command, stdout=PIPE, close_fds=False) as ffprobe: @@ -2855,6 +2904,7 @@ def find_subtitles_tracks(ffprobe_path:str, input_file: IO[bytes]) -> dict|None: ffprobe.wait() return None +@typechecked def extract_track_from_mkv(mkvextract_path: str, input_file: IO[bytes], index, output_file: IO[bytes], timestamps) -> None: logger = logging.getLogger(__name__) @@ -2899,6 +2949,7 @@ def extract_track_from_mkv(mkvextract_path: str, input_file: IO[bytes], index, else: logger.info('Track %d was succesfully extracted.', index) +@typechecked def remove_video_tracks_from_mkv(mkvmerge_path:str, input_file: IO[bytes], output_file: IO[bytes]) -> None: logger = logging.getLogger(__name__) @@ -2935,7 +2986,8 @@ def remove_video_tracks_from_mkv(mkvmerge_path:str, input_file: IO[bytes], else: logger.info('Video tracks were succesfully extracted.') -def remux_srt_subtitles(mkvmerge_path:str, input_file: IO[bytes], output_filename: IO[bytes], +@typechecked +def remux_srt_subtitles(mkvmerge_path:str, input_file: IO[bytes], output_filename: str, subtitles) -> None: logger = logging.getLogger(__name__) @@ -2986,7 +3038,8 @@ def remux_srt_subtitles(mkvmerge_path:str, input_file: IO[bytes], output_filenam return None -def concatenate_h264_parts(h264parts, output) -> None: +@typechecked +def concatenate_h264_parts(h264parts: list[IO[bytes]], output: IO[bytes]) -> None: logger = logging.getLogger(__name__) total_length = 0 @@ -3013,7 +3066,7 @@ def concatenate_h264_parts(h264parts, output) -> None: pb.update(nb_bytes) pos += nb_bytes -def concatenate_h264_ts_parts(h264_ts_parts, output) -> None: +def concatenate_h264_ts_parts(h264_ts_parts: list[IO[bytes]], output: IO[bytes]) -> None: logger = logging.getLogger(__name__) header = '# timestamp format v2\n' @@ -3053,7 +3106,7 @@ def do_coarse_processing(ffmpeg_path:str, ffprobe_path:str, mkvmerge_path:str, internal_mkv_name = f'{files_prefix}.mkv' try: - internal_mkv = open(internal_mkv_name, 'w+', encoding='utf8') + internal_mkv = open(internal_mkv_name, 'wb+') except IOError: logger.error('Impossible to create file: %s', internal_mkv_name) exit(-1) @@ -3145,7 +3198,8 @@ def main() -> None: mkvfilename = basename+'.mkv' try: - input_file = open(args.input_file, mode='r', encoding='utf8') + input_file = open(args.input_file, mode='rb') + logger.debug("Type of input file: %s" % type(input_file)) except IOError: logger.error("Impossible to open %s", args.input_file) exit(-1) @@ -3182,13 +3236,13 @@ def main() -> None: if format_of_file == SupportedFormat.TS: logger.info("Converting TS to MP4 (to fix timestamps).") try: - with open(mp4filename, 'w+', encoding='utf8') as mp4: + with open(mp4filename, 'wb+') as mp4: ffmpeg_convert(paths['ffmpeg'], paths['ffprobe'], input_file, 'mpegts', mp4, 'mp4', duration) temporaries.append(mp4) logger.info("Converting MP4 to MKV.") try: - mkv = open(mkvfilename, 'w+', encoding='utf8') + mkv = open(mkvfilename, 'wb+') except IOError: logger.error('') @@ -3202,7 +3256,7 @@ def main() -> None: elif format_of_file == SupportedFormat.MP4: logger.info("Converting MP4 to MKV") try: - mkv = open(mkvfilename, 'w+', encoding='utf8') + mkv = open(mkvfilename, 'wb+') except IOError: logger.error('') ffmpeg_convert(paths['ffmpeg'], paths['ffprobe'], input_file, 'mp4', mkv, 'matroska', @@ -3238,7 +3292,8 @@ def main() -> None: exit(-1) # We retrieve the main private codec data - _, main_codec_private_data = get_codec_private_data_from_mkv(mkvinfo_path=paths['mkvinfo'], input_file=mkv) + _, main_codec_private_data = get_codec_private_data_from_mkv(mkvinfo_path=paths['mkvinfo'], + input_file=mkv) logger.debug('Main video stream has following private data: %s', hexdump.dump(main_codec_private_data, sep=':')) @@ -3260,7 +3315,8 @@ def main() -> None: # If there exists a difference between our own reconstructed AVC configuration and the # original one, we abandon if iso_avc_config != main_avc_config: - logger.error('AVC configurations are different: %s\n%s\n', main_avc_config, iso_avc_config) + logger.error('AVC configurations are different: %s\n%s\n', main_avc_config, + iso_avc_config) exit(-1) # Pour chaque portion @@ -3373,19 +3429,19 @@ def main() -> None: internal_novideo_mkv_name = f'part-{partnum:d}-internal-novideo.mkv' try: - internal_mkv = open(internal_mkv_name, 'w+', encoding='utf8') + internal_mkv = open(internal_mkv_name, 'wb+') except IOError: logger.error('Impossible to create file: %s', internal_mkv_name) exit(-1) try: - internal_novideo_mkv = open(internal_novideo_mkv_name, 'w+', encoding='utf8') + internal_novideo_mkv = open(internal_novideo_mkv_name, 'wb+') except IOError: logger.error('Impossible to create file: %s', internal_novideo_mkv_name) exit(-1) try: - internal_h264 = open(internal_h264_name, 'w+', encoding='utf8') + internal_h264 = open(internal_h264_name, 'wb+') except IOError: logger.error('Impossible to create file: %s', internal_h264_name) exit(-1) @@ -3403,8 +3459,8 @@ def main() -> None: # Extract video stream of internal part as a raw H264 and its timestamps. logger.info('Extract video track as raw H264 file.') - extract_track_from_mkv(mkvextract_path=paths['mkvextract'], input_file=internal_mkv, index=0, - output_file=internal_h264, timestamps=internal_h264_ts) + extract_track_from_mkv(mkvextract_path=paths['mkvextract'], input_file=internal_mkv, + index=0, output_file=internal_h264, timestamps=internal_h264_ts) # Remove video track from internal part of MKV logger.info('Remove video track from %s', internal_mkv_name) @@ -3443,7 +3499,8 @@ def main() -> None: h264_ts.append(h264_tail_ts) logger.info('Merging MKV: %s', subparts) - part = merge_mkvs(mkvmerge_path=paths['mkvmerge'], inputs=subparts, + + part = merge_mkvs(mkvmerge_path=paths['mkvmerge'], inputs=subparts, output_name=f'part-{partnum:d}.mkv', concatenate=True) mkvparts.append(part) temporaries.append(part) @@ -3461,7 +3518,7 @@ def main() -> None: nb_mkv_parts = len(mkvparts) if nb_mkv_parts > 0: try: - full_h264 = open(f'{basename}-full.h264', 'w+', encoding='utf8') + full_h264 = open(f'{basename}-full.h264', 'wb+') except IOError: logger.error('Impossible to create file full H264 stream.') exit(-1) @@ -3495,7 +3552,7 @@ def main() -> None: if nb_mkv_parts >=1 : try: - final_novideo = open(final_novideo_name, 'r', encoding='utf8') + final_novideo = open(final_novideo_name, 'rb') except IOError: logger.error('Impossible to open file: %s.', final_novideo_name) exit(-1) @@ -3505,11 +3562,13 @@ def main() -> None: full_h264_ts.seek(0) logger.info('Merging final video track and all other tracks together') - final_with_video = merge_mkvs(mkvmerge_path=paths['mkvmerge'], inputs=[full_h264, final_novideo], + final_with_video = merge_mkvs(mkvmerge_path=paths['mkvmerge'], inputs=[full_h264, + final_novideo], output_name=final_with_video_name, concatenate=False, timestamps={0: full_h264_ts}) final_codec_private_data = dump_codec_private_data(main_avc_config) - logger.debug('Final codec private data: %s', hexdump.dump(final_codec_private_data, sep=':')) + logger.debug('Final codec private data: %s', hexdump.dump(final_codec_private_data, + sep=':')) logger.info('Changing codec private data with the new one.') change_codec_private_data(paths['mkvinfo'], final_with_video, final_codec_private_data) @@ -3545,17 +3604,18 @@ def main() -> None: logger.info(sts) if len(sts) > 0: + logger.info('Supported languages: %s' % supported_langs) list_of_subtitles = extract_srt(paths['mkvextract'], final_with_video_name, sts, supported_langs) logger.info(list_of_subtitles) for idx_name, sub_name, _, _ in list_of_subtitles: try: - idx = open(idx_name,'r', encoding='utf8') + idx = open(idx_name,'rb') except IOError: logger.error("Impossible to open %s.", idx_name) exit(-1) try: - sub = open(sub_name,'r', encoding='utf8') + sub = open(sub_name,'rb') except IOError: logger.error("Impossible to open %s.", sub_name) exit(-1) @@ -3563,7 +3623,8 @@ def main() -> None: temporaries.append(idx) temporaries.append(sub) - ocr = do_ocr(paths['vobsubocr'], list_of_subtitles, duration, temporaries, args.dump) + ocr = do_ocr(paths['vobsubocr'], list_of_subtitles, duration, temporaries, + args.dump) logger.info(ocr) # Remux SRT subtitles