Files
removeads/removeads.py
2025-10-25 16:47:28 +02:00

3353 lines
138 KiB
Python
Executable File

#!/usr/bin/env python3
'''A module to remove parts of video (.e.g advertisements) with single frame precision.'''
import argparse
import re
from sys import exit
from datetime import datetime,timedelta
import coloredlogs, logging
from functools import cmp_to_key
from subprocess import Popen, PIPE
from os import read, write, lseek, pipe, set_inheritable, memfd_create, SEEK_SET, close, unlink, fstat, ftruncate
import os.path
from io import BytesIO, TextIOWrapper
import json
from enum import Enum, IntEnum, unique, auto
import shutil
from tqdm import tqdm, trange
from select import select
from math import floor, ceil, log
from shutil import copyfile, which, move
import hexdump
from iso639 import Lang
from iso639.exceptions import InvalidLanguageValue
from dataclasses import dataclass, field
# Useful SPS/PPS discussion
# TODO: improve situation of SPS and PPS header mismatch when merging MVK with mkvmerge to remove warnings.
# https://copyprogramming.com/howto/including-sps-and-pps-in-a-raw-h264-track
# https://gitlab.com/mbunkus/mkvtoolnix/-/issues/2390
# New strategy: a possible way of handling multiple SPS/PPS gracefully.
# Encode each head and trailer with FFMPEG using only I-frame (to be sure the NAL unit will never refer to another image).
# Encode using an different SPS-ID all of them (using sps-id parameter of libx264 library, e.g 1 instead of 0).
# For the video track produce only a raw H264 file and a file containing timestamps of the different frames.
# For the rest of the tracks (audio, subtitles) produce directly a MKV (this is already done).
# Concatenate all raw H264 in a giant one (like cat), and the same for timestamps of video frames (to keep
# sound and video synchronized).
# Then use mkvmerge to remux the H264 track and the rest of tracks.
# MKVmerge "concatenate" subcommand is able to concatenate different SPS/PPS data into a bigger Private Codec Data.
# However, this is proved to be not reliable. Sometimes it results in a AVC context containing a single SPS/PPS.
# So we have to rely on a manual parsing of the H264 AVC context of original movie
# and the ones produced for headers and trailers, and then merging them into a bigger AVC context.
# Then finally, change the Private Codec Data in the final MKV.
def checkRequiredTools():
logger = logging.getLogger(__name__)
allOptionalTools = True
paths = {}
required = ['ffmpeg', 'ffprobe', 'mkvmerge', 'mkvinfo']
optional = ['mkvextract', 'vobsubocr','tesseract']
for tool in required:
path = which(tool)
if path is None:
logger.error('Required tool: %s is missing.',tool)
exit(-1)
else:
paths[tool] = path
for tool in optional:
path = which(tool)
if path is None:
logger.info('Optional tool: %s is missing.',tool)
allOptionalTools = False
else:
paths[tool] = path
return allOptionalTools, paths
def getTesseractSupportedLang(tesseract):
logger = logging.getLogger(__name__)
res = {}
with Popen([tesseract, '--list-langs'], stdout=PIPE) as tesseract:
for line in tesseract.stdout:
line = line.decode('utf8')
p = re.compile('(?P<lang>[a-z]{3})\n')
m = re.match(p,line)
if m is not None:
try:
lang = m.group('lang')
key = Lang(lang)
res[key] = lang
except InvalidLanguageValue as e:
pass
tesseract.wait()
if tesseract.returncode != 0:
logger.error("Tesseract returns an error code: %d",tesseract.returncode)
return None
return res
def getFrameRate(ffprobe, inputFile):
logger = logging.getLogger(__name__)
infd = inputFile.fileno()
lseek(infd, 0, SEEK_SET)
set_inheritable(infd, True)
meanDuration = 0.
nbFrames1 = 0
nbFrames2 = 0
meanInterframes = 0.
minTs = None
maxTs = None
interlaced = False
params = [ffprobe, '-loglevel', 'quiet', '-select_streams', 'v', '-show_frames',
'-read_intervals', '00%+30', '-of', 'json', '/proc/self/fd/%d' % infd]
env = {**os.environ, 'LANG': 'C'}
with Popen(params, stdout=PIPE, close_fds=False, env=env) as ffprobe:
out, _ = ffprobe.communicate()
out = json.load(BytesIO(out))
if 'frames' in out:
for frame in out['frames']:
if 'interlaced_frame' in frame:
if frame['interlaced_frame'] == 1:
interlaced = True
if 'pts_time' in frame:
ts = float(frame['pts_time'])
if minTs is None:
minTs = ts
if maxTs is None:
maxTs = ts
if ts < minTs:
minTs = ts
if ts > maxTs:
maxTs = ts
nbFrames1+=1
if 'duration_time' in frame:
meanDuration+=float(frame['duration_time'])
nbFrames2+=1
else:
return None
ffprobe.wait()
if ffprobe.returncode != 0:
logger.error("ffprobe returns an error code: %d", ffprobe.returncode)
return None
frameRate1 = nbFrames1/(maxTs-minTs)
frameRate2 = nbFrames2 / meanDuration
if abs(frameRate1 - frameRate2) > 0.2:
if not interlaced:
logger.error('Video is not interlaced and the disperancy between frame rates is too big: %f / %f', frameRate1, frameRate2)
return None
if abs(frameRate1*2 - frameRate2) < 0.2:
return frameRate2/2
else:
logger.error('Video is interlaced and the disperancy between frame rates is too big: %f / %f', frameRate1, frameRate2)
return None
else:
return frameRate2
return None
def getSubTitlesTracks(ffprobe, mkvPath):
logger = logging.getLogger(__name__)
tracks={}
nbSubTitles = 0
with Popen([ffprobe, '-loglevel', 'quiet', '-select_streams', 's', '-show_entries',
'stream=index,codec_name:stream_tags=language', '-of', 'json', mkvPath], stdout=PIPE) as ffprobe:
out, _ = ffprobe.communicate()
out = json.load(BytesIO(out))
if 'streams' in out:
for stream in out['streams']:
index = stream['index']
codec = stream['codec']
lang = stream['tags']['language']
if codec == 'dvd_subtitle':
if lang not in tracks:
tracks[lang] = [index]
else:
l = tracks[lang]
l.append(index)
tracks[lang] = l
else:
return None
ffprobe.wait()
if ffprobe.returncode != 0:
logger.error("ffprobe returns an error code: %d", ffprobe.returncode)
return None
return tracks
def extractSRT(mkvextract, fileName, subtitles, langs):
logger = logging.getLogger(__name__)
params = [mkvextract, fileName, 'tracks']
res = []
for lang in subtitles:
iso = Lang(lang)
if iso in langs:
ocrlang = langs[iso]
else:
logger.warning("Language not supported by Tesseract: %s", iso.name)
ocrlang ='osd'
if len(subtitles[lang]) == 1:
params.append('%d:%s' % (subtitles[lang][0], lang))
res.append(('%s.idx' % lang, '%s.sub' % lang, lang, ocrlang))
else:
count = 1
for track in subtitles[lang]:
params.append('%d:%s-%d' % (track, lang, count))
res.append(('%s-%d.idx' % (lang,count), '%s-%d.sub' % (lang,count), lang, ocrlang))
count = count+1
logger.debug('Executing %s' % params)
env = {**os.environ, 'LANG': 'C'}
with Popen(params, stdout=PIPE, close_fds=False, env=env) as extract:
pb = tqdm(TextIOWrapper(extract.stdout, encoding="utf-8"), total=100, unit='%', desc='Extraction:')
for line in pb:
if line.startswith('Progress :'):
p = re.compile('^Progress : (?P<progress>[0-9]{1,3})%$')
m = p.match(line)
if m is None:
logger.error('Impossible to parse progress')
pb.update(int(m['progress'])-pb.n)
pb.update(100-pb.n)
pb.refresh()
pb.close()
extract.wait()
# mkvextract returns 0, 1 or 2 as error code.
if extract.returncode == 0:
logger.info('Subtitle tracks were succesfully extracted.')
return res
elif extract.returncode == 1:
logger.warning('Mkvextract returns warning')
return res
else:
logger.error('Mkvextract returns an error code: %d', extract.returncode)
return None
def doOCR(vobsubocr, idxs, duration, temporaries, dumpMemFD=False):
logger = logging.getLogger(__name__)
res = []
for idxName, subName, lang, iso in idxs:
srtname = '%s.srt' % os.path.splitext(idxName)[0]
# Tesseract seems to recognize the three dots ... as "su"
ldots = re.compile('^su\n$')
timestamps = re.compile(r'^[0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3} \-\-> (?P<hours>[0-9]{2}):(?P<minutes>[0-9]{2}):(?P<seconds>[0-9]{2}),[0-9]{3}$')
srtfd = memfd_create(srtname, flags=0)
with Popen([vobsubocr, '--lang', iso, idxName], stdout=PIPE) as ocr:
pb = tqdm(TextIOWrapper(ocr.stdout, encoding="utf-8"), total=int(duration/timedelta(seconds=1)), unit='s', desc='OCR')
for line in pb:
m = re.match(ldots,line)
if m is not None:
write(srtfd, '...'.encode(encoding='UTF-8'))
else:
write(srtfd, line.encode(encoding='UTF-8'))
m = re.match(timestamps, line)
if m!=None:
hours = int(m.group('hours'))
minutes = int(m.group('hours'))
seconds = int(m.group('seconds'))
ts = timedelta(hours=hours, minutes=minutes, seconds=seconds)
pb.n = int(ts/timedelta(seconds=1))
pb.update()
status = ocr.wait()
if status != 0:
logger.error('OCR failed with status code: %d', status)
if dumpMemFD:
try:
dumpSrt = open(srtname,'w')
except IOError:
logger.error('Impossible to create file: %s', srtname)
return None
lseek(srtfd, 0, SEEK_SET)
srtLength = fstat(srtfd).st_size
buf = read(srtfd, srtLength)
outfd = dumpSrt.fileno()
pos = 0
while pos < srtLength:
pos+=write(outfd, buf[pos:])
temporaries.append(dumpSrt)
srtLength = fstat(srtfd).st_size
if srtLength > 0:
res.append((srtfd, lang))
return res
@unique
class SupportedFormat(IntEnum):
TS = 1
MP4 = 2
Matroska = 3
def __str__(self):
if self is SupportedFormat.TS:
return 'mpegts'
elif self is SupportedFormat.MP4:
return 'mov,mp4,m4a,3gp,3g2,mj2'
elif self is SupportedFormat.Matroska:
return 'matroska,webm'
else:
return 'Unsupported format'
# Extract SPS/PPS
# https://gitlab.com/mbunkus/mkvtoolnix/-/issues/2390
# ffmpeg -i <InputFile (before concatenation)> -c:v copy -an -sn -bsf:v trace_headers -t 0.01 -report -loglevel 0 -f null -
# Found codec private data using mkvinfo
def getCodecPrivateDataFromMKV(mkvinfo, inputFile):
logger = logging.getLogger(__name__)
infd = inputFile.fileno()
lseek(infd, 0, SEEK_SET)
set_inheritable(infd, True)
found = False
env = {**os.environ, 'LANG': 'C'}
# Output example
# Codec's private data: size 48 (H.264 profile: High @L4.0) hexdump 01 64 00 28 ff e1 00 1b 67 64 00 28 ac d9 40 78 04 4f dc d4 04 04 05 00 00 92 ef 00 1d ad a6 1f 16 2d 96 01 00 06 68 fb a3 cb 22 c0 fd f8 f8 00 at 406 size 51 data size 48
with Popen([mkvinfo, '-z', '-X', '-P', '/proc/self/fd/%d' % infd ], stdout=PIPE, close_fds=False, env=env) as mkvinfo:
out, _ = mkvinfo.communicate()
out = out.decode('utf8')
regExp = r"^.*Codec's private data: size ([0-9]+) \(H.264.*\) hexdump (?P<hexdump>([0-9a-f]{2} )+)at (?P<position>[0-9]+) size (?P<size>[0-9]+).*$"
p = re.compile(regExp)
for line in out.splitlines():
m = p.match(line)
if m is not None:
size = int(m.group('size'))
position = int(m.group('position'))
logger.debug("Found codec private data at position: %s, size: %d", position, size)
found = True
mkvinfo.wait()
break
if found:
lseek(infd, position, SEEK_SET)
data = read(infd, size)
return position, data
else:
return None, None
# All the following code is a transposition of documents:
# ISO/IEC H.264-201602
# ISO/IEC 14496-15
def readBit(buf, bitPosition):
logger = logging.getLogger(__name__)
bytePosition = floor(floor(bitPosition/8))
byte = buf[bytePosition]
bit = (byte >> (7-(bitPosition % 8))) & 1
return bitPosition+1, bit
def readBoolean(buf, bitPosition):
bitPosition, b = readBit(buf, bitPosition)
return bitPosition, b==1
def readBits(buf, bitPosition, nbBits):
logger = logging.getLogger(__name__)
v = 0
for i in range(0, nbBits):
bitPosition, bit = readBit(buf, bitPosition)
v = v*2+bit
return bitPosition, v
def readByte(buf, bitPosition):
bitPosition, b = readBits(buf, bitPosition, 8)
return bitPosition, b
def readWord(buf, bitPosition):
bitPosition, w = readBits(buf, bitPosition, 16)
return bitPosition, w
def readLong(buf, bitPosition):
bitPosition, l = readBits(buf, bitPosition, 32)
return bitPosition, l
def readUnsignedExpGolomb(buf, bitPosition):
nbZeroes=0
while True:
bitPosition, b = readBit(buf, bitPosition)
if b!=0:
break
nbZeroes+=1
v1 = 1
bitPosition, v2 = readBits(buf, bitPosition, nbZeroes)
v = (v1<<nbZeroes)+v2
return bitPosition, v-1
def readSignedExpGolomb(buf, bitPosition):
logger = logging.getLogger(__name__)
bitPosition, v = readUnsignedExpGolomb(buf, bitPosition)
if v%2 == 0:
return bitPosition, -(v>>1)
else:
return bitPosition, (v+1)>>1
def writeBit(buf, bitPosition, b):
logger = logging.getLogger(__name__)
bufLength = len(buf)
bytePosition = floor(bitPosition/8)
if bytePosition >= bufLength:
extension = bytearray(bytePosition+1-bufLength)
buf.extend(extension)
buf[bytePosition] |= (b<<(7-(bitPosition % 8)))
bitPosition+=1
return bitPosition
def writeBoolean(buf, bitPosition, b):
if b:
bitPosition = writeBit(buf, bitPosition, 1)
else:
bitPosition = writeBit(buf, bitPosition, 0)
return bitPosition
def writeBits(buf, bitPosition, v, size):
for i in range(size-1,-1,-1):
b = (v>>i)&1
bitPosition = writeBit(buf, bitPosition, b)
return bitPosition
def writeByte(buf, bitPosition, v):
bitPosition = writeBits(buf, bitPosition, v, 8)
return bitPosition
def writeWord(buf, bitPosition, v):
bitPosition = writeBits(buf, bitPosition, v, 16)
return bitPosition
def writeLong(buf, bitPosition, v):
bitPosition = writeBits(buf, bitPosition, v, 32)
return bitPosition
def writeUnsignedExpGolomb(buf, bitPosition, v):
logger = logging.getLogger(__name__)
n = floor(log(v+1)/log(2))+1
# Write zeroes
bitPosition = writeBits(buf, bitPosition, 0, n-1)
bitPosition = writeBit(buf, bitPosition, 1)
bitPosition = writeBits(buf, bitPosition, v+1, n-1)
return bitPosition
def writeSignedExpGolomb(buf, bitPosition, v):
if v <= 0:
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, -v*2)
else:
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, v*2-1)
return bitPosition
def parseRBSPTrailingBits(buf, bitPosition):
logger = logging.getLogger(__name__)
bitPosition, one = readBit(buf, bitPosition)
if one==0:
raise(Exception('Stop bit should be equal to one. Read: %d' % one))
while bitPosition%8 != 0:
bitPosition, zero = readBit(buf, bitPosition)
if zero==1:
raise(Exception('Trailing bit should be equal to zero'))
return bitPosition
def writeRBSPTrailingBits(buf, bitPosition):
bitPosition = writeBit(buf, bitPosition, 1)
while bitPosition%8 != 0:
bitPosition = writeBit(buf, bitPosition, 0)
return bitPosition
def moreRBSPData(buf, bitPosition):
logger = logging.getLogger(__name__)
logger.debug('Is there more data in buffer of length: %d at bitPosition: %d' % (len(buf), bitPosition))
byteLength = len(buf)
bitLength = byteLength*8
# We are at the end of buffer
if bitPosition == bitLength:
return False
else:
found = False
for i in range(bitLength-1,-1,-1):
pos, b = readBit(buf, i)
if b == 1:
found = True
break
if not found:
raise(Exception('Impossible to find trailing stop bit !'))
# No more data
if bitPosition == pos:
return False
return True
# Convert from RBSP (Raw Byte Sequence Payload) to SODB (String Of Data Bits)
def RBSP2SODB(buf):
logger = logging.getLogger(__name__)
logger.debug('RBSP: %s' % hexdump.dump(buf, sep=':'))
res = buf
for b in [ b'\x00', b'\x01', b'\x02', b'\x03']:
pattern = b'\x00\x00\x03'+b
replacement = b'\x00\x00' + b
res = res.replace(pattern, replacement)
logger.debug('SODB: %s' % hexdump.dump(res, sep=':'))
return res
# Reverse operation SODB to RBSP.
def SODB2RBSP(buf):
logger = logging.getLogger(__name__)
logger.debug('SODB: %s' % hexdump.dump(buf, sep=':'))
res = buf
for b in [ b'\x03', b'\x00', b'\x01', b'\x02']:
pattern = b'\x00\x00'+b
replacement = b'\x00\x00\x03' + b
res = res.replace(pattern, replacement)
logger.debug('RBSP: %s', hexdump.dump(res, sep=':'))
return res
# Useful for SPS and PPS
def parseScalingList(buf, bitPosition, size):
logger = logging.getLogger(__name__)
res = []
lastScale = 8
nextScale = 8
for i in range(0, size):
if nextScale != 0:
bitPosition, delta_scale = readSignedExpGolomb(buf, bitPosition)
nextScale = (lastScale+delta_scale+256) % 256
v = lastScale if nextScale==0 else nextScale
res.append(v)
lastScale = v
return bitPosition,res
# TODO: test optimized version.
# The ISO/IEC H.264-201602 seems to take into account the case where the end of the deltas list is full of zeroes.
def writeScalingList(buf, bitPosition, size, matrix, optimized=False):
logger = logging.getLogger(__name__)
logger.debug('Dumping matrix: %s of size: %d, size parameter: %d.', matrix, len(matrix), size)
prev = 8
deltas = []
for i in range(0, size):
v = matrix[i]
delta = v - prev
deltas.append(delta)
prev = v
if not optimized:
for delta in deltas:
bitPosition = writeSignedExpGolomb(buf, bitPosition, delta)
else:
logger.error('Not yet implemented')
exit(-1)
# reverse = deltas.reverse()
# compressed = False
# while len(reverse)>0:
# if reverse[0] == 0:
# compressed = True
# reverse.pop()
# else:
# break
# deltas = reverse.reverse()
# if compressed:
# deltas.append(0)
# for delta in deltas:
# bitPosition = writeSignedExpGolomb(buf, bitPosition, delta)
return bitPosition
@dataclass
class HRD:
cpb_cnt_minus1: int=0
bit_rate_scale: int=0
cpb_size_scale: int=0
bit_rate_value_minus1: dict = field(default_factory=dict)
cpb_size_value_minus1: dict = field(default_factory=dict)
cbr_flag: dict = field(default_factory=dict)
initial_cpb_removal_delay_length_minus1: int=0
cpb_removal_delay_length_minus1: int=0
dpb_output_delay_length_minus1: int=0
time_offset_length: int=0
def __init__(self):
self.bit_rate_value_minus1 = {}
self.cpb_size_value_minus1 = {}
self.cbr_flag = {}
def fromBytes(self, buf, bitPosition):
bitPosition, self.cpb_cnt_minus1 = readUnsignedExpGolomb(buf, bitPosition)
bitPosition, self.bit_rate_scale = readBits(buf, bitPosition, 4)
bitPosition, self.cpb_size_scale = readBits(buf, bitPosition, 4)
for i in range(0, self.cpb_cnt_minus1+1):
bitPosition, v = readUnsignedExpGolomb(buf, bitPosition)
self.bit_rate_value_minus1[i] = v
bitPosition, v = readUnsignedExpGolomb(buf, bitPosition)
self.cpb_size_value_minus1[i] = v
bitPosition, b = readBoolean(buf, bitPosition)
self.cbr_flag[i] = b
bitPosition, self.initial_cpb_removal_delay_length_minus1 = readBits(buf, bitPosition, 5)
bitPosition, self.cpb_removal_delay_length_minus1 = readBits(buf, bitPosition, 5)
bitPosition, self.dpb_output_delay_length_minus1 = readBits(buf, bitPosition, 5)
bitPosition, self.time_offset_length = readBits(buf, bitPosition, 5)
return bitPosition
def toBytes(self, buf, bitPosition):
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.cpb_cnt_minus1)
bitPosition = writeBits(buf, bitPosition, self.bit_rate_scale, 4)
bitPosition = writeBits(buf, bitPosition, self.cpb_size_scale, 4)
for i in range(0, self.cpb_cnt_minus1+1):
v = self.bit_rate_value_minus1[i]
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, v)
v = self.cpb_size_value_minus1[i]
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, v)
b = self.cbr_flag[i]
bitPosition = writeBoolean(buf, bitPosition, b)
bitPosition = writeBits(buf, bitPosition, self.initial_cpb_removal_delay_length_minus1, 5)
bitPosition = writeBits(buf, bitPosition, self.cpb_removal_delay_length_minus1, 5)
bitPosition = writeBits(buf, bitPosition, self.dpb_output_delay_length_minus1, 5)
bitPosition = writeBits(buf, bitPosition, self.time_offset_length, 5)
return bitPosition
@dataclass
class VUI:
aspect_ratio_info_present_flag:bool=False
aspect_ratio_idc:int=0
sar_width:int=0
sar_height:int=0
overscan_info_present_flag:bool=False
overscan_appropriate_flag:bool=False
video_signal_type_present_flag:bool=False
video_format:int=0
video_full_range_flag:bool=False
colour_description_present_flag:bool=False
colour_primaries:int=0
transfer_characteristics:int=0
matrix_coefficients:int=0
chroma_loc_info_present_flag:bool=False
chroma_sample_loc_type_top_field:int=0
chroma_sample_loc_type_bottom_field:int=0
timing_info_present_flag:bool=False
num_units_in_tick:int=0
time_scale:int=0
fixed_frame_rate_flag:bool=False
nal_hrd_parameters_present_flag:bool=False
hrd_parameters:HRD=None
vcl_hrd_parameters_present_flag:bool=False
vcl_hrd_parameters:HRD=None
low_delay_hrd_flag:bool=False
pic_struct_present_flag:bool=False
bitstream_restriction_flag:bool=False
motion_vectors_over_pic_boundaries_flag:bool=False
max_bytes_per_pic_denom:int=0
max_bits_per_mb_denom:int=0
log2_max_mv_length_horizontal:int=0
log2_max_mv_length_vertical:int=0
max_num_reorder_frames:int=0
max_dec_frame_buffering:int=0
# This structure is not guaranteed to be located at a byte boundary.
# We must explicitely indicate bit offset.
def fromBytes(self, buf, bitPosition):
logger = logging.getLogger(__name__)
bitPosition, self.aspect_ratio_info_present_flag = readBoolean(buf, bitPosition)
if self.aspect_ratio_info_present_flag:
bitPosition, self.aspect_ratio_idc = readByte(buf, bitPosition)
if self.aspect_ratio_idc == 255: # Extended_SAR
bitPosition, self.sar_width = readWord(buf, bitPosition)
bitPosition, self.sar_height = readWord(buf, bitPosition)
bitPosition, self.overscan_info_present_flag = readBoolean(buf, bitPosition)
if self.overscan_info_present_flag:
bitPosition, self.overscan_appropriate_flag = readBoolean(buf, bitPosition)
bitPosition, self.video_signal_type_present_flag = readBoolean(buf, bitPosition)
if self.video_signal_type_present_flag:
bitPosition, self.video_format = readBits(buf, bitPosition, 3)
bitPosition, self.video_full_range_flag = readBoolean(buf, bitPosition)
bitPosition, self.colour_description_present_flag = readBoolean(buf, bitPosition)
if self.colour_description_present_flag:
bitPosition, self.colour_primaries = readByte(buf, bitPosition)
bitPosition, self.transfer_characteristics = readByte(buf, bitPosition)
bitPosition, self.matrix_coefficients = readByte(buf, bitPosition)
bitPosition, self.chroma_loc_info_present_flag = readBoolean(buf, bitPosition)
if self.chroma_loc_info_present_flag:
bitPosition, self.chroma_sample_loc_type_top_field = readUnsignedExpGolomb(buf, bitPosition)
bitPosition, self.chroma_sample_loc_type_bottom_field = readUnsignedExpGolomb(buf, bitPosition)
bitPosition, self.timing_info_present_flag = readBoolean(buf, bitPosition)
if self.timing_info_present_flag:
bitPosition, self.num_units_in_tick = readLong(buf, bitPosition)
bitPosition, self.time_scale = readLong(buf, bitPosition)
bitPosition, self.fixed_frame_rate_flag = readBoolean(buf, bitPosition)
bitPosition, self.nal_hrd_parameters_present_flag = readBoolean(buf, bitPosition)
if self.nal_hrd_parameters_present_flag:
hrd = HRD()
bitPosition = hrd.fromBytes(buf, bitPosition)
self.hrd_parameters = hrd
bitPosition, self.vcl_hrd_parameters_present_flag = readBoolean(buf, bitPosition)
if self.vcl_hrd_parameters_present_flag:
hrd = HRD()
bitPosition = hrd.fromBytes(buf, bitPosition)
self.vcl_hrd_parameters = hrd
if self.nal_hrd_parameters_present_flag or self.vcl_hrd_parameters_present_flag:
bitPosition, self.low_delay_hrd_flag = readBoolean(buf, bitPosition)
bitPosition, self.pic_struct_present_flag = readBoolean(buf, bitPosition)
bitPosition, self.bitstream_restriction_flag = readBoolean(buf, bitPosition)
if self.bitstream_restriction_flag:
bitPosition, self.motion_vectors_over_pic_boundaries_flag = readBoolean(buf, bitPosition)
bitPosition, self.max_bytes_per_pic_denom = readUnsignedExpGolomb(buf, bitPosition)
bitPosition, self.max_bits_per_mb_denom = readUnsignedExpGolomb(buf, bitPosition)
bitPosition, self.log2_max_mv_length_horizontal = readUnsignedExpGolomb(buf, bitPosition)
bitPosition, self.log2_max_mv_length_vertical = readUnsignedExpGolomb(buf, bitPosition)
bitPosition, self.max_num_reorder_frames = readUnsignedExpGolomb(buf, bitPosition)
bitPosition, self.max_dec_frame_buffering = readUnsignedExpGolomb(buf, bitPosition)
return bitPosition
def toBytes(self, buf, bitPosition):
logger = logging.getLogger(__name__)
bitPosition = writeBoolean(buf, bitPosition, self.aspect_ratio_info_present_flag)
if self.aspect_ratio_info_present_flag:
bitPosition = writeByte(buf, bitPosition, self.aspect_ratio_idc)
if self.aspect_ratio_idc == 255: # Extended_SAR
bitPosition = writeWord(buf, bitPosition, self.sar_width)
bitPosition = writeWord(buf, bitPosition, self.sar_height)
bitPosition = writeBoolean(buf, bitPosition, self.overscan_info_present_flag)
if self.overscan_info_present_flag:
bitPosition = writeBoolean(buf, bitPosition, self.overscan_appropriate_flag)
bitPosition = writeBoolean(buf, bitPosition, self.video_signal_type_present_flag)
if self.video_signal_type_present_flag:
bitPosition = writeBits(buf, bitPosition, self.video_format, 3)
bitPosition = writeBoolean(buf, bitPosition, self.video_full_range_flag)
bitPosition = writeBoolean(buf, bitPosition, self.colour_description_present_flag)
if self.colour_description_present_flag:
bitPosition = writeByte(buf, bitPosition, self.colour_primaries)
bitPosition = writeByte(buf, bitPosition, self.transfer_characteristics)
bitPosition = writeByte(buf, bitPosition, self.matrix_coefficients)
bitPosition = writeBoolean(buf, bitPosition, self.chroma_loc_info_present_flag)
if self.chroma_loc_info_present_flag:
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.chroma_sample_loc_type_top_field)
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.chroma_sample_loc_type_bottom_field)
bitPosition = writeBoolean(buf, bitPosition, self.timing_info_present_flag )
if self.timing_info_present_flag:
bitPosition = writeLong(buf, bitPosition, self.num_units_in_tick )
bitPosition = writeLong(buf, bitPosition, self.time_scale)
bitPosition = writeBoolean(buf, bitPosition, self.fixed_frame_rate_flag)
bitPosition = writeBoolean(buf, bitPosition, self.nal_hrd_parameters_present_flag)
if self.nal_hrd_parameters_present_flag:
bitPosition = self.hrd_parameters.toBytes(buf, bitPosition)
bitPosition = writeBoolean(buf, bitPosition, self.vcl_hrd_parameters_present_flag)
if self.vcl_hrd_parameters_present_flag:
bitPosition = self.vcl_hrd_parameters.toBytes(buf, bitPosition)
if self.nal_hrd_parameters_present_flag or self.vcl_hrd_parameters_present_flag:
bitPosition = writeBoolean(buf, bitPosition, self.low_delay_hrd_flag)
bitPosition = writeBoolean(buf, bitPosition, self.pic_struct_present_flag)
bitPosition = writeBoolean(buf, bitPosition, self.bitstream_restriction_flag)
if self.bitstream_restriction_flag:
bitPosition = writeBoolean(buf, bitPosition, self.motion_vectors_over_pic_boundaries_flag)
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.max_bytes_per_pic_denom)
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.max_bits_per_mb_denom)
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.log2_max_mv_length_horizontal)
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.log2_max_mv_length_vertical)
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.max_num_reorder_frames)
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.max_dec_frame_buffering)
return bitPosition
@dataclass
class SPS:
profile_idc:int=0 # u(8)
constraint_set0_flag:bool=False # u(1)
constraint_set1_flag:bool=False # u(1)
constraint_set2_flag:bool=False # u(1)
constraint_set3_flag:bool=False # u(1)
constraint_set4_flag:bool=False # u(1)
constraint_set5_flag:bool=False # u(1)
level_idc:int=0 # u(8)
seq_parameter_set_id:int=0 # ue(v)
chroma_format_idc:int=0 # ue(v)
separate_colour_plane_flag:bool=False # u(1)
bit_depth_luma_minus8:int=0 # ue(v)
bit_depth_chroma_minus8:int=0 # ue(v)
qpprime_y_zero_transform_bypass_flag:bool=False # u(1)
seq_scaling_matrix_present_flag:bool=False # u(1)
scaling_list: dict = field(default_factory=dict)
log2_max_frame_num_minus4:int=0 # ue(v)
pic_order_cnt_type:int=0 # ue(v)
log2_max_pic_order_cnt_lsb_minus4:int=0 # ue(v)
delta_pic_order_always_zero_flag:bool=False # ue(1)
offset_for_non_ref_pic:int=0 # se(v)
offset_for_top_to_bottom_field:int=0 # se(v)
num_ref_frames_in_pic_order_cnt_cycle:int=0 # ue(v)
offset_for_ref_frame:dict[int] = field(default_factory=dict)
max_num_ref_frames:int=9 # ue(v)
gaps_in_frame_num_value_allowed_flag:bool=False # u(1)
pic_width_in_mbs_minus1:int=0 # ue(v)
pic_height_in_map_units_minus1:int=0 # ue(v)
frame_mbs_only_flag:bool=False # u(1)
mb_adaptive_frame_field_flag:bool=False # u(1)
direct_8x8_inference_flag:bool=False # u(1)
frame_cropping_flag:bool=False # u(1)
frame_crop_left_offset:int=0 # ue(v)
frame_crop_right_offset:int=0 # ue(v)
frame_crop_top_offset:int=0 # ue(v)
frame_crop_bottom_offset:int=0 # ue(v)
vui_parameters_present_flag:bool=False # u(1)
vui:VUI=None # VUI object
def __init__(self):
self.scaling_list={}
self.offset_for_ref_frame={}
# TODO: ...
# Compute options to pass to ffmpeg so as to reproduce the same SPS.
# Very complex since some codec configuration are not provided by ffmpeg and/or libx264.
# This is only an attempt.
def ffmpegOptions(self, videoID=0):
logger = logging.getLogger(__name__)
x264opts = []
if self.profile_idc in [ 0x42, 0x4D, 0x64, 0x6E, 0x7A, 0xF4, 0x2C]:
if self.profile_idc == 0x42:
profile = 'baseline'
elif self.profile_idc == 0x4D:
profile = 'main'
elif self.profile_idc == 0x64 :
profile = 'high'
elif self.profile_idc == 0x6E:
profile = 'high10'
elif self.profile_idc == 0x7A:
profile = 'high422'
elif self.profile_idc == 0xF4:
profile = 'high444'
else:
logger.error('Unknow profile: %x', self.profile_idc)
return []
level = '%d.%d' % (floor(self.level_idc/10), self.level_idc % 10)
x264opts.extend(['sps-id=%d' % self.seq_parameter_set_id] )
if self.bit_depth_chroma_minus8 not in [0,1,2,4,6,8]:
logger.error('Bit depth of chrominance is not supported: %d', self.bit_depth_chroma_minus8+8)
return []
if self.chroma_format_idc in range(0,4):
if self.chroma_format_idc == 0:
# Monochrome
pass
elif self.chroma_format_idc == 1:
# YUV:4:2:0
pass
elif self.chroma_format_idc == 2:
# YUV:4:2:2
pass
elif self.chroma_format_idc == 3:
# YUV:4:4:4
pass
else:
logger.error('Unknow chrominance format: %x', self.chroma_format_idc)
return []
res = ['-profile:v:%d' % videoID, self.profile_idc, '-level:v:%d' % videoID, level]
return res
def fromBytes(self, buf):
logger = logging.getLogger(__name__)
logger.debug('Parsing: %s' % (hexdump.dump(buf,sep=':')))
bitPosition=0
# NAL Unit SPS
bitPosition, zero = readBit(buf, bitPosition)
if zero != 0:
raise(Exception('Reserved bit is not equal to 0: %d' % zero ))
bitPosition, nal_ref_idc = readBits(buf, bitPosition,2)
if nal_ref_idc != 3:
raise(Exception('NAL ref idc is not equal to 3: %d' % nal_ref_idc ))
bitPosition, nal_unit_type = readBits(buf, bitPosition,5)
if nal_unit_type != 7:
raise(Exception('NAL unit type is not a SPS: %d' % nal_unit_type ))
bitPosition, self.profile_idc = readByte(buf, bitPosition)
bitPosition, self.constraint_set0_flag = readBit(buf,bitPosition)
bitPosition, self.constraint_set1_flag = readBit(buf,bitPosition)
bitPosition, self.constraint_set2_flag = readBit(buf,bitPosition)
bitPosition, self.constraint_set3_flag = readBit(buf,bitPosition)
bitPosition, self.constraint_set4_flag = readBit(buf,bitPosition)
bitPosition, self.constraint_set5_flag = readBit(buf,bitPosition)
bitPosition, v = readBits(buf, bitPosition, 2)
if v!=0:
raise(Exception('Reserved bits different from 0b00: %x' % v))
bitPosition, self.level_idc = readByte(buf, bitPosition)
bitPosition, self.seq_parameter_set_id = readUnsignedExpGolomb(buf, bitPosition)
if self.profile_idc in [44, 83, 86, 100, 110, 118, 122, 128, 134, 135, 138, 139, 244]:
bitPosition, self.chroma_format_idc = readUnsignedExpGolomb(buf, bitPosition)
if self.chroma_format_idc==3:
bitPositionn, self.separate_colour_plane_flag=readBit(buf, bitPosition)
bitPosition, self.bit_depth_luma_minus8 = readUnsignedExpGolomb(buf, bitPosition)
bitPosition, self.bit_depth_chroma_minus8 = readUnsignedExpGolomb(buf, bitPosition)
bitPosition, self.qpprime_y_zero_transform_bypass_flag = readBoolean(buf, bitPosition)
bitPosition, self.seq_scaling_matrix_present_flag = readBoolean(buf, bitPosition)
if self.seq_scaling_matrix_present_flag:
nbMatrices = 12 if self.chroma_format_idc == 3 else 8
for i in range(0, nbMatrices):
bitPosition, present = readBoolean(buf, bitPosition)
if present:
if i<6:
bitPosition, matrix = parseScalingList(buf, bitPosition, 16)
self.scaling_list[i] = matrix
else:
bitPosition, matrix = parseScalingList(buf, bitPosition, 64)
self.scaling_list[i] = matrix
else:
self.scaling_list[i] = []
bitPosition, self.log2_max_frame_num_minus4 = readUnsignedExpGolomb(buf, bitPosition)
bitPosition , self.pic_order_cnt_type = readUnsignedExpGolomb(buf, bitPosition)
if self.pic_order_cnt_type == 0:
bitPosition, self.log2_max_pic_order_cnt_lsb_minus4 = readUnsignedExpGolomb(buf, bitPosition)
elif self.pic_order_cnt_type == 1:
bitPosition, self.delta_pic_order_always_zero_flag = readBoolean(buf, bitPosition)
bitPosition, self.offset_for_non_ref_pic = readSignedExpGolomb(buf, bitPosition)
bitPosition, self.offset_for_top_to_bottom_field = readSignedExpGolomb(buf, bitPosition)
bitPosition, self.num_ref_frames_in_pic_order_cnt_cycle = readUnsignedExpGolomb(buf, bitPosition)
for i in range(0, self.num_ref_frames_in_pic_order_cnt_cycle):
bitPosition, v = readUnsignedExpGolomb(buf, bitPosition)
self.offset_for_ref_frame[i]=v
bitPosition, self.max_num_ref_frames = readUnsignedExpGolomb(buf, bitPosition)
bitPosition, self.gaps_in_frame_num_value_allowed_flag = readBoolean(buf, bitPosition)
bitPosition, self.pic_width_in_mbs_minus1 = readUnsignedExpGolomb(buf, bitPosition)
bitPosition, self.pic_height_in_map_units_minus1 = readUnsignedExpGolomb(buf, bitPosition)
bitPosition, self.frame_mbs_only_flag = readBoolean(buf, bitPosition)
if not self.frame_mbs_only_flag:
bitPosition, self.mb_adaptive_frame_field_flag = readBoolean(buf, bitPosition)
bitPosition, self.direct_8x8_inference_flag = readBoolean(buf, bitPosition)
bitPosition, self.frame_cropping_flag = readBoolean(buf, bitPosition)
if self.frame_cropping_flag:
bitPosition, self.frame_crop_left_offset = readUnsignedExpGolomb(buf, bitPosition)
bitPosition, self.frame_crop_right_offset = readUnsignedExpGolomb(buf, bitPosition)
bitPosition, self.frame_crop_top_offset = readUnsignedExpGolomb(buf, bitPosition)
bitPosition, self.frame_crop_bottom_offset = readUnsignedExpGolomb(buf, bitPosition)
bitPosition, self.vui_parameters_present_flag = readBoolean(buf, bitPosition)
if self.vui_parameters_present_flag:
self.vui = VUI()
bitPosition = self.vui.fromBytes(buf,bitPosition)
logger.debug('VUI present: %s', self.vui)
logger.debug('Parse end of SPS. Bit position: %d. Remaining bytes: %s.' % (bitPosition, hexdump.dump(buf[floor(bitPosition/8):], sep=':')))
bitPosition = parseRBSPTrailingBits(buf, bitPosition)
logger.debug('End of SPS: %d. Remaining bytes: %s' % (bitPosition, hexdump.dump(buf[floor(bitPosition/8):], sep=':')))
return bitPosition
def toBytes(self):
logger = logging.getLogger(__name__)
buf = bytearray()
bitPosition = 0
bitPosition = writeBit(buf, bitPosition,0)
bitPosition = writeBits(buf, bitPosition, 3, 2)
bitPosition = writeBits(buf, bitPosition, 7, 5)
bitPosition = writeByte(buf, bitPosition, self.profile_idc)
bitPosition = writeBit(buf, bitPosition, self.constraint_set0_flag)
bitPosition = writeBit(buf, bitPosition, self.constraint_set1_flag)
bitPosition = writeBit(buf, bitPosition, self.constraint_set2_flag)
bitPosition = writeBit(buf, bitPosition, self.constraint_set3_flag)
bitPosition = writeBit(buf, bitPosition, self.constraint_set4_flag)
bitPosition = writeBit(buf, bitPosition, self.constraint_set5_flag)
bitPosition = writeBits(buf, bitPosition, 0, 2)
bitPosition = writeByte(buf, bitPosition, self.level_idc)
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.seq_parameter_set_id)
if self.profile_idc in [44, 83, 86, 100, 110, 118, 122, 128, 134, 135, 138, 139, 244]:
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.chroma_format_idc)
if self.chroma_format_idc==3:
bitPosition = writeBit(buf, bitPosition, self.separate_colour_plane_flag)
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.bit_depth_luma_minus8)
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.bit_depth_chroma_minus8)
bitPosition = writeBoolean(buf, bitPosition, self.qpprime_y_zero_transform_bypass_flag )
bitPosition = writeBoolean(buf, bitPosition, self.seq_scaling_matrix_present_flag)
if self.seq_scaling_matrix_present_flag:
nbMatrices = 12 if self.chroma_format_idc == 3 else 8
for i in range(0, nbMatrices):
matrix = self.scaling_list[i]
present = (len(matrix))!=0
bitPosition = writeBoolean(buf, bitPosition, present)
if present:
if i<6:
bitPosition = writeScalingList(buf, bitPosition, 16, matrix)
else:
bitPosition = writeScalingList(buf, bitPosition, 64, matrix)
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.log2_max_frame_num_minus4)
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.pic_order_cnt_type)
if self.pic_order_cnt_type == 0:
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.log2_max_pic_order_cnt_lsb_minus4)
elif self.pic_order_cnt_type == 1:
bitPosition = writeBoolean(buf, bitPosition, self.delta_pic_order_always_zero_flag)
bitPosition = writeSignedExpGolomb(buf, bitPosition, self.offset_for_non_ref_pic)
bitPosition = writeSignedExpGolomb(buf, bitPosition, self.offset_for_top_to_bottom_field)
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.num_ref_frames_in_pic_order_cnt_cycle)
for i in range(0, self.num_ref_frames_in_pic_order_cnt_cycle):
v = self.offset_for_ref_frame[i]
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, v)
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.max_num_ref_frames)
bitPosition = writeBoolean(buf, bitPosition, self.gaps_in_frame_num_value_allowed_flag)
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.pic_width_in_mbs_minus1)
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.pic_height_in_map_units_minus1)
bitPosition = writeBoolean(buf, bitPosition, self.frame_mbs_only_flag)
if not self.frame_mbs_only_flag:
bitPosition = writeBoolean(buf, bitPosition, self.mb_adaptive_frame_field_flag)
bitPosition = writeBoolean(buf, bitPosition, self.direct_8x8_inference_flag)
bitPosition = writeBoolean(buf, bitPosition, self.frame_cropping_flag)
if self.frame_cropping_flag:
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.frame_crop_left_offset)
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.frame_crop_right_offset)
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.frame_crop_top_offset)
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.frame_crop_bottom_offset)
bitPosition = writeBoolean(buf, bitPosition, self.vui_parameters_present_flag)
if self.vui_parameters_present_flag:
logger.debug('SPS has VUI. Writing VUI at position: %d', bitPosition)
bitPosition = self.vui.toBytes(buf, bitPosition)
logger.debug('VUI written. New bit position: %d', bitPosition)
bitPosition = writeRBSPTrailingBits(buf, bitPosition)
return buf
@dataclass
class PPS:
pic_parameter_set_id:int=0
seq_parameter_set_id:int=0
entropy_coding_mode_flag:bool=False
bottom_field_pic_order_in_frame_present_flag:bool=False
num_slice_groups_minus1:int=0
slice_group_map_type:int=0
run_length_minus1:dict = field(default_factory=dict)
top_left:dict = field(default_factory=dict)
bottom_right:dict = field(default_factory=dict)
slice_group_change_direction_flag:bool=False
slice_group_change_rate_minus1:int=0
pic_size_in_map_units_minus1:int=0
slice_group_id:dict = field(default_factory=dict)
num_ref_idx_l0_default_active_minus1:int=0
num_ref_idx_l1_default_active_minus1:int=0
weighted_pred_flag:bool=False
weighted_bipred_idc:int=0
pic_init_qp_minus26:int=0
pic_init_qs_minus26:int=0
chroma_qp_index_offset:int=0
deblocking_filter_control_present_flag:bool=False
constrained_intra_pred_flag:bool=False
redundant_pic_cnt_present_flag:bool=False
transform_8x8_mode_flag:bool=False
pic_scaling_matrix_present_flag:bool=False
pic_scaling_list:list[list[int]] = field(default_factory=list)
second_chroma_qp_index_offset:int=0
def __init__(self):
self.run_length_minus1={}
self.top_left={}
self.bottom_right={}
self.slice_group_id={}
self.pic_scaling_list=[]
# PPS are located at byte boundary
def fromBytes(self, buf, chroma_format_idc):
logger = logging.getLogger(__name__)
logger.debug('Parsing: %s' % (hexdump.dump(buf,sep=':')))
bitPosition=0
# NAL Unit PPS
bitPosition, zero = readBit(buf, bitPosition)
if zero != 0:
raise(Exception('Reserved bit is not equal to 0: %d' % zero ))
bitPosition, nal_ref_idc = readBits(buf, bitPosition,2)
if nal_ref_idc != 3:
raise(Exception('NAL ref idc is not equal to 3: %d' % nal_ref_idc ))
bitPosition, nal_unit_type = readBits(buf, bitPosition,5)
if nal_unit_type != 8:
raise(Exception('NAL unit type is not a PPS: %d' % nal_unit_type ))
bitPosition, self.pic_parameter_set_id = readUnsignedExpGolomb(buf, bitPosition)
bitPosition, self.seq_parameter_set_id = readUnsignedExpGolomb(buf, bitPosition)
bitPosition, self.entropy_coding_mode_flag = readBoolean(buf, bitPosition)
bitPosition, self.bottom_field_pic_order_in_frame_present_flag = readBoolean(buf, bitPosition)
bitPosition, self.num_slice_groups_minus1 = readUnsignedExpGolomb(buf, bitPosition)
if self.num_slice_groups_minus1>0:
bitPosition, self.slice_group_map_type = readUnsignedExpGolomb(buf, bitPosition)
if self.slice_group_map_type == 0:
for i in range(0, self.num_slice_groups_minus1):
bitPosition, v = readUnsignedExpGolomb(buf, bitPosition)
self.run_length_minus1[i]=v
elif self.slice_group_map_type == 2:
for i in range(0, self.num_slice_groups_minus1):
bitPosition, v = readUnsignedExpGolomb(buf, bitPosition)
self.top_left[i] = v
bitPosition, v = readUnsignedExpGolomb(buf, bitPosition)
self.bottom_right[i] = v
elif self.slice_group_map_type in [3,4,5]:
bitPosition, self.slice_group_change_direction_flag = readBoolean(buf, bitPosition)
bitPosition, self.slice_group_change_rate_minus1 = readUnsignedExpGolomb(buf, bitPosition)
elif self.slice_group_map_type == 6:
bitPosition, self.pic_size_in_map_units_minus1 = readUnsignedExpGolomb(buf, bitPosition)
l = ceil(log(self.num_slice_groups_minus1+1))
for i in range(0, self.pic_size_in_map_units_minus1):
bitPosition, v = readBits(buf, bitPosition, l)
self.slice_group_id[i]=v
bitPosition, self.num_ref_idx_l0_default_active_minus1 = readUnsignedExpGolomb(buf, bitPosition)
bitPosition, self.num_ref_idx_l2_default_active_minus1 = readUnsignedExpGolomb(buf, bitPosition)
bitPosition, self.weighted_pred_flag = readBoolean(buf, bitPosition)
bitPosition, self.weighted_bipred_idc = readBits(buf, bitPosition, 2)
bitPosition, self.pic_init_qp_minus26 = readSignedExpGolomb(buf, bitPosition)
bitPosition, self.pic_init_qs_minus26 = readSignedExpGolomb(buf, bitPosition)
bitPosition, self.chroma_qp_index_offset = readSignedExpGolomb(buf, bitPosition)
bitPosition, self.deblocking_filter_control_present_flag = readBoolean(buf, bitPosition)
bitPosition, self.constrained_intra_pred_flag = readBoolean(buf, bitPosition)
bitPosition, self.redundant_pic_cnt_present_flag = readBoolean(buf, bitPosition)
if moreRBSPData(buf, bitPosition):
bitPosition, self.transform_8x8_mode_flag = readBoolean(buf, bitPosition)
bitPosition, self.pic_scaling_matrix_present_flag = readBoolean(buf, bitPosition)
if self.pic_scaling_matrix_present_flag:
nbMatrices = 6 if chroma_format_idc == 3 else 2
if self.transform_8x8_mode_flag:
nbMatrices+=6
else:
nbMatrices = 6
for i in range(0, nbMatrices):
bitPosition, present = readBoolean(buf, bitPosition)
if present:
if i<6:
bitPosition, matrix = parseScalingList(buf, bitPosition, 16)
self.pic_scaling_list.append(matrix)
else:
bitPosition, matrix = parseScalingList(buf, bitPosition, 64)
self.pic_scaling_list.append(matrix)
else:
self.pic_scaling_list.append([])
bitPosition, self.second_chroma_qp_index_offset = readSignedExpGolomb(buf, bitPosition)
logger.info("parse RBSP")
bitPosition = parseRBSPTrailingBits(buf, bitPosition)
return bitPosition
def toBytes(self, chroma_format_idc):
logger = logging.getLogger(__name__)
buf = bytearray()
bitPosition = 0
# NAL Unit PPS
bitPosition = writeBit(buf, bitPosition, 0)
bitPosition = writeBits(buf, bitPosition, 3, 2)
bitPosition = writeBits(buf, bitPosition, 8, 5)
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.pic_parameter_set_id)
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.seq_parameter_set_id)
bitPosition = writeBoolean(buf, bitPosition, self.entropy_coding_mode_flag)
bitPosition = writeBoolean(buf, bitPosition, self.bottom_field_pic_order_in_frame_present_flag)
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.num_slice_groups_minus1)
if self.num_slice_groups_minus1>0:
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.slice_group_map_type)
if self.slice_group_map_type == 0:
for i in range(0, self.num_slice_groups_minus1):
v = self.run_length_minus1[i]
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, v)
elif self.slice_group_map_type == 2:
for i in range(0, self.num_slice_groups_minus1):
v = self.top_left[i]
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, v)
v = self.bottom_right[i]
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, v)
elif self.slice_group_map_type in [3,4,5]:
bitPosition = writeBoolean(buf, bitPosition, self.slice_group_change_direction_flag)
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.slice_group_change_rate_minus1)
elif self.slice_group_map_type == 6:
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.pic_size_in_map_units_minus1)
l = ceil(log(self.num_slice_groups_minus1+1))
for i in range(0, self.pic_size_in_map_units_minus1):
v = self.slice_group_id[i]
bitPosition, v = writeBits(buf, bitPosition, v, l)
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.num_ref_idx_l0_default_active_minus1)
bitPosition = writeUnsignedExpGolomb(buf, bitPosition, self.num_ref_idx_l2_default_active_minus1)
bitPosition = writeBoolean(buf, bitPosition, self.weighted_pred_flag)
bitPosition = writeBits(buf, bitPosition, self.weighted_bipred_idc, 2)
bitPosition = writeSignedExpGolomb(buf, bitPosition, self.pic_init_qp_minus26)
bitPosition = writeSignedExpGolomb(buf, bitPosition, self.pic_init_qs_minus26)
bitPosition = writeSignedExpGolomb(buf, bitPosition, self.chroma_qp_index_offset)
bitPosition = writeBoolean(buf, bitPosition, self.deblocking_filter_control_present_flag)
bitPosition = writeBoolean(buf, bitPosition, self.constrained_intra_pred_flag)
bitPosition = writeBoolean(buf, bitPosition, self.redundant_pic_cnt_present_flag)
bitPosition = writeBoolean(buf, bitPosition, self.transform_8x8_mode_flag)
bitPosition = writeBoolean(buf, bitPosition, self.pic_scaling_matrix_present_flag)
if self.pic_scaling_matrix_present_flag:
nbMatrices = 6 if chroma_format_idc == 3 else 2
if self.transform_8x8_mode_flag:
nbMatrices+=6
else:
nbMatrices = 6
for i in range(0, nbMatrices):
matrix = self.pic_scaling_list[i]
logger.info("Retrieved pic scaling matrix: %s %d" % (matrix, len(matrix)))
present = (len(matrix)!=0)
logger.info("Matrix is present: %s" % present)
bitPosition = writeBoolean(buf, bitPosition, present)
if present:
if i<6:
logger.info("Writing matrix: %s" % matrix)
bitPosition = writeScalingList(buf, bitPosition, 16, matrix)
else:
logger.info("Writing matrix: %s" % matrix)
bitPosition = writeScalingList(buf, bitPosition, 64, matrix)
bitPosition = writeSignedExpGolomb(buf, bitPosition, self.second_chroma_qp_index_offset)
bitPosition = writeRBSPTrailingBits(buf, bitPosition)
return buf
@dataclass
class AVCDecoderConfiguration:
configurationVersion:int=1 # u(8)
AVCProfileIndication:int=0 # u(8)
profile_compatibility:int=0 # u(8)
AVCLevelIndication:int=0 # u(8)
lengthSizeMinusOne:int=0 # u(2) (0,1 or 3)
numOfSequenceParameterSets:int=0 # u(5)
sps:dict = field(default_factory=dict)
numOfPictureParameterSets:int=0 #u(8)
pps:dict = field(default_factory=dict)
chroma_format:int=0 # u(2)
bit_depth_luma_minus8:int=0 # u(3)
bit_depth_chroma_minus8:int=0 # u(3)
numOfSequenceParameterSetExt:int=0 # u(8)
spsext:dict = field(default_factory=dict)
def __init__(self):
self.sps = {}
self.spsext = {}
self.pps = {}
def fromBytes(self, buf):
logger = logging.getLogger(__name__)
logger.debug('Parsing: %s' % (hexdump.dump(buf,sep=':')))
bitPosition = 0
bitPosition, self.configurationVersion = readByte(buf, bitPosition)
bitPosition, self.AVCProfileIndication = readByte(buf, bitPosition)
bitPosition, self.profile_compatibility = readByte(buf, bitPosition)
bitPosition, self.AVCLevelIndication = readByte(buf, bitPosition)
bitPosition, v = readBits(buf, bitPosition, 6)
if v != 0b111111:
raise(Exception('Reserved bits are not equal to 0b111111: %x' % v ))
bitPosition, self.lengthSizeMinusOne = readBits(buf, bitPosition, 2)
bitPosition, v = readBits(buf, bitPosition, 3)
if v != 0b111:
raise(Exception('Reserved bits are not equal to 0b111: %x' % v))
bitPosition, self.numOfSequenceParameterSets= readBits(buf, bitPosition, 5)
logger.debug('Number of SPS: %d' % self.numOfSequenceParameterSets)
for i in range(0,self.numOfSequenceParameterSets):
bitPosition, length = readWord(buf, bitPosition)
if bitPosition % 8 != 0:
raise(Exception('SPS is not located at a byte boundary: %d' % bitPosition ))
sps = SPS()
sodb = RBSP2SODB(buf[floor(bitPosition/8):])
bitLength = sps.fromBytes(sodb)
spsid = sps.seq_parameter_set_id
self.sps[spsid] = sps
parsedLength = floor(bitLength/8)
logger.debug('Expected length of SPS: %d bytes. Parsed: %d bytes' % (length, parsedLength))
# Parse length can be shorter than length because of rewriting from RBSP to SODB (that is shorter).
# So we advance of indicated length.
bitPosition+=length*8
logger.debug('Bit position:%d. Reading one byte of: %s' % (bitPosition, hexdump.dump(buf[floor(bitPosition/8):], sep=':')))
bitPosition, self.numOfPictureParameterSets = readByte(buf, bitPosition)
logger.debug('Number of PPS: %d' % self.numOfPictureParameterSets)
for i in range(0,self.numOfPictureParameterSets):
bitPosition, length = readWord(buf, bitPosition)
if bitPosition % 8 != 0:
raise(Exception('PPS is not located at a byte boundary: %d' % bitPosition ))
pps = PPS()
sodb = RBSP2SODB(buf[floor(bitPosition/8):])
bitLength = pps.fromBytes(sodb, self.chroma_format)
ppsid = pps.pic_parameter_set_id
self.pps[ppsid] = pps
parsedLength = floor(bitLength/8)
logger.debug('Expected length of PPS: %d bytes. Parsed: %d bytes' % (length, parsedLength))
# Parse length can be shorter than length because of rewriting from RBSP to SODB (that is shorter).
# So we advance of indicated length.
bitPosition+=length*8
logger.debug('Remaining bits: %s' % hexdump.dump(buf[floor(bitPosition/8):]))
if self.AVCProfileIndication in [100, 110, 122, 144]:
bitPosition, reserved = readBits(buf, bitPosition, 6)
if reserved != 0b111111:
raise(Exception('Reserved bits are different from 111111: %x' % reserved))
bitPosition, self.chroma_format = readBits(buf, bitPosition, 2)
bitPosition, reserved = readBits(buf, bitPosition, 5)
if reserved != 0b11111:
raise(Exception('Reserved bits are different from 11111: %x' % reserved))
bitPosition, self.bit_depth_luma_minus8 = readBits(buf, bitPosition, 3)
bitPosition, reserved = readBits(buf, bitPosition, 5)
if reserved != 0b11111:
raise(Exception('Reserved bits are different from 11111: %x' % reserved))
bitPosition, self.bit_depth_chroma_minus8 = readBits(buf, bitPosition, 3)
bitPosition, self.numOfSequenceParameterSetExt = readByte(buf, bitPosition)
for i in range(0, self.numOfSequenceParameterSetExt):
# TODO: parse SPSextended
logger.error('Parsing of SPS extended not yet implemented !')
pass
def toBytes(self):
logger = logging.getLogger(__name__)
buf = bytearray()
bitPosition = 0
bitPosition = writeByte(buf, bitPosition, self.configurationVersion)
bitPosition = writeByte(buf, bitPosition, self.AVCProfileIndication)
bitPosition = writeByte(buf, bitPosition, self.profile_compatibility)
bitPosition = writeByte(buf, bitPosition, self.AVCLevelIndication)
bitPosition = writeBits(buf, bitPosition, 0b111111, 6)
bitPosition = writeBits(buf, bitPosition, self.lengthSizeMinusOne, 2)
bitPosition = writeBits(buf, bitPosition, 0b111, 3)
bitPosition = writeBits(buf, bitPosition, self.numOfSequenceParameterSets, 5)
for spsid in self.sps:
sps = self.sps[spsid]
sodb = sps.toBytes()
sodbLength = len(sodb)
rbsp = SODB2RBSP(sodb)
rbspLength = len(rbsp)
logger.debug('SODB length: %d RBSP length:%d' % (sodbLength, rbspLength))
bitPosition = writeWord(buf, bitPosition, rbspLength)
buf.extend(rbsp)
bitPosition+=rbspLength*8
logger.debug('2. Buffer: %s' % hexdump.dump(buf, sep=':'))
bitPosition = writeByte(buf, bitPosition, self.numOfPictureParameterSets)
for ppsid in self.pps:
logger.debug('Writing PPS: %d' % ppsid)
pps = self.pps[ppsid]
# TODO: does chroma_format should come from self ?
sodb = pps.toBytes(self.chroma_format)
sodbLength = len(sodb)
rbsp = SODB2RBSP(sodb)
rbspLength = len(rbsp)
logger.debug('SODB length: %d RBSP length:%d' % (sodbLength, rbspLength))
bitPosition = writeWord(buf, bitPosition, rbspLength)
buf.extend(rbsp)
bitPosition+=rbspLength*8
if self.AVCProfileIndication in [ 100, 110, 122, 144]:
bitPosition = writeBits(buf, bitPosition, 0b111111, 6)
bitPosition = writeBits(buf, bitPosition, self.chroma_format, 2)
bitPosition = writeBits(buf, bitPosition, 0b11111, 5)
bitPosition = writeBits(buf, bitPosition, self.bit_depth_luma_minus8, 3)
bitPosition = writeBits(buf, bitPosition, 0b11111, 5)
bitPosition = writeBits(buf, bitPosition, self.bit_depth_chroma_minus8, 3)
bitPosition = writeByte(buf, bitPosition, self.numOfSequenceParameterSetExt)
for i in range(0, self.numOfSequenceParameterSetExt):
# TODO: dump SPSextended
logger.error('Dumping SPS extended not yet implemented')
pass
return buf
def merge(self, config):
# Check config compatibility
if self.configurationVersion != config.configurationVersion:
raise(Exception('Configuration versions are different: %d vs %s' % (self.configurationVersion, config.configurationVersion)))
if self.AVCProfileIndication != config.AVCProfileIndication:
raise(Exception('AVC profiles are different: %d vs %s' % (self.AVCProfileIndication, config.AVCProfileIndication)))
if self.profile_compatibility != config.profile_compatibility:
raise(Exception('Profile compatilities are different: %d vs %s' % (self.profile_compatibility, config.profile_compatibility)))
if self.AVCLevelIndication != config.AVCLevelIndication:
raise(Exception('Level indications are different: %d vs %s' % (self.AVCLevelIndication, config.AVCLevelIndication)))
if self.lengthSizeMinusOne != config.lengthSizeMinusOne:
raise(Exception('Length units are different: %d vs %s' % (self.lengthSizeMinusOne, config.lengthSizeMinusOne)))
if self.chroma_format != config.chroma_format:
raise(Exception('Colour format are different: %d vs %s' % (self.chroma_format, config.chroma_format)))
if self.bit_depth_luma_minus8 != config.bit_depth_luma_minus8:
raise(Exception('Depth of luminance are different: %d vs %s' % (self.bit_depth_luma_minus8, config.bit_depth_luma_minus8)))
if self.bit_depth_chroma_minus8 != config.bit_depth_chroma_minus8:
raise(Exception('Depth of chromaticity are different: %d vs %s' % (self.bit_depth_chroma_minus8, config.bit_depth_luma_minus8)))
for spsid in config.sps:
sps = config.sps[spsid]
if spsid in self.sps:
localsps = self.sps[spsid]
if sps!=localsps:
raise(Exception('Profile are not compatible. They contain two different SPS with the same identifier (%d): %s\n%s\n' % (spsid, localsps, sps)))
self.sps[spsid] = sps
self.numOfSequenceParameterSets = len(self.sps)
for ppsid in config.pps:
pps = config.pps[ppsid]
if ppsid in self.pps:
localpps = self.pps[ppsid]
if pps!=localpps:
raise(Exception('Profile are not compatible. They contain two different PPS with the same identifier (%d): %s\n%s\n' % (ppsid, localpps, pps)))
self.pps[ppsid] = pps
self.numOfPictureParameterSets = len(self.pps)
# TODO: do the same with extended SPS !
def parseCodecPrivate(codecPrivateData):
if codecPrivateData[0] != 0x63:
raise(Exception('Matroska header is wrong: %x' % codecPrivateData[0]))
if codecPrivateData[1] != 0xA2:
raise(Exception('Matroska header is wrong: %x' % codecPrivateData[1]))
length = codecPrivateData[2]
if length == 0:
raise(Exception('Matroska length cannot start with zero byte.'))
for nbZeroes in range(0,8):
b = readBit(codecPrivateData[2:], nbZeroes)
if b != 0:
break
mask = 2^(7-nbZeroes)-1
length = codecPrivateData[2] and mask
for i in range(0, nbZeroes):
length*=256
length+=(codecPrivateData[3+i])
bytePosition = 3+nbZeroes
avcconfig = AVCDecoderConfiguration()
avcconfig.fromBytes(codecPrivateData[bytePosition:])
return avcconfig
def getAvcConfigFromH264(inputFile):
logger = logging.getLogger(__name__)
# TODO: improve this ...
rbsp = inputFile.read(1000)
sodb = RBSP2SODB(rbsp)
bitPosition = 0
bitPosition, startCode = readLong(sodb, bitPosition)
if startCode != 1:
raise(Exception('Starting code not detected: %x' % startCode))
sps = SPS()
bitLength = sps.fromBytes(sodb[4:])
bitPosition+=bitLength
bitPosition, startCode = readLong(sodb, bitPosition)
if startCode != 1:
raise(Exception('Starting code not detected: %x' % startCode))
pps = PPS()
bitLength = pps.fromBytes(sodb[floor(bitPosition/8):], sps.chroma_format_idc)
logger.debug(pps)
avcconfig = AVCDecoderConfiguration()
avcconfig.configurationVersion = 1
avcconfig.AVCProfileIndication = sps.profile_idc
avcconfig.profile_compatibility = 0
avcconfig.AVCLevelIndication = sps.level_idc
avcconfig.lengthSizeMinusOne = 3
avcconfig.numOfSequenceParameterSets = 1
avcconfig.numOfPictureParameterSets = 1
avcconfig.numOfSequenceParameterSetExt = 0
avcconfig.chroma_format = sps.chroma_format_idc
avcconfig.bit_depth_chroma_minus8 = sps.bit_depth_chroma_minus8
avcconfig.bit_depth_luma_minus8 = sps.bit_depth_luma_minus8
avcconfig.sps[sps.seq_parameter_set_id] = sps
avcconfig.pps[pps.pic_parameter_set_id] = pps
return avcconfig
def getCodecPrivateDataFromH264(inputFile):
logger = logging.getLogger(__name__)
avcconfig = getAvcConfigFromH264(inputFile)
res = dumpCodecPrivateData(avcconfig)
return res
def parseMKVTree(mkvinfo, inputFile):
logger = logging.getLogger(__name__)
infd = inputFile.fileno()
lseek(infd, 0, SEEK_SET)
set_inheritable(infd, True)
env = {**os.environ, 'LANG': 'C'}
elements = {}
with Popen([mkvinfo, '-z', '-X', '-P', '/proc/self/fd/%d' % infd ], stdout=PIPE, close_fds=False, env=env) as mkvinfo:
out, _ = mkvinfo.communicate()
out = out.decode('utf8')
prefix = []
regExp = r"(^(?P<root>\+)|(\|(?P<depth>[ ]*\+))).*at (?P<position>[0-9]+) size (?P<size>[0-9]+).*$"
p = re.compile(regExp)
prevDepth = -1
for line in out.splitlines():
m = p.match(line)
if m is None:
logger.error("Impossible to match line: %s" % line)
else:
position = int(m.group('position'))
size = int(m.group('size'))
root = (m.group('root')!=None)
if root:
depth = 0
else:
depth = len(m.group('depth'))
if depth > prevDepth:
for i in range(depth-prevDepth):
prefix.append(1)
elif depth == prevDepth:
subid = prefix[-1]
subid+=1
prefix.pop()
prefix.append(subid)
else:
for i in range(prevDepth-depth):
prefix.pop()
subid = prefix[-1]
subid+=1
prefix.pop()
prefix.append(subid)
prevDepth = depth
key=".".join(map(str, prefix))
elements[key] = (position, size)
mkvinfo.wait()
return elements
# MKV is formatted as an EBML file (Extended Binary Markup Langage).
# cf http://matroska-org.github.io/libebml/specs.html
# It is a Type, Length, Value (TLV) kind of binary file.
# Types are encoded as follows:
# 1xxx xxxx - Class A IDs (2^7 -1 possible values)
# 01xx xxxx xxxx xxxx - Class B IDs (2^14-1 possible values)
# 001x xxxx xxxx xxxx xxxx xxxx - Class C IDs (2^21-1 possible values)
# 0001 xxxx xxxx xxxx xxxx xxxx xxxx xxxx - Class D IDs (2^28-1 possible values)
# Lengths are encoded as follows:
# 1xxx xxxx - value 0 to 2^7-2
# 01xx xxxx xxxx xxxx - value 0 to 2^14-2
# 001x xxxx xxxx xxxx xxxx xxxx - value 0 to 2^21-2
# 0001 xxxx xxxx xxxx xxxx xxxx xxxx xxxx - value 0 to 2^28-2
# 0000 1xxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx - value 0 to 2^35-2
# 0000 01xx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx - value 0 to 2^42-2
# 0000 001x xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx - value 0 to 2^49-2
# 0000 0001 xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx - value 0 to 2^56-2
def getEBMLLength(length):
logger = logging.getLogger(__name__)
if (0 <= length) and (length <= 2**7-2):
size = 1
elif length <= 2**14-2:
size = 2
elif length <= 2**21-2:
size = 3
elif length <= 2**28-2:
size = 4
elif length <= 2**35-2:
size = 5
elif length <= 2**42-2:
size = 6
elif length <= 2**49-2:
size = 7
elif length <= 2**56-2:
size = 8
elif length < 0:
logger.error('Impossible to encode a negative length with EBML.')
return None
else:
logger.error('Impossible to encode a length larger than 2^56-2 with EBML.')
return None
encodedLength = length + ((128>>(size-1))<<((size-1)*8))
res = (encodedLength).to_bytes(size, byteorder='big')
return res
def dumpCodecPrivateData(AVCDecoderConfiguration):
logger = logging.getLogger(__name__)
# Rebuild a Matroska Codec Private Element
res = bytearray()
# Code private element
res.extend(b'\x63\xA2')
buf = AVCDecoderConfiguration.toBytes()
logger.debug('AVC configuration bitstream: %s (length: %d))' % (hexdump.dump(buf, sep=':'), len(buf)))
EMBLlength = getEBMLLength(len(buf))
logger.debug('EMBL encoded length: %s' % (hexdump.dump(EMBLlength, sep=':')))
res.extend(EMBLlength)
res.extend(buf)
return res
def changeEBMLElementSize(inputFile, position, addendum):
logger = logging.getLogger(__name__)
initialPosition = position
infd = inputFile.fileno()
lseek(infd, position, SEEK_SET)
buf = read(infd, 1)
elementType = int.from_bytes(buf, byteorder='big')
mask=128
found = False
for i in range(1,5):
if elementType&mask:
typeSize = i
found = True
break
else:
mask = mask>>1
if not found:
logger.error('Size of element type cannot be determined: %d', elementType)
exit(-1)
# We seek to size
position+=typeSize
lseek(infd, position, SEEK_SET)
buf = read(infd, 1)
sizeHead = int.from_bytes(buf, byteorder='big')
logger.info('First byte of size: %x' % sizeHead)
mask=128
found = False
for i in range(1,9):
if sizeHead&mask:
sizeOfDataSize = i
found = True
break
else:
mask = mask>>1
if not found:
logger.error('Size of data size cannot be determined: %d', sizeHead)
exit(-1)
else:
logger.info('Size of data size: %d.', sizeOfDataSize)
lseek(infd, position, SEEK_SET)
oldSizeBuf = read(infd, sizeOfDataSize)
maxSize = 2**(sizeOfDataSize*7)-2
sizeOfData = int.from_bytes(oldSizeBuf, byteorder='big')
logger.info('Size of data with mask: %x mask: %d.' % (sizeOfData, mask))
sizeOfData-= (mask<<((sizeOfDataSize-1)*8))
logger.info('Found element at position: %d, size of type: %d size of data: %d maximal size: %d.', initialPosition, typeSize, sizeOfData, maxSize)
newSize = sizeOfData+addendum
delta = 0
if newSize > maxSize:
# TODO: Test this code ...
newEncodedSize = getEBMLLength(newSize)
sizeOfNewEncodedSize = len(newEncodedSize)
if sizeOfNewEncodedSize <= sizeOfDataSize:
logger.error('New encoded size is smaller (%d) or equal than previous size (%d). This should not happen.' , sizeOfNewEncodedSize, sizeOfDataSize)
exit(-1)
# The difference of length between old size field and new one.
delta = sizeOfNewEncodedSize - sizeOfDataSize
fileLength = fstat(infd).st_size
# We seek after actual length field
lseek(infd, position+sizeOfDataSize, SEEK_SET)
# We read the rest of file
tail = read(infd, fileLength-(position+sizeOfDataSize))
# We increase file length
ftruncate(infd, fileLength+delta)
# We go to the beginning of length field
lseek(infd, position, SEEK_SET)
# We write the new length field
write(infd, newEncodedSize)
# We overwrite the rest of file with its previous content that has been offset.
write(infd, tail)
else:
size = newSize + ((128>>(sizeOfDataSize-1))<<((sizeOfDataSize-1)*8))
newSizeBuf = (size).to_bytes(sizeOfDataSize, byteorder='big')
logger.info('Old encoded size: %s New encoded size: %s' % (hexdump.dump(oldSizeBuf,sep=':'), hexdump.dump(newSizeBuf, sep=':')))
lseek(infd, position, SEEK_SET)
write(infd, newSizeBuf)
# We return the potential increase in size of the file if the length field had to be increased.
return delta
def changeCodecPrivateData(mkvinfo, inputFile, codecData):
logger = logging.getLogger(__name__)
infd = inputFile.fileno()
lseek(infd, 0, SEEK_SET)
currentLength = fstat(infd).st_size
logger.info('Current size of file: %d' % currentLength)
position, currentData = getCodecPrivateDataFromMKV(mkvinfo, inputFile)
currentDataLength = len(currentData)
futureLength = currentLength - currentDataLength + len(codecData)
logger.info('Expected size of file: %d' % futureLength)
logger.info('Current data at position %d: %s' % (position, hexdump.dump(currentData, sep=":")))
logger.info('Future data: %s' % hexdump.dump(codecData, sep=":"))
elements = parseMKVTree(mkvinfo, inputFile)
found = False
for key in elements:
pos, size = elements[key]
if pos == position:
logger.info('Codec private data key: %s' % key)
found = True
break
if not found:
logger.error('Impossible to retrieve the key of codec private data')
exit(-1)
if currentLength < futureLength:
lseek(infd, position+currentDataLength, SEEK_SET)
tail = read(infd, currentLength-(position+currentDataLength))
# We extend the file at the end with zeroes
ftruncate(infd, futureLength)
lseek(infd, position+len(codecData), SEEK_SET)
write(infd, tail)
lseek(infd, position, SEEK_SET)
write(infd, codecData)
elif currentLength == futureLength:
# Almost nothing to do except overwriting old private codec data with new ones.
lseek(infd, position, SEEK_SET)
write(infd, codecData)
else:
lseek(infd, position+currentDataLength, SEEK_SET)
tail = read(infd, currentLength-(position+currentDataLength))
lseek(infd, position+len(codecData), SEEK_SET)
write(infd, tail)
lseek(infd, position, SEEK_SET)
write(infd, codecData)
# We reduce the length of file.
ftruncate(infd, futureLength)
# We have to modify the tree elements up to the root that contains the codec private data.
keys = key.split('.')
logger.info(keys)
delta = futureLength-currentLength
# if there is no modification of the private codec data, no need to change anything.
if delta != 0:
for i in range(0, len(keys)-1):
keys.pop()
key=".".join(map(str, keys))
pos, size = elements[key]
logger.info('Trying to fix element with key: %s at position: %d with actual size: %d.' % (key, pos, size))
# Changing an element can increase its size (in very rare case).
# In that case, we update the new delta that will be larger (because the element has been resized).
delta+=changeEBMLElementSize(inputFile, pos, delta)
def getFormat(ffprobe, inputFile):
logger = logging.getLogger(__name__)
infd = inputFile.fileno()
lseek(infd, 0, SEEK_SET)
set_inheritable(infd, True)
with Popen([ffprobe, '-loglevel', 'quiet', '-show_format', '-of', 'json', '-i', '/proc/self/fd/%d' % infd], stdout=PIPE, close_fds=False) as ffprobe:
out, _ = ffprobe.communicate()
out = json.load(BytesIO(out))
if 'format' in out:
return out['format']
else:
logger.error('Impossible to retrieve format of file')
return None
def getMovieDuration(ffprobe, inputFile):
logger = logging.getLogger(__name__)
infd = inputFile.fileno()
lseek(infd, 0, SEEK_SET)
set_inheritable(infd, True)
with Popen([ffprobe, '-loglevel', 'quiet', '-show_format', '-of', 'json', '-i', '/proc/self/fd/%d' % infd], stdout=PIPE, close_fds=False) as ffprobe:
out, _ = ffprobe.communicate()
out = json.load(BytesIO(out))
if 'format' in out and 'duration' in out['format']:
duration = floor(float(out['format']['duration']))
ts = timedelta(seconds=duration)
return ts
else:
logger.error('Impossible to retrieve duration of movie')
return None
# ffprobe -loglevel quiet -select_streams v:0 -show_entries stream=width,height -of json ./example.ts
def getVideoDimensions(ffprobe, inputFile):
logger = logging.getLogger(__name__)
infd = inputFile.fileno()
lseek(infd, 0, SEEK_SET)
set_inheritable(infd, True)
with Popen([ffprobe, '-loglevel', 'quiet', '-select_streams', 'v:0', '-show_entries', 'stream=width,height', '-of', 'json', '-i', '/proc/self/fd/%d' % infd], stdout=PIPE, close_fds=False) as ffprobe:
out, _ = ffprobe.communicate()
out = json.load(BytesIO(out))
if 'streams' in out:
video = out['streams'][0]
if ('width' in video) and ('height' in video):
return int(video['width']), int(video['height'])
logger.error('Impossible to retrieve dimensions of video')
exit(-1)
def getStreams(ffprobe, inputFile):
logger = logging.getLogger(__name__)
infd = inputFile.fileno()
lseek(infd, 0, SEEK_SET)
set_inheritable(infd, True)
with Popen([ffprobe, '-loglevel', 'quiet', '-show_streams', '-of', 'json', '-i', '/proc/self/fd/%d' % infd], stdout=PIPE, close_fds=False) as ffprobe:
out, _ = ffprobe.communicate()
out = json.load(BytesIO(out))
if 'streams' in out:
return out['streams']
else:
logger.error('Impossible to retrieve streams inside file')
return None
def withSubtitles(ffprobe, inputFile):
logger = logging.getLogger(__name__)
infd = inputFile.fileno()
lseek(infd, 0, SEEK_SET)
set_inheritable(infd, True)
with Popen([ffprobe, '-loglevel', 'quiet', '-show_streams', '-of', 'json', '-i', '/proc/self/fd/%d' % infd], stdout=PIPE, close_fds=False) as ffprobe:
out, _ = ffprobe.communicate()
out = json.load(BytesIO(out))
if 'streams' in out:
streams = out['streams']
for stream in streams:
if 'codec_type' in stream and stream['codec_type'] == 'subtitle':
return True
else:
logger.error('Impossible to retrieve streams inside file')
return False
def parseTimestamp(ts):
logger = logging.getLogger(__name__)
tsRegExp = r'^(?P<hour>[0-9]{1,2}):(?P<minute>[0-9]{1,2}):(?P<second>[0-9]{1,2})(\.(?P<us>[0-9]{1,6}))?$'
p = re.compile(tsRegExp)
m = p.match(ts)
if m is None:
logger.warning("Impossible to parse timestamp: %s" % ts)
return None
values = m.groupdict()
hour = 0
minute = 0
second = 0
us = 0
if values['hour'] is not None:
hour = int(values['hour'])
if values['minute'] is not None:
minute = int(values['minute'])
if values['second'] is not None:
second = int(values['second'])
if values['us'] is not None:
us = int(values['us'])
if hour < 0 or hour > 23:
logger.error("hour must be in [0,24[")
return None
if minute < 0 or minute > 59:
logger.error("minute must be in [0,60[")
return None
if second < 0 or second > 59:
logger.error("second must be in [0,60[")
return None
if us < 0 or us > 1000000:
logger.error("milliseconds must be in [0,1000000[")
return None
ts = timedelta(hours=hour, minutes=minute, seconds=second, microseconds=us)
return ts
def parseTimeInterval(interval):
logger = logging.getLogger(__name__)
intervalRegExp = r'^(?P<hour1>[0-9]{1,2}):(?P<minute1>[0-9]{1,2}):(?P<second1>[0-9]{1,2})(\.(?P<ms1>[0-9]{1,3}))?-(?P<hour2>[0-9]{1,2}):(?P<minute2>[0-9]{1,2}):(?P<second2>[0-9]{1,2})(\.(?P<ms2>[0-9]{1,3}))?$'
p = re.compile(intervalRegExp)
m = p.match(interval)
if m is None:
logger.error("Impossible to parse time interval")
return None
values = m.groupdict()
hour1 = 0
minute1 = 0
second1 = 0
ms1 = 0
hour2 = 0
minute2 = 0
second2 = 0
ms2 = 0
if values['hour1'] is not None:
hour1 = int(values['hour1'])
if values['minute1'] is not None:
minute1 = int(values['minute1'])
if values['second1'] is not None:
second1 = int(values['second1'])
if values['ms1'] is not None:
ms1 = int(values['ms1'])
if values['hour2'] is not None:
hour2 = int(values['hour2'])
if values['minute2'] is not None:
minute2 = int(values['minute2'])
if values['second2'] is not None:
second2 = int(values['second2'])
if values['ms2'] is not None:
ms2 = int(values['ms2'])
if hour1 < 0 or hour1 > 23:
logger.error("hour must be in [0,24[")
return None, None
if minute1 < 0 or minute1 > 59:
logger.error("minute must be in [0,60[")
return None, None
if second1 < 0 or second1 > 59:
logger.error("second must be in [0,60[")
return None, None
if ms1 < 0 or ms1 > 1000:
logger.error("milliseconds must be in [0,1000[")
return None, None
if hour2 < 0 or hour2 > 23:
logger.error("hour must be in [0,24[")
return None, None
if minute2 < 0 or minute2 > 59:
logger.error("minute must be in [0,60[")
return None, None
if second2 < 0 or second2 > 59:
logger.error("second must be in [0,60[")
return None, None
if ms2 < 0 or ms2 > 1000:
logger.error("milliseconds must be in [0,1000[")
return None, None
ts1 = timedelta(hours=hour1, minutes=minute1, seconds=second1, microseconds=ms1*1000)
ts2 = timedelta(hours=hour2, minutes=minute2, seconds=second2, microseconds=ms2*1000)
if ts2 < ts1:
logger.error("Non monotonic interval")
return None,None
return (ts1, ts2)
def compareTimeInterval(interval1, interval2):
ts11,ts12 = interval1
ts21,ts22 = interval2
if ts12 < ts21:
return -1
elif ts22 < ts11:
return 1
else:
return 0
def ffmpegConvert(ffmpeg, ffprobe, inputFile, inputFormat, outputFile, outputFormat, duration):
logger = logging.getLogger(__name__)
width, height = getVideoDimensions(ffprobe, inputFile)
subtitles = withSubtitles(ffprobe, inputFile)
infd = inputFile.fileno()
outfd = outputFile.fileno()
set_inheritable(infd, True)
set_inheritable(outfd, True)
if logger.getEffectiveLevel() == logging.DEBUG:
log = []
else:
log = [ '-loglevel', 'quiet' ]
params = [ffmpeg, '-y',]+log+['-progress', '/dev/stdout', '-canvas_size', '%dx%d' % (width, height), '-f', inputFormat, '-i', '/proc/self/fd/%d' % infd,
'-map', '0:v', '-map', '0:a']
if subtitles:
params.extend(['-map', '0:s'])
params.extend(['-bsf:v', 'h264_mp4toannexb,dump_extra=freq=keyframe', '-vcodec', 'copy', '-acodec', 'copy'])
if subtitles:
params.extend(['-scodec', 'dvdsub'])
params.extend(['-r:0', '25', '-f', outputFormat, '/proc/self/fd/%d' % outfd])
logger.debug('Executing %s', params)
with Popen(params, stdout=PIPE, close_fds=False) as ffmpeg:
pb = tqdm(TextIOWrapper(ffmpeg.stdout, encoding="utf-8"), total=int(duration/timedelta(seconds=1)), unit='s', desc='Conversion')
for line in pb:
if line.startswith('out_time='):
ts = line.split('=')[1].strip()
ts = parseTimestamp(ts)
if ts is not None:
pb.n = int(ts/timedelta(seconds=1))
pb.update()
status = ffmpeg.wait()
if status != 0:
logger.error('Conversion failed with status code: %d', status)
def getTSFrame(frame):
logger = logging.getLogger(__name__)
if 'pts_time' in frame:
pts_time = float(frame['pts_time'])
elif 'pkt_pts_time' in frame:
pts_time = float(frame['pkt_pts_time'])
else:
logger.error('Impossible to find timestamp of frame %s', frame)
return None
ts = timedelta(seconds=pts_time)
return ts
def getPacketDuration(packet):
logger = logging.getLogger(__name__)
if 'duration' in packet:
duration = int(packet['duration'])
elif 'pkt_duration' in packet:
duration = int(packet['pkt_duration'])
else:
logger.error('Impossible to find duration of packet %s', packet)
return None
return duration
def getFramesInStream(ffprobe, inputFile, begin, end, streamKind, subStreamId=0):
logger = logging.getLogger(__name__)
infd = inputFile.fileno()
set_inheritable(infd, True)
command = [ffprobe, '-loglevel', 'quiet', '-read_intervals', ('%s%%%s' %(begin, end)), '-show_entries', 'frame',
'-select_streams', '%s:%d' % (streamKind, subStreamId), '-of', 'json', '/proc/self/fd/%d' % infd]
logger.debug('Executing: %s', command)
with Popen(command, stdout=PIPE, close_fds=False) as ffprobe:
out, _ = ffprobe.communicate()
frames = json.load(BytesIO(out))
status = ffprobe.wait()
if status != 0:
logger.error('ffprobe failed with status code: %d' % status)
return None
# Sort frames by timestamp
tmp = {}
if 'frames' in frames:
frames = frames['frames']
for frame in frames:
ts = getTSFrame(frame)
if ts is None:
return None
if begin <= ts and ts <= end:
tmp[ts]=frame
res = []
for ts in sorted(tmp):
res.append(tmp[ts])
return res
else:
logger.error('Impossible to retrieve frames inside file around [%s,%s]' % (begin, end))
return None
# TODO:
def getNearestIDRFrame(ffprobe, inputFile, timestamp, before=True, delta=timedelta(seconds=2)):
logger = logging.getLogger(__name__)
zero = timedelta()
tbegin = timestamp-delta
tend = timestamp+delta
if tbegin < zero:
tbegin = zero
infd = inputFile.fileno()
set_inheritable(infd, True)
logger.debug('Looking for IDR frame in [%s, %s]', tbegin, tend)
idrs = []
# Retains only IDR frame
with Popen([ffprobe, '-loglevel', 'quiet', '-read_intervals', ('%s%%%s' %(tbegin, tend)), '-skip_frame', 'nokey', '-show_entries', 'frame', '-select_streams', 'v:0', '-of', 'json', '/proc/self/fd/%d' % infd], stdout=PIPE, close_fds=False) as ffprobe:
out, _ = ffprobe.communicate()
frames = json.load(BytesIO(out))
status = ffprobe.wait()
if status != 0:
logger.error('ffprobe failed with status code: %d' % status)
return None
res = []
if 'frames' in frames:
frames = frames['frames']
for frame in frames:
ts = getTSFrame(frame)
if ts is None:
return None
if tbegin <= ts and ts <= tend:
idrs.append(frame)
else:
logger.error('Impossible to retrieve IDR frames inside file around [%s,%s]', tbegin, tend)
return None
def getNearestIFrame(ffprobe, inputFile, timestamp, before=True, deltaMax=timedelta(seconds=15)):
logger = logging.getLogger(__name__)
infd = inputFile.fileno()
set_inheritable(infd, True)
delta = timedelta(seconds=1)
iframe = None
while delta < deltaMax:
zero = timedelta()
if before:
tbegin = timestamp-delta
else:
tbegin = timestamp
if not before:
tend = timestamp+delta
else:
tend = timestamp
if tbegin < zero:
tbegin = zero
logger.debug('Looking for an iframe in [%s, %s]', tbegin, tend)
frames = getFramesInStream(ffprobe, inputFile=inputFile, begin=tbegin, end=tend, streamKind='v')
if frames is None:
logger.debug('Found no frame in [%s, %s]', tbegin, tend)
delta+=timedelta(seconds=1)
continue
iframes = []
for frame in frames:
if frame['pict_type'] == 'I':
iframes.append(frame)
found = False
for frame in iframes:
ts = getTSFrame(frame)
if ts is None:
logger.warning('I-frame with no timestamp: %s' % frame)
continue
if before and ts <= timestamp:
found = True
iframe = frame
if not before and ts >= timestamp:
found = True
iframe = frame
break
if found:
logger.info("Found i-frame at: %s" % iframe)
break
else:
delta+=timedelta(seconds=1)
continue
if iframe is not None:
its = getTSFrame(iframe)
nbFrames = 0
for frame in frames:
ts = getTSFrame(frame)
if ts is None:
logger.warning('Frame without timestamp: %s' % frame)
continue
if before:
if its <= ts and ts <= timestamp:
logger.info("Retrieve a frame between %s and %s at %s" % (its, timestamp, ts))
nbFrames = nbFrames+1
else:
if timestamp <= ts and ts <= its:
logger.info("Retrieve a frame between %s and %s at %s" % (timestamp, ts, its))
nbFrames = nbFrames+1
else:
logger.error("Impossible to find I-frame between: %s and %s" % (tbegin, tend))
return 0, None
return(nbFrames, iframe)
def extractMKVPart(mkvmerge, inputFile, outputFile, begin, end):
logger = logging.getLogger(__name__)
logger.info('Extract video between I-frames at %s and %s' % (begin,end))
infd = inputFile.fileno()
outfd = outputFile.fileno()
lseek(infd, 0, SEEK_SET)
lseek(outfd, 0, SEEK_SET)
set_inheritable(infd, True)
set_inheritable(outfd, True)
env = {**os.environ, 'LANG': 'C'}
warnings = []
command = [mkvmerge, '-o', '/proc/self/fd/%d' % outfd, '--split', 'parts:%s-%s' % (begin, end), '/proc/self/fd/%d' % infd]
logger.debug('Executing: %s' % command)
with Popen(command, stdout=PIPE, close_fds=False, env=env) as mkvmerge:
pb = tqdm(TextIOWrapper(mkvmerge.stdout, encoding="utf-8"), total=100, unit='%', desc='Extraction')
for line in pb:
if line.startswith('Progress :'):
p = re.compile('^Progress : (?P<progress>[0-9]{1,3})%$')
m = p.match(line)
if m is None:
logger.error('Impossible to parse progress')
pb.update(int(m['progress'])-pb.n)
elif line.startswith('Warning'):
warnings.append(line)
pb.update(100-pb.n)
pb.refresh()
pb.close()
status = mkvmerge.wait()
if status == 1:
logger.warning('Extraction returns warning')
for w in warnings:
logger.warning(w)
elif status == 2:
logger.error('Extraction returns errors')
def extractPictures(ffmpeg, inputFile, begin, nbFrames, width=640, height=480):
logger = logging.getLogger(__name__)
infd = inputFile.fileno()
lseek(infd, 0, SEEK_SET)
outfd = memfd_create('pictures', flags=0)
set_inheritable(outfd, True)
# PPM header
# "P6\nWIDTH HEIGHT\n255\n"
headerLen=2+1+ceil(log(width, 10))+1+ceil(log(height, 10))+1+3+1
logger.debug('Header length: %d' % headerLen)
imageLength = width*height*3+headerLen
length = imageLength*nbFrames
logger.debug("Estimated length: %d" % length)
command = [ffmpeg, '-loglevel', 'quiet' ,'-y', '-ss', '%s'%begin, '-i', '/proc/self/fd/%d' % infd, '-s', '%dx%d'%(width, height),
'-vframes', '%d'%nbFrames, '-c:v', 'ppm', '-f', 'image2pipe', '/proc/self/fd/%d' % outfd ]
logger.debug('Executing: %s', command)
images = bytes()
with Popen(command, stdout=PIPE, close_fds=False) as ffmpeg:
status = ffmpeg.wait()
if status != 0:
logger.error('Conversion failed with status code: %d' % status)
return None, None
lseek(outfd, 0, SEEK_SET)
images = read(outfd,length)
if len(images) != length:
logger.error("Received %d bytes but %d were expected." % (len(images), length))
return None, None
lseek(outfd, 0, SEEK_SET)
return images, outfd
def extractSound(ffmpeg, inputFile, begin, outputFileName, packetDuration, subChannel=0, nbPackets=0, sampleRate=48000, nbChannels=2):
logger = logging.getLogger(__name__)
outfd = memfd_create(outputFileName, flags=0)
infd = inputFile.fileno()
lseek(infd, 0, SEEK_SET)
set_inheritable(infd, True)
set_inheritable(outfd, True)
sound = bytes()
length = int(nbChannels*sampleRate*4*nbPackets*packetDuration/1000)
command = [ffmpeg, '-y', '-loglevel', 'quiet', '-ss', '%s'%begin, '-i', '/proc/self/fd/%d' % infd, '-frames:a:%d' % subChannel, '%d' % (nbPackets+1),
'-c:a', 'pcm_s32le', '-sample_rate', '%d' % sampleRate, '-channels', '%d' % nbChannels, '-f', 's32le', '/proc/self/fd/%d' % outfd]
logger.debug('Executing: %s', command)
with Popen(command, stdout=PIPE, close_fds=False) as ffmpeg:
status = ffmpeg.wait()
if status != 0:
logger.error('Sound extraction returns error code: %d' % status)
return None, None
lseek(outfd, 0, SEEK_SET)
sound = read(outfd, length)
if (len(sound) != length):
logger.info("Received %d bytes but %d were expected (channels=%d, freq=%d, packets=%d, duration=%d ms)." % (len(sound), length, nbChannels, sampleRate, nbPackets, packetDuration))
return None, None
return sound, outfd
def dumpPPM(pictures, prefix, temporaries):
logger = logging.getLogger(__name__)
# "P6\nWIDTH HEIGHT\n255\n"
pos = 0
picture = 0
logger.debug('Dumping %d pictures: %s' % (len(pictures),prefix))
while pos<len(pictures):
filename = '%s-%03d.ppm' % (prefix, picture)
header = BytesIO(pictures[pos:])
magic = header.readline().decode('utf8')
dimensions = header.readline().decode('utf8')
maxvalue = header.readline().decode('utf8')
if magic == 'P6\n':
pattern = re.compile('^(?P<width>[0-9]+) (?P<height>[0-9]+)\n$')
m = pattern.match(dimensions)
if m is not None:
width = int(m['width'])
height = int(m['height'])
else:
logger.error('Impossible to parse dimensions of picture')
return
else:
logger.error('Not a PPM picture')
return
headerLen=2+1+ceil(log(width, 10))+1+ceil(log(height, 10))+1+3+1
try:
out = open(filename, 'w')
outfd = out.fileno()
except IOError:
logger.error('Impossible to create file: %s' % filename)
temporaries.append(out)
length=headerLen+3*width*height
nbBytes = 0
while nbBytes < length:
nbBytes+=write(outfd, pictures[pos+nbBytes:pos+length])
pos+=length
picture+=1
def extractAllStreams(ffmpeg, ffprobe, inputFile, begin, end, streams, filesPrefix, nbFrames, frameRate, width, height, temporaries, dumpMemFD=False):
logger = logging.getLogger(__name__)
# The command line for encoding only video track
videoEncoderParams = [ ffmpeg, '-y', '-loglevel', 'quiet']
videoInputParams = []
videoCodecParams = []
# The command line to create a MKV file with the rest of tracks
genericEncoderParams = [ ffmpeg, '-y', '-loglevel', 'quiet' ]
genericInputParams = []
genericCodecParams = []
if begin < end:
videoID=0
audioID=0
subTitleID=0
memfds = []
for stream in streams:
if stream['codec_type'] == 'video':
logger.info("Extracting %d frames of video stream v:%d" % (nbFrames,videoID))
sar = stream['sample_aspect_ratio']
dar = stream['display_aspect_ratio']
pixelFormat = stream['pix_fmt']
colorRange = stream['color_range']
colorSpace =stream['color_space']
colorTransfer = stream['color_transfer']
colorPrimaries = stream['color_primaries']
level = int(stream['level'])
level = '%d.%d' % (floor(level/10), level%10)
chromaLocation = stream['chroma_location']
fieldOrder = stream
interlacedOptions = []
if fieldOrder == 'progressive':
interlacedOptions = ['-field_order', '0']
elif fieldOrder == 'tt':
interlacedOptions = ['-top', '1', '-flags:v:%d' % videoID, '+ilme+ildct', '-field_order', '1']
elif fieldOrder == 'bb':
interlacedOptions = ['-top', '0', '-flags:v:%d' % videoID, '+ilme+ildct', '-field_order','2']
elif fieldOrder == 'tb':
interlacedOptions = ['-top', '1', '-flags:v:%d' % videoID, '+ilme+ildct', '-field_order', '3']
elif fieldOrder == 'bt':
interlacedOptions = ['-top', '0', '-flags:v:%d' % videoID, '+ilme+ildct', '-field_order', '4']
# ======================================= #
# TODO: adjust SAR and DAR
# https://superuser.com/questions/907933/correct-aspect-ratio-without-re-encoding-video-file
codec = stream['codec_name']
imagesBytes, memfd = extractPictures(ffmpeg, inputFile=inputFile, begin=begin, nbFrames=nbFrames, width=width, height=height)
if imagesBytes is None:
exit(-1)
memfds.append(memfd)
if dumpMemFD:
dumpPPM(imagesBytes, '%s-%d' % (filesPrefix,videoID), temporaries)
# We rewind to zero the memory file descriptor
lseek(memfd, 0, SEEK_SET)
set_inheritable(memfd, True)
videoInputParams.extend(['-framerate', '%f'%frameRate, '-f', 'image2pipe', '-i', '/proc/self/fd/%d' % memfd])
videoCodecParams.extend(['-c:v:%d' % videoID, codec, '-level:v:%d' % videoID, level, '-pix_fmt', pixelFormat])
videoCodecParams.extend(interlacedOptions)
videoCodecParams.extend(['-colorspace:v:%d' % videoID, colorSpace, '-color_primaries:v:%d' % videoID, colorPrimaries, '-color_trc:v:%d' % videoID, colorTransfer, '-color_range:v:%d' % videoID, colorRange])
videoID=videoID+1
elif stream['codec_type'] == 'audio':
logger.debug('Audio stream: %s' % stream)
sampleRate = int(stream['sample_rate'])
nbChannels = int(stream['channels'])
if 'bit_rate' in stream:
bitRate = int(stream['bit_rate'])
else:
bitRate = 128000
codec = stream['codec_name']
if 'tags' in stream:
if 'language' in stream['tags']:
genericCodecParams.extend(['-metadata:s:a:%d' % audioID, 'language=%s' % stream['tags']['language']])
packets = getFramesInStream(ffprobe, inputFile=inputFile, begin=begin, end=end, streamKind='a', subStreamId=audioID)
nbPackets = len(packets)
logger.debug("Found %d packets to be extracted from audio track.", nbPackets)
if(nbPackets > 0):
packetDuration = getPacketDuration(packets[0])
if packetDuration is None:
return None
else:
packetDuration = 0
logger.info("Extracting %d packets of audio stream: a:%d" , nbPackets, audioID)
tmpname = '%s-%d.pcm' % (filesPrefix,audioID)
soundBytes, memfd = extractSound(ffmpeg=ffmpeg, inputFile=inputFile, begin=begin, nbPackets=nbPackets, packetDuration=packetDuration, outputFileName=tmpname, sampleRate=sampleRate, nbChannels=nbChannels)
if soundBytes is None:
exit(-1)
memfds.append(memfd)
if dumpMemFD:
try:
output = open(tmpname,'w')
except IOError:
logger.error('Impossible to create file: %s' % tmpname)
return None
outfd = output.fileno()
pos = 0
while pos < len(soundBytes):
pos+=write(outfd, soundBytes[pos:])
temporaries.append(output)
# We rewind to zero the memory file descriptor
lseek(memfd, 0, SEEK_SET)
set_inheritable(memfd, True)
genericInputParams.extend(['-f', 's32le', '-ar', '%d'%sampleRate, '-ac', '%d'%nbChannels, '-i', '/proc/self/fd/%d' % memfd])
genericCodecParams.extend(['-c:a:%d' % audioID, codec, '-b:a:%d' % audioID, '%d' % bitRate])
audioID=audioID+1
elif stream['codec_type'] == 'subtitle':
logger.info("Extracting a subtitle stream: s:%d" % subTitleID)
codec = stream['codec_name']
genericInputParams.extend(['-i', './empty.idx'])
if 'tags' in stream:
if 'language' in stream['tags']:
genericCodecParams.extend(['-metadata:s:s:%d' % subTitleID, 'language=%s' % stream['tags']['language']])
genericCodecParams.extend(['-c:s:%d' % subTitleID, 'copy'])
subTitleID=subTitleID+1
else:
logger.error("Unknown stream type: %s" % stream['codec_type'])
# Create a new MKV movie with all streams (except videos) that have been extracted.
genericEncoderParams.extend(genericInputParams)
for index in range(0,audioID+subTitleID):
genericEncoderParams.extend(['-map', '%d' % index])
genericEncoderParams.extend(genericCodecParams)
mkvFileName = '%s.mkv' % filesPrefix
try:
mkvOutput = open(mkvFileName,'wb+')
except IOError:
logger.error('Impossible to create file: %s' % mkvFileName)
return None
mkvoutfd = mkvOutput.fileno()
set_inheritable(mkvoutfd, True)
genericEncoderParams.extend(['-f', 'matroska', '/proc/self/fd/%d' % mkvoutfd])
logger.info('Encoding all streams (except video) into a MKV file: %s' % mkvFileName)
logger.debug('Executing: %s' % genericEncoderParams)
with Popen(genericEncoderParams, stdout=PIPE, close_fds=False) as ffmpeg:
status = ffmpeg.wait()
if status != 0:
logger.error('Encoding failed with status code: %d' % status)
return None
temporaries.append(mkvOutput)
h264FileName = '%s.h264' % filesPrefix
try:
h264Output = open(h264FileName,'wb+')
except IOError:
logger.error('Impossible to create file: %s' % h264FileName)
return None
h264outfd = h264Output.fileno()
set_inheritable(h264outfd, True)
videoEncoderParams.extend(videoInputParams)
videoEncoderParams.extend(videoCodecParams)
videoEncoderParams.extend([ '-x264opts', 'keyint=1:sps-id=%d' % 1,'-bsf:v',
'h264_mp4toannexb,dump_extra=freq=keyframe,h264_metadata=overscan_appropriate_flag=1:sample_aspect_ratio=1:video_format=0:chroma_sample_loc_type=0',
'-f', 'h264', '/proc/self/fd/%d' % h264outfd])
logger.info('Encoding video into a H264 file: %s' % h264FileName)
logger.debug('Executing: %s' % videoEncoderParams)
with Popen(videoEncoderParams, stdout=PIPE, close_fds=False) as ffmpeg:
status = ffmpeg.wait()
if status != 0:
logger.error('Encoding failed with status code: %d' % status)
return None
temporaries.append(h264Output)
h264TSFileName = '%s-ts.txt' % filesPrefix
try:
h264TSOutput = open(h264TSFileName,'w+')
except IOError:
logger.error('Impossible to create file: %s', h264TSFileName)
return None
h264TSOutput.write('# timestamp format v2\n')
ts = 0
for frame in range(0,nbFrames):
ts = ts+ceil(1000/frameRate)
h264TSOutput.write('%d\n' % ts)
h264TSOutput.flush()
h264TSOutput.seek(0)
temporaries.append(h264TSOutput)
for memfd in memfds:
close(memfd)
return h264Output, h264TSOutput, mkvOutput
else:
# Nothing to be done. We are already at a i-frame boundary.
return None, None
# Merge a list of mkv files passed as input, and produce a new MKV as output
def mergeMKVs(mkvmerge, inputs, outputName, concatenate=True, timestamps=None):
logger = logging.getLogger(__name__)
fds = []
try:
out = open(outputName, 'w+')
except IOError:
logger.error('Impossible to create file: %s', outputName)
return None
outfd = out.fileno()
lseek(outfd, 0, SEEK_SET)
fds.append(outfd)
set_inheritable(outfd, True)
# Timestamps of merged tracks are modified by the length of the preceding track.
# The default mode ('file') is using the largest timestamp of the whole file which may create desynchronize video and sound.
mergeParams = [mkvmerge, '--append-mode', 'track']
first = True
partNum = 0
for mkv in inputs:
if mkv !=None:
fd = mkv.fileno()
fds.append(fd)
set_inheritable(fd, True)
# If we pass a timestamps file associated with the considered track, use it.
if timestamps is not None and partNum in timestamps:
tsfd = timestamps[partNum].fileno()
lseek(tsfd, 0, SEEK_SET)
fds.append(tsfd)
set_inheritable(tsfd, True)
mergeParams.extend(['--timestamps', ('%d:/proc/self/fd/%d' % (partNum, tsfd))])
if first:
mergeParams.append('/proc/self/fd/%d' % fd)
first = False
elif concatenate:
mergeParams.append('+/proc/self/fd/%d' % fd)
else:
mergeParams.append('/proc/self/fd/%d' % fd)
partNum+=1
mergeParams.extend(['-o', '/proc/self/fd/%d' % outfd])
# We merge all files.
warnings = []
env = {**os.environ, 'LANG': 'C'}
logger.debug('Executing: LANG=C %s', mergeParams)
with Popen(mergeParams, stdout=PIPE, close_fds=False, env=env) as mkvmerge:
pb = tqdm(TextIOWrapper(mkvmerge.stdout, encoding="utf-8"), total=100, unit='%', desc='Merging')
for line in pb:
if line.startswith('Progress :'):
p = re.compile('^Progress : (?P<progress>[0-9]{1,3})%$')
m = p.match(line)
if m is None:
logger.error('Impossible to parse progress')
pb.n = int(m['progress'])
pb.update()
elif line.startswith('Warning'):
warnings.append(line)
status = mkvmerge.wait()
if status == 1:
logger.warning('Extraction returns warning')
for w in warnings:
logger.warning(w)
elif status == 2:
logger.error('Extraction returns errors')
for fd in fds:
set_inheritable(fd, False)
return out
def findSubtitlesTracks(ffprobe, inputFile):
logger = logging.getLogger(__name__)
infd = inputFile.fileno()
lseek(infd, 0, SEEK_SET)
set_inheritable(infd, True)
command = [ffprobe, '-loglevel','quiet', '-i', '/proc/self/fd/%d' % infd, '-select_streams', 's', '-show_entries', 'stream=index:stream_tags=language', '-of', 'json']
logger.debug('Executing: %s', command)
with Popen(command, stdout=PIPE, close_fds=False) as ffprobe:
out, _ = ffprobe.communicate()
out = json.load(BytesIO(out))
if 'streams' in out:
return out['streams']
else:
logger.error('Impossible to retrieve format of file')
ffprobe.wait()
def extractTrackFromMKV(mkvextract, inputFile, index, outputFile, timestamps):
logger = logging.getLogger(__name__)
infd = inputFile.fileno()
lseek(infd, 0, SEEK_SET)
set_inheritable(infd, True)
outfd = outputFile.fileno()
lseek(outfd, 0, SEEK_SET)
set_inheritable(outfd, True)
tsfd = timestamps.fileno()
lseek(tsfd, 0, SEEK_SET)
set_inheritable(tsfd, True)
params = [ mkvextract, '/proc/self/fd/%d' % infd, 'tracks', '%d:/proc/self/fd/%d' % (index, outfd), 'timestamps_v2', '%d:/proc/self/fd/%d' % (index, tsfd)]
env = {**os.environ, 'LANG': 'C'}
logger.debug('Executing: LANG=C %s' % params)
with Popen(params, stdout=PIPE, close_fds=False, env=env) as extract:
pb = tqdm(TextIOWrapper(extract.stdout, encoding="utf-8"), total=100, unit='%', desc='Extraction of track')
for line in pb:
if line.startswith('Progress :'):
p = re.compile('^Progress : (?P<progress>[0-9]{1,3})%$')
m = p.match(line)
if m is None:
logger.error('Impossible to parse progress')
pb.update(int(m['progress'])-pb.n)
pb.update(100-pb.n)
pb.refresh()
pb.close()
extract.wait()
if extract.returncode != 0:
logger.error('Mkvextract returns an error code: %d', extract.returncode)
return None
else:
logger.info('Track %d was succesfully extracted.', index)
def removeVideoTracksFromMKV(mkvmerge, inputFile, outputFile):
logger = logging.getLogger(__name__)
outfd = outputFile.fileno()
infd = inputFile.fileno()
lseek(infd, 0, SEEK_SET)
lseek(outfd, 0, SEEK_SET)
set_inheritable(infd, True)
set_inheritable(outfd, True)
params = [ mkvmerge, '-o', '/proc/self/fd/%d' % outfd, '-D', '/proc/self/fd/%d' % infd]
logger.debug('Executing: LANG=C %s', params)
env = {**os.environ, 'LANG': 'C'}
with Popen(params, stdout=PIPE, close_fds=False, env=env) as remove:
pb = tqdm(TextIOWrapper(remove.stdout, encoding="utf-8"), total=100, unit='%', desc='Removal of video track:')
for line in pb:
if line.startswith('Progress :'):
p = re.compile('^Progress : (?P<progress>[0-9]{1,3})%$')
m = p.match(line)
if m is None:
logger.error('Impossible to parse progress')
pb.update(int(m['progress'])-pb.n)
pb.update(100-pb.n)
pb.refresh()
pb.close()
remove.wait()
if remove.returncode != 0:
logger.error('Mkvmerge returns an error code: %d', remove.returncode)
return None
else:
logger.info('Video tracks were succesfully extracted.')
def remuxSRTSubtitles(mkvmerge, inputFile, outputFileName, subtitles):
logger = logging.getLogger(__name__)
try:
out = open(outputFileName, 'w')
except IOError:
logger.error('Impossible to create file: %s' % outputFileName)
return None
outfd = out.fileno()
infd = inputFile.fileno()
lseek(infd, 0, SEEK_SET)
set_inheritable(infd, True)
set_inheritable(outfd, True)
mkvmergeParams = [mkvmerge, '/proc/self/fd/%d' % infd]
for fd, lang in subtitles:
lseek(fd, 0, SEEK_SET)
set_inheritable(fd, True)
mkvmergeParams.extend(['--language', '0:%s' % lang, '/proc/self/fd/%d' % fd])
mkvmergeParams.extend(['-o', '/proc/self/fd/%d' % outfd])
warnings = []
env = {**os.environ, 'LANG': 'C'}
logger.info('Remux subtitles: %s' % mkvmergeParams)
with Popen(mkvmergeParams, stdout=PIPE, close_fds=False, env=env) as mkvmerge:
pb = tqdm(TextIOWrapper(mkvmerge.stdout, encoding="utf-8"), total=100, unit='%', desc='Remux subtitles:')
for line in pb:
if line.startswith('Progress :'):
p = re.compile('^Progress : (?P<progress>[0-9]{1,3})%$')
m = p.match(line)
if m is None:
logger.error('Impossible to parse progress')
pb.n = int(m['progress'])
pb.update()
elif line.startswith('Warning'):
warnings.append(line)
status = mkvmerge.wait()
if status == 1:
logger.warning('Remux subtitles returns warning')
for w in warnings:
logger.warning(w)
elif status == 2:
logger.error('Remux subtitles returns errors')
def concatenateH264Parts(h264parts, output):
logger = logging.getLogger(__name__)
totalLength = 0
for h264 in h264parts:
fd = h264.fileno()
totalLength += fstat(fd).st_size
logger.info('Total length: %d', totalLength)
outfd = output.fileno()
lseek(outfd, 0, SEEK_SET)
pb = tqdm(total=totalLength, unit='bytes', desc='Concatenation')
for h264 in h264parts:
fd = h264.fileno()
lseek(fd, 0, SEEK_SET)
while True:
buf = read(fd, 1000000)
if buf is None or len(buf) == 0:
break
pos = 0
while pos < len(buf):
nbBytes = write(outfd, buf[pos:])
pb.update(nbBytes)
pos += nbBytes
def concatenateH264TSParts(h264TSParts, output):
logger = logging.getLogger(__name__)
header = '# timestamp format v2\n'
output.write(header)
last = 0.
first = True
for part in h264TSParts:
if first:
offset = last
else:
# TODO: take framerate into account
offset = last + 40
logger.debug('Parsing file: %s. Offset=%d' % (part, offset))
isheader = part.readline()
if (not isheader) or (isheader != header):
logger.error('Impossible to find a valid header: "%s"' % isheader)
exit(-1)
while True:
line = part.readline()
if not line:
break
ts = offset + float(line)
last = max(last,ts)
output.write('%f\n' % ts)
if first:
first = False
# TODO: finish this procedure
def doCoarseProcessing(ffmpeg, ffprobe, mkvmerge, inputFile, begin, end, nbFrames, frameRate, filesPrefix, streams, width, height, temporaries, dumpMemFD):
logger = logging.getLogger(__name__)
# Internal video with all streams (video, audio and subtitles)
internalMKVName = '%s.mkv' % filesPrefix
try:
internalMKV = open(internalMKVName, 'w+')
except IOError:
logger.error('Impossible to create file: %s', internalMKVName)
exit(-1)
# Extract internal part of MKV
extractMKVPart(mkvmerge=mkvmerge, inputFile=inputFile, outputFile=internalMKV, begin=begin, end=end)
temporaries.append(internalMKV)
pass
def main():
logger = logging.getLogger(__name__)
coloredlogs.install()
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--input", dest='inputFile', type=str, required=True, help="Input file to process (can be .ts, .mp4 or .mkv).")
parser.add_argument("-o", "--output", dest='outputFile', type=str, required=True, help="Output MKV file to produce.")
parser.add_argument("-p", "--part", dest='parts', nargs='+', required=False, action='append', metavar="hh:mm:ss[.mmm]-hh:mm:ss[.mmm]", help="Extract this exact part of the original file.")
parser.add_argument("-k", "--keep", action='store_true', help="Do not cleanup temporary files after processing.")
parser.add_argument("-t", "--threshold", action='store', type=int, help="Suppress headers and trailers that are smaller than the threshold.")
parser.add_argument("-c", "--coarse", action='store_true', dest='coarse', help="Do not take trailers and headers into account.")
parser.add_argument("--dump-memory", action='store_true', dest='dump', help="For debug purpose, dump all memory mapping of headers (and trailers) before (after) each part. They are kept in memory only otherwise.")
parser.add_argument("-s","--srt", action='store_true', dest='srt', help="Dump subtitles and make OCR and finally remux them in the movie (as SRT).")
parser.add_argument("-v","--verbose", action='store_true', dest='verbose', help="Debug.")
parser.add_argument("-f","--framerate", action='store', type=int, help="Override frame rate estimator.")
args = parser.parse_args()
logger.info('Arguments: %s' % args)
if args.verbose:
logger.info('Setting logging to debug mode')
coloredlogs.set_level(level=logging.DEBUG)
logger.debug('Arguments: %s', args)
if args.coarse and args.threshold is not None:
logger.error('--coarse and threshold arguments are exclusive.')
exit(-1)
if (not args.coarse) and args.threshold is None:
args.threshold = 0
allOptionalTools, paths = checkRequiredTools()
# Flatten args.parts
intervals = []
if args.parts is not None:
for part in args.parts:
for subpart in part:
intervals.append(subpart)
parts=[]
# Parse each interval
for interval in intervals:
ts1, ts2 = parseTimeInterval(interval)
if ts1 is None or ts2 is None:
logger.error("Illegal time interval: %s" % interval)
exit(-1)
parts.append((ts1,ts2))
# Sort intervals
parts.sort(key=cmp_to_key(compareTimeInterval))
# Check that no intervals are overlapping
prevts = timedelta(0)
for part in parts:
ts1, ts2 = part
if prevts > ts1:
logger.error('Intervals are overlapping')
exit(-1)
prevts = ts2
nbParts = len(parts)
temporaries = []
basename = os.path.splitext(os.path.basename(args.inputFile))[0]
mp4filename = basename+'.mp4'
mkvfilename = basename+'.mkv'
try:
inputFile = open(args.inputFile, mode='r')
except IOError:
logger.error("Impossible to open %s" % args.inputFile)
exit(-1)
formatOfFile = getFormat(paths['ffprobe'], inputFile)
if formatOfFile is None:
exit(-1)
duration = timedelta(seconds=float(formatOfFile['duration']))
logger.info("Durée de l'enregistrement: %s" % duration)
if args.framerate is None:
frameRate = getFrameRate(paths['ffprobe'], inputFile)
if frameRate is None:
logger.error('Impossible to estimate frame rate !')
exit(-1)
else:
frameRate = args.framerate
logger.info('Frame rate: %.1f fps' % frameRate)
found = False
for f in SupportedFormat:
if 'format_name' in formatOfFile:
if formatOfFile['format_name'] == str(f):
found = True
formatOfFile = f
break
if not found:
logger.error('Unsupported format of file')
if formatOfFile == SupportedFormat.TS:
logger.info("Converting TS to MP4 (to fix timestamps).")
try:
with open(mp4filename, 'w+') as mp4:
ffmpegConvert(paths['ffmpeg'], paths['ffprobe'], inputFile, 'mpegts', mp4, 'mp4', duration)
temporaries.append(mp4)
logger.info("Converting MP4 to MKV.")
try:
mkv = open(mkvfilename, 'w+')
except IOError:
logger.error('')
ffmpegConvert(paths['ffmpeg'], paths['ffprobe'], mp4, 'mp4', mkv, 'matroska', duration)
if nbParts > 0:
temporaries.append(mkv)
except IOError:
logger.error('')
elif formatOfFile == SupportedFormat.MP4:
logger.info("Converting MP4 to MKV")
try:
mkv = open(mkvfilename, 'w+')
except IOError:
logger.error('')
ffmpegConvert(paths['ffmpeg'], paths['ffprobe'], inputFile, 'mp4', mkv, 'matroska', duration)
if nbParts > 0:
temporaries.append(mkv)
else:
logger.info("Already in MKV")
mkv = inputFile
streams = getStreams(paths['ffprobe'], mkv)
logger.debug('Streams: %s' % streams)
mainVideo = None
nbVideos = 0
for stream in streams:
if stream['codec_type'] == 'video':
if stream['disposition']['default'] == 1:
mainVideo = stream
width = stream['width']
height = stream['height']
break
nbVideos+=1
if nbVideos == 1:
mainVideo = stream
width = stream['width']
height = stream['height']
else:
mainVideo = None
if mainVideo is None:
logger.error('Impossible to find main video stream.')
exit(-1)
# We retrieve the main private codec data
_, mainCodecPrivateData = getCodecPrivateDataFromMKV(mkvinfo=paths['mkvinfo'], inputFile=mkv)
logger.debug('Main video stream has following private data: %s' % hexdump.dump(mainCodecPrivateData, sep=':'))
# We parse them
mainAvcConfig = parseCodecPrivate(mainCodecPrivateData)
logger.debug('AVC configuration: %s' % mainAvcConfig)
# We check if the parse and dump operations are idempotent.
privateData = dumpCodecPrivateData(mainAvcConfig)
logger.debug('Redump AVC configuration: %s' % hexdump.dump(privateData, sep=':'))
# In rare occasion, the PPS has trailing zeroes that do not seem to be related to useful data but they differ from the private data we generate that do not contain them.
# In that case we try to redecode our own private data to see if both AVC configurations are the same.
if mainCodecPrivateData != privateData:
logger.warning('Difference detected in bitstream !!')
isoAvcConfig = parseCodecPrivate(privateData)
logger.debug('Reread AVC configuration: %s' % isoAvcConfig)
# If there exists a difference between our own reconstructed AVC configuration and the original one, we abandon
if isoAvcConfig != mainAvcConfig:
logger.error('AVC configurations are different: %s\n%s\n' % (mainAvcConfig, isoAvcConfig))
exit(-1)
# Pour chaque portion
partnum = 0
mkvparts = []
h264parts = []
h264TS = []
checks = []
pos = timedelta()
otherAvcConfigs = []
for ts1, ts2 in parts:
# Trouver l'estampille de la trame 'I' la plus proche (mais postérieure) au début de la portion.
# Trouver l'estampille de la trame 'I' la plus proche (mais antérieure) à la fin de la portion.
# On a alors
# debut ----- trame --------- trame --------- fin fin+1
# 'B/P' 'B/P'* 'I' 'I' 'B/P'* 'B/P' 'I/B/P'
# Si la trame de début est déjà 'I', il n'y a rien à faire.
# Sinon on extrait les trames 'B' ou 'P' depuis le début jusqu'à la trame 'I' non incluse.
# Si la trame de fin précède une trame I, on n'a rien à faire.
# Sinon on extrait toutes les trames depuis la dernière trame I jusqu'à la trame de fin.
partnum = partnum + 1
# Get the nearest I-frame whose timestamp is greater or equal to the beginning.
headFrames = getNearestIFrame(paths['ffprobe'], mkv, ts1, before=False)
if headFrames is None:
exit(-1)
# Get the nearest I-frame whose timestamp ...
# TODO: wrong here ...
tailFrames = getNearestIFrame(paths['ffprobe'], mkv, ts2, before=True)
if tailFrames is None:
exit(-1)
nbHeadFrames, headIFrame = headFrames
nbTailFrames, tailIFrame = tailFrames
logger.info("Found %d frames between beginning of current part and first I-frame", nbHeadFrames)
logger.info("Found %d frames between last I-frame and end of current part", nbTailFrames)
headIFrameTS = getTSFrame(headIFrame)
if headIFrameTS is None:
exit(-1)
tailIFrameTS = getTSFrame(tailIFrame)
if tailIFrameTS is None:
exit(-1)
checks.append(pos+headIFrameTS-ts1)
subparts = []
# TODO: separate pipeline processing between coarse and not fine grain options.
# if args.coarse:
# doCoarseProcessing(ffmpeg=paths['ffmpeg'], ffprobe=paths['ffprobe'], inputFile=mkv, begin=ts1, end=headIFrameTS, nbFrames=nbHeadFrames-1, frameRate=frameRate, filesPrefix='part-%d-head' % (partnum), streams=streams, width=width, height=height, temporaries=temporaries, dumpMemFD=args.dump)
# else:
# doFineGrainProcessing(ffmpeg=paths['ffmpeg'], ffprobe=paths['ffprobe'], inputFile=mkv, begin=ts1, end=headIFrameTS, nbFrames=nbHeadFrames-1, frameRate=frameRate, filesPrefix='part-%d-head' % (partnum), streams=streams, width=width, height=height, temporaries=temporaries, dumpMemFD=args.dump)
if (not args.coarse) and (nbHeadFrames > args.threshold):
# We extract all frames between the beginning upto the frame that immediately preceeds the I-frame.
h264Head, h264HeadTS, mkvHead = extractAllStreams(ffmpeg=paths['ffmpeg'], ffprobe=paths['ffprobe'], inputFile=mkv, begin=ts1, end=headIFrameTS, nbFrames=nbHeadFrames-1, frameRate=frameRate, filesPrefix='part-%d-head' % (partnum), streams=streams, width=width, height=height, temporaries=temporaries, dumpMemFD=args.dump)
# If we are not at an exact boundary:
if mkvHead is not None:
subparts.append(mkvHead)
if h264Head is not None:
avcconfig = getAvcConfigFromH264(h264Head)
otherAvcConfigs.append(avcconfig)
h264parts.append(h264Head)
if h264HeadTS is not None:
h264TS.append(h264HeadTS)
# Creating MKV file that corresponds to current part between I-frames
# Internal video with all streams (video, audio and subtitles)
internalMKVName = 'part-%d-internal.mkv' % partnum
# Internal video stream as a raw H264 stream
internalH264Name = 'part-%d-internal.h264' % partnum
# Internal video timestamps
internalH264TSName = 'part-%d-internal-ts.txt' % partnum
# Internal video with only audio and subtitles streams
internalNoVideoMKVName = 'part-%d-internal-novideo.mkv' % partnum
try:
internalMKV = open(internalMKVName, 'w+')
except IOError:
logger.error('Impossible to create file: %s', internalMKVName)
exit(-1)
try:
internalNoVideoMKV = open(internalNoVideoMKVName, 'w+')
except IOError:
logger.error('Impossible to create file: %s', internalNoVideoMKVName)
exit(-1)
try:
internalH264 = open(internalH264Name, 'w+')
except IOError:
logger.error('Impossible to create file: %s', internalH264Name)
exit(-1)
try:
internalH264TS = open(internalH264TSName, 'w+')
except IOError:
logger.error('Impossible to create file: %s', internalH264TSName)
exit(-1)
# logger.info('Merge header, middle and trailer subpart into: %s' % internalMKVName)
# Extract internal part of MKV
extractMKVPart(mkvmerge=paths['mkvmerge'], inputFile=mkv, outputFile=internalMKV, begin=headIFrameTS, end=tailIFrameTS)
# Extract video stream of internal part as a raw H264 and its timestamps.
logger.info('Extract video track as raw H264 file.')
extractTrackFromMKV(mkvextract=paths['mkvextract'], inputFile=internalMKV, index=0, outputFile=internalH264, timestamps=internalH264TS)
# Remove video track from internal part of MKV
logger.info('Remove video track from %s', internalMKVName)
removeVideoTracksFromMKV(mkvmerge=paths['mkvmerge'], inputFile=internalMKV, outputFile=internalNoVideoMKV)
temporaries.append(internalMKV)
temporaries.append(internalH264)
temporaries.append(internalH264TS)
temporaries.append(internalNoVideoMKV)
h264parts.append(internalH264)
h264TS.append(internalH264TS)
subparts.append(internalNoVideoMKV)
if (not args.coarse) and (nbTailFrames > args.threshold):
# We extract all frames between the I-frame (including it) upto the end.
h264Tail, h264TailTS, mkvTail = extractAllStreams(ffmpeg=paths['ffmpeg'], ffprobe=paths['ffprobe'], inputFile=mkv, begin=tailIFrameTS, end=ts2, nbFrames=nbTailFrames, frameRate=frameRate, filesPrefix='part-%d-tail' % (partnum), streams=streams, width=width, height=height, temporaries=temporaries, dumpMemFD=args.dump)
if mkvTail is not None:
subparts.append(mkvTail)
if h264Tail is not None:
avcconfig = getAvcConfigFromH264(h264Tail)
otherAvcConfigs.append(avcconfig)
h264parts.append(h264Tail)
if h264TailTS is not None:
h264TS.append(h264TailTS)
logger.info('Merging MKV: %s', subparts)
part = mergeMKVs(mkvmerge=paths['mkvmerge'], inputs=subparts, outputName="part-%d.mkv" % partnum, concatenate=True)
mkvparts.append(part)
temporaries.append(part)
pos = pos+tailIFrameTS-ts1
# We need to check the end also
checks.append(pos)
# When using coarse option there is a single AVC configuration.
for avcConfig in otherAvcConfigs:
mainAvcConfig.merge(avcConfig)
logger.debug('Merged AVC configuration: %s', mainAvcConfig)
nbMKVParts = len(mkvparts)
if nbMKVParts > 0:
try:
fullH264 = open('%s-full.h264' % basename, 'w+')
except IOError:
logger.error('Impossible to create file full H264 stream.')
exit(-1)
logger.info('Merging all H264 tracks')
concatenateH264Parts(h264parts=h264parts, output=fullH264)
temporaries.append(fullH264)
try:
fullH264TS = open('%s-ts.txt' % basename, 'w+')
except IOError:
logger.error('Impossible to create file containing all video timestamps.')
exit(-1)
logger.info('Merging H264 timestamps')
concatenateH264TSParts(h264TSParts=h264TS, output=fullH264TS)
temporaries.append(fullH264TS)
finalNoVideoName = '%s-novideo.mkv' % basename
finalWithVideoName = '%s-video.mkv' % basename
if nbMKVParts > 1:
logger.info('Merging all audio and subtitles parts: %s' % mkvparts)
mergeMKVs(mkvmerge=paths['mkvmerge'], inputs=mkvparts, outputName=finalNoVideoName, concatenate=True)
elif nbMKVParts == 1:
copyfile('part-1.mkv', finalNoVideoName)
else:
logger.info("Nothing else to do.")
copyfile(mkvfilename, finalWithVideoName)
if nbMKVParts >=1 :
try:
finalNoVideo = open(finalNoVideoName, 'r')
except IOError:
logger.error('Impossible to open file: %s.' % finalNoVideoName)
exit(-1)
temporaries.append(finalNoVideo)
fullH264TS.seek(0)
logger.info('Merging final video track and all other tracks together')
finalWithVideo = mergeMKVs(mkvmerge=paths['mkvmerge'], inputs=[fullH264, finalNoVideo], outputName=finalWithVideoName, concatenate=False, timestamps={0: fullH264TS})
finalCodecPrivateData = dumpCodecPrivateData(mainAvcConfig)
logger.debug('Final codec private data: %s' % hexdump.dump(finalCodecPrivateData, sep=':'))
logger.info('Changing codec private data with the new one.')
changeCodecPrivateData(paths['mkvinfo'], finalWithVideo, finalCodecPrivateData)
if args.srt:
if not allOptionalTools:
logger.warning("Missing tools for extracting subtitles.")
move(finalWithVideoName, args.outputFile)
else:
# Final cut is not any more the final step.
temporaries.append(finalWithVideo)
duration = getMovieDuration(paths['ffprobe'], finalWithVideo)
supportedLangs = getTesseractSupportedLang(paths['tesseract'])
logger.info('Supported lang: %s' % supportedLangs)
logger.info('Find subtitles tracks and language.')
subtitles = findSubtitlesTracks(paths['ffprobe'], finalWithVideo)
logger.info(subtitles)
sts = {}
for subtitle in subtitles:
index = subtitle['index']
if 'tags' in subtitle:
if 'language' in subtitle['tags']:
lang = subtitle['tags']['language']
if lang in sts:
sts[lang].append(index)
else:
sts[lang] = [index]
else:
logger.error("Dropping subtitle: %s because it is missing language indication")
else:
logger.error("Dropping subtitle: %s because it is missing language indication")
logger.info(sts)
if len(sts) > 0:
listOfSubtitles = extractSRT(paths['mkvextract'], finalWithVideoName, sts, supportedLangs)
logger.info(listOfSubtitles)
for idxName, subName, _, _ in listOfSubtitles:
try:
idx = open(idxName,'r')
except IOError:
logger.error("Impossible to open %s.", idxName)
exit(-1)
try:
sub = open(subName,'r')
except IOError:
logger.error("Impossible to open %s.", subName)
exit(-1)
temporaries.append(idx)
temporaries.append(sub)
ocr = doOCR(paths['vobsubocr'], listOfSubtitles, duration, temporaries, args.dump)
logger.info(ocr)
# Remux SRT subtitles
remuxSRTSubtitles(paths['mkvmerge'], finalWithVideo, args.outputFile, ocr)
else:
copyfile(finalWithVideoName, args.outputFile)
else:
move(finalWithVideoName, args.outputFile)
if not args.keep:
logger.info("Cleaning temporary files")
for f in temporaries:
path = os.path.realpath(f.name)
logger.info("Removing: %s" % path)
f.close()
unlink(path)
d = datetime(1,1,1)
for c in checks:
logger.info("Please check cut smoothness at %s" % (c+d).strftime("%H:%M:%S"))
if __name__ == "__main__":
main()