Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __check_rows(self):
"""
Checks if the user input for row number is correct.
Otherwise, modifies the number or raises NoResults error.
"""
self._total_rows = scrape.get_total_rows(self._page_content)
if self._total_rows == 0:
raise NoResults(self._url.split('?')[1])
elif self._rows is None or self._rows > self._total_rows:
return self._total_rows
else:
return self._rows
tickers = None
if 't' in splitted_query:
tickers = splitted_query['t'][0].split(',')
filters = None
if 'f' in splitted_query:
filters = splitted_query['f'][0].split(',')
table = None
if 'v' in splitted_query:
table_numbers_types = {v: k for k, v in TABLE_TYPES.items()}
table_number_string = splitted_query['v'][0][0:3]
try:
table = table_numbers_types[table_number_string]
except KeyError:
raise InvalidTableType(splitted_query['v'][0])
else:
table = 'Overview'
custom = None
if 'c' in splitted_query:
custom = splitted_query['c'][0].split(',')
order = ''
if 'o' in splitted_query:
order = splitted_query['o'][0]
signal = ''
if 's' in splitted_query:
order = splitted_query['s'][0]
return cls(tickers, filters, rows, order, signal, table, custom)
def __check_rows(self):
"""
Checks if the user input for row number is correct.
Otherwise, modifies the number or raises NoResults error.
"""
self._total_rows = scrape.get_total_rows(self._page_content)
if self._total_rows == 0:
raise NoResults(self._url.split('?')[1])
elif self._rows is None or self._rows > self._total_rows:
return self._total_rows
else:
return self._rows
def test():
print("testing now")
testObject = FinViz()
data = testObject.getTrends()
for i in data:
print(i)
def get_stock(ticker):
"""
Returns a dictionary containing stock data.
:param ticker: stock symbol
:type ticker: str
:return dict
"""
data = {}
page_parsed, _ = http_request_get(url=STOCK_URL, payload={'t': ticker}, parse=True)
all_rows = [row.xpath('td//text()') for row in page_parsed.cssselect('tr[class="table-dark-row"]')]
for row in all_rows:
for column in range(0, 11):
if column % 2 == 0:
data[row[column]] = row[column + 1]
return data
""" Private function used to return the portfolio url from a given id/name. """
# If the user has provided an ID (Portfolio ID is always an int)
if isinstance(portfolio_name, int):
# Raise error for invalid portfolio ID
if not len(str(portfolio_name)) == PORTFOLIO_DIGIT_COUNT:
raise InvalidPortfolioID(portfolio_name)
else:
return http_request_get(url=f"{PORTFOLIO_URL}?pid={portfolio_name}",
session=self._session,
parse=False)
else: # else the user has passed a name
# We remove the first element, since it's redundant
for portfolio in parse(self._page_content).cssselect('option')[1:]:
if portfolio.text == portfolio_name:
return http_request_get(url=f"{PORTFOLIO_URL}?pid={portfolio.get('value')}",
session=self._session,
parse=False)
# Raise UnexistingPortfolioName if none of the names match
raise UnexistingPortfolioName(portfolio_name)
with open(file, 'r') as infile:
reader = csv.reader(infile)
next(reader, None) # Skip the headers
for row_number, row in enumerate(reader, 0):
row_number_string = str(row_number)
data['ticker' + row_number_string] = row[0]
data['transaction' + row_number_string] = row[1]
data['date' + row_number_string] = row[2]
data['shares' + row_number_string] = row[3]
try:
data['price' + row_number_string] = row[4]
except IndexError:
current_price_page, _ = http_request_get(PRICE_REQUEST_URL, payload={'t': row[0]}, parse=True)
data['price' + row_number_string] = current_price_page.text
self._session.post(PORTFOLIO_SUBMIT_URL, data=data)
def __search_screener(self):
""" Private function used to return data from the FinViz screener. """
self._page_content, self._url = http_request_get('https://finviz.com/screener.ashx', payload={
'v': self._table,
't': ','.join(self._tickers),
'f': ','.join(self._filters),
'o': self._order,
's': self._signal,
'c': ','.join(self._custom)
})
self._rows = self.__check_rows()
self.headers = self.__get_table_headers()
page_urls = scrape.get_page_urls(self._page_content, self._rows, self._url)
async_connector = Connector(scrape.get_table,
page_urls,
self.headers,
self._rows)
def get_insider(ticker):
"""
Returns a list of dictionaries containing all recent insider transactions.
:param ticker: stock symbol
:return: list
"""
page_parsed, _ = http_request_get(url=STOCK_URL, payload={'t': ticker}, parse=True)
table = page_parsed.cssselect('table[class="body-table"]')[0]
headers = table[0].xpath('td//text()')
data = [dict(zip(headers, row.xpath('td//text()'))) for row in table[1:]]
return data
def get_all_news():
"""
Returns a list of sets containing time, headline and url
:return: list
"""
page_parsed, _ = http_request_get(url=NEWS_URL, parse=True)
all_dates = [row.text_content() for row in page_parsed.cssselect('td[class="nn-date"]')]
all_headlines = [row.text_content() for row in page_parsed.cssselect('a[class="nn-tab-link"]')]
all_links = [row.get('href') for row in page_parsed.cssselect('a[class="nn-tab-link"]')]
return list(zip(all_dates, all_headlines, all_links))