#!/usr/bin/env python3 '''A module to remove parts of video (.e.g advertisements) with single frame precision.''' # Standard modules import argparse import re from sys import exit from datetime import datetime,timedelta import logging from functools import cmp_to_key from subprocess import Popen, PIPE from os import read, write, lseek, set_inheritable, memfd_create, SEEK_SET, close, unlink,\ fstat, ftruncate import os.path from enum import IntEnum, unique from shutil import copyfile, which, move from dataclasses import dataclass, field from math import floor, ceil, log from io import BytesIO, TextIOWrapper import json from typing import IO # Third party libraries import coloredlogs from tqdm import tqdm import hexdump from iso639 import Lang from iso639.exceptions import InvalidLanguageValue # Local modules # TODO: create local modules for MP4, MKV # Useful SPS/PPS discussion. # https://copyprogramming.com/howto/including-sps-and-pps-in-a-raw-h264-track # https://gitlab.com/mbunkus/mkvtoolnix/-/issues/2390 # New strategy: a possible way of handling multiple SPS/PPS gracefully. # Encode each head and trailer with FFMPEG using only I-frame (to be sure the NAL unit will never # refer to another image). # Encode using an different SPS-ID all of them (using sps-id parameter of libx264 library, e.g # 1 instead of 0). # For the video track produce only a raw H264 file and a file containing timestamps of the # different frames. # For the rest of the tracks (audio, subtitles) produce directly a MKV (this is already done). # Concatenate all raw H264 in a giant one (like cat), and the same for timestamps of video frames # (to keep sound and video synchronized). # Then use mkvmerge to remux the H264 track and the rest of tracks. # MKVmerge "concatenate" subcommand is able to concatenate different SPS/PPS data into a bigger # Private Codec Data. # However, this is proved to be not reliable. Sometimes it results in a AVC context containing # a single SPS/PPS. # So we have to rely on a manual parsing of the H264 AVC context of original movie # and the ones produced for headers and trailers, and then merging them into a bigger AVC context. # Then finally, change the Private Codec Data in the final MKV. def check_required_tools() -> tuple[bool,list[str]]: """Check if required external tools are installed. Args: Returns: tuple[bool, list[str]] : does all optional tools are installed and the paths of all tools. """ logger = logging.getLogger(__name__) all_optional_tools = True paths = {} required = ['ffmpeg', 'ffprobe', 'mkvmerge', 'mkvinfo'] optional = ['mkvextract', 'vobsubocr','tesseract'] for tool in required: path = which(tool) if path is None: logger.error('Required tool: %s is missing.',tool) exit(-1) else: paths[tool] = path for tool in optional: path = which(tool) if path is None: logger.info('Optional tool: %s is missing.',tool) all_optional_tools = False else: paths[tool] = path return all_optional_tools, paths def get_tesseract_supported_lang(tesseract_path:str) -> dict[str,str]|None: """Returns the set of natural languages supported by Tesseract OCR tool. Args: tesseract_path: str: path to tesseract binary. Returns: dict[str, str] : a mapping .... """ logger = logging.getLogger(__name__) res = {} with Popen([tesseract_path, '--list-langs'], stdout=PIPE) as tesseract: for line in tesseract.stdout: line = line.decode('utf8') p = re.compile('(?P[a-z]{3})\n') m = re.match(p,line) if m is not None: try: lang = m.group('lang') key = Lang(lang) res[key] = lang except InvalidLanguageValue as e: logger.warning('Invalid language: %s', e) pass tesseract.wait() if tesseract.returncode != 0: logger.error("Tesseract returns an error code: %d",tesseract.returncode) return None return res def get_frame_rate(ffprobe_path:str, inputFile) -> float|None: logger = logging.getLogger(__name__) infd = inputFile.fileno() lseek(infd, 0, SEEK_SET) set_inheritable(infd, True) mean_duration = 0. nb_frames1 = 0 nb_frames2 = 0 min_ts = None max_ts = None interlaced = False params = [ffprobe_path, '-loglevel', 'quiet', '-select_streams', 'v', '-show_frames', '-read_intervals', '00%+30', '-of', 'json', f'/proc/self/fd/{infd:d}'] env = {**os.environ, 'LANG': 'C'} with Popen(params, stdout=PIPE, close_fds=False, env=env) as ffprobe: out, _ = ffprobe.communicate() out = json.load(BytesIO(out)) if 'frames' in out: for frame in out['frames']: if 'interlaced_frame' in frame: if frame['interlaced_frame'] == 1: interlaced = True if 'pts_time' in frame: ts = float(frame['pts_time']) if min_ts is None: min_ts = ts if max_ts is None: max_ts = ts min_ts = min(min_ts, ts) max_ts = max(max_ts, ts) nb_frames1+=1 if 'duration_time' in frame: mean_duration+=float(frame['duration_time']) nb_frames2+=1 else: return None ffprobe.wait() if ffprobe.returncode != 0: logger.error("ffprobe returns an error code: %d", ffprobe.returncode) return None frame_rate1 = nb_frames1/(max_ts-min_ts) frame_rate2 = nb_frames2 / mean_duration if abs(frame_rate1 - frame_rate2) > 0.2: if not interlaced: logger.error('Video is not interlaced and the disperancy between frame rates is too \ big: %f / %f', frame_rate1, frame_rate2) return None if abs(frame_rate1*2 - frame_rate2) < 0.2: return frame_rate2/2 else: logger.error('Video is interlaced and the disperancy between frame rates is too big:\ %f / %f', frame_rate1, frame_rate2) return None else: return frame_rate2 return None def get_subtitles_tracks(ffprobe_path:str, mkvPath: str) -> dict[str,str]|None: logger = logging.getLogger(__name__) tracks={} with Popen([ffprobe_path, '-loglevel', 'quiet', '-select_streams', 's', '-show_entries', 'stream=index,codec_name:stream_tags=language', '-of', 'json', mkvPath], stdout=PIPE) as ffprobe: out, _ = ffprobe.communicate() out = json.load(BytesIO(out)) if 'streams' in out: for stream in out['streams']: index = stream['index'] codec = stream['codec'] lang = stream['tags']['language'] if codec == 'dvd_subtitle': if lang not in tracks: tracks[lang] = [index] else: current_langs = tracks[lang] current_langs.append(index) tracks[lang] = current_langs else: return None ffprobe.wait() if ffprobe.returncode != 0: logger.error("ffprobe returns an error code: %d", ffprobe.returncode) return None return tracks def extract_srt(mkvextract:str, fileName:str, subtitles:str, langs:list[str]) -> list|None: logger = logging.getLogger(__name__) params = [mkvextract, fileName, 'tracks'] res = [] for lang in subtitles: iso = Lang(lang) if iso in langs: ocrlang = langs[iso] else: logger.warning("Language not supported by Tesseract: %s", iso.name) ocrlang ='osd' if len(subtitles[lang]) == 1: params.append(f'{subtitles[lang][0]:d}:{lang}') res.append((f'{lang}.idx', f'{lang}.sub', lang, ocrlang)) else: count = 1 for track in subtitles[lang]: params.append(f'{track:d}:{lang}-{count:d}') res.append((f'{lang}-{count:d}.idx', f'{lang}-{count:d}.sub', lang, ocrlang)) count = count+1 logger.debug('Executing %s', params) env = {**os.environ, 'LANG': 'C'} with Popen(params, stdout=PIPE, close_fds=False, env=env) as extract: pb = tqdm(TextIOWrapper(extract.stdout, encoding="utf-8"), total=100, unit='%', desc='Extraction:') for line in pb: if line.startswith('Progress :'): p = re.compile('^Progress : (?P[0-9]{1,3})%$') m = p.match(line) if m is None: logger.error('Impossible to parse progress') pb.update(int(m['progress'])-pb.n) pb.update(100-pb.n) pb.refresh() pb.close() extract.wait() # mkvextract returns 0, 1 or 2 as error code. if extract.returncode == 0: logger.info('Subtitle tracks were succesfully extracted.') return res elif extract.returncode == 1: logger.warning('Mkvextract returns warning') return res else: logger.error('Mkvextract returns an error code: %d', extract.returncode) return None def do_ocr(vobsubocr, idxs, duration, temporaries, dumpMemFD=False): logger = logging.getLogger(__name__) res = [] for idx_name, _, lang, iso in idxs: srtname = f'{os.path.splitext(idx_name)[0]}.srt' # Tesseract seems to recognize the three dots ... as "su" ldots = re.compile('^su\n$') # Timestamps produced by vobsubocr: 01:52:19,861 --> 01:52:21,641 timestamps = re.compile((r'^[0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3} \-\-> (?P[0-9]{2}):' r'(?P[0-9]{2}):(?P[0-9]{2}),[0-9]{3}$')) srtfd = memfd_create(srtname, flags=0) with Popen([vobsubocr, '--lang', iso, idx_name], stdout=PIPE) as ocr: pb = tqdm(TextIOWrapper(ocr.stdout, encoding="utf-8"), total= int(duration/timedelta(seconds=1)), unit='s', desc='OCR') for line in pb: m = re.match(ldots,line) if m is not None: write(srtfd, '...'.encode(encoding='UTF-8')) else: write(srtfd, line.encode(encoding='UTF-8')) m = re.match(timestamps, line) if m is not None: hours = int(m.group('hours')) minutes = int(m.group('hours')) seconds = int(m.group('seconds')) ts = timedelta(hours=hours, minutes=minutes, seconds=seconds) pb.n = int(ts/timedelta(seconds=1)) pb.update() status = ocr.wait() if status != 0: logger.error('OCR failed with status code: %d', status) if dumpMemFD: try: with open(srtname,'w', encoding='utf8') as dump_srt: lseek(srtfd, 0, SEEK_SET) srt_length = fstat(srtfd).st_size buf = read(srtfd, srt_length) outfd = dump_srt.fileno() pos = 0 while pos < srt_length: pos+=write(outfd, buf[pos:]) temporaries.append(dump_srt) except IOError: logger.error('Impossible to create file: %s', srtname) return None srt_length = fstat(srtfd).st_size if srt_length > 0: res.append((srtfd, lang)) return res @unique class SupportedFormat(IntEnum): TS = 1 MP4 = 2 Matroska = 3 def __str__(self): if self is SupportedFormat.TS: return 'mpegts' elif self is SupportedFormat.MP4: return 'mov,mp4,m4a,3gp,3g2,mj2' elif self is SupportedFormat.Matroska: return 'matroska,webm' else: return 'Unsupported format' # Extract SPS/PPS # https://gitlab.com/mbunkus/mkvtoolnix/-/issues/2390 # ffmpeg -i -c:v copy -an -sn -bsf:v trace_headers -t 0.01\ # -report -loglevel 0 -f null - # Found codec private data using mkvinfo def get_codec_private_data_from_mkv(mkvinfo_path, inputFile: IO[bytes]): logger = logging.getLogger(__name__) infd = inputFile.fileno() lseek(infd, 0, SEEK_SET) set_inheritable(infd, True) found = False env = {**os.environ, 'LANG': 'C'} # Output example # Codec's private data: size 48 (H.264 profile: High @L4.0) hexdump 01 64 00 28 ff e1 00 1b 67\ # 64 00 28 ac d9 40 78 04 4f dc d4 04 04 05 00 00 92 ef 00 1d ad a6 1f 16 2d 96 01 00 06 68 fb\ # a3 cb 22 c0 fd f8 f8 00 at 406 size 51 data size 48 with Popen([mkvinfo_path, '-z', '-X', '-P', f'/proc/self/fd/{infd:d}'], stdout=PIPE, close_fds=False, env=env) as mkvinfo: out, _ = mkvinfo.communicate() out = out.decode('utf8') reg_exp = (r"^.*Codec's private data: size ([0-9]+) \(H.264.*\) hexdump " r"(?P([0-9a-f]{2} )+)at (?P[0-9]+) size (?P[0-9]+).*$") p = re.compile(reg_exp) for line in out.splitlines(): m = p.match(line) if m is not None: size = int(m.group('size')) position = int(m.group('position')) logger.debug("Found codec private data at position: %s, size: %d", position, size) found = True mkvinfo.wait() break if found: lseek(infd, position, SEEK_SET) data = read(infd, size) return position, data else: return None, None # All the following code is a transposition of documents: # ISO/IEC H.264-201602 # ISO/IEC 14496-15 def read_bit(buf, bit_position): byte_position = floor(floor(bit_position/8)) byte = buf[byte_position] bit = (byte >> (7-(bit_position % 8))) & 1 return bit_position+1, bit def read_boolean(buf, bit_position): bit_position, b = read_bit(buf, bit_position) return bit_position, b==1 def read_bits(buf, bit_position, nbBits): v = 0 for _ in range(0, nbBits): bit_position, bit = read_bit(buf, bit_position) v = v*2+bit return bit_position, v def read_byte(buf, bit_position): bit_position, b = read_bits(buf, bit_position, 8) return bit_position, b def read_word(buf, bit_position): bit_position, w = read_bits(buf, bit_position, 16) return bit_position, w def read_long(buf, bit_position): bit_position, value = read_bits(buf, bit_position, 32) return bit_position, value def read_unsigned_exp_golomb(buf, bit_position): nb_zeroes=0 while True: bit_position, b = read_bit(buf, bit_position) if b!=0: break nb_zeroes+=1 v1 = 1 bit_position, v2 = read_bits(buf, bit_position, nb_zeroes) v = (v1<>1) else: return bit_position, (v+1)>>1 def write_bit(buf, bit_position, b): buf_length = len(buf) byte_position = floor(bit_position/8) if byte_position >= buf_length: extension = bytearray(byte_position+1-buf_length) buf.extend(extension) buf[byte_position] |= (b<<(7-(bit_position % 8))) bit_position+=1 return bit_position def write_boolean(buf, bit_position, b): if b: bit_position = write_bit(buf, bit_position, 1) else: bit_position = write_bit(buf, bit_position, 0) return bit_position def write_bits(buf, bit_position, v, size): for i in range(size-1,-1,-1): b = (v>>i)&1 bit_position = write_bit(buf, bit_position, b) return bit_position def write_byte(buf, bit_position, v): bit_position = write_bits(buf, bit_position, v, 8) return bit_position def write_word(buf, bit_position, v): bit_position = write_bits(buf, bit_position, v, 16) return bit_position def write_long(buf, bit_position, v): bit_position = write_bits(buf, bit_position, v, 32) return bit_position def write_unsigned_exp_golomb(buf, bit_position, v): n = floor(log(v+1)/log(2))+1 # Write zeroes bit_position = write_bits(buf, bit_position, 0, n-1) bit_position = write_bit(buf, bit_position, 1) bit_position = write_bits(buf, bit_position, v+1, n-1) return bit_position def write_signed_exp_golomb(buf, bit_position, v): if v <= 0: bit_position = write_unsigned_exp_golomb(buf, bit_position, -v*2) else: bit_position = write_unsigned_exp_golomb(buf, bit_position, v*2-1) return bit_position def parse_rbsp_trailing_bits(buf, bit_position): bit_position, one = read_bit(buf, bit_position) if one==0: raise ValueError(f'Stop bit should be equal to one. Read: {one:d}') while bit_position%8 != 0: bit_position, zero = read_bit(buf, bit_position) if zero==1: raise ValueError('Trailing bit should be equal to zero') return bit_position def write_rbsp_trailing_bits(buf, bit_position): bit_position = write_bit(buf, bit_position, 1) while bit_position%8 != 0: bit_position = write_bit(buf, bit_position, 0) return bit_position def more_rbsp_data(buf, bit_position): logger = logging.getLogger(__name__) logger.debug('Is there more data in buffer of length: %d at bit position: %d', len(buf), bit_position) byte_length = len(buf) bit_length = byte_length*8 # We are at the end of buffer if bit_position == bit_length: return False else: found = False for i in range(bit_length-1,-1,-1): pos, b = read_bit(buf, i) if b == 1: found = True break if not found: raise ValueError('Impossible to find trailing stop bit !') # No more data if bit_position == pos: return False return True # Convert from RBSP (Raw Byte Sequence Payload) to SODB (String Of Data Bits) def rbsp_to_sodb(buf): logger = logging.getLogger(__name__) logger.debug('RBSP: %s', hexdump.dump(buf, sep=':')) res = buf for b in [ b'\x00', b'\x01', b'\x02', b'\x03']: pattern = b'\x00\x00\x03'+b replacement = b'\x00\x00' + b res = res.replace(pattern, replacement) logger.debug('SODB: %s', hexdump.dump(res, sep=':')) return res # Reverse operation SODB to RBSP. def sodb_to_rbsp(buf): logger = logging.getLogger(__name__) logger.debug('SODB: %s', hexdump.dump(buf, sep=':')) res = buf for b in [ b'\x03', b'\x00', b'\x01', b'\x02']: pattern = b'\x00\x00'+b replacement = b'\x00\x00\x03' + b res = res.replace(pattern, replacement) logger.debug('RBSP: %s', hexdump.dump(res, sep=':')) return res # Useful for SPS and PPS def parse_scaling_list(buf, bit_position, size): res = [] last_scale = 8 next_scale = 8 for _ in range(0, size): if next_scale != 0: bit_position, delta_scale = read_signed_exp_golomb(buf, bit_position) next_scale = (last_scale+delta_scale+256) % 256 v = last_scale if next_scale==0 else next_scale res.append(v) last_scale = v return bit_position,res # TODO: test optimized version. # The ISO/IEC H.264-201602 seems to take into account the case where the end of the deltas list # is full of zeroes. def write_scaling_list(buf, bit_position, size, matrix, optimized=False): logger = logging.getLogger(__name__) logger.debug('Dumping matrix: %s of size: %d, size parameter: %d.', matrix, len(matrix), size) prev = 8 deltas = [] for i in range(0, size): v = matrix[i] delta = v - prev deltas.append(delta) prev = v if not optimized: for delta in deltas: bit_position = write_signed_exp_golomb(buf, bit_position, delta) else: logger.error('Not yet implemented') exit(-1) # reverse = deltas.reverse() # compressed = False # while len(reverse)>0: # if reverse[0] == 0: # compressed = True # reverse.pop() # else: # break # deltas = reverse.reverse() # if compressed: # deltas.append(0) # for delta in deltas: # bit_position = write_signed_exp_golomb(buf, bit_position, delta) return bit_position @dataclass class HRD: cpb_cnt_minus1: int=0 bit_rate_scale: int=0 cpb_size_scale: int=0 bit_rate_value_minus1: dict = field(default_factory=dict) cpb_size_value_minus1: dict = field(default_factory=dict) cbr_flag: dict = field(default_factory=dict) initial_cpb_removal_delay_length_minus1: int=0 cpb_removal_delay_length_minus1: int=0 dpb_output_delay_length_minus1: int=0 time_offset_length: int=0 def __init__(self): self.bit_rate_value_minus1 = {} self.cpb_size_value_minus1 = {} self.cbr_flag = {} def fromBytes(self, buf, bit_position): bit_position, self.cpb_cnt_minus1 = read_unsigned_exp_golomb(buf, bit_position) bit_position, self.bit_rate_scale = read_bits(buf, bit_position, 4) bit_position, self.cpb_size_scale = read_bits(buf, bit_position, 4) for i in range(0, self.cpb_cnt_minus1+1): bit_position, v = read_unsigned_exp_golomb(buf, bit_position) self.bit_rate_value_minus1[i] = v bit_position, v = read_unsigned_exp_golomb(buf, bit_position) self.cpb_size_value_minus1[i] = v bit_position, b = read_boolean(buf, bit_position) self.cbr_flag[i] = b bit_position, self.initial_cpb_removal_delay_length_minus1 = read_bits(buf, bit_position, 5) bit_position, self.cpb_removal_delay_length_minus1 = read_bits(buf, bit_position, 5) bit_position, self.dpb_output_delay_length_minus1 = read_bits(buf, bit_position, 5) bit_position, self.time_offset_length = read_bits(buf, bit_position, 5) return bit_position def toBytes(self, buf, bit_position): bit_position = write_unsigned_exp_golomb(buf, bit_position, self.cpb_cnt_minus1) bit_position = write_bits(buf, bit_position, self.bit_rate_scale, 4) bit_position = write_bits(buf, bit_position, self.cpb_size_scale, 4) for i in range(0, self.cpb_cnt_minus1+1): v = self.bit_rate_value_minus1[i] bit_position = write_unsigned_exp_golomb(buf, bit_position, v) v = self.cpb_size_value_minus1[i] bit_position = write_unsigned_exp_golomb(buf, bit_position, v) b = self.cbr_flag[i] bit_position = write_boolean(buf, bit_position, b) bit_position = write_bits(buf, bit_position, self.initial_cpb_removal_delay_length_minus1, 5) bit_position = write_bits(buf, bit_position, self.cpb_removal_delay_length_minus1, 5) bit_position = write_bits(buf, bit_position, self.dpb_output_delay_length_minus1, 5) bit_position = write_bits(buf, bit_position, self.time_offset_length, 5) return bit_position @dataclass class VUI: aspect_ratio_info_present_flag:bool=False aspect_ratio_idc:int=0 sar_width:int=0 sar_height:int=0 overscan_info_present_flag:bool=False overscan_appropriate_flag:bool=False video_signal_type_present_flag:bool=False video_format:int=0 video_full_range_flag:bool=False colour_description_present_flag:bool=False colour_primaries:int=0 transfer_characteristics:int=0 matrix_coefficients:int=0 chroma_loc_info_present_flag:bool=False chroma_sample_loc_type_top_field:int=0 chroma_sample_loc_type_bottom_field:int=0 timing_info_present_flag:bool=False num_units_in_tick:int=0 time_scale:int=0 fixed_frame_rate_flag:bool=False nal_hrd_parameters_present_flag:bool=False hrd_parameters:HRD=None vcl_hrd_parameters_present_flag:bool=False vcl_hrd_parameters:HRD=None low_delay_hrd_flag:bool=False pic_struct_present_flag:bool=False bitstream_restriction_flag:bool=False motion_vectors_over_pic_boundaries_flag:bool=False max_bytes_per_pic_denom:int=0 max_bits_per_mb_denom:int=0 log2_max_mv_length_horizontal:int=0 log2_max_mv_length_vertical:int=0 max_num_reorder_frames:int=0 max_dec_frame_buffering:int=0 # This structure is not guaranteed to be located at a byte boundary. # We must explicitely indicate bit offset. def fromBytes(self, buf, bit_position): bit_position, self.aspect_ratio_info_present_flag = read_boolean(buf, bit_position) if self.aspect_ratio_info_present_flag: bit_position, self.aspect_ratio_idc = read_byte(buf, bit_position) if self.aspect_ratio_idc == 255: # Extended_SAR bit_position, self.sar_width = read_word(buf, bit_position) bit_position, self.sar_height = read_word(buf, bit_position) bit_position, self.overscan_info_present_flag = read_boolean(buf, bit_position) if self.overscan_info_present_flag: bit_position, self.overscan_appropriate_flag = read_boolean(buf, bit_position) bit_position, self.video_signal_type_present_flag = read_boolean(buf, bit_position) if self.video_signal_type_present_flag: bit_position, self.video_format = read_bits(buf, bit_position, 3) bit_position, self.video_full_range_flag = read_boolean(buf, bit_position) bit_position, self.colour_description_present_flag = read_boolean(buf, bit_position) if self.colour_description_present_flag: bit_position, self.colour_primaries = read_byte(buf, bit_position) bit_position, self.transfer_characteristics = read_byte(buf, bit_position) bit_position, self.matrix_coefficients = read_byte(buf, bit_position) bit_position, self.chroma_loc_info_present_flag = read_boolean(buf, bit_position) if self.chroma_loc_info_present_flag: bit_position, self.chroma_sample_loc_type_top_field =\ read_unsigned_exp_golomb(buf, bit_position) bit_position, self.chroma_sample_loc_type_bottom_field =\ read_unsigned_exp_golomb(buf,bit_position) bit_position, self.timing_info_present_flag = read_boolean(buf, bit_position) if self.timing_info_present_flag: bit_position, self.num_units_in_tick = read_long(buf, bit_position) bit_position, self.time_scale = read_long(buf, bit_position) bit_position, self.fixed_frame_rate_flag = read_boolean(buf, bit_position) bit_position, self.nal_hrd_parameters_present_flag = read_boolean(buf, bit_position) if self.nal_hrd_parameters_present_flag: hrd = HRD() bit_position = hrd.fromBytes(buf, bit_position) self.hrd_parameters = hrd bit_position, self.vcl_hrd_parameters_present_flag = read_boolean(buf, bit_position) if self.vcl_hrd_parameters_present_flag: hrd = HRD() bit_position = hrd.fromBytes(buf, bit_position) self.vcl_hrd_parameters = hrd if self.nal_hrd_parameters_present_flag or self.vcl_hrd_parameters_present_flag: bit_position, self.low_delay_hrd_flag = read_boolean(buf, bit_position) bit_position, self.pic_struct_present_flag = read_boolean(buf, bit_position) bit_position, self.bitstream_restriction_flag = read_boolean(buf, bit_position) if self.bitstream_restriction_flag: bit_position, self.motion_vectors_over_pic_boundaries_flag =\ read_boolean(buf, bit_position) bit_position, self.max_bytes_per_pic_denom = read_unsigned_exp_golomb(buf, bit_position) bit_position, self.max_bits_per_mb_denom = read_unsigned_exp_golomb(buf, bit_position) bit_position, self.log2_max_mv_length_horizontal = read_unsigned_exp_golomb(buf, bit_position) bit_position, self.log2_max_mv_length_vertical = read_unsigned_exp_golomb(buf, bit_position) bit_position, self.max_num_reorder_frames = read_unsigned_exp_golomb(buf, bit_position) bit_position, self.max_dec_frame_buffering = read_unsigned_exp_golomb(buf, bit_position) return bit_position def toBytes(self, buf, bit_position): bit_position = write_boolean(buf, bit_position, self.aspect_ratio_info_present_flag) if self.aspect_ratio_info_present_flag: bit_position = write_byte(buf, bit_position, self.aspect_ratio_idc) if self.aspect_ratio_idc == 255: # Extended_SAR bit_position = write_word(buf, bit_position, self.sar_width) bit_position = write_word(buf, bit_position, self.sar_height) bit_position = write_boolean(buf, bit_position, self.overscan_info_present_flag) if self.overscan_info_present_flag: bit_position = write_boolean(buf, bit_position, self.overscan_appropriate_flag) bit_position = write_boolean(buf, bit_position, self.video_signal_type_present_flag) if self.video_signal_type_present_flag: bit_position = write_bits(buf, bit_position, self.video_format, 3) bit_position = write_boolean(buf, bit_position, self.video_full_range_flag) bit_position = write_boolean(buf, bit_position, self.colour_description_present_flag) if self.colour_description_present_flag: bit_position = write_byte(buf, bit_position, self.colour_primaries) bit_position = write_byte(buf, bit_position, self.transfer_characteristics) bit_position = write_byte(buf, bit_position, self.matrix_coefficients) bit_position = write_boolean(buf, bit_position, self.chroma_loc_info_present_flag) if self.chroma_loc_info_present_flag: bit_position = write_unsigned_exp_golomb(buf, bit_position, self.chroma_sample_loc_type_top_field) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.chroma_sample_loc_type_bottom_field) bit_position = write_boolean(buf, bit_position, self.timing_info_present_flag ) if self.timing_info_present_flag: bit_position = write_long(buf, bit_position, self.num_units_in_tick ) bit_position = write_long(buf, bit_position, self.time_scale) bit_position = write_boolean(buf, bit_position, self.fixed_frame_rate_flag) bit_position = write_boolean(buf, bit_position, self.nal_hrd_parameters_present_flag) if self.nal_hrd_parameters_present_flag: bit_position = self.hrd_parameters.toBytes(buf, bit_position) bit_position = write_boolean(buf, bit_position, self.vcl_hrd_parameters_present_flag) if self.vcl_hrd_parameters_present_flag: bit_position = self.vcl_hrd_parameters.toBytes(buf, bit_position) if self.nal_hrd_parameters_present_flag or self.vcl_hrd_parameters_present_flag: bit_position = write_boolean(buf, bit_position, self.low_delay_hrd_flag) bit_position = write_boolean(buf, bit_position, self.pic_struct_present_flag) bit_position = write_boolean(buf, bit_position, self.bitstream_restriction_flag) if self.bitstream_restriction_flag: bit_position = write_boolean(buf, bit_position, self.motion_vectors_over_pic_boundaries_flag) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.max_bytes_per_pic_denom) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.max_bits_per_mb_denom) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.log2_max_mv_length_horizontal) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.log2_max_mv_length_vertical) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.max_num_reorder_frames) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.max_dec_frame_buffering) return bit_position @dataclass class SPS: profile_idc:int=0 # u(8) constraint_set0_flag:bool=False # u(1) constraint_set1_flag:bool=False # u(1) constraint_set2_flag:bool=False # u(1) constraint_set3_flag:bool=False # u(1) constraint_set4_flag:bool=False # u(1) constraint_set5_flag:bool=False # u(1) level_idc:int=0 # u(8) seq_parameter_set_id:int=0 # ue(v) chroma_format_idc:int=0 # ue(v) separate_colour_plane_flag:bool=False # u(1) bit_depth_luma_minus8:int=0 # ue(v) bit_depth_chroma_minus8:int=0 # ue(v) qpprime_y_zero_transform_bypass_flag:bool=False # u(1) seq_scaling_matrix_present_flag:bool=False # u(1) scaling_list: dict = field(default_factory=dict) log2_max_frame_num_minus4:int=0 # ue(v) pic_order_cnt_type:int=0 # ue(v) log2_max_pic_order_cnt_lsb_minus4:int=0 # ue(v) delta_pic_order_always_zero_flag:bool=False # ue(1) offset_for_non_ref_pic:int=0 # se(v) offset_for_top_to_bottom_field:int=0 # se(v) num_ref_frames_in_pic_order_cnt_cycle:int=0 # ue(v) offset_for_ref_frame:dict[int] = field(default_factory=dict) max_num_ref_frames:int=9 # ue(v) gaps_in_frame_num_value_allowed_flag:bool=False # u(1) pic_width_in_mbs_minus1:int=0 # ue(v) pic_height_in_map_units_minus1:int=0 # ue(v) frame_mbs_only_flag:bool=False # u(1) mb_adaptive_frame_field_flag:bool=False # u(1) direct_8x8_inference_flag:bool=False # u(1) frame_cropping_flag:bool=False # u(1) frame_crop_left_offset:int=0 # ue(v) frame_crop_right_offset:int=0 # ue(v) frame_crop_top_offset:int=0 # ue(v) frame_crop_bottom_offset:int=0 # ue(v) vui_parameters_present_flag:bool=False # u(1) vui:VUI=None # VUI object def __init__(self): self.scaling_list={} self.offset_for_ref_frame={} # TODO: ... # Compute options to pass to ffmpeg so as to reproduce the same SPS. # Very complex since some codec configuration are not provided by ffmpeg and/or libx264. # This is only an attempt for now and it is almost impossible to mimic any profile without # patching ffmpeg and/or libx264 to add the support for corner cases. def ffmpegOptions(self, videoID=0): logger = logging.getLogger(__name__) x264opts = [] try: profile = {0x42:'baseline', 0x4D:'main', 0x64:'high', 0x6E:'high10', 0x7A:'high422', 0xF4:'high444'}[self.profile_idc] except KeyError: logger.error('Unknow profile: %x', self.profile_idc) return [] level = f'{floor(self.level_idc/10):d}.{self.level_idc % 10:d}' x264opts.extend([f'sps-id={self.seq_parameter_set_id:d}'] ) if self.bit_depth_chroma_minus8 not in [0,1,2,4,6,8]: logger.error('Bit depth of chrominance is not supported: %d', self.bit_depth_chroma_minus8+8) return [] if self.chroma_format_idc in range(0,4): if self.chroma_format_idc == 0: # Monochrome pass elif self.chroma_format_idc == 1: # YUV:4:2:0 pass elif self.chroma_format_idc == 2: # YUV:4:2:2 pass elif self.chroma_format_idc == 3: # YUV:4:4:4 pass else: logger.error('Unknow chrominance format: %x', self.chroma_format_idc) return [] res = [f'-profile:v:{videoID:d}', self.profile_idc, f'-level:v:{videoID:d}', level] return res def fromBytes(self, buf): logger = logging.getLogger(__name__) logger.debug('Parsing: %s', hexdump.dump(buf,sep=':')) bit_position=0 # NAL Unit SPS bit_position, zero = read_bit(buf, bit_position) if zero != 0: raise ValueError(f'Reserved bit is not equal to 0: {zero:d}') bit_position, nal_ref_idc = read_bits(buf, bit_position,2) if nal_ref_idc != 3: raise ValueError(f'NAL ref idc is not equal to 3: {nal_ref_idc:d}') bit_position, nal_unit_type = read_bits(buf, bit_position,5) if nal_unit_type != 7: raise ValueError(f'NAL unit type is not a SPS: {nal_unit_type:d}') bit_position, self.profile_idc = read_byte(buf, bit_position) bit_position, self.constraint_set0_flag = read_bit(buf,bit_position) bit_position, self.constraint_set1_flag = read_bit(buf,bit_position) bit_position, self.constraint_set2_flag = read_bit(buf,bit_position) bit_position, self.constraint_set3_flag = read_bit(buf,bit_position) bit_position, self.constraint_set4_flag = read_bit(buf,bit_position) bit_position, self.constraint_set5_flag = read_bit(buf,bit_position) bit_position, v = read_bits(buf, bit_position, 2) if v!=0: raise ValueError(f'Reserved bits different from 0b00: {v:x}') bit_position, self.level_idc = read_byte(buf, bit_position) bit_position, self.seq_parameter_set_id = read_unsigned_exp_golomb(buf, bit_position) if self.profile_idc in [44, 83, 86, 100, 110, 118, 122, 128, 134, 135, 138, 139, 244]: bit_position, self.chroma_format_idc = read_unsigned_exp_golomb(buf, bit_position) if self.chroma_format_idc==3: bit_position, self.separate_colour_plane_flag=read_bit(buf, bit_position) bit_position, self.bit_depth_luma_minus8 = read_unsigned_exp_golomb(buf, bit_position) bit_position, self.bit_depth_chroma_minus8 = read_unsigned_exp_golomb(buf, bit_position) bit_position, self.qpprime_y_zero_transform_bypass_flag = read_boolean(buf, bit_position) bit_position, self.seq_scaling_matrix_present_flag = read_boolean(buf, bit_position) if self.seq_scaling_matrix_present_flag: nb_matrices = 12 if self.chroma_format_idc == 3 else 8 for i in range(0, nb_matrices): bit_position, present = read_boolean(buf, bit_position) if present: if i<6: bit_position, matrix = parse_scaling_list(buf, bit_position, 16) self.scaling_list[i] = matrix else: bit_position, matrix = parse_scaling_list(buf, bit_position, 64) self.scaling_list[i] = matrix else: self.scaling_list[i] = [] bit_position, self.log2_max_frame_num_minus4 = read_unsigned_exp_golomb(buf, bit_position) bit_position , self.pic_order_cnt_type = read_unsigned_exp_golomb(buf, bit_position) if self.pic_order_cnt_type == 0: bit_position, self.log2_max_pic_order_cnt_lsb_minus4 =\ read_unsigned_exp_golomb(buf, bit_position) elif self.pic_order_cnt_type == 1: bit_position, self.delta_pic_order_always_zero_flag = read_boolean(buf, bit_position) bit_position, self.offset_for_non_ref_pic = read_signed_exp_golomb(buf, bit_position) bit_position, self.offset_for_top_to_bottom_field = read_signed_exp_golomb(buf, bit_position) bit_position, self.num_ref_frames_in_pic_order_cnt_cycle =\ read_unsigned_exp_golomb(buf, bit_position) for i in range(0, self.num_ref_frames_in_pic_order_cnt_cycle): bit_position, v = read_unsigned_exp_golomb(buf, bit_position) self.offset_for_ref_frame[i]=v bit_position, self.max_num_ref_frames = read_unsigned_exp_golomb(buf, bit_position) bit_position, self.gaps_in_frame_num_value_allowed_flag = read_boolean(buf, bit_position) bit_position, self.pic_width_in_mbs_minus1 = read_unsigned_exp_golomb(buf, bit_position) bit_position, self.pic_height_in_map_units_minus1 = read_unsigned_exp_golomb(buf, bit_position) bit_position, self.frame_mbs_only_flag = read_boolean(buf, bit_position) if not self.frame_mbs_only_flag: bit_position, self.mb_adaptive_frame_field_flag = read_boolean(buf, bit_position) bit_position, self.direct_8x8_inference_flag = read_boolean(buf, bit_position) bit_position, self.frame_cropping_flag = read_boolean(buf, bit_position) if self.frame_cropping_flag: bit_position, self.frame_crop_left_offset = read_unsigned_exp_golomb(buf, bit_position) bit_position, self.frame_crop_right_offset = read_unsigned_exp_golomb(buf, bit_position) bit_position, self.frame_crop_top_offset = read_unsigned_exp_golomb(buf, bit_position) bit_position, self.frame_crop_bottom_offset = read_unsigned_exp_golomb(buf, bit_position) bit_position, self.vui_parameters_present_flag = read_boolean(buf, bit_position) if self.vui_parameters_present_flag: self.vui = VUI() bit_position = self.vui.fromBytes(buf,bit_position) logger.debug('VUI present: %s', self.vui) logger.debug('Parse end of SPS. Bit position: %d. Remaining bytes: %s.', bit_position, hexdump.dump(buf[floor(bit_position/8):], sep=':')) bit_position = parse_rbsp_trailing_bits(buf, bit_position) logger.debug('End of SPS: %d. Remaining bytes: %s', bit_position, hexdump.dump(buf[floor(bit_position/8):], sep=':')) return bit_position def toBytes(self): logger = logging.getLogger(__name__) buf = bytearray() bit_position = 0 bit_position = write_bit(buf, bit_position,0) bit_position = write_bits(buf, bit_position, 3, 2) bit_position = write_bits(buf, bit_position, 7, 5) bit_position = write_byte(buf, bit_position, self.profile_idc) bit_position = write_bit(buf, bit_position, self.constraint_set0_flag) bit_position = write_bit(buf, bit_position, self.constraint_set1_flag) bit_position = write_bit(buf, bit_position, self.constraint_set2_flag) bit_position = write_bit(buf, bit_position, self.constraint_set3_flag) bit_position = write_bit(buf, bit_position, self.constraint_set4_flag) bit_position = write_bit(buf, bit_position, self.constraint_set5_flag) bit_position = write_bits(buf, bit_position, 0, 2) bit_position = write_byte(buf, bit_position, self.level_idc) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.seq_parameter_set_id) if self.profile_idc in [44, 83, 86, 100, 110, 118, 122, 128, 134, 135, 138, 139, 244]: bit_position = write_unsigned_exp_golomb(buf, bit_position, self.chroma_format_idc) if self.chroma_format_idc==3: bit_position = write_bit(buf, bit_position, self.separate_colour_plane_flag) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.bit_depth_luma_minus8) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.bit_depth_chroma_minus8) bit_position = write_boolean(buf, bit_position, self.qpprime_y_zero_transform_bypass_flag) bit_position = write_boolean(buf, bit_position, self.seq_scaling_matrix_present_flag) if self.seq_scaling_matrix_present_flag: nb_matrices = 12 if self.chroma_format_idc == 3 else 8 for i in range(0, nb_matrices): matrix = self.scaling_list[i] present = (len(matrix))!=0 bit_position = write_boolean(buf, bit_position, present) if present: if i<6: bit_position = write_scaling_list(buf, bit_position, 16, matrix) else: bit_position = write_scaling_list(buf, bit_position, 64, matrix) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.log2_max_frame_num_minus4) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.pic_order_cnt_type) if self.pic_order_cnt_type == 0: bit_position = write_unsigned_exp_golomb(buf, bit_position, self.log2_max_pic_order_cnt_lsb_minus4) elif self.pic_order_cnt_type == 1: bit_position = write_boolean(buf, bit_position, self.delta_pic_order_always_zero_flag) bit_position = write_signed_exp_golomb(buf, bit_position, self.offset_for_non_ref_pic) bit_position = write_signed_exp_golomb(buf, bit_position, self.offset_for_top_to_bottom_field) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.num_ref_frames_in_pic_order_cnt_cycle) for i in range(0, self.num_ref_frames_in_pic_order_cnt_cycle): v = self.offset_for_ref_frame[i] bit_position = write_unsigned_exp_golomb(buf, bit_position, v) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.max_num_ref_frames) bit_position = write_boolean(buf, bit_position, self.gaps_in_frame_num_value_allowed_flag) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.pic_width_in_mbs_minus1) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.pic_height_in_map_units_minus1) bit_position = write_boolean(buf, bit_position, self.frame_mbs_only_flag) if not self.frame_mbs_only_flag: bit_position = write_boolean(buf, bit_position, self.mb_adaptive_frame_field_flag) bit_position = write_boolean(buf, bit_position, self.direct_8x8_inference_flag) bit_position = write_boolean(buf, bit_position, self.frame_cropping_flag) if self.frame_cropping_flag: bit_position = write_unsigned_exp_golomb(buf, bit_position, self.frame_crop_left_offset) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.frame_crop_right_offset) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.frame_crop_top_offset) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.frame_crop_bottom_offset) bit_position = write_boolean(buf, bit_position, self.vui_parameters_present_flag) if self.vui_parameters_present_flag: logger.debug('SPS has VUI. Writing VUI at position: %d', bit_position) bit_position = self.vui.toBytes(buf, bit_position) logger.debug('VUI written. New bit position: %d', bit_position) bit_position = write_rbsp_trailing_bits(buf, bit_position) return buf @dataclass class PPS: pic_parameter_set_id:int=0 seq_parameter_set_id:int=0 entropy_coding_mode_flag:bool=False bottom_field_pic_order_in_frame_present_flag:bool=False num_slice_groups_minus1:int=0 slice_group_map_type:int=0 run_length_minus1:dict = field(default_factory=dict) top_left:dict = field(default_factory=dict) bottom_right:dict = field(default_factory=dict) slice_group_change_direction_flag:bool=False slice_group_change_rate_minus1:int=0 pic_size_in_map_units_minus1:int=0 slice_group_id:dict = field(default_factory=dict) num_ref_idx_l0_default_active_minus1:int=0 num_ref_idx_l1_default_active_minus1:int=0 weighted_pred_flag:bool=False weighted_bipred_idc:int=0 pic_init_qp_minus26:int=0 pic_init_qs_minus26:int=0 chroma_qp_index_offset:int=0 deblocking_filter_control_present_flag:bool=False constrained_intra_pred_flag:bool=False redundant_pic_cnt_present_flag:bool=False transform_8x8_mode_flag:bool=False pic_scaling_matrix_present_flag:bool=False pic_scaling_list:list[list[int]] = field(default_factory=list) second_chroma_qp_index_offset:int=0 def __init__(self): self.run_length_minus1={} self.top_left={} self.bottom_right={} self.slice_group_id={} self.pic_scaling_list=[] # PPS are located at byte boundary def fromBytes(self, buf, chroma_format_idc): logger = logging.getLogger(__name__) logger.debug('Parsing: %s', (hexdump.dump(buf,sep=':'))) bit_position=0 # NAL Unit PPS bit_position, zero = read_bit(buf, bit_position) if zero != 0: raise ValueError(f'Reserved bit is not equal to 0: {zero:d}') bit_position, nal_ref_idc = read_bits(buf, bit_position,2) if nal_ref_idc != 3: raise ValueError(f'NAL ref idc is not equal to 3: {nal_ref_idc:d}') bit_position, nal_unit_type = read_bits(buf, bit_position,5) if nal_unit_type != 8: raise ValueError(f'NAL unit type is not a PPS: {nal_unit_type:d}') bit_position, self.pic_parameter_set_id = read_unsigned_exp_golomb(buf, bit_position) bit_position, self.seq_parameter_set_id = read_unsigned_exp_golomb(buf, bit_position) bit_position, self.entropy_coding_mode_flag = read_boolean(buf, bit_position) bit_position, self.bottom_field_pic_order_in_frame_present_flag =\ read_boolean(buf, bit_position) bit_position, self.num_slice_groups_minus1 = read_unsigned_exp_golomb(buf, bit_position) if self.num_slice_groups_minus1>0: bit_position, self.slice_group_map_type = read_unsigned_exp_golomb(buf, bit_position) if self.slice_group_map_type == 0: for i in range(0, self.num_slice_groups_minus1): bit_position, v = read_unsigned_exp_golomb(buf, bit_position) self.run_length_minus1[i]=v elif self.slice_group_map_type == 2: for i in range(0, self.num_slice_groups_minus1): bit_position, v = read_unsigned_exp_golomb(buf, bit_position) self.top_left[i] = v bit_position, v = read_unsigned_exp_golomb(buf, bit_position) self.bottom_right[i] = v elif self.slice_group_map_type in [3,4,5]: bit_position, self.slice_group_change_direction_flag = read_boolean(buf, bit_position) bit_position, self.slice_group_change_rate_minus1 =\ read_unsigned_exp_golomb(buf, bit_position) elif self.slice_group_map_type == 6: bit_position, self.pic_size_in_map_units_minus1 =\ read_unsigned_exp_golomb(buf, bit_position) nb_bits = ceil(log(self.num_slice_groups_minus1+1)) for i in range(0, self.pic_size_in_map_units_minus1): bit_position, v = read_bits(buf, bit_position, nb_bits) self.slice_group_id[i]=v bit_position, self.num_ref_idx_l0_default_active_minus1 =\ read_unsigned_exp_golomb(buf, bit_position) bit_position, self.num_ref_idx_l2_default_active_minus1 =\ read_unsigned_exp_golomb(buf, bit_position) bit_position, self.weighted_pred_flag = read_boolean(buf, bit_position) bit_position, self.weighted_bipred_idc = read_bits(buf, bit_position, 2) bit_position, self.pic_init_qp_minus26 = read_signed_exp_golomb(buf, bit_position) bit_position, self.pic_init_qs_minus26 = read_signed_exp_golomb(buf, bit_position) bit_position, self.chroma_qp_index_offset = read_signed_exp_golomb(buf, bit_position) bit_position, self.deblocking_filter_control_present_flag = read_boolean(buf, bit_position) bit_position, self.constrained_intra_pred_flag = read_boolean(buf, bit_position) bit_position, self.redundant_pic_cnt_present_flag = read_boolean(buf, bit_position) if more_rbsp_data(buf, bit_position): bit_position, self.transform_8x8_mode_flag = read_boolean(buf, bit_position) bit_position, self.pic_scaling_matrix_present_flag = read_boolean(buf, bit_position) if self.pic_scaling_matrix_present_flag: nb_matrices = 6 if chroma_format_idc == 3 else 2 if self.transform_8x8_mode_flag: nb_matrices+=6 else: nb_matrices = 6 for i in range(0, nb_matrices): bit_position, present = read_boolean(buf, bit_position) if present: if i<6: bit_position, matrix = parse_scaling_list(buf, bit_position, 16) self.pic_scaling_list.append(matrix) else: bit_position, matrix = parse_scaling_list(buf, bit_position, 64) self.pic_scaling_list.append(matrix) else: self.pic_scaling_list.append([]) bit_position, self.second_chroma_qp_index_offset = read_signed_exp_golomb(buf, bit_position) logger.info("parse RBSP") bit_position = parse_rbsp_trailing_bits(buf, bit_position) return bit_position def toBytes(self, chroma_format_idc): logger = logging.getLogger(__name__) buf = bytearray() bit_position = 0 # NAL Unit PPS bit_position = write_bit(buf, bit_position, 0) bit_position = write_bits(buf, bit_position, 3, 2) bit_position = write_bits(buf, bit_position, 8, 5) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.pic_parameter_set_id) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.seq_parameter_set_id) bit_position = write_boolean(buf, bit_position, self.entropy_coding_mode_flag) bit_position = write_boolean(buf, bit_position,\ self.bottom_field_pic_order_in_frame_present_flag) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.num_slice_groups_minus1) if self.num_slice_groups_minus1>0: bit_position = write_unsigned_exp_golomb(buf, bit_position, self.slice_group_map_type) if self.slice_group_map_type == 0: for i in range(0, self.num_slice_groups_minus1): v = self.run_length_minus1[i] bit_position = write_unsigned_exp_golomb(buf, bit_position, v) elif self.slice_group_map_type == 2: for i in range(0, self.num_slice_groups_minus1): v = self.top_left[i] bit_position = write_unsigned_exp_golomb(buf, bit_position, v) v = self.bottom_right[i] bit_position = write_unsigned_exp_golomb(buf, bit_position, v) elif self.slice_group_map_type in [3,4,5]: bit_position = write_boolean(buf, bit_position, self.slice_group_change_direction_flag) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.slice_group_change_rate_minus1) elif self.slice_group_map_type == 6: bit_position = write_unsigned_exp_golomb(buf, bit_position, self.pic_size_in_map_units_minus1) nb_bits = ceil(log(self.num_slice_groups_minus1+1)) for i in range(0, self.pic_size_in_map_units_minus1): v = self.slice_group_id[i] bit_position, v = write_bits(buf, bit_position, v, nb_bits) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.num_ref_idx_l0_default_active_minus1) bit_position = write_unsigned_exp_golomb(buf, bit_position, self.num_ref_idx_l2_default_active_minus1) bit_position = write_boolean(buf, bit_position, self.weighted_pred_flag) bit_position = write_bits(buf, bit_position, self.weighted_bipred_idc, 2) bit_position = write_signed_exp_golomb(buf, bit_position, self.pic_init_qp_minus26) bit_position = write_signed_exp_golomb(buf, bit_position, self.pic_init_qs_minus26) bit_position = write_signed_exp_golomb(buf, bit_position, self.chroma_qp_index_offset) bit_position = write_boolean(buf, bit_position, self.deblocking_filter_control_present_flag) bit_position = write_boolean(buf, bit_position, self.constrained_intra_pred_flag) bit_position = write_boolean(buf, bit_position, self.redundant_pic_cnt_present_flag) bit_position = write_boolean(buf, bit_position, self.transform_8x8_mode_flag) bit_position = write_boolean(buf, bit_position, self.pic_scaling_matrix_present_flag) if self.pic_scaling_matrix_present_flag: nb_matrices = 6 if chroma_format_idc == 3 else 2 if self.transform_8x8_mode_flag: nb_matrices+=6 else: nb_matrices = 6 for i in range(0, nb_matrices): matrix = self.pic_scaling_list[i] logger.info("Retrieved pic scaling matrix: %s %d", matrix, len(matrix)) present = len(matrix)!=0 logger.info("Matrix is present: %s", present) bit_position = write_boolean(buf, bit_position, present) if present: if i<6: logger.info("Writing matrix: %s", matrix) bit_position = write_scaling_list(buf, bit_position, 16, matrix) else: logger.info("Writing matrix: %s", matrix) bit_position = write_scaling_list(buf, bit_position, 64, matrix) bit_position = write_signed_exp_golomb(buf, bit_position, self.second_chroma_qp_index_offset) bit_position = write_rbsp_trailing_bits(buf, bit_position) return buf @dataclass class AVCDecoderConfiguration: configurationVersion:int=1 # u(8) AVCProfileIndication:int=0 # u(8) profile_compatibility:int=0 # u(8) AVCLevelIndication:int=0 # u(8) lengthSizeMinusOne:int=0 # u(2) (0,1 or 3) numOfSequenceParameterSets:int=0 # u(5) sps:dict = field(default_factory=dict) numOfPictureParameterSets:int=0 #u(8) pps:dict = field(default_factory=dict) chroma_format:int=0 # u(2) bit_depth_luma_minus8:int=0 # u(3) bit_depth_chroma_minus8:int=0 # u(3) numOfSequenceParameterSetExt:int=0 # u(8) spsext:dict = field(default_factory=dict) def __init__(self): self.sps = {} self.spsext = {} self.pps = {} def fromBytes(self, buf): logger = logging.getLogger(__name__) logger.debug('Parsing: %s', (hexdump.dump(buf,sep=':'))) bit_position = 0 bit_position, self.configurationVersion = read_byte(buf, bit_position) bit_position, self.AVCProfileIndication = read_byte(buf, bit_position) bit_position, self.profile_compatibility = read_byte(buf, bit_position) bit_position, self.AVCLevelIndication = read_byte(buf, bit_position) bit_position, v = read_bits(buf, bit_position, 6) if v != 0b111111: raise ValueError(f'Reserved bits are not equal to 0b111111: {v:x}') bit_position, self.lengthSizeMinusOne = read_bits(buf, bit_position, 2) bit_position, v = read_bits(buf, bit_position, 3) if v != 0b111: raise ValueError(f'Reserved bits are not equal to 0b111: {v:x}') bit_position, self.numOfSequenceParameterSets= read_bits(buf, bit_position, 5) logger.debug('Number of SPS: %d', self.numOfSequenceParameterSets) for _ in range(0,self.numOfSequenceParameterSets): bit_position, length = read_word(buf, bit_position) if bit_position % 8 != 0: raise ValueError(f'SPS is not located at a byte boundary: {bit_position:d}') sps = SPS() sodb = rbsp_to_sodb(buf[floor(bit_position/8):]) bit_length = sps.fromBytes(sodb) spsid = sps.seq_parameter_set_id self.sps[spsid] = sps parsed_length = floor(bit_length/8) logger.debug('Expected length of SPS: %d bytes. Parsed: %d bytes', length, parsed_length) # Parse length can be shorter than length because of rewriting from RBSP to SODB # (that is shorter). # So we advance of indicated length. bit_position+=length*8 logger.debug('Bit position:%d. Reading one byte of: %s', bit_position, hexdump.dump(buf[floor(bit_position/8):], sep=':')) bit_position, self.numOfPictureParameterSets = read_byte(buf, bit_position) logger.debug('Number of PPS: %d', self.numOfPictureParameterSets) for _ in range(0,self.numOfPictureParameterSets): bit_position, length = read_word(buf, bit_position) if bit_position % 8 != 0: raise ValueError('PPS is not located at a byte boundary: {bit_position:d}') pps = PPS() sodb = rbsp_to_sodb(buf[floor(bit_position/8):]) bit_length = pps.fromBytes(sodb, self.chroma_format) ppsid = pps.pic_parameter_set_id self.pps[ppsid] = pps parsed_length = floor(bit_length/8) logger.debug('Expected length of PPS: %d bytes. Parsed: %d bytes', length, parsed_length) # Parse length can be shorter than length because of rewriting from RBSP to SODB # (that is shorter). # So we advance of indicated length. bit_position+=length*8 logger.debug('Remaining bits: %s', hexdump.dump(buf[floor(bit_position/8):])) if self.AVCProfileIndication in [100, 110, 122, 144]: bit_position, reserved = read_bits(buf, bit_position, 6) if reserved != 0b111111: raise ValueError(f'Reserved bits are different from 111111: {reserved:x}') bit_position, self.chroma_format = read_bits(buf, bit_position, 2) bit_position, reserved = read_bits(buf, bit_position, 5) if reserved != 0b11111: raise ValueError(f'Reserved bits are different from 11111: {reserved:x}') bit_position, self.bit_depth_luma_minus8 = read_bits(buf, bit_position, 3) bit_position, reserved = read_bits(buf, bit_position, 5) if reserved != 0b11111: raise ValueError(f'Reserved bits are different from 11111: {reserved:x}') bit_position, self.bit_depth_chroma_minus8 = read_bits(buf, bit_position, 3) bit_position, self.numOfSequenceParameterSetExt = read_byte(buf, bit_position) for _ in range(0, self.numOfSequenceParameterSetExt): # TODO: parse SPSextended logger.error('Parsing of SPS extended not yet implemented !') pass def toBytes(self): logger = logging.getLogger(__name__) buf = bytearray() bit_position = 0 bit_position = write_byte(buf, bit_position, self.configurationVersion) bit_position = write_byte(buf, bit_position, self.AVCProfileIndication) bit_position = write_byte(buf, bit_position, self.profile_compatibility) bit_position = write_byte(buf, bit_position, self.AVCLevelIndication) bit_position = write_bits(buf, bit_position, 0b111111, 6) bit_position = write_bits(buf, bit_position, self.lengthSizeMinusOne, 2) bit_position = write_bits(buf, bit_position, 0b111, 3) bit_position = write_bits(buf, bit_position, self.numOfSequenceParameterSets, 5) for spsid in self.sps: sps = self.sps[spsid] sodb = sps.toBytes() sodb_length = len(sodb) rbsp = sodb_to_rbsp(sodb) rbsp_length = len(rbsp) logger.debug('SODB length: %d RBSP length:%d', sodb_length, rbsp_length) bit_position = write_word(buf, bit_position, rbsp_length) buf.extend(rbsp) bit_position+=rbsp_length*8 logger.debug('2. Buffer: %s', hexdump.dump(buf, sep=':')) bit_position = write_byte(buf, bit_position, self.numOfPictureParameterSets) for ppsid in self.pps: logger.debug('Writing PPS: %d', ppsid) pps = self.pps[ppsid] # TODO: does chroma_format should come from self ? sodb = pps.toBytes(self.chroma_format) sodb_length = len(sodb) rbsp = sodb_to_rbsp(sodb) rbsp_length = len(rbsp) logger.debug('SODB length: %d RBSP length:%d', sodb_length, rbsp_length) bit_position = write_word(buf, bit_position, rbsp_length) buf.extend(rbsp) bit_position+=rbsp_length*8 if self.AVCProfileIndication in [ 100, 110, 122, 144]: bit_position = write_bits(buf, bit_position, 0b111111, 6) bit_position = write_bits(buf, bit_position, self.chroma_format, 2) bit_position = write_bits(buf, bit_position, 0b11111, 5) bit_position = write_bits(buf, bit_position, self.bit_depth_luma_minus8, 3) bit_position = write_bits(buf, bit_position, 0b11111, 5) bit_position = write_bits(buf, bit_position, self.bit_depth_chroma_minus8, 3) bit_position = write_byte(buf, bit_position, self.numOfSequenceParameterSetExt) for _ in range(0, self.numOfSequenceParameterSetExt): # TODO: dump SPSextended logger.error('Dumping SPS extended not yet implemented') pass return buf def merge(self, config): # Check config compatibility if self.configurationVersion != config.configurationVersion: raise ValueError('Configuration versions are different: %d vs %s' %\ (self.configurationVersion, config.configurationVersion)) if self.AVCProfileIndication != config.AVCProfileIndication: raise ValueError('AVC profiles are different: %d vs %s' %\ (self.AVCProfileIndication, config.AVCProfileIndication)) if self.profile_compatibility != config.profile_compatibility: raise ValueError('Profile compatilities are different: %d vs %s' %\ (self.profile_compatibility, config.profile_compatibility)) if self.AVCLevelIndication != config.AVCLevelIndication: raise ValueError('Level indications are different: %d vs %s' %\ (self.AVCLevelIndication, config.AVCLevelIndication)) if self.lengthSizeMinusOne != config.lengthSizeMinusOne: raise ValueError('Length units are different: %d vs %s' %\ (self.lengthSizeMinusOne, config.lengthSizeMinusOne)) if self.chroma_format != config.chroma_format: raise ValueError('Colour format are different: %d vs %s' %\ (self.chroma_format, config.chroma_format)) if self.bit_depth_luma_minus8 != config.bit_depth_luma_minus8: raise ValueError('Depth of luminance are different: %d vs %s' %\ (self.bit_depth_luma_minus8, config.bit_depth_luma_minus8)) if self.bit_depth_chroma_minus8 != config.bit_depth_chroma_minus8: raise ValueError('Depth of chromaticity are different: %d vs %s' %\ (self.bit_depth_chroma_minus8, config.bit_depth_luma_minus8)) for spsid in config.sps: sps = config.sps[spsid] if spsid in self.sps: localsps = self.sps[spsid] if sps!=localsps: raise ValueError(f'Profile are not compatible. They contain two different SPS\ with the same identifier ({spsid:d}): {localsps}\n{sps}\n') self.sps[spsid] = sps self.numOfSequenceParameterSets = len(self.sps) for ppsid in config.pps: pps = config.pps[ppsid] if ppsid in self.pps: localpps = self.pps[ppsid] if pps!=localpps: raise ValueError(f'Profile are not compatible. They contain two different PPS\ with the same identifier ({ppsid:d}): {localpps}\n{pps}\n') self.pps[ppsid] = pps self.numOfPictureParameterSets = len(self.pps) # TODO: do the same with extended SPS ! def parse_codec_private(codecPrivateData): if codecPrivateData[0] != 0x63: raise ValueError(f'Matroska header is wrong: {codecPrivateData[0]:x}') if codecPrivateData[1] != 0xA2: raise ValueError(f'Matroska header is wrong: {codecPrivateData[1]:x}') length = codecPrivateData[2] if length == 0: raise ValueError('Matroska length cannot start with zero byte.') for nb_zeroes in range(0,8): b = read_bit(codecPrivateData[2:], nb_zeroes) if b != 0: break mask = 2^(7-nb_zeroes)-1 length = codecPrivateData[2] and mask for i in range(0, nb_zeroes): length*=256 length+=(codecPrivateData[3+i]) byte_position = 3+nb_zeroes avcconfig = AVCDecoderConfiguration() avcconfig.fromBytes(codecPrivateData[byte_position:]) return avcconfig def get_avc_config_from_h264(inputFile): logger = logging.getLogger(__name__) # TODO: improve this ... rbsp = inputFile.read(1000) sodb = rbsp_to_sodb(rbsp) bit_position = 0 bit_position, start_code = read_long(sodb, bit_position) if start_code != 1: raise ValueError(f'Starting code not detected: {start_code:x}') sps = SPS() bit_length = sps.fromBytes(sodb[4:]) bit_position+=bit_length bit_position, start_code = read_long(sodb, bit_position) if start_code != 1: raise ValueError(f'Starting code not detected: {start_code:x}') pps = PPS() bit_length = pps.fromBytes(sodb[floor(bit_position/8):], sps.chroma_format_idc) logger.debug(pps) avcconfig = AVCDecoderConfiguration() avcconfig.configurationVersion = 1 avcconfig.AVCProfileIndication = sps.profile_idc avcconfig.profile_compatibility = 0 avcconfig.AVCLevelIndication = sps.level_idc avcconfig.lengthSizeMinusOne = 3 avcconfig.numOfSequenceParameterSets = 1 avcconfig.numOfPictureParameterSets = 1 avcconfig.numOfSequenceParameterSetExt = 0 avcconfig.chroma_format = sps.chroma_format_idc avcconfig.bit_depth_chroma_minus8 = sps.bit_depth_chroma_minus8 avcconfig.bit_depth_luma_minus8 = sps.bit_depth_luma_minus8 avcconfig.sps[sps.seq_parameter_set_id] = sps avcconfig.pps[pps.pic_parameter_set_id] = pps return avcconfig # Unused ? def get_codec_private_data_from_h264(inputFile): avcconfig = get_avc_config_from_h264(inputFile) res = dump_codec_private_data(avcconfig) return res def parse_mkv_tree(mkvinfo_path, inputFile): logger = logging.getLogger(__name__) infd = inputFile.fileno() lseek(infd, 0, SEEK_SET) set_inheritable(infd, True) env = {**os.environ, 'LANG': 'C'} elements = {} with Popen([mkvinfo_path, '-z', '-X', '-P', f'/proc/self/fd/{infd:d}'], stdout=PIPE, close_fds=False, env=env) as mkvinfo: out, _ = mkvinfo.communicate() out = out.decode('utf8') prefix = [] reg_exp = (r"(^(?P\+)|(\|(?P[ ]*\+))).*at (?P[0-9]+)" r" size (?P[0-9]+).*$") p = re.compile(reg_exp) prev_depth = -1 for line in out.splitlines(): m = p.match(line) if m is None: logger.error("Impossible to match line: %s", line) else: position = int(m.group('position')) size = int(m.group('size')) root = m.group('root') is not None if root: depth = 0 else: depth = len(m.group('depth')) if depth > prev_depth: for _ in range(depth-prev_depth): prefix.append(1) elif depth == prev_depth: subid = prefix[-1] subid+=1 prefix.pop() prefix.append(subid) else: for _ in range(prev_depth-depth): prefix.pop() subid = prefix[-1] subid+=1 prefix.pop() prefix.append(subid) prev_depth = depth key=".".join(map(str, prefix)) elements[key] = (position, size) mkvinfo.wait() return elements # MKV is formatted as an EBML file (Extended Binary Markup Langage). # cf http://matroska-org.github.io/libebml/specs.html # It is a Type, Length, Value (TLV) kind of binary file. # Types are encoded as follows: # 1xxx xxxx - Class A IDs (2^7 -1 possible values) # 01xx xxxx xxxx xxxx - Class B IDs (2^14-1 possible values) # 001x xxxx xxxx xxxx xxxx xxxx - Class C IDs (2^21-1 possible values) # 0001 xxxx xxxx xxxx xxxx xxxx xxxx xxxx - Class D IDs (2^28-1 possible values) # Lengths are encoded as follows: # 1xxx xxxx # value 0 to 2^7-2 # 01xx xxxx xxxx xxxx # value 0 to 2^14-2 # 001x xxxx xxxx xxxx xxxx xxxx # value 0 to 2^21-2 # 0001 xxxx xxxx xxxx xxxx xxxx xxxx xxxx # value 0 to 2^28-2 # 0000 1xxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx # value 0 to 2^35-2 # 0000 01xx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx # value 0 to 2^42-2 # 0000 001x xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx # value 0 to 2^49-2 # 0000 0001 xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx # value 0 to 2^56-2 def get_ebml_length(length): logger = logging.getLogger(__name__) if 0 <= length <= 2**7-2: size = 1 elif length <= 2**14-2: size = 2 elif length <= 2**21-2: size = 3 elif length <= 2**28-2: size = 4 elif length <= 2**35-2: size = 5 elif length <= 2**42-2: size = 6 elif length <= 2**49-2: size = 7 elif length <= 2**56-2: size = 8 elif length < 0: logger.error('Impossible to encode a negative length with EBML.') return None else: logger.error('Impossible to encode a length larger than 2^56-2 with EBML.') return None encoded_length = length + ((128>>(size-1))<<((size-1)*8)) res = (encoded_length).to_bytes(size, byteorder='big') return res def dump_codec_private_data(AVCDecoderConfiguration): logger = logging.getLogger(__name__) # Rebuild a Matroska Codec Private Element res = bytearray() # Code private element res.extend(b'\x63\xA2') buf = AVCDecoderConfiguration.toBytes() logger.debug('AVC configuration bitstream: %s (length: %d))', hexdump.dump(buf, sep=':'), len(buf)) embl_length = get_ebml_length(len(buf)) logger.debug('EMBL encoded length: %s', hexdump.dump(embl_length, sep=':')) res.extend(embl_length) res.extend(buf) return res def change_ebml_element_size(inputFile, position, addendum): logger = logging.getLogger(__name__) initial_position = position infd = inputFile.fileno() lseek(infd, position, SEEK_SET) buf = read(infd, 1) element_type = int.from_bytes(buf, byteorder='big') mask=128 found = False for i in range(1,5): if element_type&mask: type_size = i found = True break else: mask = mask>>1 if not found: logger.error('Size of element type cannot be determined: %d', element_type) exit(-1) # We seek to size position+=type_size lseek(infd, position, SEEK_SET) buf = read(infd, 1) size_head = int.from_bytes(buf, byteorder='big') logger.info('First byte of size: %x', size_head) mask=128 found = False for i in range(1,9): if size_head&mask: size_of_data_size = i found = True break else: mask = mask>>1 if not found: logger.error('Size of data size cannot be determined: %d', size_head) exit(-1) else: logger.info('Size of data size: %d.', size_of_data_size) lseek(infd, position, SEEK_SET) old_size_buf = read(infd, size_of_data_size) max_size = 2**(size_of_data_size*7)-2 size_of_data = int.from_bytes(old_size_buf, byteorder='big') logger.info('Size of data with mask: %x mask: %d.', size_of_data, mask) size_of_data-= (mask<<((size_of_data_size-1)*8)) logger.info('Found element at position: %d, size of type: %d size of data: %d \ maximal size: %d.', initial_position, type_size, size_of_data, max_size) new_size = size_of_data+addendum delta = 0 if new_size > max_size: # TODO: Test this code ... new_encoded_size = get_ebml_length(new_size) size_of_new_encoded_size = len(new_encoded_size) if size_of_new_encoded_size <= size_of_data_size: logger.error('New encoded size is smaller (%d) or equal than previous size (%d).\ This should not happen.', size_of_new_encoded_size, size_of_data_size) exit(-1) # The difference of length between old size field and new one. delta = size_of_new_encoded_size - size_of_data_size file_length = fstat(infd).st_size # We seek after actual length field lseek(infd, position+size_of_data_size, SEEK_SET) # We read the rest of file tail = read(infd, file_length-(position+size_of_data_size)) # We increase file length ftruncate(infd, file_length+delta) # We go to the beginning of length field lseek(infd, position, SEEK_SET) # We write the new length field write(infd, new_encoded_size) # We overwrite the rest of file with its previous content that has been offset. write(infd, tail) else: size = new_size + ((128>>(size_of_data_size-1))<<((size_of_data_size-1)*8)) new_size_buf = (size).to_bytes(size_of_data_size, byteorder='big') logger.info('Old encoded size: %s New encoded size: %s', hexdump.dump(old_size_buf,sep=':'), hexdump.dump(new_size_buf, sep=':')) lseek(infd, position, SEEK_SET) write(infd, new_size_buf) # We return the potential increase in size of the file if the length field had to be increased. return delta def change_codec_private_data(mkvinfo, inputFile, codecData): logger = logging.getLogger(__name__) infd = inputFile.fileno() lseek(infd, 0, SEEK_SET) current_length = fstat(infd).st_size logger.info('Current size of file: %d', current_length) position, current_data = get_codec_private_data_from_mkv(mkvinfo, inputFile) current_data_length = len(current_data) future_length = current_length - current_data_length + len(codecData) logger.info('Expected size of file: %d', future_length) logger.info('Current data at position %d: %s', position, hexdump.dump(current_data, sep=":")) logger.info('Future data: %s', hexdump.dump(codecData, sep=":")) elements = parse_mkv_tree(mkvinfo, inputFile) found = False for key in elements: pos, size = elements[key] if pos == position: logger.info('Codec private data key: %s', key) found = True break if not found: logger.error('Impossible to retrieve the key of codec private data') exit(-1) if current_length < future_length: lseek(infd, position+current_data_length, SEEK_SET) tail = read(infd, current_length-(position+current_data_length)) # We extend the file at the end with zeroes ftruncate(infd, future_length) lseek(infd, position+len(codecData), SEEK_SET) write(infd, tail) lseek(infd, position, SEEK_SET) write(infd, codecData) elif current_length == future_length: # Almost nothing to do except overwriting old private codec data with new ones. lseek(infd, position, SEEK_SET) write(infd, codecData) else: lseek(infd, position+current_data_length, SEEK_SET) tail = read(infd, current_length-(position+current_data_length)) lseek(infd, position+len(codecData), SEEK_SET) write(infd, tail) lseek(infd, position, SEEK_SET) write(infd, codecData) # We reduce the length of file. ftruncate(infd, future_length) # We have to modify the tree elements up to the root that contains the codec private data. keys = key.split('.') logger.info(keys) delta = future_length-current_length # if there is no modification of the private codec data, no need to change anything. if delta != 0: for _ in range(0, len(keys)-1): keys.pop() key=".".join(map(str, keys)) pos, size = elements[key] logger.info('Trying to fix element with key: %s at position: %d with actual size: %d.', key, pos, size) # Changing an element can increase its size (in very rare case). # In that case, we update the new delta that will be larger (because the element has # been resized). delta+=change_ebml_element_size(inputFile, pos, delta) def get_format(ffprobe_path:str, inputFile): logger = logging.getLogger(__name__) infd = inputFile.fileno() lseek(infd, 0, SEEK_SET) set_inheritable(infd, True) with Popen([ffprobe_path, '-loglevel', 'quiet', '-show_format', '-of', 'json', '-i', f'/proc/self/fd/{infd:d}'], stdout=PIPE, close_fds=False) as ffprobe: out, _ = ffprobe.communicate() out = json.load(BytesIO(out)) if 'format' in out: return out['format'] else: logger.error('Impossible to retrieve format of file') return None def get_movie_duration(ffprobe_path:str, inputFile): logger = logging.getLogger(__name__) infd = inputFile.fileno() lseek(infd, 0, SEEK_SET) set_inheritable(infd, True) with Popen([ffprobe_path, '-loglevel', 'quiet', '-show_format', '-of', 'json', '-i', f'/proc/self/fd/{infd:d}'], stdout=PIPE, close_fds=False) as ffprobe: out, _ = ffprobe.communicate() out = json.load(BytesIO(out)) if 'format' in out and 'duration' in out['format']: duration = floor(float(out['format']['duration'])) ts = timedelta(seconds=duration) return ts else: logger.error('Impossible to retrieve duration of movie') return None # ffprobe -loglevel quiet -select_streams v:0 -show_entries stream=width,height -of json sample.ts def get_video_dimensions(ffprobe_path, inputFile): logger = logging.getLogger(__name__) infd = inputFile.fileno() lseek(infd, 0, SEEK_SET) set_inheritable(infd, True) with Popen([ffprobe_path, '-loglevel', 'quiet', '-select_streams', 'v:0', '-show_entries',\ 'stream=width,height', '-of', 'json', '-i', f'/proc/self/fd/{infd:d}'],\ stdout=PIPE, close_fds=False) as ffprobe: out, _ = ffprobe.communicate() out = json.load(BytesIO(out)) if 'streams' in out: video = out['streams'][0] if ('width' in video) and ('height' in video): return int(video['width']), int(video['height']) logger.error('Impossible to retrieve dimensions of video') exit(-1) def get_streams(ffprobe_path, inputFile): logger = logging.getLogger(__name__) infd = inputFile.fileno() lseek(infd, 0, SEEK_SET) set_inheritable(infd, True) with Popen([ffprobe_path, '-loglevel', 'quiet', '-show_streams', '-of', 'json', '-i', f'/proc/self/fd/{infd:d}'], stdout=PIPE, close_fds=False) as ffprobe: out, _ = ffprobe.communicate() out = json.load(BytesIO(out)) if 'streams' in out: return out['streams'] else: logger.error('Impossible to retrieve streams inside file') return None def with_subtitles(ffprobe_path, inputFile): logger = logging.getLogger(__name__) infd = inputFile.fileno() lseek(infd, 0, SEEK_SET) set_inheritable(infd, True) with Popen([ffprobe_path, '-loglevel', 'quiet', '-show_streams', '-of', 'json', '-i', f'/proc/self/fd/{infd:d}'], stdout=PIPE, close_fds=False) as ffprobe: out, _ = ffprobe.communicate() out = json.load(BytesIO(out)) if 'streams' in out: streams = out['streams'] for stream in streams: if 'codec_type' in stream and stream['codec_type'] == 'subtitle': return True else: logger.error('Impossible to retrieve streams inside file') return False def parse_timestamp(ts): logger = logging.getLogger(__name__) ts_reg_exp = (r'^(?P[0-9]{1,2}):(?P[0-9]{1,2})' r':(?P[0-9]{1,2})(\.(?P[0-9]{1,6}))?$') p = re.compile(ts_reg_exp) m = p.match(ts) if m is None: logger.warning("Impossible to parse timestamp: %s", ts) return None values = m.groupdict() hour = 0 minute = 0 second = 0 us = 0 if values['hour'] is not None: hour = int(values['hour']) if values['minute'] is not None: minute = int(values['minute']) if values['second'] is not None: second = int(values['second']) if values['us'] is not None: us = int(values['us']) if hour < 0 or hour > 23: logger.error("hour must be in [0,24[") return None if minute < 0 or minute > 59: logger.error("minute must be in [0,60[") return None if second < 0 or second > 59: logger.error("second must be in [0,60[") return None if us < 0 or us > 1000000: logger.error("milliseconds must be in [0,1000000[") return None ts = timedelta(hours=hour, minutes=minute, seconds=second, microseconds=us) return ts def parse_time_interval(interval): logger = logging.getLogger(__name__) interval_reg_exp = (r'^(?P[0-9]{1,2}):(?P[0-9]{1,2}):(?P[0-9]{1,2})' r'(\.(?P[0-9]{1,3}))?-(?P[0-9]{1,2}):(?P[0-9]{1,2})' r':(?P[0-9]{1,2})(\.(?P[0-9]{1,3}))?$') p = re.compile(interval_reg_exp) m = p.match(interval) if m is None: logger.error("Impossible to parse time interval") return None values = m.groupdict() hour1 = 0 minute1 = 0 second1 = 0 ms1 = 0 hour2 = 0 minute2 = 0 second2 = 0 ms2 = 0 if values['hour1'] is not None: hour1 = int(values['hour1']) if values['minute1'] is not None: minute1 = int(values['minute1']) if values['second1'] is not None: second1 = int(values['second1']) if values['ms1'] is not None: ms1 = int(values['ms1']) if values['hour2'] is not None: hour2 = int(values['hour2']) if values['minute2'] is not None: minute2 = int(values['minute2']) if values['second2'] is not None: second2 = int(values['second2']) if values['ms2'] is not None: ms2 = int(values['ms2']) if hour1 < 0 or hour1 > 23: logger.error("hour must be in [0,24[") return None, None if minute1 < 0 or minute1 > 59: logger.error("minute must be in [0,60[") return None, None if second1 < 0 or second1 > 59: logger.error("second must be in [0,60[") return None, None if ms1 < 0 or ms1 > 1000: logger.error("milliseconds must be in [0,1000[") return None, None if hour2 < 0 or hour2 > 23: logger.error("hour must be in [0,24[") return None, None if minute2 < 0 or minute2 > 59: logger.error("minute must be in [0,60[") return None, None if second2 < 0 or second2 > 59: logger.error("second must be in [0,60[") return None, None if ms2 < 0 or ms2 > 1000: logger.error("milliseconds must be in [0,1000[") return None, None ts1 = timedelta(hours=hour1, minutes=minute1, seconds=second1, microseconds=ms1*1000) ts2 = timedelta(hours=hour2, minutes=minute2, seconds=second2, microseconds=ms2*1000) if ts2 < ts1: logger.error("Non monotonic interval") return None,None return (ts1, ts2) def compare_time_interval(interval1, interval2): ts11,ts12 = interval1 ts21,ts22 = interval2 if ts12 < ts21: return -1 elif ts22 < ts11: return 1 else: return 0 def ffmpeg_convert(ffmpeg_path, ffprobe_path, inputFile, inputFormat, outputFile, outputFormat, duration): logger = logging.getLogger(__name__) width, height = get_video_dimensions(ffprobe_path, inputFile) subtitles = with_subtitles(ffprobe_path, inputFile) infd = inputFile.fileno() outfd = outputFile.fileno() set_inheritable(infd, True) set_inheritable(outfd, True) if logger.getEffectiveLevel() == logging.DEBUG: log = [] else: log = [ '-loglevel', 'quiet' ] params = [ffmpeg_path, '-y',]+log+['-progress', '/dev/stdout', '-canvas_size', f'{width:d}x{height:d}', '-f', inputFormat, '-i', f'/proc/self/fd/{infd:d}', '-map', '0:v', '-map', '0:a'] if subtitles: params.extend(['-map', '0:s']) params.extend(['-bsf:v', 'h264_mp4toannexb,dump_extra=freq=keyframe', '-vcodec', 'copy', '-acodec', 'copy']) if subtitles: params.extend(['-scodec', 'dvdsub']) params.extend(['-r:0', '25', '-f', outputFormat, f'/proc/self/fd/{outfd:d}']) logger.debug('Executing %s', params) with Popen(params, stdout=PIPE, close_fds=False) as ffmpeg: pb = tqdm(TextIOWrapper(ffmpeg.stdout, encoding="utf-8"), total=int(duration/timedelta(seconds=1)), unit='s', desc='Conversion') for line in pb: if line.startswith('out_time='): ts = line.split('=')[1].strip() ts = parse_timestamp(ts) if ts is not None: pb.n = int(ts/timedelta(seconds=1)) pb.update() status = ffmpeg.wait() if status != 0: logger.error('Conversion failed with status code: %d', status) def get_ts_frame(frame): logger = logging.getLogger(__name__) if 'pts_time' in frame: pts_time = float(frame['pts_time']) elif 'pkt_pts_time' in frame: pts_time = float(frame['pkt_pts_time']) else: logger.error('Impossible to find timestamp of frame %s', frame) return None ts = timedelta(seconds=pts_time) return ts def get_packet_duration(packet): logger = logging.getLogger(__name__) if 'duration' in packet: duration = int(packet['duration']) elif 'pkt_duration' in packet: duration = int(packet['pkt_duration']) else: logger.error('Impossible to find duration of packet %s', packet) return None return duration def get_frames_in_stream(ffprobe_path, inputFile, begin, end, streamKind, subStreamId=0): logger = logging.getLogger(__name__) infd = inputFile.fileno() set_inheritable(infd, True) command = [ffprobe_path, '-loglevel', 'quiet', '-read_intervals', f'{begin}%{end}','-show_entries', 'frame', '-select_streams', f'{streamKind}:{subStreamId:d}','-of', 'json', f'/proc/self/fd/{infd:d}'] logger.debug('Executing: %s', command) with Popen(command, stdout=PIPE, close_fds=False) as ffprobe: out, _ = ffprobe.communicate() frames = json.load(BytesIO(out)) status = ffprobe.wait() if status != 0: logger.error('ffprobe failed with status code: %d', status) return None # Sort frames by timestamp tmp = {} if 'frames' in frames: frames = frames['frames'] for frame in frames: ts = get_ts_frame(frame) if ts is None: return None if begin <= ts <= end: tmp[ts]=frame res = [] for ts in sorted(tmp): res.append(tmp[ts]) return res else: logger.error('Impossible to retrieve frames inside file around [%s,%s]', begin, end) return None # TODO: Finish implementation of this function and use it. def get_nearest_idr_frame(ffprobe_path, inputFile, timestamp, before=True, delta=timedelta(seconds=2)): # pylint: disable=W0613 logger = logging.getLogger(__name__) zero = timedelta() tbegin = timestamp-delta tend = timestamp+delta if tbegin < zero: tbegin = zero infd = inputFile.fileno() set_inheritable(infd, True) logger.debug('Looking for IDR frame in [%s, %s]', tbegin, tend) idrs = [] # Retains only IDR frame with Popen([ffprobe_path, '-loglevel', 'quiet', '-read_intervals', f'{tbegin}%{tend}','-skip_frame', 'nokey', '-show_entries', 'frame', '-select_streams', 'v:0', '-of', 'json', f'/proc/self/fd/{infd:d}'], stdout=PIPE, close_fds=False) as ffprobe: out, _ = ffprobe.communicate() frames = json.load(BytesIO(out)) status = ffprobe.wait() if status != 0: logger.error('ffprobe failed with status code: %d', status) return None if 'frames' in frames: frames = frames['frames'] for frame in frames: ts = get_ts_frame(frame) if ts is None: return None if tbegin <= ts <= tend: idrs.append(frame) else: logger.error('Impossible to retrieve IDR frames inside file around [%s,%s]', tbegin, tend) return None def get_nearest_iframe(ffprobe_path, inputFile, timestamp, before=True, deltaMax=timedelta(seconds=15)): logger = logging.getLogger(__name__) infd = inputFile.fileno() set_inheritable(infd, True) delta = timedelta(seconds=1) iframe = None while delta < deltaMax: zero = timedelta() if before: tbegin = timestamp-delta else: tbegin = timestamp if not before: tend = timestamp+delta else: tend = timestamp if tbegin < zero: tbegin = zero logger.debug('Looking for an iframe in [%s, %s]', tbegin, tend) frames = get_frames_in_stream(ffprobe_path, inputFile=inputFile, begin=tbegin, end=tend, streamKind='v') if frames is None: logger.debug('Found no frame in [%s, %s]', tbegin, tend) delta+=timedelta(seconds=1) continue iframes = [] for frame in frames: if frame['pict_type'] == 'I': iframes.append(frame) found = False for frame in iframes: ts = get_ts_frame(frame) if ts is None: logger.warning('I-frame with no timestamp: %s', frame) continue if before and ts <= timestamp: found = True iframe = frame if not before and ts >= timestamp: found = True iframe = frame break if found: logger.info("Found i-frame at: %s", iframe) break else: delta+=timedelta(seconds=1) continue if iframe is not None: its = get_ts_frame(iframe) nb_frames = 0 for frame in frames: ts = get_ts_frame(frame) if ts is None: logger.warning('Frame without timestamp: %s', frame) continue if before: if its <= ts <= timestamp: logger.info("Retrieve a frame between %s and %s at %s", its, timestamp, ts) nb_frames = nb_frames+1 else: if timestamp <= ts <= its: logger.info("Retrieve a frame between %s and %s at %s", timestamp, ts, its) nb_frames = nb_frames+1 else: logger.error("Impossible to find I-frame between: %s and %s", tbegin, tend) return 0, None return(nb_frames, iframe) def extract_mkv_part(mkvmerge_path, inputFile, outputFile, begin, end): logger = logging.getLogger(__name__) logger.info('Extract video between I-frames at %s and %s', begin,end) infd = inputFile.fileno() outfd = outputFile.fileno() lseek(infd, 0, SEEK_SET) lseek(outfd, 0, SEEK_SET) set_inheritable(infd, True) set_inheritable(outfd, True) env = {**os.environ, 'LANG': 'C'} warnings = [] command = [mkvmerge_path, '-o', f'/proc/self/fd/{outfd:d}', '--split', f'parts:{begin}-{end}', f'/proc/self/fd/{infd:d}'] logger.debug('Executing: %s', command) with Popen(command, stdout=PIPE, close_fds=False, env=env) as mkvmerge: pb = tqdm(TextIOWrapper(mkvmerge.stdout, encoding="utf-8"), total=100, unit='%', desc='Extraction') for line in pb: if line.startswith('Progress :'): p = re.compile('^Progress : (?P[0-9]{1,3})%$') m = p.match(line) if m is None: logger.error('Impossible to parse progress') pb.update(int(m['progress'])-pb.n) elif line.startswith('Warning'): warnings.append(line) pb.update(100-pb.n) pb.refresh() pb.close() status = mkvmerge.wait() if status == 1: logger.warning('Extraction returns warning') for w in warnings: logger.warning(w) elif status == 2: logger.error('Extraction returns errors') def extract_pictures(ffmpeg_path, inputFile, begin, nbFrames, width=640, height=480): logger = logging.getLogger(__name__) infd = inputFile.fileno() lseek(infd, 0, SEEK_SET) outfd = memfd_create('pictures', flags=0) set_inheritable(outfd, True) # PPM header # "P6\nWIDTH HEIGHT\n255\n" header_len=2+1+ceil(log(width, 10))+1+ceil(log(height, 10))+1+3+1 logger.debug('Header length: %d', header_len) image_length = width*height*3+header_len length = image_length*nbFrames logger.debug("Estimated length: %d", length) command = [ffmpeg_path, '-loglevel', 'quiet' ,'-y', '-ss', f'{begin}', '-i', f'/proc/self/fd/{infd}', '-s', f'{width:d}x{height:d}', '-vframes', f'{nbFrames:d}', '-c:v', 'ppm','-f', 'image2pipe', f'/proc/self/fd/{outfd:d}'] logger.debug('Executing: %s', command) images = bytes() with Popen(command, stdout=PIPE, close_fds=False) as ffmpeg: status = ffmpeg.wait() if status != 0: logger.error('Conversion failed with status code: %d', status) return None, None lseek(outfd, 0, SEEK_SET) images = read(outfd,length) if len(images) != length: logger.error("Received %d bytes but %d were expected.", len(images), length) return None, None lseek(outfd, 0, SEEK_SET) return images, outfd def extract_sound(ffmpeg_path, inputFile, begin, outputFileName, packet_duration, subChannel=0, nb_packets=0, sample_rate=48000, nb_channels=2): logger = logging.getLogger(__name__) outfd = memfd_create(outputFileName, flags=0) infd = inputFile.fileno() lseek(infd, 0, SEEK_SET) set_inheritable(infd, True) set_inheritable(outfd, True) sound = bytes() length = int(nb_channels*sample_rate*4*nb_packets*packet_duration/1000) command = [ffmpeg_path, '-y', '-loglevel', 'quiet', '-ss', f'{begin}', '-i', f'/proc/self/fd/{infd}', f'-frames:a:{subChannel:d}', f'{nb_packets+1:d}', '-c:a', 'pcm_s32le', '-sample_rate', f'{sample_rate:d}', '-channels', f'{nb_channels:d}', '-f', 's32le', f'/proc/self/fd/{outfd:d}'] logger.debug('Executing: %s', command) with Popen(command, stdout=PIPE, close_fds=False) as ffmpeg: status = ffmpeg.wait() if status != 0: logger.error('Sound extraction returns error code: %d', status) return None, None lseek(outfd, 0, SEEK_SET) sound = read(outfd, length) if len(sound) != length: logger.info("Received %d bytes but %d were expected (channels=%d, freq=%d, packets=%d,\ duration=%d ms).", len(sound), length, nb_channels, sample_rate, nb_packets, packet_duration) return None, None return sound, outfd def dump_ppm(pictures, prefix, temporaries): logger = logging.getLogger(__name__) # "P6\nWIDTH HEIGHT\n255\n" pos = 0 picture = 0 logger.debug('Dumping %d pictures: %s', len(pictures),prefix) while pos[0-9]+) (?P[0-9]+)\n$') m = pattern.match(dimensions) if m is not None: width = int(m['width']) height = int(m['height']) else: logger.error('Impossible to parse dimensions of picture') return else: logger.error('Not a PPM picture') return if max_value != 255: logger.error('Not a valid PPM picture. Color are not encoded on byte. Max value: %d', max_value) header_len=2+1+ceil(log(width, 10))+1+ceil(log(height, 10))+1+3+1 try: with open(filename, 'w', encoding='utf8') as out: temporaries.append(out) outfd = out.fileno() length=header_len+3*width*height nb_bytes = 0 while nb_bytes < length: nb_bytes+=write(outfd, pictures[pos+nb_bytes:pos+length]) pos+=length picture+=1 except IOError: logger.error('Impossible to create file: %s', filename) def extract_all_streams(ffmpeg_path, ffprobe_path, inputFile, begin, end, streams, filesPrefix, nbFrames, framerate, width, height, temporaries, dumpMemFD=False): logger = logging.getLogger(__name__) # The command line for encoding only video track video_encoder_params = [ ffmpeg_path, '-y', '-loglevel', 'quiet'] video_input_params = [] video_codec_params = [] # The command line to create a MKV file with the rest of tracks generic_encoder_params = [ ffmpeg_path, '-y', '-loglevel', 'quiet' ] generic_input_params = [] generic_codec_params = [] if begin < end: video_id=0 audio_id=0 subtitle_id=0 memfds = [] for stream in streams: if stream['codec_type'] == 'video': logger.info("Extracting %d frames of video stream v:%d", nbFrames, video_id) sar = stream['sample_aspect_ratio'] dar = stream['display_aspect_ratio'] pixel_format = stream['pix_fmt'] color_range = stream['color_range'] color_space =stream['color_space'] color_transfer = stream['color_transfer'] color_primaries = stream['color_primaries'] level = int(stream['level']) level = f'{floor(level/10):d}.{level%10:d}' chroma_location = stream['chroma_location'] field_order = stream match field_order: case 'progressive': interlaced_options = ['-field_order', '0'] case 'tt': interlaced_options = ['-top', '1', f'-flags:v:{video_id:d}', '+ilme+ildct', '-field_order', '1'] case 'bb': interlaced_options = ['-top', '0', f'-flags:v:{video_id:d}', '+ilme+ildct', '-field_order','2'] case 'tb': interlaced_options = ['-top', '1', f'-flags:v:{video_id:d}', '+ilme+ildct', '-field_order', '3'] case 'bt': interlaced_options = ['-top', '0', f'-flags:v:{video_id:d}', '+ilme+ildct', '-field_order', '4'] case _: interlaced_options = [] # ======================================= # # TODO: adjust SAR and DAR # https://superuser.com/questions/907933/correct-aspect-ratio-without-re-encoding-video-file # SAR: -aspect width:height # DAR: -bsf:v sample_aspect_ratio=1:video_format logger.warning('Missing SAR adjustment for: %s', sar) logger.warning('Missing DAR adjustment for: %s', dar) logger.warning('Missing treatment for chroma location: %s', chroma_location) codec = stream['codec_name'] images_bytes, memfd = extract_pictures(ffmpeg_path, inputFile=inputFile, begin=begin, nbFrames=nbFrames, width=width, height=height) if images_bytes is None: logger.error('Impossible to extract picture from video stream.') exit(-1) memfds.append(memfd) if dumpMemFD: dump_ppm(images_bytes, f'{filesPrefix}-{video_id:d}', temporaries) # We rewind to zero the memory file descriptor lseek(memfd, 0, SEEK_SET) set_inheritable(memfd, True) video_input_params.extend(['-framerate', f'{framerate:f}', '-f', 'image2pipe', '-i', f'/proc/self/fd/{memfd:d}']) video_codec_params.extend([f'-c:v:{video_id:d}', codec, f'-level:v:{video_id:d}', level, '-pix_fmt', pixel_format]) video_codec_params.extend(interlaced_options) video_codec_params.extend([f'-colorspace:v:{video_id}', color_space, f'-color_primaries:v:{video_id:d}', color_primaries, f'-color_trc:v:{video_id:d}', color_transfer, f'-color_range:v:{video_id:d}', color_range]) video_id=video_id+1 elif stream['codec_type'] == 'audio': logger.debug('Audio stream: %s', stream) sample_rate = int(stream['sample_rate']) nb_channels = int(stream['channels']) if 'bit_rate' in stream: bit_rate = int(stream['bit_rate']) else: bit_rate = 128000 codec = stream['codec_name'] if 'tags' in stream: if 'language' in stream['tags']: generic_codec_params.extend([f'-metadata:s:a:{audio_id:d}', f"language={stream['tags']['language']}"]) packets = get_frames_in_stream(ffprobe_path, inputFile=inputFile, begin=begin, end=end, streamKind='a', subStreamId=audio_id) nb_packets = len(packets) logger.debug("Found %d packets to be extracted from audio track.", nb_packets) if nb_packets > 0: packet_duration = get_packet_duration(packets[0]) if packet_duration is None: return None else: packet_duration = 0 logger.info("Extracting %d packets of audio stream: a:%d" , nb_packets, audio_id) tmpname = f'{filesPrefix}-{audio_id:d}.pcm' sound_bytes, memfd = extract_sound(ffmpeg_path=ffmpeg_path, inputFile=inputFile, begin=begin, nb_packets=nb_packets, packet_duration=packet_duration, outputFileName=tmpname, sample_rate=sample_rate, nb_channels=nb_channels) if sound_bytes is None: logger.error('Impossible to extract sound track') exit(-1) memfds.append(memfd) if dumpMemFD: try: with open(tmpname,'w', encoding='utf8') as output: temporaries.append(output) outfd = output.fileno() pos = 0 while pos < len(sound_bytes): pos+=write(outfd, sound_bytes[pos:]) except IOError: logger.error('Impossible to create file: %s', tmpname) return None # We rewind to zero the memory file descriptor lseek(memfd, 0, SEEK_SET) set_inheritable(memfd, True) generic_input_params.extend(['-f', 's32le', '-ar', f'{sample_rate:d}', '-ac', f'{nb_channels:d}', '-i', f'/proc/self/fd/{memfd:d}']) generic_codec_params.extend([f'-c:a:{audio_id:d}', codec, f'-b:a:{audio_id:d}', f'{bit_rate:d}']) audio_id=audio_id+1 elif stream['codec_type'] == 'subtitle': logger.info("Extracting a subtitle stream: s:%d", subtitle_id) codec = stream['codec_name'] generic_input_params.extend(['-i', './empty.idx']) if 'tags' in stream: if 'language' in stream['tags']: generic_codec_params.extend([f'-metadata:s:s:{subtitle_id:d}', f"language={stream['tags']['language']}"]) generic_codec_params.extend([f'-c:s:{subtitle_id:d}', 'copy']) subtitle_id=subtitle_id+1 else: logger.error("Unknown stream type: %s", stream['codec_type']) # Create a new MKV movie with all streams (except videos) that have been extracted. generic_encoder_params.extend(generic_input_params) for index in range(0,audio_id+subtitle_id): generic_encoder_params.extend(['-map', f'{index:d}']) generic_encoder_params.extend(generic_codec_params) mkv_filename = f'{filesPrefix}.mkv' try: mkv_output = open(mkv_filename,'wb+') except IOError: logger.error('Impossible to create file: %s', mkv_filename) return None mkvoutfd = mkv_output.fileno() set_inheritable(mkvoutfd, True) generic_encoder_params.extend(['-f', 'matroska', f'/proc/self/fd/{mkvoutfd:d}']) logger.info('Encoding all streams (except video) into a MKV file: %s', mkv_filename) logger.debug('Executing: %s', generic_encoder_params) with Popen(generic_encoder_params, stdout=PIPE, close_fds=False) as ffmpeg: status = ffmpeg.wait() if status != 0: logger.error('Encoding failed with status code: %d', status) return None temporaries.append(mkv_output) h264_filename = f'{filesPrefix}.h264' try: h264_output = open(h264_filename,'wb+') except IOError: logger.error('Impossible to create file: %s', h264_filename) return None h264outfd = h264_output.fileno() set_inheritable(h264outfd, True) video_encoder_params.extend(video_input_params) video_encoder_params.extend(video_codec_params) video_encoder_params.extend([ '-x264opts', f'keyint=1:sps-id={1:d}','-bsf:v', 'h264_mp4toannexb,dump_extra=freq=keyframe,h264_metadata=\ overscan_appropriate_flag=1:sample_aspect_ratio=1:video_format=\ 0:chroma_sample_loc_type=0','-f', 'h264', f'/proc/self/fd/{h264outfd:d}']) logger.info('Encoding video into a H264 file: %s', h264_filename) logger.debug('Executing: %s', video_encoder_params) with Popen(video_encoder_params, stdout=PIPE, close_fds=False) as ffmpeg: status = ffmpeg.wait() if status != 0: logger.error('Encoding failed with status code: %d', status) return None temporaries.append(h264_output) h264_ts_filename = f'{filesPrefix}-ts.txt' try: h264_ts_output = open(h264_ts_filename,'w+', encoding='utf8') except IOError: logger.error('Impossible to create file: %s', h264_ts_filename) return None h264_ts_output.write('# timestamp format v2\n') ts = 0 for _ in range(0,nbFrames): ts = ts+ceil(1000/framerate) h264_ts_output.write(f'{ts:d}\n') h264_ts_output.flush() h264_ts_output.seek(0) temporaries.append(h264_ts_output) for memfd in memfds: close(memfd) return h264_output, h264_ts_output, mkv_output else: # Nothing to be done. We are already at a i-frame boundary. return None, None # Merge a list of mkv files passed as input, and produce a new MKV as output def merge_mkvs(mkvmerge_path, inputs, outputName, concatenate=True, timestamps=None): logger = logging.getLogger(__name__) fds = [] try: out = open(outputName, 'w+', encoding='utf8') except IOError: logger.error('Impossible to create file: %s', outputName) return None outfd = out.fileno() lseek(outfd, 0, SEEK_SET) fds.append(outfd) set_inheritable(outfd, True) # Timestamps of merged tracks are modified by the length of the preceding track. # The default mode ('file') is using the largest timestamp of the whole file which may create # desynchronize video and sound. merge_params = [mkvmerge_path, '--append-mode', 'track'] first = True partnum = 0 for mkv in inputs: if mkv is not None: fd = mkv.fileno() fds.append(fd) set_inheritable(fd, True) # If we pass a timestamps file associated with the considered track, use it. if timestamps is not None and partnum in timestamps: tsfd = timestamps[partnum].fileno() lseek(tsfd, 0, SEEK_SET) fds.append(tsfd) set_inheritable(tsfd, True) merge_params.extend(['--timestamps', f'{partnum:d}:/proc/self/fd/{tsfd:d}']) if first: merge_params.append(f'/proc/self/fd/{fd:d}') first = False elif concatenate: merge_params.append(f'+/proc/self/fd/{fd:d}') else: merge_params.append(f'/proc/self/fd/{fd:d}') partnum+=1 merge_params.extend(['-o', f'/proc/self/fd/{outfd:d}']) # We merge all files. warnings = [] env = {**os.environ, 'LANG': 'C'} logger.debug('Executing: LANG=C %s', merge_params) with Popen(merge_params, stdout=PIPE, close_fds=False, env=env) as mkvmerge: pb = tqdm(TextIOWrapper(mkvmerge.stdout, encoding="utf-8"), total=100, unit='%', desc='Merging') for line in pb: if line.startswith('Progress :'): p = re.compile('^Progress : (?P[0-9]{1,3})%$') m = p.match(line) if m is None: logger.error('Impossible to parse progress') pb.n = int(m['progress']) pb.update() elif line.startswith('Warning'): warnings.append(line) status = mkvmerge.wait() if status == 1: logger.warning('Extraction returns warning') for w in warnings: logger.warning(w) elif status == 2: logger.error('Extraction returns errors') for fd in fds: set_inheritable(fd, False) return out def find_subtitles_tracks(ffprobe_path:str, inputFile): logger = logging.getLogger(__name__) infd = inputFile.fileno() lseek(infd, 0, SEEK_SET) set_inheritable(infd, True) command = [ffprobe_path, '-loglevel','quiet', '-i', f'/proc/self/fd/{infd:d}', '-select_streams', 's', '-show_entries', 'stream=index:stream_tags=language', '-of', 'json'] logger.debug('Executing: %s', command) with Popen(command, stdout=PIPE, close_fds=False) as ffprobe: out, _ = ffprobe.communicate() out = json.load(BytesIO(out)) if 'streams' in out: return out['streams'] else: logger.error('Impossible to retrieve format of file') ffprobe.wait() def extract_track_from_mkv(mkvextract_path, inputFile, index, outputFile, timestamps): logger = logging.getLogger(__name__) infd = inputFile.fileno() lseek(infd, 0, SEEK_SET) set_inheritable(infd, True) outfd = outputFile.fileno() lseek(outfd, 0, SEEK_SET) set_inheritable(outfd, True) tsfd = timestamps.fileno() lseek(tsfd, 0, SEEK_SET) set_inheritable(tsfd, True) params = [ mkvextract_path, f'/proc/self/fd/{infd:d}', 'tracks', f'{index:d}:/proc/self/fd/{outfd:d}', 'timestamps_v2', f'{index:d}:/proc/self/fd/{tsfd:d}'] env = {**os.environ, 'LANG': 'C'} logger.debug('Executing: LANG=C %s', params) with Popen(params, stdout=PIPE, close_fds=False, env=env) as extract: pb = tqdm(TextIOWrapper(extract.stdout, encoding="utf-8"), total=100, unit='%', desc='Extraction of track') for line in pb: if line.startswith('Progress :'): p = re.compile('^Progress : (?P[0-9]{1,3})%$') m = p.match(line) if m is None: logger.error('Impossible to parse progress') pb.update(int(m['progress'])-pb.n) pb.update(100-pb.n) pb.refresh() pb.close() extract.wait() if extract.returncode != 0: logger.error('Mkvextract returns an error code: %d', extract.returncode) return None else: logger.info('Track %d was succesfully extracted.', index) def remove_video_tracks_from_mkv(mkvmerge_path, inputFile, outputFile): logger = logging.getLogger(__name__) outfd = outputFile.fileno() infd = inputFile.fileno() lseek(infd, 0, SEEK_SET) lseek(outfd, 0, SEEK_SET) set_inheritable(infd, True) set_inheritable(outfd, True) params = [ mkvmerge_path, '-o', f'/proc/self/fd/{outfd:d}', '-D', f'/proc/self/fd/{infd:d}'] logger.debug('Executing: LANG=C %s', params) env = {**os.environ, 'LANG': 'C'} with Popen(params, stdout=PIPE, close_fds=False, env=env) as remove: pb = tqdm(TextIOWrapper(remove.stdout, encoding="utf-8"), total=100, unit='%', desc='Removal of video track:') for line in pb: if line.startswith('Progress :'): p = re.compile('^Progress : (?P[0-9]{1,3})%$') m = p.match(line) if m is None: logger.error('Impossible to parse progress') pb.update(int(m['progress'])-pb.n) pb.update(100-pb.n) pb.refresh() pb.close() remove.wait() if remove.returncode != 0: logger.error('Mkvmerge returns an error code: %d', remove.returncode) return None else: logger.info('Video tracks were succesfully extracted.') def remux_srt_subtitles(mkvmerge_path, inputFile, outputFileName, subtitles): logger = logging.getLogger(__name__) try: out = open(outputFileName, 'w', encoding='utf8') except IOError: logger.error('Impossible to create file: %s', outputFileName) return None outfd = out.fileno() infd = inputFile.fileno() lseek(infd, 0, SEEK_SET) set_inheritable(infd, True) set_inheritable(outfd, True) mkv_merge_params = [mkvmerge_path, f'/proc/self/fd/{infd:d}'] for fd, lang in subtitles: lseek(fd, 0, SEEK_SET) set_inheritable(fd, True) mkv_merge_params.extend(['--language', f'0:{lang}', f'/proc/self/fd/{fd:d}']) mkv_merge_params.extend(['-o', f'/proc/self/fd/{outfd:d}']) warnings = [] env = {**os.environ, 'LANG': 'C'} logger.info('Remux subtitles: %s', mkv_merge_params) with Popen(mkv_merge_params, stdout=PIPE, close_fds=False, env=env) as mkvmerge: pb = tqdm(TextIOWrapper(mkvmerge.stdout, encoding="utf-8"), total=100, unit='%', desc='Remux subtitles:') for line in pb: if line.startswith('Progress :'): p = re.compile('^Progress : (?P[0-9]{1,3})%$') m = p.match(line) if m is None: logger.error('Impossible to parse progress') pb.n = int(m['progress']) pb.update() elif line.startswith('Warning'): warnings.append(line) status = mkvmerge.wait() if status == 1: logger.warning('Remux subtitles returns warning') for w in warnings: logger.warning(w) elif status == 2: logger.error('Remux subtitles returns errors') def concatenate_h264_parts(h264parts, output): logger = logging.getLogger(__name__) total_length = 0 for h264 in h264parts: fd = h264.fileno() total_length += fstat(fd).st_size logger.info('Total length: %d', total_length) outfd = output.fileno() lseek(outfd, 0, SEEK_SET) pb = tqdm(total=total_length, unit='bytes', desc='Concatenation') for h264 in h264parts: fd = h264.fileno() lseek(fd, 0, SEEK_SET) while True: buf = read(fd, 1000000) if buf is None or len(buf) == 0: break pos = 0 while pos < len(buf): nb_bytes = write(outfd, buf[pos:]) pb.update(nb_bytes) pos += nb_bytes def concatenate_h264_ts_parts(h264TSParts, output): logger = logging.getLogger(__name__) header = '# timestamp format v2\n' output.write(header) last = 0. first = True for part in h264TSParts: if first: offset = last else: # TODO: take framerate into account offset = last + 40 logger.debug('Parsing file: %s. Offset=%d', part, offset) isheader = part.readline() if (not isheader) or (isheader != header): logger.error('Impossible to find a valid header: "%s"', isheader) exit(-1) while True: line = part.readline() if not line: break ts = offset + float(line) last = max(last,ts) output.write(f'{ts:f}\n') if first: first = False # TODO: finish this procedure def do_coarse_processing(ffmpeg_path, ffprobe_path, mkvmerge_path, inputFile, begin, end, nbFrames, framerate, filesPrefix, streams, width, height, temporaries, dumpMemFD): # pylint: disable=W0613 logger = logging.getLogger(__name__) # Internal video with all streams (video, audio and subtitles) internal_mkv_name = f'{filesPrefix}.mkv' try: internal_mkv = open(internal_mkv_name, 'w+', encoding='utf8') except IOError: logger.error('Impossible to create file: %s', internal_mkv_name) exit(-1) # Extract internal part of MKV extract_mkv_part(mkvmerge_path=mkvmerge_path, inputFile=inputFile, outputFile=internal_mkv, begin=begin, end=end) temporaries.append(internal_mkv) pass def main(): logger = logging.getLogger(__name__) coloredlogs.install() parser = argparse.ArgumentParser() parser.add_argument("-i", "--input", dest='input_file', type=str, required=True, help="Input file to process (can be .ts, .mp4 or .mkv).") parser.add_argument("-o", "--output", dest='outputFile', type=str, required=True, help="Output MKV file to produce.") parser.add_argument("-p", "--part", dest='parts', nargs='+', required=False, action='append', metavar="hh:mm:ss[.mmm]-hh:mm:ss[.mmm]", help="Extract this exact part of the original file.") parser.add_argument("-k", "--keep", action='store_true', help="Do not cleanup temporary files after processing.") parser.add_argument("-t", "--threshold", action='store', type=int, help="Suppress headers and trailers that are smaller than the threshold.") parser.add_argument("-c", "--coarse", action='store_true', dest='coarse', help="Do not take trailers and headers into account.") parser.add_argument("--dump-memory", action='store_true', dest='dump', help="For debug purpose, dump all memory mapping of headers (and trailers)\ before (after) each part. They are kept in memory only otherwise.") parser.add_argument("-s","--srt", action='store_true', dest='srt', help="Dump subtitles and make OCR and finally remux them in the movie\ (as SRT).") parser.add_argument("-v","--verbose", action='store_true', dest='verbose', help="Debug.") parser.add_argument("-f","--framerate", action='store', type=int, help="Override frame rate estimator.") args = parser.parse_args() logger.info('Arguments: %s', args) if args.verbose: logger.info('Setting logging to debug mode') coloredlogs.set_level(level=logging.DEBUG) logger.debug('Arguments: %s', args) if args.coarse and args.threshold is not None: logger.error('--coarse and threshold arguments are exclusive.') exit(-1) if (not args.coarse) and args.threshold is None: args.threshold = 0 all_optional_tools, paths = check_required_tools() # Flatten args.parts intervals = [] if args.parts is not None: for part in args.parts: for subpart in part: intervals.append(subpart) parts=[] # Parse each interval for interval in intervals: ts1, ts2 = parse_time_interval(interval) if ts1 is None or ts2 is None: logger.error("Illegal time interval: %s", interval) exit(-1) parts.append((ts1,ts2)) # Sort intervals parts.sort(key=cmp_to_key(compare_time_interval)) # Check that no intervals are overlapping prevts = timedelta(0) for part in parts: ts1, ts2 = part if prevts > ts1: logger.error('Intervals are overlapping') exit(-1) prevts = ts2 nb_parts = len(parts) temporaries = [] basename = os.path.splitext(os.path.basename(args.input_file))[0] mp4filename = basename+'.mp4' mkvfilename = basename+'.mkv' try: input_file = open(args.input_file, mode='r', encoding='utf8') except IOError: logger.error("Impossible to open %s", args.input_file) exit(-1) format_of_file = get_format(paths['ffprobe'], input_file) if format_of_file is None: exit(-1) duration = timedelta(seconds=float(format_of_file['duration'])) logger.info("Durée de l'enregistrement: %s", duration) if args.framerate is None: framerate = get_frame_rate(paths['ffprobe'], input_file) if framerate is None: logger.error('Impossible to estimate frame rate !') exit(-1) else: framerate = args.framerate logger.info('Frame rate: %.1f fps', framerate) found = False for f in SupportedFormat: if 'format_name' in format_of_file: if format_of_file['format_name'] == str(f): found = True format_of_file = f break if not found: logger.error('Unsupported format of file') if format_of_file == SupportedFormat.TS: logger.info("Converting TS to MP4 (to fix timestamps).") try: with open(mp4filename, 'w+', encoding='utf8') as mp4: ffmpeg_convert(paths['ffmpeg'], paths['ffprobe'], input_file, 'mpegts', mp4, 'mp4', duration) temporaries.append(mp4) logger.info("Converting MP4 to MKV.") try: mkv = open(mkvfilename, 'w+', encoding='utf8') except IOError: logger.error('') ffmpeg_convert(paths['ffmpeg'], paths['ffprobe'], mp4, 'mp4', mkv, 'matroska', duration) if nb_parts > 0: temporaries.append(mkv) except IOError: logger.error('') elif format_of_file == SupportedFormat.MP4: logger.info("Converting MP4 to MKV") try: mkv = open(mkvfilename, 'w+', encoding='utf8') except IOError: logger.error('') ffmpeg_convert(paths['ffmpeg'], paths['ffprobe'], input_file, 'mp4', mkv, 'matroska', duration) if nb_parts > 0: temporaries.append(mkv) else: logger.info("Already in MKV") mkv = input_file streams = get_streams(paths['ffprobe'], mkv) logger.debug('Streams: %s', streams) main_video = None nb_videos = 0 for stream in streams: if stream['codec_type'] == 'video': if stream['disposition']['default'] == 1: main_video = stream width = stream['width'] height = stream['height'] break nb_videos+=1 if nb_videos == 1: main_video = stream width = stream['width'] height = stream['height'] else: main_video = None if main_video is None: logger.error('Impossible to find main video stream.') exit(-1) # We retrieve the main private codec data _, main_codec_private_data = get_codec_private_data_from_mkv(mkvinfo_path=paths['mkvinfo'], inputFile=mkv) logger.debug('Main video stream has following private data: %s', hexdump.dump(main_codec_private_data, sep=':')) # We parse them main_avc_config = parse_codec_private(main_codec_private_data) logger.debug('AVC configuration: %s', main_avc_config) # We check if the parse and dump operations are idempotent. private_data = dump_codec_private_data(main_avc_config) logger.debug('Redump AVC configuration: %s', hexdump.dump(private_data, sep=':')) # In rare occasion, the PPS has trailing zeroes that do not seem to be related to useful data # but they differ from the private data we generate that do not contain them. # In that case we try to redecode our own private data to see if both AVC configurations are # the same. if main_codec_private_data != private_data: logger.warning('Difference detected in bitstream !!') iso_avc_config = parse_codec_private(private_data) logger.debug('Reread AVC configuration: %s', iso_avc_config) # If there exists a difference between our own reconstructed AVC configuration and the # original one, we abandon if iso_avc_config != main_avc_config: logger.error('AVC configurations are different: %s\n%s\n', main_avc_config, iso_avc_config) exit(-1) # Pour chaque portion partnum = 0 mkvparts = [] h264parts = [] h264_ts = [] checks = [] pos = timedelta() other_avc_configs = [] for ts1, ts2 in parts: # TODO: translate comment in english # Trouver l'estampille de la trame 'I' la plus proche (mais postérieure) au début # de la portion. # Trouver l'estampille de la trame 'I' la plus proche (mais antérieure) à la fin # de la portion. # On a alors # debut ----- trame --------- trame --------- fin fin+1 # 'B/P' 'B/P'* 'I' 'I' 'B/P'* 'B/P' 'I/B/P' # Si la trame de début est déjà 'I', il n'y a rien à faire. # Sinon on extrait les trames 'B' ou 'P' depuis le début jusqu'à la trame 'I' non incluse. # Si la trame de fin précède une trame I, on n'a rien à faire. # Sinon on extrait toutes les trames depuis la dernière trame I jusqu'à la trame de fin. partnum = partnum + 1 # Get the nearest I-frame whose timestamp is greater or equal to the beginning. head_frames = get_nearest_iframe(paths['ffprobe'], mkv, ts1, before=False) if head_frames is None: logger.error('Impossible to retrieve I-frame') exit(-1) # Get the nearest I-frame whose timestamp ... # TODO: wrong here ... tail_frames = get_nearest_iframe(paths['ffprobe'], mkv, ts2, before=True) if tail_frames is None: logger.error('Impossible to retrieve I-frame') exit(-1) nb_head_frames, head_iframe = head_frames nb_tail_frames, tail_iframe = tail_frames logger.info("Found %d frames between beginning of current part and first I-frame", nb_head_frames) logger.info("Found %d frames between last I-frame and end of current part", nb_tail_frames) head_iframe_ts = get_ts_frame(head_iframe) if head_iframe_ts is None: exit(-1) tail_iframe_ts = get_ts_frame(tail_iframe) if tail_iframe_ts is None: exit(-1) checks.append(pos+head_iframe_ts-ts1) subparts = [] # TODO: separate pipeline processing between coarse and not fine grain options. # if args.coarse: # do_coarse_processing(ffmpeg=paths['ffmpeg'], ffprobe=paths['ffprobe'], inputFile=mkv, # begin=ts1, end=head_iframe_ts, nbFrames=nb_head_frames-1, # frameRate=frameRate, filesPrefix='part-%d-head' % (partnum), # streams=streams, width=width, height=height, # temporaries=temporaries, dumpMemFD=args.dump) # else: # doFineGrainProcessing(ffmpeg=paths['ffmpeg'], ffprobe=paths['ffprobe'], # inputFile=mkv, begin=ts1, end=head_iframe_ts, # nbFrames=nb_head_frames-1, frameRate=frameRate, # filesPrefix='part-%d-head' % (partnum), streams=streams, # width=width, height=height, temporaries=temporaries, # dumpMemFD=args.dump) if (not args.coarse) and (nb_head_frames > args.threshold): # We extract all frames between the beginning upto the frame that immediately preceeds # the I-frame. h264_head, h264_head_ts, mkv_head = extract_all_streams(ffmpeg_path=paths['ffmpeg'], ffprobe_path=paths['ffprobe'], inputFile=mkv, begin=ts1, end=head_iframe_ts, nbFrames=nb_head_frames-1, framerate=framerate, filesPrefix=f'part-{partnum:d}-head', streams=streams, width=width, height=height, temporaries=temporaries, dumpMemFD=args.dump) # If we are not at an exact boundary: if mkv_head is not None: subparts.append(mkv_head) if h264_head is not None: avcconfig = get_avc_config_from_h264(h264_head) other_avc_configs.append(avcconfig) h264parts.append(h264_head) if h264_head_ts is not None: h264_ts.append(h264_head_ts) # Creating MKV file that corresponds to current part between I-frames # Internal video with all streams (video, audio and subtitles) internal_mkv_name = f'part-{partnum:d}-internal.mkv' # Internal video stream as a raw H264 stream internal_h264_name = f'part-{partnum:d}-internal.h264' # Internal video timestamps internal_h264_ts_name = f'part-{partnum:d}-internal-ts.txt' # Internal video with only audio and subtitles streams internal_novideo_mkv_name = f'part-{partnum:d}-internal-novideo.mkv' try: internal_mkv = open(internal_mkv_name, 'w+', encoding='utf8') except IOError: logger.error('Impossible to create file: %s', internal_mkv_name) exit(-1) try: internal_novideo_mkv = open(internal_novideo_mkv_name, 'w+', encoding='utf8') except IOError: logger.error('Impossible to create file: %s', internal_novideo_mkv_name) exit(-1) try: internal_h264 = open(internal_h264_name, 'w+', encoding='utf8') except IOError: logger.error('Impossible to create file: %s', internal_h264_name) exit(-1) try: internal_h264_ts = open(internal_h264_ts_name, 'w+', encoding='utf8') except IOError: logger.error('Impossible to create file: %s', internal_h264_ts_name) exit(-1) # logger.info('Merge header, middle and trailer subpart into: %s' % internal_mkv_name) # Extract internal part of MKV extract_mkv_part(mkvmerge_path=paths['mkvmerge'], inputFile=mkv, outputFile=internal_mkv, begin=head_iframe_ts, end=tail_iframe_ts) # Extract video stream of internal part as a raw H264 and its timestamps. logger.info('Extract video track as raw H264 file.') extract_track_from_mkv(mkvextract_path=paths['mkvextract'], inputFile=internal_mkv, index=0, outputFile=internal_h264, timestamps=internal_h264_ts) # Remove video track from internal part of MKV logger.info('Remove video track from %s', internal_mkv_name) remove_video_tracks_from_mkv(mkvmerge_path=paths['mkvmerge'], inputFile=internal_mkv, outputFile=internal_novideo_mkv) temporaries.append(internal_mkv) temporaries.append(internal_h264) temporaries.append(internal_h264_ts) temporaries.append(internal_novideo_mkv) h264parts.append(internal_h264) h264_ts.append(internal_h264_ts) subparts.append(internal_novideo_mkv) if (not args.coarse) and (nb_tail_frames > args.threshold): # We extract all frames between the I-frame (including it) upto the end. h264_tail, h264_tail_ts, mkv_tail = extract_all_streams(ffmpeg_path=paths['ffmpeg'], ffprobe_path=paths['ffprobe'], inputFile=mkv, begin=tail_iframe_ts, end=ts2, nbFrames=nb_tail_frames, framerate=framerate, filesPrefix=f'part-{partnum:d}-tail', streams=streams, width=width, height=height, temporaries=temporaries, dumpMemFD=args.dump) if mkv_tail is not None: subparts.append(mkv_tail) if h264_tail is not None: avcconfig = get_avc_config_from_h264(h264_tail) other_avc_configs.append(avcconfig) h264parts.append(h264_tail) if h264_tail_ts is not None: h264_ts.append(h264_tail_ts) logger.info('Merging MKV: %s', subparts) part = merge_mkvs(mkvmerge_path=paths['mkvmerge'], inputs=subparts, outputName=f'part-{partnum:d}.mkv', concatenate=True) mkvparts.append(part) temporaries.append(part) pos = pos+tail_iframe_ts-ts1 # We need to check the end also checks.append(pos) # When using coarse option there is a single AVC configuration. for avc_config in other_avc_configs: main_avc_config.merge(avc_config) logger.debug('Merged AVC configuration: %s', main_avc_config) nb_mkv_parts = len(mkvparts) if nb_mkv_parts > 0: try: full_h264 = open(f'{basename}-full.h264', 'w+', encoding='utf8') except IOError: logger.error('Impossible to create file full H264 stream.') exit(-1) logger.info('Merging all H264 tracks') concatenate_h264_parts(h264parts=h264parts, output=full_h264) temporaries.append(full_h264) try: full_h264_ts = open(f'{basename}-ts.txt', 'w+', encoding='utf8') except IOError: logger.error('Impossible to create file containing all video timestamps.') exit(-1) logger.info('Merging H264 timestamps') concatenate_h264_ts_parts(h264TSParts=h264_ts, output=full_h264_ts) temporaries.append(full_h264_ts) final_novideo_name = f'{basename}-novideo.mkv' final_with_video_name = f'{basename}-video.mkv' if nb_mkv_parts > 1: logger.info('Merging all audio and subtitles parts: %s', mkvparts) merge_mkvs(mkvmerge_path=paths['mkvmerge'], inputs=mkvparts, outputName=final_novideo_name, concatenate=True) elif nb_mkv_parts == 1: copyfile('part-1.mkv', final_novideo_name) else: logger.info("Nothing else to do.") copyfile(mkvfilename, final_with_video_name) if nb_mkv_parts >=1 : try: final_novideo = open(final_novideo_name, 'r', encoding='utf8') except IOError: logger.error('Impossible to open file: %s.', final_novideo_name) exit(-1) temporaries.append(final_novideo) full_h264_ts.seek(0) logger.info('Merging final video track and all other tracks together') final_with_video = merge_mkvs(mkvmerge_path=paths['mkvmerge'], inputs=[full_h264, final_novideo], outputName=final_with_video_name, concatenate=False, timestamps={0: full_h264_ts}) final_codec_private_data = dump_codec_private_data(main_avc_config) logger.debug('Final codec private data: %s', hexdump.dump(final_codec_private_data, sep=':')) logger.info('Changing codec private data with the new one.') change_codec_private_data(paths['mkvinfo'], final_with_video, final_codec_private_data) if args.srt: if not all_optional_tools: logger.warning("Missing tools for extracting subtitles.") move(final_with_video_name, args.outputFile) else: # Final cut is not any more the final step. temporaries.append(final_with_video) duration = get_movie_duration(paths['ffprobe'], final_with_video) supported_langs = get_tesseract_supported_lang(paths['tesseract']) logger.info('Supported lang: %s', supported_langs) logger.info('Find subtitles tracks and language.') subtitles = find_subtitles_tracks(paths['ffprobe'], final_with_video) logger.info(subtitles) sts = {} for subtitle in subtitles: index = subtitle['index'] if 'tags' in subtitle: if 'language' in subtitle['tags']: lang = subtitle['tags']['language'] if lang in sts: sts[lang].append(index) else: sts[lang] = [index] else: logger.error("Dropping subtitle: %s because it is missing language\ indication") else: logger.error("Dropping subtitle: %s because it is missing language indication", subtitle) logger.info(sts) if len(sts) > 0: list_of_subtitles = extract_srt(paths['mkvextract'], final_with_video_name, sts, supported_langs) logger.info(list_of_subtitles) for idx_name, sub_name, _, _ in list_of_subtitles: try: idx = open(idx_name,'r', encoding='utf8') except IOError: logger.error("Impossible to open %s.", idx_name) exit(-1) try: sub = open(sub_name,'r', encoding='utf8') except IOError: logger.error("Impossible to open %s.", sub_name) exit(-1) temporaries.append(idx) temporaries.append(sub) ocr = do_ocr(paths['vobsubocr'], list_of_subtitles, duration, temporaries, args.dump) logger.info(ocr) # Remux SRT subtitles remux_srt_subtitles(paths['mkvmerge'], final_with_video, args.outputFile, ocr) else: copyfile(final_with_video_name, args.outputFile) else: move(final_with_video_name, args.outputFile) if not args.keep: logger.info("Cleaning temporary files") for f in temporaries: path = os.path.realpath(f.name) logger.info("Removing: %s", path) f.close() unlink(path) d = datetime(1,1,1) for c in checks: logger.info("Please check cut smoothness at %s", (c+d).strftime("%H:%M:%S")) if __name__ == "__main__": main()