Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def init_db():
insert_list = [
Video(yt_videoid="0", title="title1", description="description1", publisher="id_publisher1", publish_date=1488286166,
watched=False),
Video(yt_videoid="0", title="title1", description="description1", publisher="id_publisher1", publish_date=1488286167,
watched=False),
Video(yt_videoid="1", title="title2", description="description1", publisher="id_publisher1", publish_date=1488286168,
watched=False),
Video(yt_videoid="1", title="title2", description="description2", publisher="id_publisher2", publish_date=1488286170,
watched=False),
Video(yt_videoid="2", title="title3", description="description3", publisher="id_publisher2", publish_date=1488286171,
watched=False)
]
db = Database(":memory:")
db.add_channel(Channel(displayname="publisher1", yt_channelid="id_publisher1"))
db.add_channel(Channel(displayname="publisher2", yt_channelid="id_publisher2"))
db.add_channel(Channel(displayname="publisher3", yt_channelid="id_publisher3"))
db.add_videos(insert_list)
return db
def setUp(self):
self.current_dir = os.path.dirname(__file__)
self.ytcc = Ytcc(os.path.join(self.current_dir, "data/ytcc_test.conf"))
self.db_conn = self.ytcc.database
self.db_conn.engine.execute("delete from video")
self.db_conn.engine.execute("delete from channel")
self.db_conn.add_channel(Channel(displayname="Webdriver Torso", yt_channelid="UCsLiV4WJfkTEHH0b9PmRklw"))
self.db_conn.add_channel(Channel(displayname="Webdriver YPP", yt_channelid="UCxexYYtOetqikZqriLuTS-g"))
insert_list = [
Video(yt_videoid="0", title="title1", description="description1", publisher="id_publisher1", publish_date=1488286166,
watched=False),
Video(yt_videoid="0", title="title1", description="description1", publisher="id_publisher1", publish_date=1488286167,
watched=False),
Video(yt_videoid="1", title="title2", description="description1", publisher="id_publisher1", publish_date=1488286168,
watched=False),
Video(yt_videoid="1", title="title2", description="description2", publisher="id_publisher2", publish_date=1488286170,
watched=False),
Video(yt_videoid="2", title="title3", description="description3", publisher="id_publisher2", publish_date=1488286171,
watched=False)
]
db = Database(":memory:")
db.add_channel(Channel(displayname="publisher1", yt_channelid="id_publisher1"))
db.add_channel(Channel(displayname="publisher2", yt_channelid="id_publisher2"))
db.add_channel(Channel(displayname="publisher3", yt_channelid="id_publisher3"))
db.add_videos(insert_list)
return db
parser = etree.HTMLParser()
root = etree.parse(StringIO(response), parser).getroot()
site_name_node = root.xpath('//meta[@property="og:site_name"]')
channel_id_node = root.xpath('//meta[@itemprop="channelId"]')
if not site_name_node or site_name_node[0].attrib.get("content", "") != "YouTube":
raise BadURLException(f"{channel_url} does not seem to be a YouTube URL")
if not channel_id_node:
raise ChannelDoesNotExistException(f"{channel_url} does not seem to be a YouTube URL")
yt_channelid = channel_id_node[0].attrib.get("content")
try:
self.database.add_channel(Channel(displayname=displayname, yt_channelid=yt_channelid))
except sqlalchemy.exc.IntegrityError:
raise DuplicateChannelException(f"Channel already subscribed: {displayname}")
def rename_channel(self, oldname: str, newname: str) -> None:
"""Rename the given channel.
:param oldname: The name of the channel.
:param newname: The new name of the channel.
:raises ChannelDoesNotExistException: If the given channel does not exist.
:raises DuplicateChannelException: If new name already exists.
"""
query = self.session.query(Channel).filter(Channel.displayname == newname)
if query.one_or_none() is not None:
raise DuplicateChannelException()
try:
channel = self.session.query(Channel).filter(Channel.displayname == oldname).one()
channel.displayname = newname
except NoResultFound:
raise ChannelDoesNotExistException()
def list_videos(self) -> List[Video]:
"""Return a list of videos that match the filters set by the set_*_filter methods.
:return: A list of videos.
"""
if self.video_id_filter:
return self.database.session.query(Video) \
.join(Channel, Channel.yt_channelid == Video.publisher) \
.filter(Video.id.in_(self.video_id_filter)) \
.order_by(*self.config.order_by).all()
if not self.date_end_filter[1]:
date_end_filter = time.mktime(time.gmtime()) + 20
else:
date_end_filter = self.date_end_filter[0]
query = self.database.session.query(Video) \
.join(Channel, Channel.yt_channelid == Video.publisher) \
.filter(Video.publish_date > self.date_begin_filter) \
.filter(Video.publish_date < date_end_filter)
if self.channel_filter:
query = query.filter(Channel.displayname.in_(self.channel_filter))
def _create_channel(elem: etree.Element) -> Channel:
rss_url = urlparse(elem.attrib["xmlUrl"])
query_dict = parse_qs(rss_url.query, keep_blank_values=False)
channel_id = query_dict.get("channel_id", [])
if len(channel_id) != 1:
message = f"'{file.name}' is not a valid YouTube export file"
raise InvalidSubscriptionFileError(message)
return Channel(displayname=elem.attrib["title"], yt_channelid=channel_id[0])