#!/usr/bin/env python3 import requests import xmltodict from datetime import datetime,timedelta,time import re from sys import exit,stdout import json from io import BytesIO, StringIO, IOBase from subprocess import Popen, PIPE import argparse import locale from math import floor, ceil, log from tempfile import NamedTemporaryFile from os import mkdir, set_inheritable, lseek, SEEK_SET, SEEK_END, read, fstat import coloredlogs, logging from tqdm import tqdm default_url='http://tv:55000/dms/control_0' headers={ 'SOAPAction': '"urn:schemas-upnp-org:service:ContentDirectory:1#Browse"', 'Accept-Language': 'fr-fr;q=1, fr;q=0.5', 'Accept-Encoding': 'gzip', 'Content-Type': 'text/xml; charset="utf-8"', 'User-Agent': 'kodi', 'Connection': 'Keep-Alive'} request = '%s%s%s%d%d' def followTag(xml, tag): logger = logging.getLogger(__name__) if tag in xml: return xml[tag] else: logger.warning('No %s tag found in xml' % tag) return None def filter(text): return ''.join(map(lambda x: x if ord(x) in range(32, 128) else ' ', text)) def getChildren(url, parent, childType): logger = logging.getLogger(__name__) data = request % (parent,'BrowseDirectChildren','@childCount',0,64) r = requests.post(url, headers=headers, data=data) if (r.status_code != 200): logger.error('Impossible to connect to get children of node %s' % parent) exit(-1) # Filtering characters that may be non parsable by xmltodict text = filter(r.text) response = xmltodict.parse(text) tags=['s:Envelope', 's:Body', 'u:BrowseResponse', 'Result'] for tag in tags: response = followTag(response, tag) didl = xmltodict.parse(response) children = didl tags=['DIDL-Lite', childType] for tag in tags: children = followTag(children, tag) return children def getMetaData(url, item): logger = logging.getLogger(__name__) data = request % (item,'BrowseMetadata','*',0,64) r = requests.post(url, headers=headers, data=data) if (r.status_code != 200): logger.error('Impossible to connect to get metadata of node %s' % parent) exit(-1) text = filter(r.text) response = xmltodict.parse(text) tags=['s:Envelope', 's:Body', 'u:BrowseResponse', 'Result'] for tag in tags: response = followTag(response, tag) didl = xmltodict.parse(response) metadata = didl tags=['DIDL-Lite', 'item'] for tag in tags: metadata = followTag(metadata, tag) return metadata def getFilmsList(url): logger = logging.getLogger(__name__) disk=getChildren(url, parent='0', childType='container') if( followTag(disk,'dc:title') != 'EnregistreurTV'): logger.error('Impossible to find disk') exit(-1) name = followTag(disk, '@id') directories=getChildren(url, parent=name, childType='container') nameRec=None for directory in directories: if( followTag(directory,'dc:title') == 'Recorded TV'): nameRec = followTag(directory, '@id') break if (nameRec == None): logger.error('Impossible to find TV recorder directory') exit(-1) films=getChildren(url, parent=nameRec, childType='item') if films == None: films = [] if not isinstance(films, list): films = [ films ] res = [] for film in films: title = followTag(film, 'dc:title') fid = followTag(film, '@id') metadata=getMetaData(url, fid) date = datetime.fromisoformat(followTag(metadata, 'dc:date')) channel = followTag(metadata, 'upnp:channelName') streams = followTag(metadata, 'res') duration = followTag(streams[0], '@duration') m = re.match('(?P[0-9]*):(?P[0-9]*):(?P[0-9]*)', duration) if m == None: logger.error('Impossible to parse date: %s' % duration) exit(-1) duration = timedelta(hours=int(m.group('hour')), minutes=int(m.group('minute')), seconds=int(m.group('second'))) uri=followTag(streams[0],'#text') res.append({'title':title, 'date':date, 'channel':channel, 'duration':duration, 'url':uri}) return res def dumpFilms(films): logger = logging.getLogger(__name__) fid = 1 for film in films: logger.info('%d - %s enregistré le %s sur %s. Durée: %s. %s' % (fid, film['title'], film['date'], film['channel'], film['duration'], film['url'])) fid = fid + 1 def getInitialTSFromBuffer(content, delta=0.1): logger = logging.getLogger(__name__) logger.debug('Determining initial timestamp from content') length = len(content) with Popen(['ffprobe','/dev/stdin','-loglevel', 'quiet', '-read_intervals', ('0%%+%f' % delta), '-show_entries', 'frame=frame_no,best_effort_timestamp_time', '-of','json'], stdout=PIPE, stdin=PIPE) as ffprobe: out, _ = ffprobe.communicate(input=content[0:min(length,200000)]) frames = json.load(BytesIO(out)) logger.debug('Frames: %s' % frames) if 'frames' in frames: frames = frames['frames'] if len(frames) > 0: for frame in frames: if 'best_effort_timestamp_time' in frame: ts = float(frame['best_effort_timestamp_time']) ts = timedelta(seconds=ts) return ts else: return None else: return None def getLastTSFromBuffer(content): logger = logging.getLogger(__name__) logger.debug('Determining last timestamp from content.') length = len(content) with Popen(['ffprobe', '/dev/stdin', '-loglevel', 'quiet', '-show_entries', 'frame=frame_no,best_effort_timestamp_time', '-of','json'], stdout=PIPE, stdin=PIPE) as ffprobe: out, _ = ffprobe.communicate(input=content[max(0,length-200000):]) frames = json.load(BytesIO(out)) logger.debug('Frames: %s' % frames) if 'frames' in frames: frames = frames['frames'] if len(frames) > 0: lastTS = None for frame in frames: if 'best_effort_timestamp_time' in frame: ts = float(frame['best_effort_timestamp_time']) ts = timedelta(seconds=ts) if (lastTS == None) or (lastTS < ts): lastTS = ts return lastTS else: return None else: return None def getLastTSFromFD(fd): logger = logging.getLogger(__name__) set_inheritable(fd, True) with Popen(['ffprobe', '/dev/stdin' , '-loglevel', 'quiet', '-show_entries', 'frame=frame_no,best_effort_timestamp_time', '-of','json'], stdout=PIPE, stdin=PIPE, close_fds=False) as ffprobe: back = 200000 length = fstat(fd).st_size pos = lseek(fd, -back, SEEK_END) content = read(fd, back) out, _ = ffprobe.communicate(input=content) frames = json.load(BytesIO(out)) if 'frames' in frames: frames = frames['frames'] if len(frames) > 0: lastTS = None for frame in frames: if 'best_effort_timestamp_time' in frame: ts = float(frame['best_effort_timestamp_time']) ts = timedelta(seconds=ts) if (lastTS == None) or (lastTS < ts): lastTS = ts return lastTS else: return None else: return None def downloadMovie(url, outputFileName): logger = logging.getLogger(__name__) try: output = open(outputFileName, mode='wb+') except IOException: logger.error('Impossible to create file: %s' % outputFileName) exit(-1) outputFD = output.fileno() session = requests.Session() sample = 2. headers = {'TimeSeekRange.dlna.org': 'npt=0.000-%.03f' % sample} r = session.get(url, headers=headers) if r.status_code != 200: logger.error('Impossible to download first %f seconds: %d' % (sample,r.status_code)) exit(-1) if 'TimeSeekRange.dlna.org' not in r.headers: logger.error('TimeSeekRange.dlna.org is not in header: %s' % r.headers) exit(-1) p = re.compile(r'^.*/(?P[0-9]+)\.(?P[0-9]+)$') m = p.match(r.headers['TimeSeekRange.dlna.org']) if m == None: logger.error('Impossible to parse timestamps' % r.headers['TimeSeekRange.dlna.org']) exit(-1) total = int(m.group('totalsecs'))+(int(m.group('totalms')))/1000 total = timedelta(seconds=total) logger.debug('Header: %s' % r.headers) initialTS = None nbRetries = 0 maxRetries = 10 while initialTS == None and nbRetries < maxRetries: initialTS = getInitialTSFromBuffer(r.content, delta=0.1*nbRetries) nbRetries = nbRetries+1 logger.debug('Initial timestamp: %s' % initialTS) if initialTS == None: logger.error('Impossible to retrieve initial timestamp') exit(-1) lastTS = getLastTSFromBuffer(r.content) logger.debug('Last timestamp: %s' % lastTS) delta = lastTS-initialTS ratio = delta/total estimatedLength = ceil(len(r.content)/ratio) logger.info('Estimated length: %d' % estimatedLength) pb = tqdm(total=estimatedLength, unit='bytes', desc='Download', unit_scale=True, unit_divisor=1024) nbiters = 0 with session.get(url, stream=True) as r: chunks = r.iter_content(chunk_size = 100000) for chunk in chunks: output.write(chunk) if nbiters == 1000: lastTS = getLastTSFromFD(outputFD) if lastTS != None: delta = lastTS-initialTS ratio = delta/total length = fstat(outputFD).st_size estimatedLength = ceil(length/ratio) pb.total = estimatedLength lseek(outputFD, 0, SEEK_END) nbiters = 0 pb.update(len(chunk)) nbiters+=1 def head(url): logger = logging.getLogger(__name__) # headers = {'TimeSeekRange.dlna.org': 'npt=0.00-'} headers = { 'getcontentFeatures.dlna.org': '1'} r = requests.head(url, headers=headers) logger.info('Return code:%d Headers:%s Content:%s' % (r.status_code, r.headers, r.content)) def main(): locale.setlocale(locale.LC_ALL, 'fr_FR.UTF8') logger = logging.getLogger(__name__) coloredlogs.install() parser = argparse.ArgumentParser() parser.add_argument("-u", "--url", help="URL to connect to.") parser.add_argument("-l", "--list", action="store_true", help="List available movies.") parser.add_argument("-s", "--streams", type=int, metavar="#", help="List streams in # movie.") parser.add_argument("--head", type=int, help="Return HEAD ") parser.add_argument("-d", "--download", type=int, metavar="#", help="Download # movie.") parser.add_argument("-t", "--ts", type=int, metavar="#", help="Display initial timestamp of # movie.") parser.add_argument("-o", "--output", help="Destination file.") parser.add_argument("-v", "--verbose", action='store_true', dest='verbose', help="Debug.") args = parser.parse_args() if args.verbose: logger.info('Setting logging level to debug.') coloredlogs.set_level(level=logging.DEBUG) logger.debug('Arguments: %s' % args) if args.url == None: args.url = default_url if args.list == True: films = getFilmsList(args.url) dumpFilms(films) exit(0) if args.head != None: films = getFilmsList(args.url) if args.head < 1 or args.head > len(films): logger.error('Movie id must be in [1,%d]' % len(films)) exit(-1) head(films[args.head-1]['url']) if args.download != None: films = getFilmsList(args.url) if args.download < 1 or args.download > len(films): logger.error('Movie id must be in [1,%d]' % len(films)) exit(-1) mid = args.download-1 title = films[mid]['title'] if args.output == None: args.output = '%s.ts' % title logger.info('Setting output file to %s\n' % args.output) duration = int(films[mid]['duration'].total_seconds()) # start = getInitialTS(films[mid]['url']) t = timedelta(seconds=duration) logger.info('Downloading movie "%s" whose duration is %s' % (title, t)) downloadMovie(url=films[mid]['url'], outputFileName=args.output) if __name__ == "__main__": main()