Files
panasonic/panasonic.py
2024-06-08 14:32:56 +02:00

376 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
import requests
import xmltodict
from datetime import datetime,timedelta,time
import re
from sys import exit,stdout
import json
from io import BytesIO, StringIO, IOBase
from subprocess import Popen, PIPE
import argparse
import locale
from math import floor, ceil, log
from tempfile import NamedTemporaryFile
from os import mkdir, set_inheritable, lseek, SEEK_SET, SEEK_END, read, fstat
import coloredlogs, logging
from tqdm import tqdm
default_url='http://tv:55000/dms/control_0'
headers={
'SOAPAction': '"urn:schemas-upnp-org:service:ContentDirectory:1#Browse"',
'Accept-Language': 'fr-fr;q=1, fr;q=0.5',
'Accept-Encoding': 'gzip',
'Content-Type': 'text/xml; charset="utf-8"',
'User-Agent': 'kodi',
'Connection': 'Keep-Alive'}
request = '<?xml version="1.0"?><s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"><s:Body><u:Browse xmlns:u="urn:schemas-upnp-org:service:ContentDirectory:1"><ObjectID>%s</ObjectID><BrowseFlag>%s</BrowseFlag><Filter>%s</Filter><StartingIndex>%d</StartingIndex><RequestedCount>%d</RequestedCount><SortCriteria></SortCriteria></u:Browse></s:Body></s:Envelope>'
def followTag(xml, tag):
logger = logging.getLogger(__name__)
if tag in xml:
return xml[tag]
else:
logger.warning('No %s tag found in xml' % tag)
return None
def filter(text):
return ''.join(map(lambda x: x if ord(x) in range(32, 128) else ' ', text))
def getChildren(url, parent, childType):
logger = logging.getLogger(__name__)
data = request % (parent,'BrowseDirectChildren','@childCount',0,64)
r = requests.post(url, headers=headers, data=data)
if (r.status_code != 200):
logger.error('Impossible to connect to get children of node %s' % parent)
exit(-1)
# Filtering characters that may be non parsable by xmltodict
text = filter(r.text)
response = xmltodict.parse(text)
tags=['s:Envelope', 's:Body', 'u:BrowseResponse', 'Result']
for tag in tags:
response = followTag(response, tag)
didl = xmltodict.parse(response)
children = didl
tags=['DIDL-Lite', childType]
for tag in tags:
children = followTag(children, tag)
return children
def getMetaData(url, item):
logger = logging.getLogger(__name__)
data = request % (item,'BrowseMetadata','*',0,64)
r = requests.post(url, headers=headers, data=data)
if (r.status_code != 200):
logger.error('Impossible to connect to get metadata of node %s' % parent)
exit(-1)
text = filter(r.text)
response = xmltodict.parse(text)
tags=['s:Envelope', 's:Body', 'u:BrowseResponse', 'Result']
for tag in tags:
response = followTag(response, tag)
didl = xmltodict.parse(response)
metadata = didl
tags=['DIDL-Lite', 'item']
for tag in tags:
metadata = followTag(metadata, tag)
return metadata
def getFilmsList(url):
logger = logging.getLogger(__name__)
disk=getChildren(url, parent='0', childType='container')
if( followTag(disk,'dc:title') != 'EnregistreurTV'):
logger.error('Impossible to find disk')
exit(-1)
name = followTag(disk, '@id')
directories=getChildren(url, parent=name, childType='container')
nameRec=None
for directory in directories:
if( followTag(directory,'dc:title') == 'Recorded TV'):
nameRec = followTag(directory, '@id')
break
if (nameRec == None):
logger.error('Impossible to find TV recorder directory')
exit(-1)
films=getChildren(url, parent=nameRec, childType='item')
if films == None:
films = []
if not isinstance(films, list):
films = [ films ]
res = []
for film in films:
title = followTag(film, 'dc:title')
fid = followTag(film, '@id')
metadata=getMetaData(url, fid)
date = datetime.fromisoformat(followTag(metadata, 'dc:date'))
channel = followTag(metadata, 'upnp:channelName')
streams = followTag(metadata, 'res')
duration = followTag(streams[0], '@duration')
m = re.match('(?P<hour>[0-9]*):(?P<minute>[0-9]*):(?P<second>[0-9]*)', duration)
if m == None:
logger.error('Impossible to parse date: %s' % duration)
exit(-1)
duration = timedelta(hours=int(m.group('hour')), minutes=int(m.group('minute')), seconds=int(m.group('second')))
uri=followTag(streams[0],'#text')
res.append({'title':title, 'date':date, 'channel':channel, 'duration':duration, 'url':uri})
return res
def dumpFilms(films):
logger = logging.getLogger(__name__)
fid = 1
for film in films:
logger.info('%d - %s enregistré le %s sur %s. Durée: %s. %s' % (fid, film['title'], film['date'], film['channel'], film['duration'], film['url']))
fid = fid + 1
def getInitialTSFromBuffer(content, delta=0.1):
logger = logging.getLogger(__name__)
logger.debug('Determining initial timestamp from content')
with Popen(['ffprobe','/dev/stdin','-loglevel', 'quiet', '-read_intervals', ('0%%+%f' % delta), '-show_entries', 'frame=frame_no,best_effort_timestamp_time', '-of','json'], stdout=PIPE, stdin=PIPE) as ffprobe:
out, _ = ffprobe.communicate(input=content)
frames = json.load(BytesIO(out))
logger.debug('Frames: %s' % frames)
if 'frames' in frames:
frames = frames['frames']
if len(frames) > 0:
for frame in frames:
if 'best_effort_timestamp_time' in frame:
ts = float(frame['best_effort_timestamp_time'])
ts = timedelta(seconds=ts)
return ts
else:
return None
else:
return None
def getLastTSFromBuffer(content):
logger = logging.getLogger(__name__)
logger.debug('Determining last timestamp from content.')
length = len(content)
with Popen(['ffprobe', '/dev/stdin', '-loglevel', 'quiet', '-show_entries', 'frame=frame_no,best_effort_timestamp_time', '-of','json'], stdout=PIPE, stdin=PIPE) as ffprobe:
out, _ = ffprobe.communicate(input=content[max(0,length-200000):])
frames = json.load(BytesIO(out))
logger.debug('Frames: %s' % frames)
if 'frames' in frames:
frames = frames['frames']
if len(frames) > 0:
lastTS = None
for frame in frames:
if 'best_effort_timestamp_time' in frame:
ts = float(frame['best_effort_timestamp_time'])
ts = timedelta(seconds=ts)
if (lastTS == None) or (lastTS < ts):
lastTS = ts
return lastTS
else:
return None
else:
return None
def getLastTSFromFD(fd):
logger = logging.getLogger(__name__)
set_inheritable(fd, True)
with Popen(['ffprobe', '/dev/stdin' , '-loglevel', 'quiet', '-show_entries', 'frame=frame_no,best_effort_timestamp_time', '-of','json'], stdout=PIPE, stdin=PIPE, close_fds=False) as ffprobe:
back = 200000
length = fstat(fd).st_size
pos = lseek(fd, -back, SEEK_END)
content = read(fd, back)
out, _ = ffprobe.communicate(input=content)
frames = json.load(BytesIO(out))
if 'frames' in frames:
frames = frames['frames']
if len(frames) > 0:
lastTS = None
for frame in frames:
if 'best_effort_timestamp_time' in frame:
ts = float(frame['best_effort_timestamp_time'])
ts = timedelta(seconds=ts)
if (lastTS == None) or (lastTS < ts):
lastTS = ts
return lastTS
else:
return None
else:
return None
def downloadMovie(url, outputFileName):
logger = logging.getLogger(__name__)
try:
output = open(outputFileName, mode='wb+')
except IOException:
logger.error('Impossible to create file: %s' % outputFileName)
exit(-1)
outputFD = output.fileno()
session = requests.Session()
sample = 2.
headers = {'TimeSeekRange.dlna.org': 'npt=0.000-%.03f' % sample}
r = session.get(url, headers=headers)
if r.status_code != 200:
logger.error('Impossible to download first %f seconds: %d' % (sample,r.status_code))
exit(-1)
if 'TimeSeekRange.dlna.org' not in r.headers:
logger.error('TimeSeekRange.dlna.org is not in header: %s' % r.headers)
exit(-1)
p = re.compile('^.*/(?P<totalsecs>[0-9]+)\.(?P<totalms>[0-9]+)$')
m = p.match(r.headers['TimeSeekRange.dlna.org'])
if m == None:
logger.error('Impossible to parse timestamps' % r.headers['TimeSeekRange.dlna.org'])
exit(-1)
total = int(m.group('totalsecs'))+(int(m.group('totalms')))/1000
total = timedelta(seconds=total)
logger.debug('Header: %s' % r.headers)
initialTS = None
nbRetries = 0
maxRetries = 10
while initialTS == None and nbRetries < maxRetries:
initialTS = getInitialTSFromBuffer(r.content, delta=0.1*nbRetries)
nbRetries = nbRetries+1
logger.debug('Initial timestamp: %s' % initialTS)
if initialTS == None:
logger.error('Impossible to retrieve initial timestamp')
exit(-1)
lastTS = getLastTSFromBuffer(r.content)
logger.debug('Last timestamp: %s' % lastTS)
delta = lastTS-initialTS
ratio = delta/total
estimatedLength = ceil(len(r.content)/ratio)
logger.info('Estimated length: %d' % estimatedLength)
pb = tqdm(total=estimatedLength, unit='bytes', desc='Download', unit_scale=True, unit_divisor=1024)
nbiters = 0
with session.get(url, stream=True) as r:
chunks = r.iter_content(chunk_size = 100000)
for chunk in chunks:
output.write(chunk)
if nbiters == 1000:
lastTS = getLastTSFromFD(outputFD)
if lastTS != None:
delta = lastTS-initialTS
ratio = delta/total
length = fstat(outputFD).st_size
estimatedLength = ceil(length/ratio)
pb.total = estimatedLength
lseek(outputFD, 0, SEEK_END)
nbiters = 0
pb.update(len(chunk))
nbiters+=1
def head(url):
logger = logging.getLogger(__name__)
# headers = {'TimeSeekRange.dlna.org': 'npt=0.00-'}
headers = { 'getcontentFeatures.dlna.org': '1'}
r = requests.head(url, headers=headers)
logger.info('Return code:%d Headers:%s Content:%s' % (r.status_code, r.headers, r.content))
def main():
locale.setlocale(locale.LC_ALL, 'fr_FR.UTF8')
logger = logging.getLogger(__name__)
coloredlogs.install()
parser = argparse.ArgumentParser()
parser.add_argument("-u", "--url", help="URL to connect to.")
parser.add_argument("-l", "--list", action="store_true", help="List available movies.")
parser.add_argument("-s", "--streams", type=int, metavar="#", help="List streams in # movie.")
parser.add_argument("--head", type=int, help="Return HEAD ")
parser.add_argument("-d", "--download", type=int, metavar="#", help="Download # movie.")
parser.add_argument("-t", "--ts", type=int, metavar="#", help="Display initial timestamp of # movie.")
parser.add_argument("-o", "--output", help="Destination file.")
parser.add_argument("-v", "--verbose", action='store_true', dest='verbose', help="Debug.")
args = parser.parse_args()
if args.verbose:
logger.info('Setting logging level to debug.')
coloredlogs.set_level(level=logging.DEBUG)
logger.debug('Arguments: %s' % args)
if args.url == None:
args.url = default_url
if args.list == True:
films = getFilmsList(args.url)
dumpFilms(films)
exit(0)
if args.head != None:
films = getFilmsList(args.url)
if args.head < 1 or args.head > len(films):
logger.error('Movie id must be in [1,%d]' % len(films))
exit(-1)
head(films[args.head-1]['url'])
if args.download != None:
films = getFilmsList(args.url)
if args.download < 1 or args.download > len(films):
logger.error('Movie id must be in [1,%d]' % len(films))
exit(-1)
mid = args.download-1
title = films[mid]['title']
if args.output == None:
args.output = '%s.ts' % title
logger.info('Setting output file to %s\n' % args.output)
duration = int(films[mid]['duration'].total_seconds())
# start = getInitialTS(films[mid]['url'])
t = timedelta(seconds=duration)
logger.info('Downloading movie "%s" whose duration is %s' % (title, t))
downloadMovie(url=films[mid]['url'], outputFileName=args.output)
if __name__ == "__main__":
main()