Compare commits
	
		
			2 Commits
		
	
	
		
			ffe1cb8a01
			...
			5b05972d40
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | 5b05972d40 | ||
|  | e3b95c3450 | 
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @@ -0,0 +1 @@ | ||||
| venv/ | ||||
							
								
								
									
										357
									
								
								panasonic.py
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										357
									
								
								panasonic.py
									
									
									
									
									
										Executable file
									
								
							| @@ -0,0 +1,357 @@ | ||||
| #!/usr/bin/env python3 | ||||
|  | ||||
| import requests | ||||
| import xmltodict | ||||
| from datetime import datetime,timedelta,time | ||||
| import re | ||||
| from sys import exit,stdout | ||||
| import json | ||||
| from io import BytesIO, StringIO, IOBase | ||||
| from subprocess import Popen, PIPE | ||||
| import argparse | ||||
| import locale | ||||
| from math import floor, ceil, log | ||||
| from tempfile import NamedTemporaryFile | ||||
| from os import mkdir, set_inheritable, lseek, SEEK_SET, SEEK_END, read, fstat | ||||
| import coloredlogs, logging | ||||
| from tqdm import tqdm | ||||
|  | ||||
|  | ||||
| default_url='http://tv:55000/dms/control_0' | ||||
|  | ||||
| headers={ | ||||
| 'SOAPAction': '"urn:schemas-upnp-org:service:ContentDirectory:1#Browse"', | ||||
| 'Accept-Language': 'fr-fr;q=1, fr;q=0.5', | ||||
| 'Accept-Encoding': 'gzip', | ||||
| 'Content-Type': 'text/xml; charset="utf-8"', | ||||
| 'User-Agent': 'kodi', | ||||
| 'Connection': 'Keep-Alive'} | ||||
|  | ||||
| request = '<?xml version="1.0"?><s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"><s:Body><u:Browse xmlns:u="urn:schemas-upnp-org:service:ContentDirectory:1"><ObjectID>%s</ObjectID><BrowseFlag>%s</BrowseFlag><Filter>%s</Filter><StartingIndex>%d</StartingIndex><RequestedCount>%d</RequestedCount><SortCriteria></SortCriteria></u:Browse></s:Body></s:Envelope>' | ||||
|  | ||||
|  | ||||
| def followTag(xml, tag): | ||||
|     logger = logging.getLogger(__name__) | ||||
|      | ||||
|     if tag in xml: | ||||
|         return xml[tag] | ||||
|     else: | ||||
|         logger.warning('No %s tag found in xml' % tag) | ||||
|         return None | ||||
|  | ||||
| def filter(text): | ||||
|     return ''.join(map(lambda x: x if ord(x) in range(32, 128) else ' ', text)) | ||||
|  | ||||
| def getChildren(url, parent, childType): | ||||
|     logger = logging.getLogger(__name__) | ||||
|      | ||||
|     data = request % (parent,'BrowseDirectChildren','@childCount',0,64) | ||||
|     r = requests.post(url, headers=headers, data=data) | ||||
|     if (r.status_code != 200): | ||||
|         logger.error('Impossible to connect to get children of node %s' % parent) | ||||
|         exit(-1) | ||||
|   | ||||
|     # Filtering characters that may be non parsable by xmltodict | ||||
|     text = filter(r.text) | ||||
|     response = xmltodict.parse(text) | ||||
|  | ||||
|     tags=['s:Envelope', 's:Body', 'u:BrowseResponse', 'Result'] | ||||
|     for tag in tags: | ||||
|         response = followTag(response, tag) | ||||
|  | ||||
|     didl = xmltodict.parse(response) | ||||
|  | ||||
|     children = didl | ||||
|     tags=['DIDL-Lite', childType] | ||||
|     for tag in tags: | ||||
|         children = followTag(children, tag) | ||||
|  | ||||
|     return children | ||||
|  | ||||
| def getMetaData(url, item): | ||||
|     logger = logging.getLogger(__name__) | ||||
|      | ||||
|     data = request % (item,'BrowseMetadata','*',0,64) | ||||
|     r = requests.post(url, headers=headers, data=data) | ||||
|      | ||||
|     if (r.status_code != 200): | ||||
|         logger.error('Impossible to connect to get metadata of node %s' % parent) | ||||
|         exit(-1) | ||||
|  | ||||
|     text = filter(r.text) | ||||
|     response = xmltodict.parse(text) | ||||
|     tags=['s:Envelope', 's:Body', 'u:BrowseResponse', 'Result'] | ||||
|     for tag in tags: | ||||
|         response = followTag(response, tag) | ||||
|     didl = xmltodict.parse(response) | ||||
|  | ||||
|     metadata = didl | ||||
|     tags=['DIDL-Lite', 'item'] | ||||
|     for tag in tags: | ||||
|         metadata = followTag(metadata, tag) | ||||
|  | ||||
|     return metadata | ||||
|  | ||||
| def getFilmsList(url): | ||||
|     logger = logging.getLogger(__name__) | ||||
|      | ||||
|     disk=getChildren(url, parent='0', childType='container') | ||||
|     if( followTag(disk,'dc:title') != 'EnregistreurTV'): | ||||
|         logger.error('Impossible to find disk') | ||||
|         exit(-1) | ||||
|  | ||||
|     name = followTag(disk, '@id') | ||||
|  | ||||
|     directories=getChildren(url, parent=name, childType='container') | ||||
|    | ||||
|     nameRec=None | ||||
|     for directory in directories: | ||||
|         if( followTag(directory,'dc:title') == 'Recorded TV'): | ||||
|             nameRec = followTag(directory, '@id') | ||||
|             break | ||||
|      | ||||
|     if (nameRec == None): | ||||
|         logger.error('Impossible to find TV recorder directory') | ||||
|         exit(-1) | ||||
|  | ||||
|     films=getChildren(url, parent=nameRec, childType='item') | ||||
|     if films == None: | ||||
|         films = [] | ||||
|    | ||||
|     if not isinstance(films, list): | ||||
|         films = [ films ] | ||||
|    | ||||
|     res = [] | ||||
|     for film in films: | ||||
|         title = followTag(film, 'dc:title') | ||||
|         fid = followTag(film, '@id') | ||||
|         metadata=getMetaData(url, fid) | ||||
|         date = datetime.fromisoformat(followTag(metadata, 'dc:date')) | ||||
|         channel = followTag(metadata, 'upnp:channelName') | ||||
|         streams = followTag(metadata, 'res') | ||||
|         duration = followTag(streams[0], '@duration') | ||||
|         m = re.match('(?P<hour>[0-9]*):(?P<minute>[0-9]*):(?P<second>[0-9]*)', duration) | ||||
|         if m == None: | ||||
|             logger.error('Impossible to parse date: %s' % duration) | ||||
|             exit(-1) | ||||
|          | ||||
|         duration = timedelta(hours=int(m.group('hour')), minutes=int(m.group('minute')), seconds=int(m.group('second'))) | ||||
|         uri=followTag(streams[0],'#text') | ||||
|         res.append({'title':title, 'date':date, 'channel':channel, 'duration':duration, 'url':uri}) | ||||
|  | ||||
|     return res | ||||
|  | ||||
|  | ||||
| def dumpFilms(films): | ||||
|     logger = logging.getLogger(__name__) | ||||
|      | ||||
|     fid = 1 | ||||
|     for film in films: | ||||
|         logger.info('%d - %s enregistré le %s sur %s. Durée: %s. %s' % (fid, film['title'], film['date'], film['channel'], film['duration'], film['url'])) | ||||
|         fid = fid + 1 | ||||
|  | ||||
| def getInitialTSFromBuffer(content): | ||||
|     logger = logging.getLogger(__name__) | ||||
|  | ||||
|     with Popen(['ffprobe','/dev/stdin','-loglevel', 'quiet', '-read_intervals', '0%+0.000001', '-show_entries', 'frame=frame_no,best_effort_timestamp_time', '-of','json'], stdout=PIPE, stdin=PIPE) as ffprobe: | ||||
|         out, _ = ffprobe.communicate(input=content) | ||||
|         frames = json.load(BytesIO(out)) | ||||
|         if 'frames' in frames: | ||||
|             frames = frames['frames'] | ||||
|             if len(frames) > 0: | ||||
|                 for frame in frames: | ||||
|                     if 'best_effort_timestamp_time' in frame: | ||||
|                         ts = float(frame['best_effort_timestamp_time']) | ||||
|                         ts = timedelta(seconds=ts) | ||||
|                         return ts | ||||
|             else: | ||||
|                 return None | ||||
|         else: | ||||
|             return None | ||||
|  | ||||
|  | ||||
| def getLastTSFromBuffer(content): | ||||
|     logger = logging.getLogger(__name__) | ||||
|      | ||||
|     length = len(content) | ||||
|     with Popen(['ffprobe', '/dev/stdin', '-loglevel', 'quiet', '-show_entries', 'frame=frame_no,best_effort_timestamp_time', '-of','json'], stdout=PIPE, stdin=PIPE) as ffprobe: | ||||
|         out, _ = ffprobe.communicate(input=content[max(0,length-100000):]) | ||||
|         frames = json.load(BytesIO(out)) | ||||
|         if 'frames' in frames: | ||||
|             frames = frames['frames'] | ||||
|             if len(frames) > 0: | ||||
|                 lastTS = None | ||||
|                 for frame in frames: | ||||
|                     if 'best_effort_timestamp_time' in frame: | ||||
|                         ts = float(frame['best_effort_timestamp_time']) | ||||
|                         ts = timedelta(seconds=ts) | ||||
|                         if (lastTS == None) or (lastTS < ts): | ||||
|                             lastTS = ts | ||||
|                 return lastTS | ||||
|             else: | ||||
|                 return None | ||||
|         else: | ||||
|             return None | ||||
|  | ||||
| def getLastTSFromFD(fd): | ||||
|     logger = logging.getLogger(__name__) | ||||
|      | ||||
|     set_inheritable(fd, True) | ||||
|     with Popen(['ffprobe', '/dev/stdin' , '-loglevel', 'quiet', '-show_entries', 'frame=frame_no,best_effort_timestamp_time', '-of','json'], stdout=PIPE, stdin=PIPE, close_fds=False) as ffprobe: | ||||
|         back = 200000 | ||||
|         length = fstat(fd).st_size | ||||
|         pos = lseek(fd, -back, SEEK_END) | ||||
|         content = read(fd, back) | ||||
|         out, _ = ffprobe.communicate(input=content) | ||||
|         frames = json.load(BytesIO(out)) | ||||
|         if 'frames' in frames: | ||||
|             frames = frames['frames'] | ||||
|             if len(frames) > 0: | ||||
|                 lastTS = None | ||||
|                 for frame in frames: | ||||
|                     if 'best_effort_timestamp_time' in frame: | ||||
|                         ts = float(frame['best_effort_timestamp_time']) | ||||
|                         ts = timedelta(seconds=ts) | ||||
|                         if (lastTS == None) or (lastTS < ts): | ||||
|                             lastTS = ts | ||||
|                 return lastTS | ||||
|             else: | ||||
|                 return None | ||||
|         else: | ||||
|             return None | ||||
|  | ||||
| def downloadMovie(url, outputFileName, duration): | ||||
|     logger = logging.getLogger(__name__) | ||||
|      | ||||
|     try: | ||||
|         output = open(outputFileName, mode='wb+') | ||||
|     except IOException: | ||||
|         logger.error('Impossible to create file: %s' % outputFileName) | ||||
|         exit(-1) | ||||
|          | ||||
|     outputFD = output.fileno() | ||||
|      | ||||
|     sample = 2. | ||||
|     headers = {'TimeSeekRange.dlna.org': 'npt=0.000-%.03f' % sample} | ||||
|     r = requests.get(url, headers=headers) | ||||
|     if r.status_code != 200: | ||||
|         logger.error('Impossible to download first %f seconds: %d' % (sample,r.status_code)) | ||||
|         exit(-1) | ||||
|      | ||||
|     if 'TimeSeekRange.dlna.org' not in r.headers: | ||||
|         logger.error('TimeSeekRange.dlna.org is not in header: %s' % r.headers) | ||||
|         exit(-1) | ||||
|      | ||||
|     p = re.compile('^.*/(?P<totalsecs>[0-9]+)\.(?P<totalms>[0-9]+)$') | ||||
|     m = p.match(r.headers['TimeSeekRange.dlna.org']) | ||||
|     if m == None: | ||||
|         logger.error('Impossible to parse timestamps' % r.headers['TimeSeekRange.dlna.org']) | ||||
|         exit(-1) | ||||
|          | ||||
|     total = int(m.group('totalsecs'))+(int(m.group('totalms')))/1000 | ||||
|     total = timedelta(seconds=total) | ||||
|      | ||||
|     initialTS = getInitialTSFromBuffer(r.content) | ||||
|     lastTS = getLastTSFromBuffer(r.content) | ||||
|     delta = lastTS-initialTS     | ||||
|     ratio = delta/total | ||||
|     estimatedLength = ceil(len(r.content)/ratio) | ||||
|      | ||||
|     logger.info('Estimated length: %d' % estimatedLength) | ||||
|      | ||||
|     pb = tqdm(total=estimatedLength, unit='bytes', desc='Download', unit_scale=True, unit_divisor=1024) | ||||
|  | ||||
|     nbiters = 0 | ||||
|     with requests.get(url, stream=True) as r: | ||||
|         chunks = r.iter_content(chunk_size = 100000) | ||||
|         for chunk in chunks: | ||||
|             output.write(chunk) | ||||
|             if nbiters == 1000: | ||||
|                 lastTS = getLastTSFromFD(outputFD) | ||||
|                 delta = lastTS-initialTS     | ||||
|                 ratio = delta/total | ||||
|                 length = fstat(outputFD).st_size | ||||
|                 estimatedLength = ceil(length/ratio) | ||||
|                 pb.total = estimatedLength | ||||
|                 lseek(outputFD, 0, SEEK_END) | ||||
|                 nbiters = 0 | ||||
|             pb.update(len(chunk)) | ||||
|             nbiters+=1 | ||||
|          | ||||
|          | ||||
|  | ||||
| def head(url): | ||||
|     logger = logging.getLogger(__name__) | ||||
|      | ||||
|     # headers = {'TimeSeekRange.dlna.org': 'npt=0.00-'} | ||||
|     headers = { 'getcontentFeatures.dlna.org': '1'} | ||||
|      | ||||
|     r = requests.head(url, headers=headers) | ||||
|     logger.info('Return code:%d Headers:%s Content:%s' % (r.status_code, r.headers, r.content)) | ||||
|  | ||||
|  | ||||
| def main(): | ||||
|     locale.setlocale(locale.LC_ALL, 'fr_FR.UTF8') | ||||
|     logger = logging.getLogger(__name__) | ||||
|     coloredlogs.install() | ||||
|      | ||||
|     parser = argparse.ArgumentParser() | ||||
|     parser.add_argument("-u", "--url", help="URL to connect to.") | ||||
|     parser.add_argument("-l", "--list", action="store_true", help="List available movies.") | ||||
|     parser.add_argument("-s", "--streams", type=int, metavar="#", help="List streams in # movie.") | ||||
|     parser.add_argument("--head", type=int, help="Return HEAD ") | ||||
|     parser.add_argument("-d", "--download", type=int, metavar="#", help="Download # movie.") | ||||
|     parser.add_argument("-t", "--ts", type=int, metavar="#", help="Display initial timestamp of # movie.") | ||||
|     parser.add_argument("-o", "--output", help="Destination file.") | ||||
|     parser.add_argument("-v", "--verbose",  action='store_true', dest='verbose', help="Debug.") | ||||
|    | ||||
|     args = parser.parse_args() | ||||
|  | ||||
|     if args.verbose: | ||||
|         logger.info('Setting logging level to debug.') | ||||
|         coloredlogs.set_level(level=logging.DEBUG) | ||||
|      | ||||
|     logger.debug('Arguments: %s' % args) | ||||
|  | ||||
|     if args.url == None: | ||||
|         args.url = default_url | ||||
|  | ||||
|     if args.list == True: | ||||
|         films = getFilmsList(args.url) | ||||
|         dumpFilms(films) | ||||
|         exit(0) | ||||
|          | ||||
|     if args.head != None: | ||||
|         films = getFilmsList(args.url) | ||||
|          | ||||
|         if args.head < 1 or args.head > len(films): | ||||
|             logger.error('Movie id must be in [1,%d]' % len(films)) | ||||
|             exit(-1) | ||||
|          | ||||
|         head(films[args.head-1]['url']) | ||||
|  | ||||
|     if args.download != None: | ||||
|         films = getFilmsList(args.url) | ||||
|          | ||||
|         if args.download < 1 or args.download > len(films): | ||||
|             logger.error('Movie id must be in [1,%d]' % len(films)) | ||||
|             exit(-1) | ||||
|          | ||||
|         mid = args.download-1 | ||||
|         title =  films[mid]['title'] | ||||
|          | ||||
|         if args.output == None: | ||||
|             args.output = '%s.ts' % title | ||||
|             logger.info('Setting output file to %s\n' % args.output) | ||||
|   | ||||
|         duration = int(films[mid]['duration'].total_seconds()) | ||||
|         # start = getInitialTS(films[mid]['url']) | ||||
|          | ||||
|         t = timedelta(seconds=duration) | ||||
|          | ||||
|         logger.info('Downloading movie "%s" whose duration is %s' % (title, t)) | ||||
|         downloadMovie(url=films[mid]['url'], outputFileName=args.output, duration=duration) | ||||
|  | ||||
|  | ||||
| if __name__ == "__main__": | ||||
|   main() | ||||
							
								
								
									
										4
									
								
								requirements.txt
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										4
									
								
								requirements.txt
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,4 @@ | ||||
| xmltodict | ||||
| requests | ||||
| coloredlogs | ||||
| tqdm | ||||
		Reference in New Issue
	
	Block a user