How to use the streamlink.compat.urljoin function in streamlink

To help you get started, we’ve selected a few streamlink examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github back-to / generic / plugins / generic.py View on Github external
# remove \
        new_url = url.replace('\\', '')
        # repairs broken scheme
        if new_url.startswith('http://'):
            new_url = 'http:' + new_url[9:]
        elif new_url.startswith('https://'):
            new_url = 'https:' + new_url[10:]
        new_url = unquote(new_url)
        # creates a valid url from path only urls
        # and adds missing scheme for // urls
        if stream_base and new_url[1] != '/':
            if new_url[0] == '/':
                new_url = new_url[1:]
            new_url = urljoin(stream_base, new_url)
        else:
            new_url = urljoin(base_url, new_url)
        return new_url
github back-to / generic / plugins / generic.py View on Github external
def repair_url(self, url, base_url, stream_base=''):
        # remove \
        new_url = url.replace('\\', '')
        # repairs broken scheme
        if new_url.startswith('http://'):
            new_url = 'http:' + new_url[9:]
        elif new_url.startswith('https://'):
            new_url = 'https:' + new_url[10:]
        new_url = unquote(new_url)
        # creates a valid url from path only urls
        # and adds missing scheme for // urls
        if stream_base and new_url[1] != '/':
            if new_url[0] == '/':
                new_url = new_url[1:]
            new_url = urljoin(stream_base, new_url)
        else:
            new_url = urljoin(base_url, new_url)
        return new_url
github horacio9a / streamlink-bongacams / bongacams_.py View on Github external
sys.exit()

        if not r.ok:
            self.logger.debug('Status code for {0}: {1}', r.url, r.status_code)
            raise NoStreamsError(self.url)
        if len(http_session.cookies) == 0:
            raise PluginError("Can't get a cookies")

        if urlparse(r.url).netloc != stream_page_domain:
            # then redirected to regional subdomain
            country_code = urlparse(r.url).netloc.split('.')[0].lower()

        # time to set variables
        baseurl = urlunparse((stream_page_scheme, urlparse(r.url).netloc, '', '', '', ''))
        amf_gateway_url = urljoin(baseurl, CONST_AMF_GATEWAY_LOCATION)
        stream_page_url = urljoin(baseurl, stream_page_path)

        headers = {
            'User-Agent': useragents.CHROME,
            'Referer': stream_page_url,
            'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
            'X-Requested-With': 'XMLHttpRequest'
        }

        data = 'method=getRoomData&args%5B%5D={0}&args%5B%5D=false'.format(stream_page_path)
        self.logger.debug('DATA: {0}'.format(str(data)))
        # send request and close http-session
        r = http_session.post(url=amf_gateway_url,
                              headers=headers,
                              params={CONST_AMF_GATEWAY_PARAM: country_code},
                              data=data)
        http_session.close()
github streamlink / streamlink / src / streamlink / plugins / kanal7.py View on Github external
def find_iframe(self, url):
        res = self.session.http.get(url)
        # find iframe url
        iframe = self.iframe_re.search(res.text)
        iframe_url = iframe and iframe.group(1)
        if iframe_url:
            parsed = urlparse(iframe_url)
            if not parsed.netloc:
                iframe_url = urljoin(self.url, iframe_url)
            iframe_url = update_scheme(self.url, iframe_url)
            log.debug("Found iframe: {0}".format(iframe_url))
        return iframe_url
github streamlink / streamlink / src / streamlink / plugins / dogan.py View on Github external
def _get_hls_url(self, content_id):
        # make the api url relative to the current domain
        if "cnnturk" in self.url or "teve2.com.tr" in self.url:
            self.logger.debug("Using new content API url")
            api_url = urljoin(self.url, self.new_content_api.format(id=content_id))
        else:
            api_url = urljoin(self.url, self.content_api.format(id=content_id))

        apires = self.session.http.get(api_url)

        stream_data = self.session.http.json(apires, schema=self.content_api_schema)
        d = stream_data["Media"]["Link"]
        return urljoin((d["ServiceUrl"] or d["DefaultServiceUrl"]), d["SecurePath"])
github streamlink / streamlink / src / streamlink / plugins / ard_live.py View on Github external
def _get_streams(self):
        data_url = self.session.http.get(self.url, schema=self._player_url_schema)
        if data_url:
            res = self.session.http.get(urljoin(self.url, data_url))
            stream_info = self.session.http.xml(res, schema=self._livestream_schema)

            for stream in stream_info:
                url = stream["url"]
                try:
                    if ".m3u8" in url:
                        for s in HLSStream.parse_variant_playlist(self.session, url, name_key="bitrate").items():
                            yield s
                    elif ".f4m" in url:
                        for s in HDSStream.parse_manifest(self.session, url, pvswf=self.swf_url, is_akamai=True).items():
                            yield s
                    elif ".mp4" in url:
                        yield "{0}k".format(stream["bitrate"]), HTTPStream(self.session, url)
                except IOError as err:
                    self.logger.warning("Error parsing stream: {0}", err)
github streamlink / streamlink / src / streamlink / plugins / dogan.py View on Github external
def _get_hls_url(self, content_id):
        # make the api url relative to the current domain
        if "cnnturk" in self.url or "teve2.com.tr" in self.url:
            self.logger.debug("Using new content API url")
            api_url = urljoin(self.url, self.new_content_api.format(id=content_id))
        else:
            api_url = urljoin(self.url, self.content_api.format(id=content_id))

        apires = self.session.http.get(api_url)

        stream_data = self.session.http.json(apires, schema=self.content_api_schema)
        d = stream_data["Media"]["Link"]
        return urljoin((d["ServiceUrl"] or d["DefaultServiceUrl"]), d["SecurePath"])
github streamlink / streamlink / src / streamlink / plugins / onetv.py View on Github external
def vod_data(self, vid=None):
        """
        Get the VOD data path and the default VOD ID
        :return:
        """
        page = self.session.http.get(self.url)
        m = self._vod_re.search(page.text)
        vod_data_url = m and urljoin(self.url, m.group(0))
        if vod_data_url:
            self.logger.debug("Found VOD data url: {0}", vod_data_url)
            res = self.session.http.get(vod_data_url)
            return self.session.http.json(res)
github streamlink / streamlink / src / streamlink / stream / hls_playlist.py View on Github external
def uri(self, uri):
        if uri and urlparse(uri).scheme:
            return uri
        elif self.base_uri and uri:
            return urljoin(self.base_uri, uri)
        else:
            return uri
github streamlink / streamlink / src / streamlink / plugins / daisuki.py View on Github external
def _get_streams(self):
        page = http.get(self.url, schema=_schema)
        if not page:
            return

        pubkey_pem = get_public_key(self.cache, urljoin(self.url, page["clientlibs"]))
        if not pubkey_pem:
            raise PluginError("Unable to get public key")

        flashvars = page["flashvars"]

        params = {
            "cashPath": int(time.time() * 1000)
        }
        res = http.get(urljoin(self.url, flashvars["country"]), params=params)
        if not res:
            return
        language = http.xml(res, schema=_language_schema)

        api_params = {}
        for key in ("ss_id", "mv_id", "device_cd", "ss1_prm", "ss2_prm", "ss3_prm"):
            if flashvars.get(key, ""):