More linting: no more camelcase for function names.

This commit is contained in:
Frédéric Tronel
2025-10-29 14:54:42 +01:00
parent 367cb440d8
commit 40ca3e136b

View File

@@ -18,6 +18,7 @@ from dataclasses import dataclass, field
from math import floor, ceil, log
from io import BytesIO, TextIOWrapper
import json
from typing import IO
# Third party libraries
import coloredlogs
@@ -353,7 +354,7 @@ class SupportedFormat(IntEnum):
# -report -loglevel 0 -f null -
# Found codec private data using mkvinfo
def get_codec_private_data_from_mkv(mkvinfo_path, inputFile):
def get_codec_private_data_from_mkv(mkvinfo_path, inputFile: IO[bytes]):
logger = logging.getLogger(__name__)
infd = inputFile.fileno()
@@ -1581,7 +1582,8 @@ def get_avc_config_from_h264(inputFile):
return avcconfig
def getCodecPrivateDataFromH264(inputFile):
# Unused ?
def get_codec_private_data_from_h264(inputFile):
avcconfig = get_avc_config_from_h264(inputFile)
res = dump_codec_private_data(avcconfig)
@@ -1717,7 +1719,7 @@ def dump_codec_private_data(AVCDecoderConfiguration):
return res
def changeEBMLElementSize(inputFile, position, addendum):
def change_ebml_element_size(inputFile, position, addendum):
logger = logging.getLogger(__name__)
initial_position = position
@@ -1809,7 +1811,7 @@ def changeEBMLElementSize(inputFile, position, addendum):
# We return the potential increase in size of the file if the length field had to be increased.
return delta
def changeCodecPrivateData(mkvinfo, inputFile, codecData):
def change_codec_private_data(mkvinfo, inputFile, codecData):
logger = logging.getLogger(__name__)
infd = inputFile.fileno()
@@ -1878,9 +1880,9 @@ def changeCodecPrivateData(mkvinfo, inputFile, codecData):
# Changing an element can increase its size (in very rare case).
# In that case, we update the new delta that will be larger (because the element has
# been resized).
delta+=changeEBMLElementSize(inputFile, pos, delta)
delta+=change_ebml_element_size(inputFile, pos, delta)
def getFormat(ffprobe_path, inputFile):
def get_format(ffprobe_path:str, inputFile):
logger = logging.getLogger(__name__)
infd = inputFile.fileno()
@@ -1898,7 +1900,7 @@ def getFormat(ffprobe_path, inputFile):
return None
def getMovieDuration(ffprobe_path, inputFile):
def get_movie_duration(ffprobe_path:str, inputFile):
logger = logging.getLogger(__name__)
infd = inputFile.fileno()
@@ -1918,7 +1920,7 @@ def getMovieDuration(ffprobe_path, inputFile):
return None
# ffprobe -loglevel quiet -select_streams v:0 -show_entries stream=width,height -of json sample.ts
def getVideoDimensions(ffprobe_path, inputFile):
def get_video_dimensions(ffprobe_path, inputFile):
logger = logging.getLogger(__name__)
infd = inputFile.fileno()
@@ -1955,7 +1957,7 @@ def get_streams(ffprobe_path, inputFile):
return None
def withSubtitles(ffprobe_path, inputFile):
def with_subtitles(ffprobe_path, inputFile):
logger = logging.getLogger(__name__)
infd = inputFile.fileno()
@@ -2016,7 +2018,7 @@ def parse_timestamp(ts):
return ts
def parseTimeInterval(interval):
def parse_time_interval(interval):
logger = logging.getLogger(__name__)
interval_reg_exp = (r'^(?P<hour1>[0-9]{1,2}):(?P<minute1>[0-9]{1,2}):(?P<second1>[0-9]{1,2})'
@@ -2089,7 +2091,7 @@ def parseTimeInterval(interval):
return (ts1, ts2)
def compareTimeInterval(interval1, interval2):
def compare_time_interval(interval1, interval2):
ts11,ts12 = interval1
ts21,ts22 = interval2
@@ -2103,8 +2105,8 @@ def compareTimeInterval(interval1, interval2):
def ffmpeg_convert(ffmpeg_path, ffprobe_path, inputFile, inputFormat, outputFile, outputFormat, duration):
logger = logging.getLogger(__name__)
width, height = getVideoDimensions(ffprobe_path, inputFile)
subtitles = withSubtitles(ffprobe_path, inputFile)
width, height = get_video_dimensions(ffprobe_path, inputFile)
subtitles = with_subtitles(ffprobe_path, inputFile)
infd = inputFile.fileno()
outfd = outputFile.fileno()
@@ -2144,7 +2146,7 @@ def ffmpeg_convert(ffmpeg_path, ffprobe_path, inputFile, inputFormat, outputFile
if status != 0:
logger.error('Conversion failed with status code: %d', status)
def getTSFrame(frame):
def get_ts_frame(frame):
logger = logging.getLogger(__name__)
if 'pts_time' in frame:
@@ -2158,7 +2160,7 @@ def getTSFrame(frame):
ts = timedelta(seconds=pts_time)
return ts
def getPacketDuration(packet):
def get_packet_duration(packet):
logger = logging.getLogger(__name__)
if 'duration' in packet:
@@ -2171,7 +2173,7 @@ def getPacketDuration(packet):
return duration
def getFramesInStream(ffprobe_path, inputFile, begin, end, streamKind, subStreamId=0):
def get_frames_in_stream(ffprobe_path, inputFile, begin, end, streamKind, subStreamId=0):
logger = logging.getLogger(__name__)
infd = inputFile.fileno()
set_inheritable(infd, True)
@@ -2194,7 +2196,7 @@ def getFramesInStream(ffprobe_path, inputFile, begin, end, streamKind, subStream
if 'frames' in frames:
frames = frames['frames']
for frame in frames:
ts = getTSFrame(frame)
ts = get_ts_frame(frame)
if ts is None:
return None
if begin <= ts <= end:
@@ -2210,7 +2212,7 @@ def getFramesInStream(ffprobe_path, inputFile, begin, end, streamKind, subStream
return None
# TODO: Finish implementation of this function and use it.
def getNearestIDRFrame(ffprobe_path, inputFile, timestamp, before=True, delta=timedelta(seconds=2)):
def get_nearest_idr_frame(ffprobe_path, inputFile, timestamp, before=True, delta=timedelta(seconds=2)):
# pylint: disable=W0613
logger = logging.getLogger(__name__)
@@ -2241,7 +2243,7 @@ def getNearestIDRFrame(ffprobe_path, inputFile, timestamp, before=True, delta=ti
if 'frames' in frames:
frames = frames['frames']
for frame in frames:
ts = getTSFrame(frame)
ts = get_ts_frame(frame)
if ts is None:
return None
if tbegin <= ts <= tend:
@@ -2251,7 +2253,7 @@ def getNearestIDRFrame(ffprobe_path, inputFile, timestamp, before=True, delta=ti
tbegin, tend)
return None
def getNearestIFrame(ffprobe_path, inputFile, timestamp, before=True, deltaMax=timedelta(seconds=15)):
def get_nearest_iframe(ffprobe_path, inputFile, timestamp, before=True, deltaMax=timedelta(seconds=15)):
logger = logging.getLogger(__name__)
infd = inputFile.fileno()
@@ -2274,7 +2276,7 @@ def getNearestIFrame(ffprobe_path, inputFile, timestamp, before=True, deltaMax=t
tbegin = zero
logger.debug('Looking for an iframe in [%s, %s]', tbegin, tend)
frames = getFramesInStream(ffprobe_path, inputFile=inputFile, begin=tbegin, end=tend,
frames = get_frames_in_stream(ffprobe_path, inputFile=inputFile, begin=tbegin, end=tend,
streamKind='v')
if frames is None:
logger.debug('Found no frame in [%s, %s]', tbegin, tend)
@@ -2288,7 +2290,7 @@ def getNearestIFrame(ffprobe_path, inputFile, timestamp, before=True, deltaMax=t
found = False
for frame in iframes:
ts = getTSFrame(frame)
ts = get_ts_frame(frame)
if ts is None:
logger.warning('I-frame with no timestamp: %s', frame)
continue
@@ -2309,10 +2311,10 @@ def getNearestIFrame(ffprobe_path, inputFile, timestamp, before=True, deltaMax=t
continue
if iframe is not None:
its = getTSFrame(iframe)
its = get_ts_frame(iframe)
nb_frames = 0
for frame in frames:
ts = getTSFrame(frame)
ts = get_ts_frame(frame)
if ts is None:
logger.warning('Frame without timestamp: %s', frame)
continue
@@ -2331,7 +2333,7 @@ def getNearestIFrame(ffprobe_path, inputFile, timestamp, before=True, deltaMax=t
return(nb_frames, iframe)
def extractMKVPart(mkvmerge_path, inputFile, outputFile, begin, end):
def extract_mkv_part(mkvmerge_path, inputFile, outputFile, begin, end):
logger = logging.getLogger(__name__)
logger.info('Extract video between I-frames at %s and %s', begin,end)
@@ -2372,7 +2374,7 @@ def extractMKVPart(mkvmerge_path, inputFile, outputFile, begin, end):
elif status == 2:
logger.error('Extraction returns errors')
def extractPictures(ffmpeg_path, inputFile, begin, nbFrames, width=640, height=480):
def extract_pictures(ffmpeg_path, inputFile, begin, nbFrames, width=640, height=480):
logger = logging.getLogger(__name__)
infd = inputFile.fileno()
@@ -2408,7 +2410,7 @@ def extractPictures(ffmpeg_path, inputFile, begin, nbFrames, width=640, height=4
lseek(outfd, 0, SEEK_SET)
return images, outfd
def extractSound(ffmpeg_path, inputFile, begin, outputFileName, packet_duration, subChannel=0,
def extract_sound(ffmpeg_path, inputFile, begin, outputFileName, packet_duration, subChannel=0,
nb_packets=0, sample_rate=48000, nb_channels=2):
logger = logging.getLogger(__name__)
@@ -2443,7 +2445,7 @@ def extractSound(ffmpeg_path, inputFile, begin, outputFileName, packet_duration,
return sound, outfd
def dumpPPM(pictures, prefix, temporaries):
def dump_ppm(pictures, prefix, temporaries):
logger = logging.getLogger(__name__)
# "P6\nWIDTH HEIGHT\n255\n"
@@ -2490,7 +2492,7 @@ def dumpPPM(pictures, prefix, temporaries):
logger.error('Impossible to create file: %s', filename)
def extractAllStreams(ffmpeg_path, ffprobe_path, inputFile, begin, end, streams, filesPrefix, nbFrames,
def extract_all_streams(ffmpeg_path, ffprobe_path, inputFile, begin, end, streams, filesPrefix, nbFrames,
framerate, width, height, temporaries, dumpMemFD=False):
logger = logging.getLogger(__name__)
@@ -2550,7 +2552,7 @@ def extractAllStreams(ffmpeg_path, ffprobe_path, inputFile, begin, end, streams,
logger.warning('Missing DAR adjustment for: %s', dar)
logger.warning('Missing treatment for chroma location: %s', chroma_location)
codec = stream['codec_name']
images_bytes, memfd = extractPictures(ffmpeg_path, inputFile=inputFile, begin=begin,
images_bytes, memfd = extract_pictures(ffmpeg_path, inputFile=inputFile, begin=begin,
nbFrames=nbFrames, width=width, height=height)
if images_bytes is None:
logger.error('Impossible to extract picture from video stream.')
@@ -2558,7 +2560,7 @@ def extractAllStreams(ffmpeg_path, ffprobe_path, inputFile, begin, end, streams,
memfds.append(memfd)
if dumpMemFD:
dumpPPM(images_bytes, f'{filesPrefix}-{video_id:d}', temporaries)
dump_ppm(images_bytes, f'{filesPrefix}-{video_id:d}', temporaries)
# We rewind to zero the memory file descriptor
lseek(memfd, 0, SEEK_SET)
@@ -2587,12 +2589,12 @@ def extractAllStreams(ffmpeg_path, ffprobe_path, inputFile, begin, end, streams,
if 'language' in stream['tags']:
generic_codec_params.extend([f'-metadata:s:a:{audio_id:d}',
f"language={stream['tags']['language']}"])
packets = getFramesInStream(ffprobe_path, inputFile=inputFile, begin=begin, end=end,
packets = get_frames_in_stream(ffprobe_path, inputFile=inputFile, begin=begin, end=end,
streamKind='a', subStreamId=audio_id)
nb_packets = len(packets)
logger.debug("Found %d packets to be extracted from audio track.", nb_packets)
if nb_packets > 0:
packet_duration = getPacketDuration(packets[0])
packet_duration = get_packet_duration(packets[0])
if packet_duration is None:
return None
else:
@@ -2601,7 +2603,7 @@ def extractAllStreams(ffmpeg_path, ffprobe_path, inputFile, begin, end, streams,
logger.info("Extracting %d packets of audio stream: a:%d" , nb_packets, audio_id)
tmpname = f'{filesPrefix}-{audio_id:d}.pcm'
sound_bytes, memfd = extractSound(ffmpeg_path=ffmpeg_path, inputFile=inputFile, begin=begin,
sound_bytes, memfd = extract_sound(ffmpeg_path=ffmpeg_path, inputFile=inputFile, begin=begin,
nb_packets=nb_packets,
packet_duration=packet_duration,
outputFileName=tmpname, sample_rate=sample_rate,
@@ -2808,7 +2810,7 @@ def merge_mkvs(mkvmerge_path, inputs, outputName, concatenate=True, timestamps=N
return out
def findSubtitlesTracks(ffprobe_path, inputFile):
def find_subtitles_tracks(ffprobe_path:str, inputFile):
logger = logging.getLogger(__name__)
infd = inputFile.fileno()
@@ -2873,7 +2875,7 @@ def extract_track_from_mkv(mkvextract_path, inputFile, index, outputFile, timest
else:
logger.info('Track %d was succesfully extracted.', index)
def removeVideoTracksFromMKV(mkvmerge_path, inputFile, outputFile):
def remove_video_tracks_from_mkv(mkvmerge_path, inputFile, outputFile):
logger = logging.getLogger(__name__)
outfd = outputFile.fileno()
@@ -2909,7 +2911,7 @@ def removeVideoTracksFromMKV(mkvmerge_path, inputFile, outputFile):
else:
logger.info('Video tracks were succesfully extracted.')
def remuxSRTSubtitles(mkvmerge_path, inputFile, outputFileName, subtitles):
def remux_srt_subtitles(mkvmerge_path, inputFile, outputFileName, subtitles):
logger = logging.getLogger(__name__)
try:
@@ -2957,7 +2959,7 @@ def remuxSRTSubtitles(mkvmerge_path, inputFile, outputFileName, subtitles):
elif status == 2:
logger.error('Remux subtitles returns errors')
def concatenateH264Parts(h264parts, output):
def concatenate_h264_parts(h264parts, output):
logger = logging.getLogger(__name__)
total_length = 0
@@ -2984,7 +2986,7 @@ def concatenateH264Parts(h264parts, output):
pb.update(nb_bytes)
pos += nb_bytes
def concatenateH264TSParts(h264TSParts, output):
def concatenate_h264_ts_parts(h264TSParts, output):
logger = logging.getLogger(__name__)
header = '# timestamp format v2\n'
@@ -3029,7 +3031,7 @@ def do_coarse_processing(ffmpeg_path, ffprobe_path, mkvmerge_path, inputFile, be
exit(-1)
# Extract internal part of MKV
extractMKVPart(mkvmerge_path=mkvmerge_path, inputFile=inputFile, outputFile=internal_mkv, begin=begin,
extract_mkv_part(mkvmerge_path=mkvmerge_path, inputFile=inputFile, outputFile=internal_mkv, begin=begin,
end=end)
temporaries.append(internal_mkv)
@@ -3091,14 +3093,14 @@ def main():
parts=[]
# Parse each interval
for interval in intervals:
ts1, ts2 = parseTimeInterval(interval)
ts1, ts2 = parse_time_interval(interval)
if ts1 is None or ts2 is None:
logger.error("Illegal time interval: %s", interval)
exit(-1)
parts.append((ts1,ts2))
# Sort intervals
parts.sort(key=cmp_to_key(compareTimeInterval))
parts.sort(key=cmp_to_key(compare_time_interval))
# Check that no intervals are overlapping
prevts = timedelta(0)
@@ -3122,7 +3124,7 @@ def main():
logger.error("Impossible to open %s", args.input_file)
exit(-1)
format_of_file = getFormat(paths['ffprobe'], input_file)
format_of_file = get_format(paths['ffprobe'], input_file)
if format_of_file is None:
exit(-1)
@@ -3262,14 +3264,14 @@ def main():
partnum = partnum + 1
# Get the nearest I-frame whose timestamp is greater or equal to the beginning.
head_frames = getNearestIFrame(paths['ffprobe'], mkv, ts1, before=False)
head_frames = get_nearest_iframe(paths['ffprobe'], mkv, ts1, before=False)
if head_frames is None:
logger.error('Impossible to retrieve I-frame')
exit(-1)
# Get the nearest I-frame whose timestamp ...
# TODO: wrong here ...
tail_frames = getNearestIFrame(paths['ffprobe'], mkv, ts2, before=True)
tail_frames = get_nearest_iframe(paths['ffprobe'], mkv, ts2, before=True)
if tail_frames is None:
logger.error('Impossible to retrieve I-frame')
exit(-1)
@@ -3282,10 +3284,10 @@ def main():
logger.info("Found %d frames between last I-frame and end of current part",
nb_tail_frames)
head_iframe_ts = getTSFrame(head_iframe)
head_iframe_ts = get_ts_frame(head_iframe)
if head_iframe_ts is None:
exit(-1)
tail_iframe_ts = getTSFrame(tail_iframe)
tail_iframe_ts = get_ts_frame(tail_iframe)
if tail_iframe_ts is None:
exit(-1)
@@ -3312,7 +3314,7 @@ def main():
if (not args.coarse) and (nb_head_frames > args.threshold):
# We extract all frames between the beginning upto the frame that immediately preceeds
# the I-frame.
h264_head, h264_head_ts, mkv_head = extractAllStreams(ffmpeg_path=paths['ffmpeg'],
h264_head, h264_head_ts, mkv_head = extract_all_streams(ffmpeg_path=paths['ffmpeg'],
ffprobe_path=paths['ffprobe'],
inputFile=mkv, begin=ts1,
end=head_iframe_ts,
@@ -3370,7 +3372,7 @@ def main():
# logger.info('Merge header, middle and trailer subpart into: %s' % internal_mkv_name)
# Extract internal part of MKV
extractMKVPart(mkvmerge_path=paths['mkvmerge'], inputFile=mkv, outputFile=internal_mkv,
extract_mkv_part(mkvmerge_path=paths['mkvmerge'], inputFile=mkv, outputFile=internal_mkv,
begin=head_iframe_ts, end=tail_iframe_ts)
# Extract video stream of internal part as a raw H264 and its timestamps.
@@ -3380,7 +3382,7 @@ def main():
# Remove video track from internal part of MKV
logger.info('Remove video track from %s', internal_mkv_name)
removeVideoTracksFromMKV(mkvmerge_path=paths['mkvmerge'], inputFile=internal_mkv,
remove_video_tracks_from_mkv(mkvmerge_path=paths['mkvmerge'], inputFile=internal_mkv,
outputFile=internal_novideo_mkv)
temporaries.append(internal_mkv)
@@ -3394,7 +3396,7 @@ def main():
if (not args.coarse) and (nb_tail_frames > args.threshold):
# We extract all frames between the I-frame (including it) upto the end.
h264_tail, h264_tail_ts, mkv_tail = extractAllStreams(ffmpeg_path=paths['ffmpeg'],
h264_tail, h264_tail_ts, mkv_tail = extract_all_streams(ffmpeg_path=paths['ffmpeg'],
ffprobe_path=paths['ffprobe'],
inputFile=mkv, begin=tail_iframe_ts,
end=ts2, nbFrames=nb_tail_frames,
@@ -3439,7 +3441,7 @@ def main():
exit(-1)
logger.info('Merging all H264 tracks')
concatenateH264Parts(h264parts=h264parts, output=full_h264)
concatenate_h264_parts(h264parts=h264parts, output=full_h264)
temporaries.append(full_h264)
try:
@@ -3449,7 +3451,7 @@ def main():
exit(-1)
logger.info('Merging H264 timestamps')
concatenateH264TSParts(h264TSParts=h264_ts, output=full_h264_ts)
concatenate_h264_ts_parts(h264TSParts=h264_ts, output=full_h264_ts)
temporaries.append(full_h264_ts)
final_novideo_name = f'{basename}-novideo.mkv'
@@ -3483,7 +3485,7 @@ def main():
final_codec_private_data = dump_codec_private_data(main_avc_config)
logger.debug('Final codec private data: %s', hexdump.dump(final_codec_private_data, sep=':'))
logger.info('Changing codec private data with the new one.')
changeCodecPrivateData(paths['mkvinfo'], final_with_video, final_codec_private_data)
change_codec_private_data(paths['mkvinfo'], final_with_video, final_codec_private_data)
if args.srt:
if not all_optional_tools:
@@ -3492,11 +3494,11 @@ def main():
else:
# Final cut is not any more the final step.
temporaries.append(final_with_video)
duration = getMovieDuration(paths['ffprobe'], final_with_video)
duration = get_movie_duration(paths['ffprobe'], final_with_video)
supported_langs = get_tesseract_supported_lang(paths['tesseract'])
logger.info('Supported lang: %s', supported_langs)
logger.info('Find subtitles tracks and language.')
subtitles = findSubtitlesTracks(paths['ffprobe'], final_with_video)
subtitles = find_subtitles_tracks(paths['ffprobe'], final_with_video)
logger.info(subtitles)
sts = {}
for subtitle in subtitles:
@@ -3539,7 +3541,7 @@ def main():
logger.info(ocr)
# Remux SRT subtitles
remuxSRTSubtitles(paths['mkvmerge'], final_with_video, args.outputFile, ocr)
remux_srt_subtitles(paths['mkvmerge'], final_with_video, args.outputFile, ocr)
else:
copyfile(final_with_video_name, args.outputFile)
else: