Ajout d'une large partie du code nécessaire à l'extraction des sous-titres via OCR.

This commit is contained in:
Frédéric Tronel
2023-12-12 11:57:03 +01:00
parent cb600b920d
commit 4070f34a60

View File

@@ -27,15 +27,143 @@ from shutil import copyfile, which
def checkRequiredTools():
logger = logging.getLogger(__name__)
allOptionalTools = True
paths = {}
required = ['ffmpeg', 'ffprobe', 'mkvmerge']
optional = ['mkvextract', 'vobsubocr']
optional = ['mkvextract', 'vobsubocr','tesseract']
for tool in required:
if which(tool) == None:
path = which(tool)
if path == None:
logger.error('Required tool: %s is missing.' % tool)
exit(-1)
else:
paths[tool] = path
for tool in optional:
if which(tool) == None:
path = which(tool)
if path == None:
logger.info('Optional tool: %s is missing.' % tool)
allOptionalTools = False
else:
paths[tool] = path
return allOptionalTools, paths
def getTesseractSupportedLang(tesseract):
logger = logging.getLogger(__name__)
res = {}
with Popen([tesseract, '--list-langs'], stdout=PIPE) as tesseract:
for line in tesseract.stdout:
line = line.decode('utf8')
p = re.compile('(?P<lang>[a-z]{3})\n')
m = re.match(p,line)
if m != None:
try:
lang = m.group('lang')
key = Lang(lang)
res[key] = lang
except InvalidLanguageValue as e:
pass
tesseract.wait()
if tesseract.returncode != 0:
logger.error("Tesseract returns an error code: %d" % tesseract.returncode)
return None
return res
def getSubTitlesTracks(ffprobe, mkvPath):
logger = logging.getLogger(__name__)
tracks={}
nbSubTitles = 0
with Popen([ffprobe, '-loglevel', 'quiet', '-select_streams', 's', '-show_entries', 'stream=index,codec_name:stream_tags=language', '-of', 'json', mkvPath], stdout=PIPE) as ffprobe:
out, _ = ffprobe.communicate()
out = json.load(BytesIO(out))
if 'streams' in out:
for stream in out['streams']:
index = stream['index']
codec = stream['codec']
lang = stream['tags']['language']
if codec == 'dvd_subtitle':
if lang not in tracks:
tracks[lang] = [track]
else:
l = tracks[lang]
l.append(track)
tracks[lang] = l
else:
return None
ffprobe.wait()
if ffprobe.returncode != 0:
logger.error("ffprobe returns an error code: %d" % ffprobe.returncode)
return None
return tracks
def extractSRT(mkvextract, mkvPath, destPath, tracks, langs):
logger = logging.getLogger(__name__)
params = [mkvextract, mkvPath, 'tracks']
res = []
for lang in tracks:
iso = Lang(lang)
if iso in langs:
ocrlang = langs[iso]
else:
logger.warning("Language not supported by Tesseract: %s" % iso.name)
ocrlang ='osd'
if len(tracks[lang]) == 1:
params.append('%d:%s/%s' % (tracks[lang][0], destPath ,lang))
res.append(('%s/%s.idx' % (destPath, lang), lang, ocrlang))
else:
count = 1
for track in tracks[lang]:
params.append('%d:%s/%s-%d' % (track, destPath, lang, count))
res.append(('%s/%s-%d.idx' % (destPath, lang,count), lang, ocrlang))
count = count+1
with Popen(params) as extract:
extract.wait()
if extract.returncode != 0:
print("Erreur de mkvextract: %d" % extract.returncode)
else:
print("Extracted")
return res
def doOCR(vobsubocr, idxs):
res = []
for filename, lang, iso in idxs:
print(filename)
srtname = '%s.srt' % os.path.splitext(filename)[0]
print(srtname)
# Tesseract reconnaît la chaîne de caractères ... comme le texte 'su'
p = re.compile('^su\n$')
if not os.path.isfile(srtname):
with open(srtname, 'w+') as srt:
with Popen([vobsubocr, '--lang', iso, filename], stdout=PIPE) as ocr:
for line in ocr.stdout:
line = line.decode('utf8')
m = re.match(p,line)
if m != None:
srt.write('...')
else:
srt.write(line)
res.append((srtname, lang))
return res
@unique
@@ -59,13 +187,13 @@ class SupportedFormat(IntEnum):
# ffmpeg -i <InputFile (before concatenation)> -c:v copy -an -sn -bsf:v trace_headers -t 0.01 -report -loglevel 0 -f null -
def getFormat(inputFile):
def getFormat(ffprobe, inputFile):
logger = logging.getLogger(__name__)
infd = inputFile.fileno()
inputFile.seek(0,0)
set_inheritable(infd, True)
with Popen(['ffprobe', '-loglevel', 'quiet', '-show_format', '-of', 'json', '-i', '/proc/self/fd/%d' % infd], stdout=PIPE, close_fds=False) as ffprobe:
with Popen([ffprobe, '-loglevel', 'quiet', '-show_format', '-of', 'json', '-i', '/proc/self/fd/%d' % infd], stdout=PIPE, close_fds=False) as ffprobe:
out, _ = ffprobe.communicate()
out = json.load(BytesIO(out))
if 'format' in out:
@@ -75,13 +203,13 @@ def getFormat(inputFile):
return None
def getStreams(inputFile):
def getStreams(ffprobe, inputFile):
logger = logging.getLogger(__name__)
infd = inputFile.fileno()
inputFile.seek(0,0)
set_inheritable(infd, True)
with Popen(['ffprobe', '-loglevel', 'quiet', '-show_streams', '-of', 'json', '-i', '/proc/self/fd/%d' % infd], stdout=PIPE, close_fds=False) as ffprobe:
with Popen([ffprobe, '-loglevel', 'quiet', '-show_streams', '-of', 'json', '-i', '/proc/self/fd/%d' % infd], stdout=PIPE, close_fds=False) as ffprobe:
out, _ = ffprobe.communicate()
out = json.load(BytesIO(out))
if 'streams' in out:
@@ -216,7 +344,7 @@ def compareTimeInterval(interval1, interval2):
def ffmpegConvert(inputFile, inputFormat, outputFile, outputFormat, duration):
def ffmpegConvert(ffmpeg, inputFile, inputFormat, outputFile, outputFormat, duration):
logger = logging.getLogger(__name__)
infd = inputFile.fileno()
@@ -224,7 +352,7 @@ def ffmpegConvert(inputFile, inputFormat, outputFile, outputFormat, duration):
set_inheritable(infd, True)
set_inheritable(outfd, True)
# TODO: canvas size to be fixed !
with Popen(['ffmpeg', '-y', '-loglevel', 'quiet', '-progress', '/dev/stdout', '-canvas_size', '720x560', '-f', inputFormat, '-i', '/proc/self/fd/%d' % infd,
with Popen([ffmpeg, '-y', '-loglevel', 'quiet', '-progress', '/dev/stdout', '-canvas_size', '720x560', '-f', inputFormat, '-i', '/proc/self/fd/%d' % infd,
'-map', '0:v', '-map', '0:a', '-map', '0:s', '-bsf:v', 'h264_mp4toannexb,dump_extra=freq=keyframe', '-vcodec', 'copy', '-acodec', 'copy', '-scodec', 'dvdsub',
'-f', outputFormat, '/proc/self/fd/%d' % outfd], stdout=PIPE, close_fds=False) as ffmpeg:
pb = tqdm(TextIOWrapper(ffmpeg.stdout, encoding="utf-8"), total=int(duration/timedelta(seconds=1)), unit='s', desc='Conversion')
@@ -262,12 +390,12 @@ def getPacketDuration(packet):
return duration
def getFramesInStream(inputFile, begin, end, streamKind, subStreamId=0):
def getFramesInStream(ffprobe, inputFile, begin, end, streamKind, subStreamId=0):
logger = logging.getLogger(__name__)
infd = inputFile.fileno()
set_inheritable(infd, True)
with Popen(['ffprobe', '-loglevel', 'quiet', '-read_intervals', ('%s%%%s' %(begin, end)), '-show_entries', 'frame', '-select_streams', '%s:%d' % (streamKind, subStreamId), '-of', 'json', '/proc/self/fd/%d' % infd], stdout=PIPE, close_fds=False) as ffprobe:
with Popen([ffprobe, '-loglevel', 'quiet', '-read_intervals', ('%s%%%s' %(begin, end)), '-show_entries', 'frame', '-select_streams', '%s:%d' % (streamKind, subStreamId), '-of', 'json', '/proc/self/fd/%d' % infd], stdout=PIPE, close_fds=False) as ffprobe:
out, _ = ffprobe.communicate()
frames = json.load(BytesIO(out))
status = ffprobe.wait()
@@ -288,7 +416,7 @@ def getFramesInStream(inputFile, begin, end, streamKind, subStreamId=0):
logger.error('Impossible to retrieve frames inside file around [%s,%s]' % (begin, end))
return None
def getNearestIFrame(inputFile, timestamp, before=True, delta=timedelta(seconds=2)):
def getNearestIFrame(ffprobe, inputFile, timestamp, before=True, delta=timedelta(seconds=2)):
logger = logging.getLogger(__name__)
zero = timedelta()
@@ -302,7 +430,7 @@ def getNearestIFrame(inputFile, timestamp, before=True, delta=timedelta(seconds=
logger.debug('Looking for iframe in [%s, %s]' % (tbegin, tend))
frames = getFramesInStream(inputFile=inputFile, begin=tbegin, end=tend, streamKind='v')
frames = getFramesInStream(ffprobe, inputFile=inputFile, begin=tbegin, end=tend, streamKind='v')
if frames == None:
return None
@@ -352,7 +480,7 @@ def getNearestIFrame(inputFile, timestamp, before=True, delta=timedelta(seconds=
return(nbFrames, iframe)
def extractMKVPart(inputFile, outputFile, begin, end):
def extractMKVPart(mkvmerge, inputFile, outputFile, begin, end):
logger = logging.getLogger(__name__)
logger.info('Extract video between I-frames at %s and %s' % (begin,end))
@@ -363,7 +491,7 @@ def extractMKVPart(inputFile, outputFile, begin, end):
set_inheritable(infd, True)
set_inheritable(outfd, True)
warnings = []
with Popen(['mkvmerge', '-o', '/proc/self/fd/%d' % outfd, '--split', 'parts:%s-%s' % (begin, end), '/proc/self/fd/%d' % infd], stdout=PIPE, close_fds=False) as mkvmerge:
with Popen([mkvmerge, '-o', '/proc/self/fd/%d' % outfd, '--split', 'parts:%s-%s' % (begin, end), '/proc/self/fd/%d' % infd], stdout=PIPE, close_fds=False) as mkvmerge:
pb = tqdm(TextIOWrapper(mkvmerge.stdout, encoding="utf-8"), total=100, unit='%', desc='Extraction')
for line in pb:
if line.startswith('Progression :'):
@@ -371,10 +499,13 @@ def extractMKVPart(inputFile, outputFile, begin, end):
m = p.match(line)
if m == None:
logger.error('Impossible to parse progress')
pb.n = int(m['progress'])
pb.update()
pb.update(int(m['progress'])-pb.n)
elif line.startswith('Avertissement'):
warnings.append(line)
pb.update(100-pb.n)
pb.refresh()
pb.close()
status = mkvmerge.wait()
if status == 1:
@@ -383,9 +514,9 @@ def extractMKVPart(inputFile, outputFile, begin, end):
logger.warning(w)
elif status == 2:
logger.error('Extraction returns errors')
def extractPictures(inputFile, begin, nbFrames, width=640, height=480):
def extractPictures(ffmpeg, inputFile, begin, nbFrames, width=640, height=480):
logger = logging.getLogger(__name__)
inputFile.seek(0,0)
@@ -401,7 +532,7 @@ def extractPictures(inputFile, begin, nbFrames, width=640, height=480):
logger.debug("Estimated length: %d" % length)
images = bytes()
with Popen(['ffmpeg', '-loglevel', 'quiet' ,'-y', '-ss', '%s'%begin, '-i', '/proc/self/fd/%d' % infd, '-s', '%dx%d'%(width, height), '-vframes', '%d'%nbFrames, '-c:v', 'ppm', '-f', 'image2pipe', '/proc/self/fd/%d' % outfd ], stdout=PIPE, close_fds=False) as ffmpeg:
with Popen([ffmpeg, '-loglevel', 'quiet' ,'-y', '-ss', '%s'%begin, '-i', '/proc/self/fd/%d' % infd, '-s', '%dx%d'%(width, height), '-vframes', '%d'%nbFrames, '-c:v', 'ppm', '-f', 'image2pipe', '/proc/self/fd/%d' % outfd ], stdout=PIPE, close_fds=False) as ffmpeg:
status = ffmpeg.wait()
if status != 0:
logger.error('Conversion failed with status code: %d' % status)
@@ -416,7 +547,7 @@ def extractPictures(inputFile, begin, nbFrames, width=640, height=480):
lseek(outfd, 0, SEEK_SET)
return images, outfd
def extractSound(inputFile, begin, outputFileName, packetDuration, subChannel=0, nbPackets=0, sampleRate=48000, nbChannels=2):
def extractSound(ffmpeg, inputFile, begin, outputFileName, packetDuration, subChannel=0, nbPackets=0, sampleRate=48000, nbChannels=2):
logger = logging.getLogger(__name__)
inputFile.seek(0,0)
@@ -427,7 +558,7 @@ def extractSound(inputFile, begin, outputFileName, packetDuration, subChannel=0,
sound = bytes()
length = int(nbChannels*sampleRate*4*nbPackets*packetDuration/1000)
with Popen(['ffmpeg', '-y', '-loglevel', 'quiet', '-ss', '%s'%begin, '-i', '/proc/self/fd/%d' % infd, '-frames:a:%d' % subChannel, '%d' % (nbPackets+1),
with Popen([ffmpeg, '-y', '-loglevel', 'quiet', '-ss', '%s'%begin, '-i', '/proc/self/fd/%d' % infd, '-frames:a:%d' % subChannel, '%d' % (nbPackets+1),
'-c:a', 'pcm_s32le', '-sample_rate', '%d' % sampleRate, '-channels', '%d' % nbChannels, '-f', 's32le', '/proc/self/fd/%d' % outfd], stdout=PIPE, close_fds=False) as ffmpeg:
status = ffmpeg.wait()
if status != 0:
@@ -483,7 +614,7 @@ def dumpPPM(pictures, prefix, temporaries):
pos+=length
picture+=1
def extractAllStreams(inputFile, begin, end, streams, filesPrefix, nbFrames, width, height, temporaries, dumpMemFD=False):
def extractAllStreams(ffmpeg, ffprobe, inputFile, begin, end, streams, filesPrefix, nbFrames, width, height, temporaries, dumpMemFD=False):
logger = logging.getLogger(__name__)
encoderParams = [ 'ffmpeg', '-y', '-loglevel', 'quiet' ]
inputParams = []
@@ -519,7 +650,7 @@ def extractAllStreams(inputFile, begin, end, streams, filesPrefix, nbFrames, wid
# TODO: adjust SAR and DAR
# https://superuser.com/questions/907933/correct-aspect-ratio-without-re-encoding-video-file
codec = stream['codec_name']
imagesBytes, memfd = extractPictures(inputFile=inputFile, begin=begin, nbFrames=nbFrames, width=width, height=height)
imagesBytes, memfd = extractPictures(ffmpeg, inputFile=inputFile, begin=begin, nbFrames=nbFrames, width=width, height=height)
if imagesBytes == None:
exit(-1)
@@ -544,18 +675,21 @@ def extractAllStreams(inputFile, begin, end, streams, filesPrefix, nbFrames, wid
if 'tags' in stream:
if 'language' in stream['tags']:
codecsParams.extend(['-metadata:s:a:%d' % audioID, 'language=%s' % stream['tags']['language']])
packets = getFramesInStream(inputFile=inputFile, begin=begin, end=end, streamKind='a', subStreamId=audioID)
packets = getFramesInStream(ffprobe, inputFile=inputFile, begin=begin, end=end, streamKind='a', subStreamId=audioID)
nbPackets = len(packets)
logger.debug("Found %d packets to be extracted from audio track." % nbPackets)
if(nbPackets > 0):
packetDuration = getPacketDuration(packets[0])
if packetDuration == None:
return None
else:
packetDuration = 0
logger.info("Extracting %d packets of audio stream: a:%d" % (nbPackets, audioID))
tmpname = '%s-%d.pcm' % (filesPrefix,audioID)
soundBytes , memfd = extractSound(inputFile=inputFile, begin=begin, nbPackets=nbPackets, packetDuration=packetDuration, outputFileName=tmpname, sampleRate=sampleRate, nbChannels=nbChannels)
soundBytes, memfd = extractSound(ffmpeg=ffmpeg, inputFile=inputFile, begin=begin, nbPackets=nbPackets, packetDuration=packetDuration, outputFileName=tmpname, sampleRate=sampleRate, nbChannels=nbChannels)
if soundBytes == None:
exit(-1)
@@ -687,11 +821,11 @@ def mergeMKVs(inputs, outputName):
return out
def findSubtitlesTracks(filename):
def findSubtitlesTracks(ffprobe, filename):
# ffprobe -loglevel quiet -select_streams s -show_entries stream=index:stream_tags=language -of json corgi.ts
logger = logging.getLogger(__name__)
with Popen(['ffprobe', '-i', filename, '-select_streams', 's', '-show_entries', 'stream=index:stream_tags=language', '-of', 'json'], stdout=PIPE, close_fds=False) as ffprobe:
with Popen([ffprobe, '-i', filename, '-select_streams', 's', '-show_entries', 'stream=index:stream_tags=language', '-of', 'json'], stdout=PIPE, close_fds=False) as ffprobe:
out, _ = ffprobe.communicate()
out = json.load(BytesIO(out))
if 'streams' in out:
@@ -700,11 +834,11 @@ def findSubtitlesTracks(filename):
logger.error('Impossible to retrieve format of file')
pass
def extractSubTitleTrack(inputFileName, index, lang):
def extractSubTitleTrack(mkvmerge, inputFileName, index, lang):
# mkvextract video.mkv tracks position:nom [position:nom]
logger = logging.getLogger(__name__)
with Popen(['mkvextract', inputFileName, 'tracks', '%d:%s' % (index,lang)], stdout=PIPE, close_fds=False) as mkvextract:
with Popen([mkvmerge, inputFileName, 'tracks', '%d:%s' % (index,lang)], stdout=PIPE, close_fds=False) as mkvextract:
out, _ = mkvextract.communicate()
for lines in out:
logger.info(out)
@@ -714,7 +848,8 @@ def extractSubTitleTrack(inputFileName, index, lang):
def main():
logger = logging.getLogger(__name__)
coloredlogs.install()
locale.setlocale(locale.LC_ALL, 'fr_FR.UTF8')
# Fix the language used by tools to print their messages to make the script independant of environment.
locale.setlocale(locale.LC_ALL, 'C')
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--input", dest='inputFile', type=str, required=True, help="Input file to process (can be .ts, .mp4 or .mkv).")
parser.add_argument("-o", "--output", dest='outputFile', type=str, required=True, help="Output MKV file to produce.")
@@ -726,7 +861,7 @@ def main():
args = parser.parse_args()
logger.debug("Arguments: %s" % args)
checkRequiredTools()
allOptionalTools, paths = checkRequiredTools()
# Flatten args.parts
intervals = []
@@ -768,7 +903,7 @@ def main():
logger.error("Impossible to open %s" % args.inputFile)
exit(-1)
formatOfFile = getFormat(inputFile)
formatOfFile = getFormat(paths['ffprobe'], inputFile)
if formatOfFile == None:
exit(-1)
@@ -816,7 +951,7 @@ def main():
logger.info("Already in MKV")
mkv = inputFile
streams = getStreams(mkv)
streams = getStreams(paths['ffprobe'], mkv)
mainVideo = None
for stream in streams:
@@ -846,11 +981,11 @@ def main():
partnum = partnum + 1
headFrames = getNearestIFrame(mkv, ts1, before=False)
headFrames = getNearestIFrame(paths['ffprobe'], mkv, ts1, before=False)
if headFrames == None:
exit(-1)
tailFrames = getNearestIFrame(mkv, ts2, before=True)
tailFrames = getNearestIFrame(paths['ffprobe'], mkv, ts2, before=True)
if tailFrames == None:
exit(-1)
@@ -873,7 +1008,7 @@ def main():
if nbHeadFrames > 0:
# We extract all frames between the beginning upto the frame that immediately preceeds the I-frame.
head = extractAllStreams(inputFile=mkv, begin=ts1, end=headIFrameTS, nbFrames=nbHeadFrames-1, filesPrefix='part-%d-head' % (partnum), streams=streams, width=width, height=height, temporaries=temporaries, dumpMemFD=args.dump)
head = extractAllStreams(ffmpeg=paths['ffmpeg'], ffprobe=paths['ffprobe'], inputFile=mkv, begin=ts1, end=headIFrameTS, nbFrames=nbHeadFrames-1, filesPrefix='part-%d-head' % (partnum), streams=streams, width=width, height=height, temporaries=temporaries, dumpMemFD=args.dump)
subparts.append(head)
# Creating MKV file that corresponds to current part between I-frames
@@ -883,12 +1018,12 @@ def main():
logger.error('Impossible to create file: part-%d-internal.mkv' % partnum)
exit(-1)
temporaries.append(internal)
extractMKVPart(inputFile=mkv, outputFile=internal, begin=headIFrameTS, end=tailIFrameTS)
extractMKVPart(mkvmerge=paths['mkvmerge'], inputFile=mkv, outputFile=internal, begin=headIFrameTS, end=tailIFrameTS)
subparts.append(internal)
if nbTailFrames > 0:
# We extract all frames between the I-frame (including it) upto the end.
tail = extractAllStreams(inputFile=mkv, begin=tailIFrameTS, end=ts2, nbFrames=nbTailFrames, filesPrefix='part-%d-tail' % (partnum), streams=streams, width=width, height=height, temporaries=temporaries, dumpMemFD=args.dump)
tail = extractAllStreams(ffmpeg=paths['ffmpeg'], ffprobe=paths['ffprobe'], inputFile=mkv, begin=tailIFrameTS, end=ts2, nbFrames=nbTailFrames, filesPrefix='part-%d-tail' % (partnum), streams=streams, width=width, height=height, temporaries=temporaries, dumpMemFD=args.dump)
subparts.append(tail)
logger.info('Merging: %s' % subparts)
@@ -911,36 +1046,39 @@ def main():
logger.info("Nothing else to do.")
if args.srt:
logger.info("Find subtitles tracks and language.")
subtitles = findSubtitlesTracks(args.outputFile)
sts = {}
for subtitle in subtitles:
index = subtitle['index']
if 'tags' in subtitle:
if 'language' in subtitle['tags']:
lang = subtitle['tags']['language']
if lang in sts:
sts[lang].append(index)
if not allOptionalTools:
logger.warning("Missing tools for extracting subtitles.")
else:
logger.info("Find subtitles tracks and language.")
subtitles = findSubtitlesTracks(args.outputFile)
sts = {}
for subtitle in subtitles:
index = subtitle['index']
if 'tags' in subtitle:
if 'language' in subtitle['tags']:
lang = subtitle['tags']['language']
if lang in sts:
sts[lang].append(index)
else:
sts[lang] = [index]
else:
sts[lang] = [index]
logger.error("Dropping subtitle: %s because it is missing language indication")
else:
logger.error("Dropping subtitle: %s because it is missing language indication")
else:
logger.error("Dropping subtitle: %s because it is missing language indication")
for lang in sts:
indexes = sts[lang]
if len(indexes) == 0:
# Nothing to do. This should not happen.
continue
if len(indexes) == 1:
index = indexes[0]
filename = 'essai-%s.srt' % lang
elif len(indexes) > 1:
nbsrt = 1
for index in indexes:
filename = 'essai-%s-%d.srt' % (lang, nbsrt)
nbsrt+=1
for lang in sts:
indexes = sts[lang]
if len(indexes) == 0:
# Nothing to do. This should not happen.
continue
if len(indexes) == 1:
index = indexes[0]
filename = 'essai-%s.srt' % lang
elif len(indexes) > 1:
nbsrt = 1
for index in indexes:
filename = 'essai-%s-%d.srt' % (lang, nbsrt)
nbsrt+=1
if not args.keep:
logger.info("Cleaning temporary files")