From e3b95c34501c06f5cd80a3f5115efff6fc88a921 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Tronel?= Date: Fri, 5 Jan 2024 14:27:56 +0100 Subject: [PATCH] =?UTF-8?q?Import=20de=20code=20d'un=20projet=20plus=20com?= =?UTF-8?q?plet=20pour=20le=20limiter=20au=20t=C3=A9l=C3=A9chargement=20de?= =?UTF-8?q?puis=20la=20TV.=20Ajout=20d'une=20barre=20de=20progression.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + panasonic.py | 357 +++++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 4 + 3 files changed, 362 insertions(+) create mode 100644 .gitignore create mode 100755 panasonic.py create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f7275bb --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +venv/ diff --git a/panasonic.py b/panasonic.py new file mode 100755 index 0000000..1a4d281 --- /dev/null +++ b/panasonic.py @@ -0,0 +1,357 @@ +#!/usr/bin/env python3 + +import requests +import xmltodict +from datetime import datetime,timedelta,time +import re +from sys import exit,stdout +import json +from io import BytesIO, StringIO, IOBase +from subprocess import Popen, PIPE +import argparse +import locale +from math import floor, ceil, log +from tempfile import NamedTemporaryFile +from os import mkdir, set_inheritable, lseek, SEEK_SET, SEEK_END, read, fstat +import coloredlogs, logging +from tqdm import tqdm + + +default_url='http://tv:55000/dms/control_0' + +headers={ +'SOAPAction': '"urn:schemas-upnp-org:service:ContentDirectory:1#Browse"', +'Accept-Language': 'fr-fr;q=1, fr;q=0.5', +'Accept-Encoding': 'gzip', +'Content-Type': 'text/xml; charset="utf-8"', +'User-Agent': 'kodi', +'Connection': 'Keep-Alive'} + +request = '%s%s%s%d%d' + + +def followTag(xml, tag): + logger = logging.getLogger(__name__) + + if tag in xml: + return xml[tag] + else: + logger.warning('No %s tag found in xml' % tag) + return None + +def filter(text): + return ''.join(map(lambda x: x if ord(x) in range(32, 128) else ' ', text)) + +def getChildren(url, parent, childType): + logger = logging.getLogger(__name__) + + data = request % (parent,'BrowseDirectChildren','@childCount',0,64) + r = requests.post(url, headers=headers, data=data) + if (r.status_code != 200): + logger.error('Impossible to connect to get children of node %s' % parent) + exit(-1) + + # Filtering characters that may be non parsable by xmltodict + text = filter(r.text) + response = xmltodict.parse(text) + + tags=['s:Envelope', 's:Body', 'u:BrowseResponse', 'Result'] + for tag in tags: + response = followTag(response, tag) + + didl = xmltodict.parse(response) + + children = didl + tags=['DIDL-Lite', childType] + for tag in tags: + children = followTag(children, tag) + + return children + +def getMetaData(url, item): + logger = logging.getLogger(__name__) + + data = request % (item,'BrowseMetadata','*',0,64) + r = requests.post(url, headers=headers, data=data) + + if (r.status_code != 200): + logger.error('Impossible to connect to get metadata of node %s' % parent) + exit(-1) + + text = filter(r.text) + response = xmltodict.parse(text) + tags=['s:Envelope', 's:Body', 'u:BrowseResponse', 'Result'] + for tag in tags: + response = followTag(response, tag) + didl = xmltodict.parse(response) + + metadata = didl + tags=['DIDL-Lite', 'item'] + for tag in tags: + metadata = followTag(metadata, tag) + + return metadata + +def getFilmsList(url): + logger = logging.getLogger(__name__) + + disk=getChildren(url, parent='0', childType='container') + if( followTag(disk,'dc:title') != 'EnregistreurTV'): + logger.error('Impossible to find disk') + exit(-1) + + name = followTag(disk, '@id') + + directories=getChildren(url, parent=name, childType='container') + + nameRec=None + for directory in directories: + if( followTag(directory,'dc:title') == 'Recorded TV'): + nameRec = followTag(directory, '@id') + break + + if (nameRec == None): + logger.error('Impossible to find TV recorder directory') + exit(-1) + + films=getChildren(url, parent=nameRec, childType='item') + if films == None: + films = [] + + if not isinstance(films, list): + films = [ films ] + + res = [] + for film in films: + title = followTag(film, 'dc:title') + fid = followTag(film, '@id') + metadata=getMetaData(url, fid) + date = datetime.fromisoformat(followTag(metadata, 'dc:date')) + channel = followTag(metadata, 'upnp:channelName') + streams = followTag(metadata, 'res') + duration = followTag(streams[0], '@duration') + m = re.match('(?P[0-9]*):(?P[0-9]*):(?P[0-9]*)', duration) + if m == None: + logger.error('Impossible to parse date: %s' % duration) + exit(-1) + + duration = timedelta(hours=int(m.group('hour')), minutes=int(m.group('minute')), seconds=int(m.group('second'))) + uri=followTag(streams[0],'#text') + res.append({'title':title, 'date':date, 'channel':channel, 'duration':duration, 'url':uri}) + + return res + + +def dumpFilms(films): + logger = logging.getLogger(__name__) + + fid = 1 + for film in films: + logger.info('%d - %s enregistré le %s sur %s. Durée: %s. %s' % (fid, film['title'], film['date'], film['channel'], film['duration'], film['url'])) + fid = fid + 1 + +def getInitialTSFromBuffer(content): + logger = logging.getLogger(__name__) + + with Popen(['ffprobe','/dev/stdin','-loglevel', 'quiet', '-read_intervals', '0%+0.000001', '-show_entries', 'frame=frame_no,best_effort_timestamp_time', '-of','json'], stdout=PIPE, stdin=PIPE) as ffprobe: + out, _ = ffprobe.communicate(input=content) + frames = json.load(BytesIO(out)) + if 'frames' in frames: + frames = frames['frames'] + if len(frames) > 0: + for frame in frames: + if 'best_effort_timestamp_time' in frame: + ts = float(frame['best_effort_timestamp_time']) + ts = timedelta(seconds=ts) + return ts + else: + return None + else: + return None + + +def getLastTSFromBuffer(content): + logger = logging.getLogger(__name__) + + length = len(content) + with Popen(['ffprobe', '/dev/stdin', '-loglevel', 'quiet', '-show_entries', 'frame=frame_no,best_effort_timestamp_time', '-of','json'], stdout=PIPE, stdin=PIPE) as ffprobe: + out, _ = ffprobe.communicate(input=content[max(0,length-100000):]) + frames = json.load(BytesIO(out)) + if 'frames' in frames: + frames = frames['frames'] + if len(frames) > 0: + lastTS = None + for frame in frames: + if 'best_effort_timestamp_time' in frame: + ts = float(frame['best_effort_timestamp_time']) + ts = timedelta(seconds=ts) + if (lastTS == None) or (lastTS < ts): + lastTS = ts + return lastTS + else: + return None + else: + return None + +def getLastTSFromFD(fd): + logger = logging.getLogger(__name__) + + set_inheritable(fd, True) + with Popen(['ffprobe', '/dev/stdin' , '-loglevel', 'quiet', '-show_entries', 'frame=frame_no,best_effort_timestamp_time', '-of','json'], stdout=PIPE, stdin=PIPE, close_fds=False) as ffprobe: + back = 200000 + length = fstat(fd).st_size + pos = lseek(fd, -back, SEEK_END) + content = read(fd, back) + out, _ = ffprobe.communicate(input=content) + frames = json.load(BytesIO(out)) + if 'frames' in frames: + frames = frames['frames'] + if len(frames) > 0: + lastTS = None + for frame in frames: + if 'best_effort_timestamp_time' in frame: + ts = float(frame['best_effort_timestamp_time']) + ts = timedelta(seconds=ts) + if (lastTS == None) or (lastTS < ts): + lastTS = ts + return lastTS + else: + return None + else: + return None + +def downloadMovie(url, outputFileName, duration): + logger = logging.getLogger(__name__) + + try: + output = open(outputFileName, mode='wb+') + except IOException: + logger.error('Impossible to create file: %s' % outputFileName) + exit(-1) + + outputFD = output.fileno() + + sample = 2. + headers = {'TimeSeekRange.dlna.org': 'npt=0.000-%.03f' % sample} + r = requests.get(url, headers=headers) + if r.status_code != 200: + logger.error('Impossible to download first %f seconds: %d' % (sample,r.status_code)) + exit(-1) + + if 'TimeSeekRange.dlna.org' not in r.headers: + logger.error('TimeSeekRange.dlna.org is not in header: %s' % r.headers) + exit(-1) + + p = re.compile('^.*/(?P[0-9]+)\.(?P[0-9]+)$') + m = p.match(r.headers['TimeSeekRange.dlna.org']) + if m == None: + logger.error('Impossible to parse timestamps' % r.headers['TimeSeekRange.dlna.org']) + exit(-1) + + total = int(m.group('totalsecs'))+(int(m.group('totalms')))/1000 + total = timedelta(seconds=total) + + initialTS = getInitialTSFromBuffer(r.content) + lastTS = getLastTSFromBuffer(r.content) + delta = lastTS-initialTS + ratio = delta/total + estimatedLength = ceil(len(r.content)/ratio) + + logger.info('Estimated length: %d' % estimatedLength) + + pb = tqdm(total=estimatedLength, unit='bytes', desc='Download', unit_scale=True, unit_divisor=1024) + + nbiters = 0 + with requests.get(url, stream=True) as r: + chunks = r.iter_content(chunk_size = 100000) + for chunk in chunks: + output.write(chunk) + if nbiters == 1000: + lastTS = getLastTSFromFD(outputFD) + delta = lastTS-initialTS + ratio = delta/total + length = fstat(outputFD).st_size + estimatedLength = ceil(length/ratio) + pb.total = estimatedLength + lseek(outputFD, 0, SEEK_END) + nbiters = 0 + pb.update(len(chunk)) + nbiters+=1 + + + +def head(url): + logger = logging.getLogger(__name__) + + # headers = {'TimeSeekRange.dlna.org': 'npt=0.00-'} + headers = { 'getcontentFeatures.dlna.org': '1'} + + r = requests.head(url, headers=headers) + logger.info('Return code:%d Headers:%s Content:%s' % (r.status_code, r.headers, r.content)) + + +def main(): + locale.setlocale(locale.LC_ALL, 'fr_FR.UTF8') + logger = logging.getLogger(__name__) + coloredlogs.install() + + parser = argparse.ArgumentParser() + parser.add_argument("-u", "--url", help="URL to connect to.") + parser.add_argument("-l", "--list", action="store_true", help="List available movies.") + parser.add_argument("-s", "--streams", type=int, metavar="#", help="List streams in # movie.") + parser.add_argument("--head", type=int, help="Return HEAD ") + parser.add_argument("-d", "--download", type=int, metavar="#", help="Download # movie.") + parser.add_argument("-t", "--ts", type=int, metavar="#", help="Display initial timestamp of # movie.") + parser.add_argument("-o", "--output", help="Destination file.") + parser.add_argument("-v", "--verbose", action='store_true', dest='verbose', help="Debug.") + + args = parser.parse_args() + + if args.verbose: + logger.info('Setting logging level to debug.') + coloredlogs.set_level(level=logging.DEBUG) + + logger.debug('Arguments: %s' % args) + + if args.url == None: + args.url = default_url + + if args.list == True: + films = getFilmsList(args.url) + dumpFilms(films) + exit(0) + + if args.head != None: + films = getFilmsList(args.url) + + if args.head < 1 or args.head > len(films): + logger.error('Movie id must be in [1,%d]' % len(films)) + exit(-1) + + head(films[args.head-1]['url']) + + if args.download != None: + films = getFilmsList(args.url) + + if args.download < 1 or args.download > len(films): + logger.error('Movie id must be in [1,%d]' % len(films)) + exit(-1) + + mid = args.download-1 + title = films[mid]['title'] + + if args.output == None: + args.output = '%s.ts' % title + logger.info('Setting output file to %s\n' % args.output) + + duration = int(films[mid]['duration'].total_seconds()) + # start = getInitialTS(films[mid]['url']) + + t = timedelta(seconds=duration) + + logger.info('Downloading movie "%s" whose duration is %s' % (title, t)) + downloadMovie(url=films[mid]['url'], outputFileName=args.output, duration=duration) + + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c3336e4 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +xmltodict +requests +coloredlogs +tqdm