From 40ca3e136b67936c612874efd1652e098389e0ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Tronel?= Date: Wed, 29 Oct 2025 14:54:42 +0100 Subject: [PATCH] More linting: no more camelcase for function names. --- removeads.py | 116 ++++++++++++++++++++++++++------------------------- 1 file changed, 59 insertions(+), 57 deletions(-) diff --git a/removeads.py b/removeads.py index 74f151d..a47ea13 100755 --- a/removeads.py +++ b/removeads.py @@ -18,6 +18,7 @@ from dataclasses import dataclass, field from math import floor, ceil, log from io import BytesIO, TextIOWrapper import json +from typing import IO # Third party libraries import coloredlogs @@ -353,7 +354,7 @@ class SupportedFormat(IntEnum): # -report -loglevel 0 -f null - # Found codec private data using mkvinfo -def get_codec_private_data_from_mkv(mkvinfo_path, inputFile): +def get_codec_private_data_from_mkv(mkvinfo_path, inputFile: IO[bytes]): logger = logging.getLogger(__name__) infd = inputFile.fileno() @@ -1581,7 +1582,8 @@ def get_avc_config_from_h264(inputFile): return avcconfig -def getCodecPrivateDataFromH264(inputFile): +# Unused ? +def get_codec_private_data_from_h264(inputFile): avcconfig = get_avc_config_from_h264(inputFile) res = dump_codec_private_data(avcconfig) @@ -1717,7 +1719,7 @@ def dump_codec_private_data(AVCDecoderConfiguration): return res -def changeEBMLElementSize(inputFile, position, addendum): +def change_ebml_element_size(inputFile, position, addendum): logger = logging.getLogger(__name__) initial_position = position @@ -1809,7 +1811,7 @@ def changeEBMLElementSize(inputFile, position, addendum): # We return the potential increase in size of the file if the length field had to be increased. return delta -def changeCodecPrivateData(mkvinfo, inputFile, codecData): +def change_codec_private_data(mkvinfo, inputFile, codecData): logger = logging.getLogger(__name__) infd = inputFile.fileno() @@ -1878,9 +1880,9 @@ def changeCodecPrivateData(mkvinfo, inputFile, codecData): # Changing an element can increase its size (in very rare case). # In that case, we update the new delta that will be larger (because the element has # been resized). - delta+=changeEBMLElementSize(inputFile, pos, delta) + delta+=change_ebml_element_size(inputFile, pos, delta) -def getFormat(ffprobe_path, inputFile): +def get_format(ffprobe_path:str, inputFile): logger = logging.getLogger(__name__) infd = inputFile.fileno() @@ -1898,7 +1900,7 @@ def getFormat(ffprobe_path, inputFile): return None -def getMovieDuration(ffprobe_path, inputFile): +def get_movie_duration(ffprobe_path:str, inputFile): logger = logging.getLogger(__name__) infd = inputFile.fileno() @@ -1918,7 +1920,7 @@ def getMovieDuration(ffprobe_path, inputFile): return None # ffprobe -loglevel quiet -select_streams v:0 -show_entries stream=width,height -of json sample.ts -def getVideoDimensions(ffprobe_path, inputFile): +def get_video_dimensions(ffprobe_path, inputFile): logger = logging.getLogger(__name__) infd = inputFile.fileno() @@ -1955,7 +1957,7 @@ def get_streams(ffprobe_path, inputFile): return None -def withSubtitles(ffprobe_path, inputFile): +def with_subtitles(ffprobe_path, inputFile): logger = logging.getLogger(__name__) infd = inputFile.fileno() @@ -2016,7 +2018,7 @@ def parse_timestamp(ts): return ts -def parseTimeInterval(interval): +def parse_time_interval(interval): logger = logging.getLogger(__name__) interval_reg_exp = (r'^(?P[0-9]{1,2}):(?P[0-9]{1,2}):(?P[0-9]{1,2})' @@ -2089,7 +2091,7 @@ def parseTimeInterval(interval): return (ts1, ts2) -def compareTimeInterval(interval1, interval2): +def compare_time_interval(interval1, interval2): ts11,ts12 = interval1 ts21,ts22 = interval2 @@ -2103,8 +2105,8 @@ def compareTimeInterval(interval1, interval2): def ffmpeg_convert(ffmpeg_path, ffprobe_path, inputFile, inputFormat, outputFile, outputFormat, duration): logger = logging.getLogger(__name__) - width, height = getVideoDimensions(ffprobe_path, inputFile) - subtitles = withSubtitles(ffprobe_path, inputFile) + width, height = get_video_dimensions(ffprobe_path, inputFile) + subtitles = with_subtitles(ffprobe_path, inputFile) infd = inputFile.fileno() outfd = outputFile.fileno() @@ -2144,7 +2146,7 @@ def ffmpeg_convert(ffmpeg_path, ffprobe_path, inputFile, inputFormat, outputFile if status != 0: logger.error('Conversion failed with status code: %d', status) -def getTSFrame(frame): +def get_ts_frame(frame): logger = logging.getLogger(__name__) if 'pts_time' in frame: @@ -2158,7 +2160,7 @@ def getTSFrame(frame): ts = timedelta(seconds=pts_time) return ts -def getPacketDuration(packet): +def get_packet_duration(packet): logger = logging.getLogger(__name__) if 'duration' in packet: @@ -2171,7 +2173,7 @@ def getPacketDuration(packet): return duration -def getFramesInStream(ffprobe_path, inputFile, begin, end, streamKind, subStreamId=0): +def get_frames_in_stream(ffprobe_path, inputFile, begin, end, streamKind, subStreamId=0): logger = logging.getLogger(__name__) infd = inputFile.fileno() set_inheritable(infd, True) @@ -2194,7 +2196,7 @@ def getFramesInStream(ffprobe_path, inputFile, begin, end, streamKind, subStream if 'frames' in frames: frames = frames['frames'] for frame in frames: - ts = getTSFrame(frame) + ts = get_ts_frame(frame) if ts is None: return None if begin <= ts <= end: @@ -2210,7 +2212,7 @@ def getFramesInStream(ffprobe_path, inputFile, begin, end, streamKind, subStream return None # TODO: Finish implementation of this function and use it. -def getNearestIDRFrame(ffprobe_path, inputFile, timestamp, before=True, delta=timedelta(seconds=2)): +def get_nearest_idr_frame(ffprobe_path, inputFile, timestamp, before=True, delta=timedelta(seconds=2)): # pylint: disable=W0613 logger = logging.getLogger(__name__) @@ -2241,7 +2243,7 @@ def getNearestIDRFrame(ffprobe_path, inputFile, timestamp, before=True, delta=ti if 'frames' in frames: frames = frames['frames'] for frame in frames: - ts = getTSFrame(frame) + ts = get_ts_frame(frame) if ts is None: return None if tbegin <= ts <= tend: @@ -2251,7 +2253,7 @@ def getNearestIDRFrame(ffprobe_path, inputFile, timestamp, before=True, delta=ti tbegin, tend) return None -def getNearestIFrame(ffprobe_path, inputFile, timestamp, before=True, deltaMax=timedelta(seconds=15)): +def get_nearest_iframe(ffprobe_path, inputFile, timestamp, before=True, deltaMax=timedelta(seconds=15)): logger = logging.getLogger(__name__) infd = inputFile.fileno() @@ -2274,7 +2276,7 @@ def getNearestIFrame(ffprobe_path, inputFile, timestamp, before=True, deltaMax=t tbegin = zero logger.debug('Looking for an iframe in [%s, %s]', tbegin, tend) - frames = getFramesInStream(ffprobe_path, inputFile=inputFile, begin=tbegin, end=tend, + frames = get_frames_in_stream(ffprobe_path, inputFile=inputFile, begin=tbegin, end=tend, streamKind='v') if frames is None: logger.debug('Found no frame in [%s, %s]', tbegin, tend) @@ -2288,7 +2290,7 @@ def getNearestIFrame(ffprobe_path, inputFile, timestamp, before=True, deltaMax=t found = False for frame in iframes: - ts = getTSFrame(frame) + ts = get_ts_frame(frame) if ts is None: logger.warning('I-frame with no timestamp: %s', frame) continue @@ -2309,10 +2311,10 @@ def getNearestIFrame(ffprobe_path, inputFile, timestamp, before=True, deltaMax=t continue if iframe is not None: - its = getTSFrame(iframe) + its = get_ts_frame(iframe) nb_frames = 0 for frame in frames: - ts = getTSFrame(frame) + ts = get_ts_frame(frame) if ts is None: logger.warning('Frame without timestamp: %s', frame) continue @@ -2331,7 +2333,7 @@ def getNearestIFrame(ffprobe_path, inputFile, timestamp, before=True, deltaMax=t return(nb_frames, iframe) -def extractMKVPart(mkvmerge_path, inputFile, outputFile, begin, end): +def extract_mkv_part(mkvmerge_path, inputFile, outputFile, begin, end): logger = logging.getLogger(__name__) logger.info('Extract video between I-frames at %s and %s', begin,end) @@ -2372,7 +2374,7 @@ def extractMKVPart(mkvmerge_path, inputFile, outputFile, begin, end): elif status == 2: logger.error('Extraction returns errors') -def extractPictures(ffmpeg_path, inputFile, begin, nbFrames, width=640, height=480): +def extract_pictures(ffmpeg_path, inputFile, begin, nbFrames, width=640, height=480): logger = logging.getLogger(__name__) infd = inputFile.fileno() @@ -2408,7 +2410,7 @@ def extractPictures(ffmpeg_path, inputFile, begin, nbFrames, width=640, height=4 lseek(outfd, 0, SEEK_SET) return images, outfd -def extractSound(ffmpeg_path, inputFile, begin, outputFileName, packet_duration, subChannel=0, +def extract_sound(ffmpeg_path, inputFile, begin, outputFileName, packet_duration, subChannel=0, nb_packets=0, sample_rate=48000, nb_channels=2): logger = logging.getLogger(__name__) @@ -2443,7 +2445,7 @@ def extractSound(ffmpeg_path, inputFile, begin, outputFileName, packet_duration, return sound, outfd -def dumpPPM(pictures, prefix, temporaries): +def dump_ppm(pictures, prefix, temporaries): logger = logging.getLogger(__name__) # "P6\nWIDTH HEIGHT\n255\n" @@ -2490,7 +2492,7 @@ def dumpPPM(pictures, prefix, temporaries): logger.error('Impossible to create file: %s', filename) -def extractAllStreams(ffmpeg_path, ffprobe_path, inputFile, begin, end, streams, filesPrefix, nbFrames, +def extract_all_streams(ffmpeg_path, ffprobe_path, inputFile, begin, end, streams, filesPrefix, nbFrames, framerate, width, height, temporaries, dumpMemFD=False): logger = logging.getLogger(__name__) @@ -2550,7 +2552,7 @@ def extractAllStreams(ffmpeg_path, ffprobe_path, inputFile, begin, end, streams, logger.warning('Missing DAR adjustment for: %s', dar) logger.warning('Missing treatment for chroma location: %s', chroma_location) codec = stream['codec_name'] - images_bytes, memfd = extractPictures(ffmpeg_path, inputFile=inputFile, begin=begin, + images_bytes, memfd = extract_pictures(ffmpeg_path, inputFile=inputFile, begin=begin, nbFrames=nbFrames, width=width, height=height) if images_bytes is None: logger.error('Impossible to extract picture from video stream.') @@ -2558,7 +2560,7 @@ def extractAllStreams(ffmpeg_path, ffprobe_path, inputFile, begin, end, streams, memfds.append(memfd) if dumpMemFD: - dumpPPM(images_bytes, f'{filesPrefix}-{video_id:d}', temporaries) + dump_ppm(images_bytes, f'{filesPrefix}-{video_id:d}', temporaries) # We rewind to zero the memory file descriptor lseek(memfd, 0, SEEK_SET) @@ -2587,12 +2589,12 @@ def extractAllStreams(ffmpeg_path, ffprobe_path, inputFile, begin, end, streams, if 'language' in stream['tags']: generic_codec_params.extend([f'-metadata:s:a:{audio_id:d}', f"language={stream['tags']['language']}"]) - packets = getFramesInStream(ffprobe_path, inputFile=inputFile, begin=begin, end=end, + packets = get_frames_in_stream(ffprobe_path, inputFile=inputFile, begin=begin, end=end, streamKind='a', subStreamId=audio_id) nb_packets = len(packets) logger.debug("Found %d packets to be extracted from audio track.", nb_packets) if nb_packets > 0: - packet_duration = getPacketDuration(packets[0]) + packet_duration = get_packet_duration(packets[0]) if packet_duration is None: return None else: @@ -2601,7 +2603,7 @@ def extractAllStreams(ffmpeg_path, ffprobe_path, inputFile, begin, end, streams, logger.info("Extracting %d packets of audio stream: a:%d" , nb_packets, audio_id) tmpname = f'{filesPrefix}-{audio_id:d}.pcm' - sound_bytes, memfd = extractSound(ffmpeg_path=ffmpeg_path, inputFile=inputFile, begin=begin, + sound_bytes, memfd = extract_sound(ffmpeg_path=ffmpeg_path, inputFile=inputFile, begin=begin, nb_packets=nb_packets, packet_duration=packet_duration, outputFileName=tmpname, sample_rate=sample_rate, @@ -2808,7 +2810,7 @@ def merge_mkvs(mkvmerge_path, inputs, outputName, concatenate=True, timestamps=N return out -def findSubtitlesTracks(ffprobe_path, inputFile): +def find_subtitles_tracks(ffprobe_path:str, inputFile): logger = logging.getLogger(__name__) infd = inputFile.fileno() @@ -2873,7 +2875,7 @@ def extract_track_from_mkv(mkvextract_path, inputFile, index, outputFile, timest else: logger.info('Track %d was succesfully extracted.', index) -def removeVideoTracksFromMKV(mkvmerge_path, inputFile, outputFile): +def remove_video_tracks_from_mkv(mkvmerge_path, inputFile, outputFile): logger = logging.getLogger(__name__) outfd = outputFile.fileno() @@ -2909,7 +2911,7 @@ def removeVideoTracksFromMKV(mkvmerge_path, inputFile, outputFile): else: logger.info('Video tracks were succesfully extracted.') -def remuxSRTSubtitles(mkvmerge_path, inputFile, outputFileName, subtitles): +def remux_srt_subtitles(mkvmerge_path, inputFile, outputFileName, subtitles): logger = logging.getLogger(__name__) try: @@ -2957,7 +2959,7 @@ def remuxSRTSubtitles(mkvmerge_path, inputFile, outputFileName, subtitles): elif status == 2: logger.error('Remux subtitles returns errors') -def concatenateH264Parts(h264parts, output): +def concatenate_h264_parts(h264parts, output): logger = logging.getLogger(__name__) total_length = 0 @@ -2984,7 +2986,7 @@ def concatenateH264Parts(h264parts, output): pb.update(nb_bytes) pos += nb_bytes -def concatenateH264TSParts(h264TSParts, output): +def concatenate_h264_ts_parts(h264TSParts, output): logger = logging.getLogger(__name__) header = '# timestamp format v2\n' @@ -3029,7 +3031,7 @@ def do_coarse_processing(ffmpeg_path, ffprobe_path, mkvmerge_path, inputFile, be exit(-1) # Extract internal part of MKV - extractMKVPart(mkvmerge_path=mkvmerge_path, inputFile=inputFile, outputFile=internal_mkv, begin=begin, + extract_mkv_part(mkvmerge_path=mkvmerge_path, inputFile=inputFile, outputFile=internal_mkv, begin=begin, end=end) temporaries.append(internal_mkv) @@ -3091,14 +3093,14 @@ def main(): parts=[] # Parse each interval for interval in intervals: - ts1, ts2 = parseTimeInterval(interval) + ts1, ts2 = parse_time_interval(interval) if ts1 is None or ts2 is None: logger.error("Illegal time interval: %s", interval) exit(-1) parts.append((ts1,ts2)) # Sort intervals - parts.sort(key=cmp_to_key(compareTimeInterval)) + parts.sort(key=cmp_to_key(compare_time_interval)) # Check that no intervals are overlapping prevts = timedelta(0) @@ -3122,7 +3124,7 @@ def main(): logger.error("Impossible to open %s", args.input_file) exit(-1) - format_of_file = getFormat(paths['ffprobe'], input_file) + format_of_file = get_format(paths['ffprobe'], input_file) if format_of_file is None: exit(-1) @@ -3262,14 +3264,14 @@ def main(): partnum = partnum + 1 # Get the nearest I-frame whose timestamp is greater or equal to the beginning. - head_frames = getNearestIFrame(paths['ffprobe'], mkv, ts1, before=False) + head_frames = get_nearest_iframe(paths['ffprobe'], mkv, ts1, before=False) if head_frames is None: logger.error('Impossible to retrieve I-frame') exit(-1) # Get the nearest I-frame whose timestamp ... # TODO: wrong here ... - tail_frames = getNearestIFrame(paths['ffprobe'], mkv, ts2, before=True) + tail_frames = get_nearest_iframe(paths['ffprobe'], mkv, ts2, before=True) if tail_frames is None: logger.error('Impossible to retrieve I-frame') exit(-1) @@ -3282,10 +3284,10 @@ def main(): logger.info("Found %d frames between last I-frame and end of current part", nb_tail_frames) - head_iframe_ts = getTSFrame(head_iframe) + head_iframe_ts = get_ts_frame(head_iframe) if head_iframe_ts is None: exit(-1) - tail_iframe_ts = getTSFrame(tail_iframe) + tail_iframe_ts = get_ts_frame(tail_iframe) if tail_iframe_ts is None: exit(-1) @@ -3312,7 +3314,7 @@ def main(): if (not args.coarse) and (nb_head_frames > args.threshold): # We extract all frames between the beginning upto the frame that immediately preceeds # the I-frame. - h264_head, h264_head_ts, mkv_head = extractAllStreams(ffmpeg_path=paths['ffmpeg'], + h264_head, h264_head_ts, mkv_head = extract_all_streams(ffmpeg_path=paths['ffmpeg'], ffprobe_path=paths['ffprobe'], inputFile=mkv, begin=ts1, end=head_iframe_ts, @@ -3370,7 +3372,7 @@ def main(): # logger.info('Merge header, middle and trailer subpart into: %s' % internal_mkv_name) # Extract internal part of MKV - extractMKVPart(mkvmerge_path=paths['mkvmerge'], inputFile=mkv, outputFile=internal_mkv, + extract_mkv_part(mkvmerge_path=paths['mkvmerge'], inputFile=mkv, outputFile=internal_mkv, begin=head_iframe_ts, end=tail_iframe_ts) # Extract video stream of internal part as a raw H264 and its timestamps. @@ -3380,7 +3382,7 @@ def main(): # Remove video track from internal part of MKV logger.info('Remove video track from %s', internal_mkv_name) - removeVideoTracksFromMKV(mkvmerge_path=paths['mkvmerge'], inputFile=internal_mkv, + remove_video_tracks_from_mkv(mkvmerge_path=paths['mkvmerge'], inputFile=internal_mkv, outputFile=internal_novideo_mkv) temporaries.append(internal_mkv) @@ -3394,7 +3396,7 @@ def main(): if (not args.coarse) and (nb_tail_frames > args.threshold): # We extract all frames between the I-frame (including it) upto the end. - h264_tail, h264_tail_ts, mkv_tail = extractAllStreams(ffmpeg_path=paths['ffmpeg'], + h264_tail, h264_tail_ts, mkv_tail = extract_all_streams(ffmpeg_path=paths['ffmpeg'], ffprobe_path=paths['ffprobe'], inputFile=mkv, begin=tail_iframe_ts, end=ts2, nbFrames=nb_tail_frames, @@ -3439,7 +3441,7 @@ def main(): exit(-1) logger.info('Merging all H264 tracks') - concatenateH264Parts(h264parts=h264parts, output=full_h264) + concatenate_h264_parts(h264parts=h264parts, output=full_h264) temporaries.append(full_h264) try: @@ -3449,7 +3451,7 @@ def main(): exit(-1) logger.info('Merging H264 timestamps') - concatenateH264TSParts(h264TSParts=h264_ts, output=full_h264_ts) + concatenate_h264_ts_parts(h264TSParts=h264_ts, output=full_h264_ts) temporaries.append(full_h264_ts) final_novideo_name = f'{basename}-novideo.mkv' @@ -3483,7 +3485,7 @@ def main(): final_codec_private_data = dump_codec_private_data(main_avc_config) logger.debug('Final codec private data: %s', hexdump.dump(final_codec_private_data, sep=':')) logger.info('Changing codec private data with the new one.') - changeCodecPrivateData(paths['mkvinfo'], final_with_video, final_codec_private_data) + change_codec_private_data(paths['mkvinfo'], final_with_video, final_codec_private_data) if args.srt: if not all_optional_tools: @@ -3492,11 +3494,11 @@ def main(): else: # Final cut is not any more the final step. temporaries.append(final_with_video) - duration = getMovieDuration(paths['ffprobe'], final_with_video) + duration = get_movie_duration(paths['ffprobe'], final_with_video) supported_langs = get_tesseract_supported_lang(paths['tesseract']) logger.info('Supported lang: %s', supported_langs) logger.info('Find subtitles tracks and language.') - subtitles = findSubtitlesTracks(paths['ffprobe'], final_with_video) + subtitles = find_subtitles_tracks(paths['ffprobe'], final_with_video) logger.info(subtitles) sts = {} for subtitle in subtitles: @@ -3539,7 +3541,7 @@ def main(): logger.info(ocr) # Remux SRT subtitles - remuxSRTSubtitles(paths['mkvmerge'], final_with_video, args.outputFile, ocr) + remux_srt_subtitles(paths['mkvmerge'], final_with_video, args.outputFile, ocr) else: copyfile(final_with_video_name, args.outputFile) else: