More linting.

This commit is contained in:
Frédéric Tronel
2025-10-27 15:48:41 +01:00
parent e192c66157
commit 960de11b1b

View File

@@ -26,6 +26,9 @@ import hexdump
from iso639 import Lang from iso639 import Lang
from iso639.exceptions import InvalidLanguageValue from iso639.exceptions import InvalidLanguageValue
# Local modules
# TODO: create local modules for MP4, MKV
# Useful SPS/PPS discussion. # Useful SPS/PPS discussion.
# https://copyprogramming.com/howto/including-sps-and-pps-in-a-raw-h264-track # https://copyprogramming.com/howto/including-sps-and-pps-in-a-raw-h264-track
# https://gitlab.com/mbunkus/mkvtoolnix/-/issues/2390 # https://gitlab.com/mbunkus/mkvtoolnix/-/issues/2390
@@ -2503,7 +2506,7 @@ def dumpPPM(pictures, prefix, temporaries):
def extractAllStreams(ffmpeg, ffprobe, inputFile, begin, end, streams, filesPrefix, nbFrames, def extractAllStreams(ffmpeg, ffprobe, inputFile, begin, end, streams, filesPrefix, nbFrames,
frameRate, width, height, temporaries, dumpMemFD=False): framerate, width, height, temporaries, dumpMemFD=False):
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# The command line for encoding only video track # The command line for encoding only video track
@@ -2576,7 +2579,7 @@ def extractAllStreams(ffmpeg, ffprobe, inputFile, begin, end, streams, filesPref
lseek(memfd, 0, SEEK_SET) lseek(memfd, 0, SEEK_SET)
set_inheritable(memfd, True) set_inheritable(memfd, True)
video_input_params.extend(['-framerate', f'{frameRate:f}', '-f', 'image2pipe', '-i', video_input_params.extend(['-framerate', f'{framerate:f}', '-f', 'image2pipe', '-i',
f'/proc/self/fd/{memfd:d}']) f'/proc/self/fd/{memfd:d}'])
video_codec_params.extend([f'-c:v:{video_id:d}', codec, f'-level:v:{video_id:d}', video_codec_params.extend([f'-c:v:{video_id:d}', codec, f'-level:v:{video_id:d}',
level, '-pix_fmt', pixel_format]) level, '-pix_fmt', pixel_format])
@@ -2727,7 +2730,7 @@ def extractAllStreams(ffmpeg, ffprobe, inputFile, begin, end, streams, filesPref
h264_ts_output.write('# timestamp format v2\n') h264_ts_output.write('# timestamp format v2\n')
ts = 0 ts = 0
for _ in range(0,nbFrames): for _ in range(0,nbFrames):
ts = ts+ceil(1000/frameRate) ts = ts+ceil(1000/framerate)
h264_ts_output.write(f'{ts:d}\n') h264_ts_output.write(f'{ts:d}\n')
h264_ts_output.flush() h264_ts_output.flush()
h264_ts_output.seek(0) h264_ts_output.seek(0)
@@ -3027,7 +3030,7 @@ def concatenateH264TSParts(h264TSParts, output):
first = False first = False
# TODO: finish this procedure # TODO: finish this procedure
def doCoarseProcessing(ffmpeg, ffprobe, mkvmerge, inputFile, begin, end, nbFrames, frameRate, def doCoarseProcessing(ffmpeg, ffprobe, mkvmerge, inputFile, begin, end, nbFrames, framerate,
filesPrefix, streams, width, height, temporaries, dumpMemFD): filesPrefix, streams, width, height, temporaries, dumpMemFD):
# pylint: disable=W0613 # pylint: disable=W0613
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -3135,36 +3138,36 @@ def main():
logger.error("Impossible to open %s", args.input_file) logger.error("Impossible to open %s", args.input_file)
exit(-1) exit(-1)
formatOfFile = getFormat(paths['ffprobe'], input_file) format_of_file = getFormat(paths['ffprobe'], input_file)
if formatOfFile is None: if format_of_file is None:
exit(-1) exit(-1)
duration = timedelta(seconds=float(formatOfFile['duration'])) duration = timedelta(seconds=float(format_of_file['duration']))
logger.info("Durée de l'enregistrement: %s", duration) logger.info("Durée de l'enregistrement: %s", duration)
if args.framerate is None: if args.framerate is None:
frameRate = getFrameRate(paths['ffprobe'], input_file) framerate = getFrameRate(paths['ffprobe'], input_file)
if frameRate is None: if framerate is None:
logger.error('Impossible to estimate frame rate !') logger.error('Impossible to estimate frame rate !')
exit(-1) exit(-1)
else: else:
frameRate = args.framerate framerate = args.framerate
logger.info('Frame rate: %.1f fps', frameRate) logger.info('Frame rate: %.1f fps', framerate)
found = False found = False
for f in SupportedFormat: for f in SupportedFormat:
if 'format_name' in formatOfFile: if 'format_name' in format_of_file:
if formatOfFile['format_name'] == str(f): if format_of_file['format_name'] == str(f):
found = True found = True
formatOfFile = f format_of_file = f
break break
if not found: if not found:
logger.error('Unsupported format of file') logger.error('Unsupported format of file')
if formatOfFile == SupportedFormat.TS: if format_of_file == SupportedFormat.TS:
logger.info("Converting TS to MP4 (to fix timestamps).") logger.info("Converting TS to MP4 (to fix timestamps).")
try: try:
with open(mp4filename, 'w+') as mp4: with open(mp4filename, 'w+') as mp4:
@@ -3184,7 +3187,7 @@ def main():
except IOError: except IOError:
logger.error('') logger.error('')
elif formatOfFile == SupportedFormat.MP4: elif format_of_file == SupportedFormat.MP4:
logger.info("Converting MP4 to MKV") logger.info("Converting MP4 to MKV")
try: try:
mkv = open(mkvfilename, 'w+') mkv = open(mkvfilename, 'w+')
@@ -3201,51 +3204,51 @@ def main():
streams = getStreams(paths['ffprobe'], mkv) streams = getStreams(paths['ffprobe'], mkv)
logger.debug('Streams: %s', streams) logger.debug('Streams: %s', streams)
mainVideo = None main_video = None
nbVideos = 0 nb_videos = 0
for stream in streams: for stream in streams:
if stream['codec_type'] == 'video': if stream['codec_type'] == 'video':
if stream['disposition']['default'] == 1: if stream['disposition']['default'] == 1:
mainVideo = stream main_video = stream
width = stream['width'] width = stream['width']
height = stream['height'] height = stream['height']
break break
nbVideos+=1 nb_videos+=1
if nbVideos == 1: if nb_videos == 1:
mainVideo = stream main_video = stream
width = stream['width'] width = stream['width']
height = stream['height'] height = stream['height']
else: else:
mainVideo = None main_video = None
if mainVideo is None: if main_video is None:
logger.error('Impossible to find main video stream.') logger.error('Impossible to find main video stream.')
exit(-1) exit(-1)
# We retrieve the main private codec data # We retrieve the main private codec data
_, mainCodecPrivateData = getCodecPrivateDataFromMKV(mkvinfo=paths['mkvinfo'], inputFile=mkv) _, main_codec_private_data = getCodecPrivateDataFromMKV(mkvinfo=paths['mkvinfo'], inputFile=mkv)
logger.debug('Main video stream has following private data: %s', logger.debug('Main video stream has following private data: %s',
hexdump.dump(mainCodecPrivateData, sep=':')) hexdump.dump(main_codec_private_data, sep=':'))
# We parse them # We parse them
mainAvcConfig = parseCodecPrivate(mainCodecPrivateData) main_avc_config = parseCodecPrivate(main_codec_private_data)
logger.debug('AVC configuration: %s', mainAvcConfig) logger.debug('AVC configuration: %s', main_avc_config)
# We check if the parse and dump operations are idempotent. # We check if the parse and dump operations are idempotent.
privateData = dumpCodecPrivateData(mainAvcConfig) privateData = dumpCodecPrivateData(main_avc_config)
logger.debug('Redump AVC configuration: %s', hexdump.dump(privateData, sep=':')) logger.debug('Redump AVC configuration: %s', hexdump.dump(privateData, sep=':'))
# In rare occasion, the PPS has trailing zeroes that do not seem to be related to useful data # In rare occasion, the PPS has trailing zeroes that do not seem to be related to useful data
# but they differ from the private data we generate that do not contain them. # but they differ from the private data we generate that do not contain them.
# In that case we try to redecode our own private data to see if both AVC configurations are # In that case we try to redecode our own private data to see if both AVC configurations are
# the same. # the same.
if mainCodecPrivateData != privateData: if main_codec_private_data != privateData:
logger.warning('Difference detected in bitstream !!') logger.warning('Difference detected in bitstream !!')
isoAvcConfig = parseCodecPrivate(privateData) isoAvcConfig = parseCodecPrivate(privateData)
logger.debug('Reread AVC configuration: %s', isoAvcConfig) logger.debug('Reread AVC configuration: %s', isoAvcConfig)
# If there exists a difference between our own reconstructed AVC configuration and the # If there exists a difference between our own reconstructed AVC configuration and the
# original one, we abandon # original one, we abandon
if isoAvcConfig != mainAvcConfig: if isoAvcConfig != main_avc_config:
logger.error('AVC configurations are different: %s\n%s\n', mainAvcConfig, isoAvcConfig) logger.error('AVC configurations are different: %s\n%s\n', main_avc_config, isoAvcConfig)
exit(-1) exit(-1)
# Pour chaque portion # Pour chaque portion
@@ -3328,7 +3331,7 @@ def main():
inputFile=mkv, begin=ts1, inputFile=mkv, begin=ts1,
end=headIFrameTS, end=headIFrameTS,
nbFrames=nbHeadFrames-1, nbFrames=nbHeadFrames-1,
frameRate=frameRate, framerate=framerate,
filesPrefix=f'part-{partnum:d}-head', filesPrefix=f'part-{partnum:d}-head',
streams=streams, width=width, streams=streams, width=width,
height=height, height=height,
@@ -3409,7 +3412,7 @@ def main():
ffprobe=paths['ffprobe'], ffprobe=paths['ffprobe'],
inputFile=mkv, begin=tailIFrameTS, inputFile=mkv, begin=tailIFrameTS,
end=ts2, nbFrames=nbTailFrames, end=ts2, nbFrames=nbTailFrames,
frameRate=frameRate, framerate=framerate,
filesPrefix=f'part-{partnum:d}-tail', filesPrefix=f'part-{partnum:d}-tail',
streams=streams, streams=streams,
width=width, height=height, width=width, height=height,
@@ -3438,8 +3441,8 @@ def main():
# When using coarse option there is a single AVC configuration. # When using coarse option there is a single AVC configuration.
for avcConfig in otherAvcConfigs: for avcConfig in otherAvcConfigs:
mainAvcConfig.merge(avcConfig) main_avc_config.merge(avcConfig)
logger.debug('Merged AVC configuration: %s', mainAvcConfig) logger.debug('Merged AVC configuration: %s', main_avc_config)
nbMKVParts = len(mkvparts) nbMKVParts = len(mkvparts)
if nbMKVParts > 0: if nbMKVParts > 0:
@@ -3491,7 +3494,7 @@ def main():
finalWithVideo = mergeMKVs(mkvmerge=paths['mkvmerge'], inputs=[fullH264, finalNoVideo], finalWithVideo = mergeMKVs(mkvmerge=paths['mkvmerge'], inputs=[fullH264, finalNoVideo],
outputName=finalWithVideoName, concatenate=False, outputName=finalWithVideoName, concatenate=False,
timestamps={0: fullH264TS}) timestamps={0: fullH264TS})
finalCodecPrivateData = dumpCodecPrivateData(mainAvcConfig) finalCodecPrivateData = dumpCodecPrivateData(main_avc_config)
logger.debug('Final codec private data: %s', hexdump.dump(finalCodecPrivateData, sep=':')) logger.debug('Final codec private data: %s', hexdump.dump(finalCodecPrivateData, sep=':'))
logger.info('Changing codec private data with the new one.') logger.info('Changing codec private data with the new one.')
changeCodecPrivateData(paths['mkvinfo'], finalWithVideo, finalCodecPrivateData) changeCodecPrivateData(paths['mkvinfo'], finalWithVideo, finalCodecPrivateData)