Compare commits

...

12 Commits

Author SHA1 Message Date
Frédéric Tronel
4f9c3ea08b Improvement of how we determine the initial timestamp. 2025-10-23 11:12:20 +02:00
Frédéric Tronel
1e6167a1ae Regexp strings shoulb de raw strings to preserve escape character. 2025-09-07 17:17:25 +02:00
Frédéric Tronel
31f8cf2047 Retrieving initial timestamp is more robust. 2024-06-08 14:32:56 +02:00
Frédéric Tronel
f5393cda3b Improve robustness in case of bad packets. 2024-06-02 15:13:28 +02:00
Frédéric Tronel
3e8f868d87 Correction d'un bug de nommage de module. 2024-01-06 16:24:21 +01:00
Frédéric Tronel
6b9999a414 Merge branch 'main' of git.tronel.org:Public/panasonic 2024-01-06 16:16:44 +01:00
Frédéric Tronel
de952cc314 Suppression d'un paramètre inutilisé. 2024-01-06 16:16:20 +01:00
Frédéric Tronel
bded5e8990 Utilisation d'une session. 2024-01-06 16:15:47 +01:00
Frédéric Tronel
2ea2cf0fc6 Correction d'un bug qui empêchait de retrouver correctement les timestamps initiaux et finaux. 2024-01-06 15:56:00 +01:00
Frédéric Tronel
dc7c8acb00 A first version of README. 2024-01-05 14:58:33 +01:00
Frédéric Tronel
5b05972d40 Merge branch 'main' of git.tronel.org:Public/panasonic 2024-01-05 14:33:09 +01:00
Frédéric Tronel
e3b95c3450 Import de code d'un projet plus complet pour le limiter au téléchargement depuis la TV. Ajout d'une barre de progression. 2024-01-05 14:27:56 +01:00
4 changed files with 389 additions and 1 deletions

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
venv/

View File

@@ -1,2 +1,9 @@
# panasonic
# Panasonic SMART TV download recordings
This project allows to download recordings from a Panasonic smart TV (model ...).
This TV model is DLNA compatible. To download a recordings it is possible to request
the TV to stream a MPEG-TS file through a HTTP get request.
# Usage
# Progress bar

376
panasonic.py Executable file
View File

@@ -0,0 +1,376 @@
#!/usr/bin/env python3
import requests
import xmltodict
from datetime import datetime,timedelta,time
import re
from sys import exit,stdout
import json
from io import BytesIO, StringIO, IOBase
from subprocess import Popen, PIPE
import argparse
import locale
from math import floor, ceil, log
from tempfile import NamedTemporaryFile
from os import mkdir, set_inheritable, lseek, SEEK_SET, SEEK_END, read, fstat
import coloredlogs, logging
from tqdm import tqdm
default_url='http://tv:55000/dms/control_0'
headers={
'SOAPAction': '"urn:schemas-upnp-org:service:ContentDirectory:1#Browse"',
'Accept-Language': 'fr-fr;q=1, fr;q=0.5',
'Accept-Encoding': 'gzip',
'Content-Type': 'text/xml; charset="utf-8"',
'User-Agent': 'kodi',
'Connection': 'Keep-Alive'}
request = '<?xml version="1.0"?><s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"><s:Body><u:Browse xmlns:u="urn:schemas-upnp-org:service:ContentDirectory:1"><ObjectID>%s</ObjectID><BrowseFlag>%s</BrowseFlag><Filter>%s</Filter><StartingIndex>%d</StartingIndex><RequestedCount>%d</RequestedCount><SortCriteria></SortCriteria></u:Browse></s:Body></s:Envelope>'
def followTag(xml, tag):
logger = logging.getLogger(__name__)
if tag in xml:
return xml[tag]
else:
logger.warning('No %s tag found in xml' % tag)
return None
def filter(text):
return ''.join(map(lambda x: x if ord(x) in range(32, 128) else ' ', text))
def getChildren(url, parent, childType):
logger = logging.getLogger(__name__)
data = request % (parent,'BrowseDirectChildren','@childCount',0,64)
r = requests.post(url, headers=headers, data=data)
if (r.status_code != 200):
logger.error('Impossible to connect to get children of node %s' % parent)
exit(-1)
# Filtering characters that may be non parsable by xmltodict
text = filter(r.text)
response = xmltodict.parse(text)
tags=['s:Envelope', 's:Body', 'u:BrowseResponse', 'Result']
for tag in tags:
response = followTag(response, tag)
didl = xmltodict.parse(response)
children = didl
tags=['DIDL-Lite', childType]
for tag in tags:
children = followTag(children, tag)
return children
def getMetaData(url, item):
logger = logging.getLogger(__name__)
data = request % (item,'BrowseMetadata','*',0,64)
r = requests.post(url, headers=headers, data=data)
if (r.status_code != 200):
logger.error('Impossible to connect to get metadata of node %s' % parent)
exit(-1)
text = filter(r.text)
response = xmltodict.parse(text)
tags=['s:Envelope', 's:Body', 'u:BrowseResponse', 'Result']
for tag in tags:
response = followTag(response, tag)
didl = xmltodict.parse(response)
metadata = didl
tags=['DIDL-Lite', 'item']
for tag in tags:
metadata = followTag(metadata, tag)
return metadata
def getFilmsList(url):
logger = logging.getLogger(__name__)
disk=getChildren(url, parent='0', childType='container')
if( followTag(disk,'dc:title') != 'EnregistreurTV'):
logger.error('Impossible to find disk')
exit(-1)
name = followTag(disk, '@id')
directories=getChildren(url, parent=name, childType='container')
nameRec=None
for directory in directories:
if( followTag(directory,'dc:title') == 'Recorded TV'):
nameRec = followTag(directory, '@id')
break
if (nameRec == None):
logger.error('Impossible to find TV recorder directory')
exit(-1)
films=getChildren(url, parent=nameRec, childType='item')
if films == None:
films = []
if not isinstance(films, list):
films = [ films ]
res = []
for film in films:
title = followTag(film, 'dc:title')
fid = followTag(film, '@id')
metadata=getMetaData(url, fid)
date = datetime.fromisoformat(followTag(metadata, 'dc:date'))
channel = followTag(metadata, 'upnp:channelName')
streams = followTag(metadata, 'res')
duration = followTag(streams[0], '@duration')
m = re.match('(?P<hour>[0-9]*):(?P<minute>[0-9]*):(?P<second>[0-9]*)', duration)
if m == None:
logger.error('Impossible to parse date: %s' % duration)
exit(-1)
duration = timedelta(hours=int(m.group('hour')), minutes=int(m.group('minute')), seconds=int(m.group('second')))
uri=followTag(streams[0],'#text')
res.append({'title':title, 'date':date, 'channel':channel, 'duration':duration, 'url':uri})
return res
def dumpFilms(films):
logger = logging.getLogger(__name__)
fid = 1
for film in films:
logger.info('%d - %s enregistré le %s sur %s. Durée: %s. %s' % (fid, film['title'], film['date'], film['channel'], film['duration'], film['url']))
fid = fid + 1
def getInitialTSFromBuffer(content, delta=0.1):
logger = logging.getLogger(__name__)
logger.debug('Determining initial timestamp from content')
length = len(content)
with Popen(['ffprobe','/dev/stdin','-loglevel', 'quiet', '-read_intervals', ('0%%+%f' % delta), '-show_entries', 'frame=frame_no,best_effort_timestamp_time', '-of','json'], stdout=PIPE, stdin=PIPE) as ffprobe:
out, _ = ffprobe.communicate(input=content[0:min(length,200000)])
frames = json.load(BytesIO(out))
logger.debug('Frames: %s' % frames)
if 'frames' in frames:
frames = frames['frames']
if len(frames) > 0:
for frame in frames:
if 'best_effort_timestamp_time' in frame:
ts = float(frame['best_effort_timestamp_time'])
ts = timedelta(seconds=ts)
return ts
else:
return None
else:
return None
def getLastTSFromBuffer(content):
logger = logging.getLogger(__name__)
logger.debug('Determining last timestamp from content.')
length = len(content)
with Popen(['ffprobe', '/dev/stdin', '-loglevel', 'quiet', '-show_entries', 'frame=frame_no,best_effort_timestamp_time', '-of','json'], stdout=PIPE, stdin=PIPE) as ffprobe:
out, _ = ffprobe.communicate(input=content[max(0,length-200000):])
frames = json.load(BytesIO(out))
logger.debug('Frames: %s' % frames)
if 'frames' in frames:
frames = frames['frames']
if len(frames) > 0:
lastTS = None
for frame in frames:
if 'best_effort_timestamp_time' in frame:
ts = float(frame['best_effort_timestamp_time'])
ts = timedelta(seconds=ts)
if (lastTS == None) or (lastTS < ts):
lastTS = ts
return lastTS
else:
return None
else:
return None
def getLastTSFromFD(fd):
logger = logging.getLogger(__name__)
set_inheritable(fd, True)
with Popen(['ffprobe', '/dev/stdin' , '-loglevel', 'quiet', '-show_entries', 'frame=frame_no,best_effort_timestamp_time', '-of','json'], stdout=PIPE, stdin=PIPE, close_fds=False) as ffprobe:
back = 200000
length = fstat(fd).st_size
pos = lseek(fd, -back, SEEK_END)
content = read(fd, back)
out, _ = ffprobe.communicate(input=content)
frames = json.load(BytesIO(out))
if 'frames' in frames:
frames = frames['frames']
if len(frames) > 0:
lastTS = None
for frame in frames:
if 'best_effort_timestamp_time' in frame:
ts = float(frame['best_effort_timestamp_time'])
ts = timedelta(seconds=ts)
if (lastTS == None) or (lastTS < ts):
lastTS = ts
return lastTS
else:
return None
else:
return None
def downloadMovie(url, outputFileName):
logger = logging.getLogger(__name__)
try:
output = open(outputFileName, mode='wb+')
except IOException:
logger.error('Impossible to create file: %s' % outputFileName)
exit(-1)
outputFD = output.fileno()
session = requests.Session()
sample = 2.
headers = {'TimeSeekRange.dlna.org': 'npt=0.000-%.03f' % sample}
r = session.get(url, headers=headers)
if r.status_code != 200:
logger.error('Impossible to download first %f seconds: %d' % (sample,r.status_code))
exit(-1)
if 'TimeSeekRange.dlna.org' not in r.headers:
logger.error('TimeSeekRange.dlna.org is not in header: %s' % r.headers)
exit(-1)
p = re.compile(r'^.*/(?P<totalsecs>[0-9]+)\.(?P<totalms>[0-9]+)$')
m = p.match(r.headers['TimeSeekRange.dlna.org'])
if m == None:
logger.error('Impossible to parse timestamps' % r.headers['TimeSeekRange.dlna.org'])
exit(-1)
total = int(m.group('totalsecs'))+(int(m.group('totalms')))/1000
total = timedelta(seconds=total)
logger.debug('Header: %s' % r.headers)
initialTS = None
nbRetries = 0
maxRetries = 10
while initialTS == None and nbRetries < maxRetries:
initialTS = getInitialTSFromBuffer(r.content, delta=0.1*nbRetries)
nbRetries = nbRetries+1
logger.debug('Initial timestamp: %s' % initialTS)
if initialTS == None:
logger.error('Impossible to retrieve initial timestamp')
exit(-1)
lastTS = getLastTSFromBuffer(r.content)
logger.debug('Last timestamp: %s' % lastTS)
delta = lastTS-initialTS
ratio = delta/total
estimatedLength = ceil(len(r.content)/ratio)
logger.info('Estimated length: %d' % estimatedLength)
pb = tqdm(total=estimatedLength, unit='bytes', desc='Download', unit_scale=True, unit_divisor=1024)
nbiters = 0
with session.get(url, stream=True) as r:
chunks = r.iter_content(chunk_size = 100000)
for chunk in chunks:
output.write(chunk)
if nbiters == 1000:
lastTS = getLastTSFromFD(outputFD)
if lastTS != None:
delta = lastTS-initialTS
ratio = delta/total
length = fstat(outputFD).st_size
estimatedLength = ceil(length/ratio)
pb.total = estimatedLength
lseek(outputFD, 0, SEEK_END)
nbiters = 0
pb.update(len(chunk))
nbiters+=1
def head(url):
logger = logging.getLogger(__name__)
# headers = {'TimeSeekRange.dlna.org': 'npt=0.00-'}
headers = { 'getcontentFeatures.dlna.org': '1'}
r = requests.head(url, headers=headers)
logger.info('Return code:%d Headers:%s Content:%s' % (r.status_code, r.headers, r.content))
def main():
locale.setlocale(locale.LC_ALL, 'fr_FR.UTF8')
logger = logging.getLogger(__name__)
coloredlogs.install()
parser = argparse.ArgumentParser()
parser.add_argument("-u", "--url", help="URL to connect to.")
parser.add_argument("-l", "--list", action="store_true", help="List available movies.")
parser.add_argument("-s", "--streams", type=int, metavar="#", help="List streams in # movie.")
parser.add_argument("--head", type=int, help="Return HEAD ")
parser.add_argument("-d", "--download", type=int, metavar="#", help="Download # movie.")
parser.add_argument("-t", "--ts", type=int, metavar="#", help="Display initial timestamp of # movie.")
parser.add_argument("-o", "--output", help="Destination file.")
parser.add_argument("-v", "--verbose", action='store_true', dest='verbose', help="Debug.")
args = parser.parse_args()
if args.verbose:
logger.info('Setting logging level to debug.')
coloredlogs.set_level(level=logging.DEBUG)
logger.debug('Arguments: %s' % args)
if args.url == None:
args.url = default_url
if args.list == True:
films = getFilmsList(args.url)
dumpFilms(films)
exit(0)
if args.head != None:
films = getFilmsList(args.url)
if args.head < 1 or args.head > len(films):
logger.error('Movie id must be in [1,%d]' % len(films))
exit(-1)
head(films[args.head-1]['url'])
if args.download != None:
films = getFilmsList(args.url)
if args.download < 1 or args.download > len(films):
logger.error('Movie id must be in [1,%d]' % len(films))
exit(-1)
mid = args.download-1
title = films[mid]['title']
if args.output == None:
args.output = '%s.ts' % title
logger.info('Setting output file to %s\n' % args.output)
duration = int(films[mid]['duration'].total_seconds())
# start = getInitialTS(films[mid]['url'])
t = timedelta(seconds=duration)
logger.info('Downloading movie "%s" whose duration is %s' % (title, t))
downloadMovie(url=films[mid]['url'], outputFileName=args.output)
if __name__ == "__main__":
main()

4
requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
xmltodict
requests
coloredlogs
tqdm