Compare commits

...

10 Commits

Author SHA1 Message Date
Frédéric Tronel
4f9c3ea08b Improvement of how we determine the initial timestamp. 2025-10-23 11:12:20 +02:00
Frédéric Tronel
1e6167a1ae Regexp strings shoulb de raw strings to preserve escape character. 2025-09-07 17:17:25 +02:00
Frédéric Tronel
31f8cf2047 Retrieving initial timestamp is more robust. 2024-06-08 14:32:56 +02:00
Frédéric Tronel
f5393cda3b Improve robustness in case of bad packets. 2024-06-02 15:13:28 +02:00
Frédéric Tronel
3e8f868d87 Correction d'un bug de nommage de module. 2024-01-06 16:24:21 +01:00
Frédéric Tronel
6b9999a414 Merge branch 'main' of git.tronel.org:Public/panasonic 2024-01-06 16:16:44 +01:00
Frédéric Tronel
de952cc314 Suppression d'un paramètre inutilisé. 2024-01-06 16:16:20 +01:00
Frédéric Tronel
bded5e8990 Utilisation d'une session. 2024-01-06 16:15:47 +01:00
Frédéric Tronel
2ea2cf0fc6 Correction d'un bug qui empêchait de retrouver correctement les timestamps initiaux et finaux. 2024-01-06 15:56:00 +01:00
Frédéric Tronel
dc7c8acb00 A first version of README. 2024-01-05 14:58:33 +01:00
2 changed files with 44 additions and 18 deletions

View File

@@ -1,2 +1,9 @@
# panasonic
# Panasonic SMART TV download recordings
This project allows to download recordings from a Panasonic smart TV (model ...).
This TV model is DLNA compatible. To download a recordings it is possible to request
the TV to stream a MPEG-TS file through a HTTP get request.
# Usage
# Progress bar

View File

@@ -150,12 +150,15 @@ def dumpFilms(films):
logger.info('%d - %s enregistré le %s sur %s. Durée: %s. %s' % (fid, film['title'], film['date'], film['channel'], film['duration'], film['url']))
fid = fid + 1
def getInitialTSFromBuffer(content):
def getInitialTSFromBuffer(content, delta=0.1):
logger = logging.getLogger(__name__)
with Popen(['ffprobe','/dev/stdin','-loglevel', 'quiet', '-read_intervals', '0%+0.000001', '-show_entries', 'frame=frame_no,best_effort_timestamp_time', '-of','json'], stdout=PIPE, stdin=PIPE) as ffprobe:
out, _ = ffprobe.communicate(input=content)
logger.debug('Determining initial timestamp from content')
length = len(content)
with Popen(['ffprobe','/dev/stdin','-loglevel', 'quiet', '-read_intervals', ('0%%+%f' % delta), '-show_entries', 'frame=frame_no,best_effort_timestamp_time', '-of','json'], stdout=PIPE, stdin=PIPE) as ffprobe:
out, _ = ffprobe.communicate(input=content[0:min(length,200000)])
frames = json.load(BytesIO(out))
logger.debug('Frames: %s' % frames)
if 'frames' in frames:
frames = frames['frames']
if len(frames) > 0:
@@ -173,10 +176,12 @@ def getInitialTSFromBuffer(content):
def getLastTSFromBuffer(content):
logger = logging.getLogger(__name__)
logger.debug('Determining last timestamp from content.')
length = len(content)
with Popen(['ffprobe', '/dev/stdin', '-loglevel', 'quiet', '-show_entries', 'frame=frame_no,best_effort_timestamp_time', '-of','json'], stdout=PIPE, stdin=PIPE) as ffprobe:
out, _ = ffprobe.communicate(input=content[max(0,length-100000):])
out, _ = ffprobe.communicate(input=content[max(0,length-200000):])
frames = json.load(BytesIO(out))
logger.debug('Frames: %s' % frames)
if 'frames' in frames:
frames = frames['frames']
if len(frames) > 0:
@@ -220,7 +225,7 @@ def getLastTSFromFD(fd):
else:
return None
def downloadMovie(url, outputFileName, duration):
def downloadMovie(url, outputFileName):
logger = logging.getLogger(__name__)
try:
@@ -230,10 +235,11 @@ def downloadMovie(url, outputFileName, duration):
exit(-1)
outputFD = output.fileno()
session = requests.Session()
sample = 2.
headers = {'TimeSeekRange.dlna.org': 'npt=0.000-%.03f' % sample}
r = requests.get(url, headers=headers)
r = session.get(url, headers=headers)
if r.status_code != 200:
logger.error('Impossible to download first %f seconds: %d' % (sample,r.status_code))
exit(-1)
@@ -242,17 +248,28 @@ def downloadMovie(url, outputFileName, duration):
logger.error('TimeSeekRange.dlna.org is not in header: %s' % r.headers)
exit(-1)
p = re.compile('^.*/(?P<totalsecs>[0-9]+)\.(?P<totalms>[0-9]+)$')
p = re.compile(r'^.*/(?P<totalsecs>[0-9]+)\.(?P<totalms>[0-9]+)$')
m = p.match(r.headers['TimeSeekRange.dlna.org'])
if m == None:
logger.error('Impossible to parse timestamps' % r.headers['TimeSeekRange.dlna.org'])
exit(-1)
total = int(m.group('totalsecs'))+(int(m.group('totalms')))/1000
total = timedelta(seconds=total)
initialTS = getInitialTSFromBuffer(r.content)
logger.debug('Header: %s' % r.headers)
initialTS = None
nbRetries = 0
maxRetries = 10
while initialTS == None and nbRetries < maxRetries:
initialTS = getInitialTSFromBuffer(r.content, delta=0.1*nbRetries)
nbRetries = nbRetries+1
logger.debug('Initial timestamp: %s' % initialTS)
if initialTS == None:
logger.error('Impossible to retrieve initial timestamp')
exit(-1)
lastTS = getLastTSFromBuffer(r.content)
logger.debug('Last timestamp: %s' % lastTS)
delta = lastTS-initialTS
ratio = delta/total
estimatedLength = ceil(len(r.content)/ratio)
@@ -262,17 +279,19 @@ def downloadMovie(url, outputFileName, duration):
pb = tqdm(total=estimatedLength, unit='bytes', desc='Download', unit_scale=True, unit_divisor=1024)
nbiters = 0
with requests.get(url, stream=True) as r:
with session.get(url, stream=True) as r:
chunks = r.iter_content(chunk_size = 100000)
for chunk in chunks:
output.write(chunk)
if nbiters == 1000:
lastTS = getLastTSFromFD(outputFD)
delta = lastTS-initialTS
ratio = delta/total
length = fstat(outputFD).st_size
estimatedLength = ceil(length/ratio)
pb.total = estimatedLength
if lastTS != None:
delta = lastTS-initialTS
ratio = delta/total
length = fstat(outputFD).st_size
estimatedLength = ceil(length/ratio)
pb.total = estimatedLength
lseek(outputFD, 0, SEEK_END)
nbiters = 0
pb.update(len(chunk))
@@ -350,7 +369,7 @@ def main():
t = timedelta(seconds=duration)
logger.info('Downloading movie "%s" whose duration is %s' % (title, t))
downloadMovie(url=films[mid]['url'], outputFileName=args.output, duration=duration)
downloadMovie(url=films[mid]['url'], outputFileName=args.output)
if __name__ == "__main__":