Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_stock(ticker):
"""
Returns a dictionary containing stock data.
:param ticker: stock symbol
:type ticker: str
:return dict
"""
data = {}
page_parsed, _ = http_request_get(url=STOCK_URL, payload={'t': ticker}, parse=True)
all_rows = [row.xpath('td//text()') for row in page_parsed.cssselect('tr[class="table-dark-row"]')]
for row in all_rows:
for column in range(0, 11):
if column % 2 == 0:
data[row[column]] = row[column + 1]
return data
""" Private function used to return the portfolio url from a given id/name. """
# If the user has provided an ID (Portfolio ID is always an int)
if isinstance(portfolio_name, int):
# Raise error for invalid portfolio ID
if not len(str(portfolio_name)) == PORTFOLIO_DIGIT_COUNT:
raise InvalidPortfolioID(portfolio_name)
else:
return http_request_get(url=f"{PORTFOLIO_URL}?pid={portfolio_name}",
session=self._session,
parse=False)
else: # else the user has passed a name
# We remove the first element, since it's redundant
for portfolio in parse(self._page_content).cssselect('option')[1:]:
if portfolio.text == portfolio_name:
return http_request_get(url=f"{PORTFOLIO_URL}?pid={portfolio.get('value')}",
session=self._session,
parse=False)
# Raise UnexistingPortfolioName if none of the names match
raise UnexistingPortfolioName(portfolio_name)
with open(file, 'r') as infile:
reader = csv.reader(infile)
next(reader, None) # Skip the headers
for row_number, row in enumerate(reader, 0):
row_number_string = str(row_number)
data['ticker' + row_number_string] = row[0]
data['transaction' + row_number_string] = row[1]
data['date' + row_number_string] = row[2]
data['shares' + row_number_string] = row[3]
try:
data['price' + row_number_string] = row[4]
except IndexError:
current_price_page, _ = http_request_get(PRICE_REQUEST_URL, payload={'t': row[0]}, parse=True)
data['price' + row_number_string] = current_price_page.text
self._session.post(PORTFOLIO_SUBMIT_URL, data=data)
def __search_screener(self):
""" Private function used to return data from the FinViz screener. """
self._page_content, self._url = http_request_get('https://finviz.com/screener.ashx', payload={
'v': self._table,
't': ','.join(self._tickers),
'f': ','.join(self._filters),
'o': self._order,
's': self._signal,
'c': ','.join(self._custom)
})
self._rows = self.__check_rows()
self.headers = self.__get_table_headers()
page_urls = scrape.get_page_urls(self._page_content, self._rows, self._url)
async_connector = Connector(scrape.get_table,
page_urls,
self.headers,
self._rows)
def get_insider(ticker):
"""
Returns a list of dictionaries containing all recent insider transactions.
:param ticker: stock symbol
:return: list
"""
page_parsed, _ = http_request_get(url=STOCK_URL, payload={'t': ticker}, parse=True)
table = page_parsed.cssselect('table[class="body-table"]')[0]
headers = table[0].xpath('td//text()')
data = [dict(zip(headers, row.xpath('td//text()'))) for row in table[1:]]
return data
def get_all_news():
"""
Returns a list of sets containing time, headline and url
:return: list
"""
page_parsed, _ = http_request_get(url=NEWS_URL, parse=True)
all_dates = [row.text_content() for row in page_parsed.cssselect('td[class="nn-date"]')]
all_headlines = [row.text_content() for row in page_parsed.cssselect('a[class="nn-tab-link"]')]
all_links = [row.get('href') for row in page_parsed.cssselect('a[class="nn-tab-link"]')]
return list(zip(all_dates, all_headlines, all_links))
def get_news(ticker):
"""
Returns a list of sets containing news headline and url
:param ticker: stock symbol
:return: list
"""
page_parsed, _ = http_request_get(url=STOCK_URL, payload={'t': ticker}, parse=True)
all_news = page_parsed.cssselect('a[class="tab-link-news"]')
dates = []
for i in range(len(all_news)):
tr = all_news[i].getparent().getparent()
date_str = tr[0].text.strip()
if ' ' not in date_str:
# This is only time, need to grab date from upper sibling news line.
tbody = tr.getparent()
previous_date_str = ''
j = 1
while ' ' not in previous_date_str:
try:
previous_date_str = tbody[i-j][0].text.strip()
except IndexError:
break