Source code for pyarlo.media

# coding: utf-8
"""Implementation of Arlo Media object."""
import logging
from datetime import datetime
from datetime import timedelta
from pyarlo.const import LIBRARY_ENDPOINT, PRELOAD_DAYS
from pyarlo.utils import (
    http_get, http_stream, to_datetime, pretty_timestamp,
    assert_is_dict)

_LOGGER = logging.getLogger(__name__)


[docs]class ArloMediaLibrary(object): """Arlo Library Media module implementation.""" def __init__(self, arlo_session, preload=True, days=PRELOAD_DAYS): """Initialiaze Arlo Media Library object. :param arlo_session: PyArlo shared session :param preload: Boolean to pre-load video library. :param days: If preload, number of days to lookup. :returns ArloMediaLibrary object """ self._session = arlo_session if preload and days: self.videos = self.load(days) else: self.videos = [] def __repr__(self): """Representation string of object.""" return "<{0}: {1}>".format(self.__class__.__name__, self._session.userid)
[docs] def load(self, days=PRELOAD_DAYS, only_cameras=None, date_from=None, date_to=None, limit=None): """Load Arlo videos from the given criteria :param days: number of days to retrieve :param only_cameras: retrieve only <ArloCamera> on that list :param date_from: refine from initial date :param date_to: refine final date :param limit: define number of objects to return """ videos = [] url = LIBRARY_ENDPOINT if not (date_from and date_to): now = datetime.today() date_from = (now - timedelta(days=days)).strftime('%Y%m%d') date_to = now.strftime('%Y%m%d') params = {'dateFrom': date_from, 'dateTo': date_to} data = self._session.query(url, method='POST', extra_params=params).get('data') # get all cameras to append to create ArloVideo object all_cameras = self._session.cameras for video in data: # pylint: disable=cell-var-from-loop try: srccam = \ list(filter( lambda cam: cam.device_id == video.get('deviceId'), all_cameras) )[0] except IndexError: continue # make sure only_cameras is a list if only_cameras and \ not isinstance(only_cameras, list): only_cameras = [(only_cameras)] # filter by camera only if only_cameras: if list(filter(lambda cam: cam.device_id == srccam.device_id, list(only_cameras))): videos.append(ArloVideo(video, srccam, self._session)) else: videos.append(ArloVideo(video, srccam, self._session)) if limit: return videos[:limit] return videos
[docs]class ArloVideo(object): """Object for Arlo Video file.""" def __init__(self, attrs, camera, arlo_session): """Initialiaze Arlo Video object. :param attrs: Video attributes :param camera: Arlo camera which recorded the video :param arlo_session: Arlo shared session """ self._attrs = attrs self._attrs = assert_is_dict(self._attrs) self._camera = camera self._session = arlo_session def __repr__(self): """Representation string of object.""" return "<{0}: {1}>".format(self.__class__.__name__, self._name) @property def _name(self): """Define object name.""" return "{0} {1} {2}".format( self._camera.name, pretty_timestamp(self.created_at), self._attrs.get('mediaDuration')) # pylint: disable=invalid-name @property def id(self): """Return object id.""" if self._attrs is not None: return self._attrs.get('name') return None @property def created_at(self): """Return timestamp.""" if self._attrs is not None: return self._attrs.get('localCreatedDate') return None
[docs] def created_at_pretty(self, date_format=None): """Return pretty timestamp.""" if date_format: return pretty_timestamp(self.created_at, date_format=date_format) return pretty_timestamp(self.created_at)
@property def created_today(self): """Return True if created today.""" if self.datetime.date() == datetime.today().date(): return True return False @property def datetime(self): """Return datetime when video was created.""" return to_datetime(self.created_at) @property def content_type(self): """Return content_type.""" if self._attrs is not None: return self._attrs.get('contentType') return None @property def camera(self): """Return camera object that recorded video.""" return self._camera @property def media_duration_seconds(self): """Return media duration in seconds.""" if self._attrs is not None: return self._attrs.get('mediaDurationSecond') return None @property def triggered_by(self): """Return the reason why video was recorded.""" if self._attrs is not None: return self._attrs.get('reason') return None @property def thumbnail_url(self): """Return thumbnail url.""" if self._attrs is not None: return self._attrs.get('presignedThumbnailUrl') return None @property def video_url(self): """Return video content url.""" if self._attrs is not None: return self._attrs.get('presignedContentUrl') return None @property def motion_type(self): """Returns the type of motion that triggered the camera. Requires subscription.""" if self._attrs is not None: return self._attrs.get("objCategory") return None
[docs] def download_thumbnail(self, filename=None): """Download JPEG thumbnail. :param filename: File to save thumbnail. Default: stdout """ return http_get(self.thumbnail_url, filename)
[docs] def download_video(self, filename=None): """Download video content. :param filename: File to save video. Default: stdout """ return http_get(self.video_url, filename)
@property def stream_video(self): """Stream video.""" return http_stream(self.video_url)
# vim:sw=4:ts=4:et: