Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@return Same as input.
'''
logger.log(DEBUG_OVERVIEW, 'Sorting the groups for better visualization...')
ref_identifiers = re.split('(?0:
# If found more than one group with the reference, use the 'manf#'
# as second order criteria.
if len(component_groups_ref_match)>1:
try:
for item in component_groups_ref_match:
component_groups_order_old.remove(item)
except ValueError:
pass
# Examine 'manf#' and refs to get the order.
# Order by refs that have 'manf#' codes, that ones that don't have stay at the end of the group.
group_manf_list = [new_component_groups[h].fields.get('manf#') for h in component_groups_ref_match]
group_refs_list = [new_component_groups[h].refs for h in component_groups_ref_match]
sorted_groups = sorted(range(len(group_refs_list)), key=lambda k:(group_manf_list[k] is None, group_refs_list[k]))
logger.log(DEBUG_OBSESSIVE, '{} > order: {}'.format( group_manf_list, sorted_groups) )
component_groups_ref_match = [component_groups_ref_match[i] for i in sorted_groups]
# Only set the field if it is not set yet (which indicates a variant
# has been parsed before)
fields[name] = value # Do not create empty fields. This is useful
# when used more than one `manf#` alias in one designator.
else:
# Now look for fields that start with 'kicost' and possibly
# another dot-separated variant field and store their values.
# Anything else is in a non-kicost namespace.
key_re = 'kicost(\.(?P.*))?:(?P.*)'
mtch = re.match(key_re, name, flags=re.IGNORECASE)
if mtch:
v = mtch.group('variant')
if v is not None and not re.match(variant, v, flags=re.IGNORECASE):
continue
if v is not None:
logger.log(DEBUG_OBSESSIVE, 'Matched Variant ... ' + v + mtch.group('name') )
# The field name is anything that came after the leading
# 'kicost' and optional variant field.
name = mtch.group('name')
name = field_name_translations.get(name, name)
# If the field name isn't for a manufacturer's part
# number or a distributors catalog number, then add
# it to 'local' if it doesn't start with a distributor
# name and colon.
#if name not in ('manf#', 'manf', 'desc', 'value', 'comment', 'S1PN', 'S1MN', 'S1PL', 'S2PN', 'S2MN', 'S2PL') and name[:-1] not in distributor_dict:
dist_mtch = re.match('([^:]+):',name)
if dist_mtch and dist_mtch.group(1) not in distributor_dict:
# 'name' is a distibutore (preceded & followed with ':'
logger.log(DEBUG_OBSESSIVE, 'Assigning local: for name "{}" dist "{}" ... '.format(name,dist_mtch.group(1)) )
# Original code supposes that name is a distributor
if SEPRTR not in name: # This field has no distributor.
name = 'local:' + name # Assign it to a local distributor.
#considering "." and "/" marks.
if len(re.sub('[\.\/]','',strings[0])) < len(re.sub('[\.\/]','',strings[1])):
qty = strings[0].strip()
part = strings[1].strip()
else:
qty = strings[1].strip()
part = strings[0].strip()
else:
qty = '1'
part = strings[0].strip() + strings[1].strip()
if qty=='':
qty = '1'
else:
qty = '1'
part = ''.join(strings)
logger.log(DEBUG_OBSESSIVE, 'part/qty>> {}\t\tpart>>{}\tqty>>'.format(subpart, part, qty) )
return qty, part
def show_cookies(self):
for x in self.session.cookies:
self.logger.log(DEBUG_OBSESSIVE,"%s Cookie %s" % (x.domain, x.name))
class_=('SearchResultsRowOdd', 'SearchResultsRowEven'))
# Extract the product links for the part numbers from the table.
product_links = [p.find('div', class_='mfrDiv').a for p in products]
# Extract all the part numbers from the text portion of the links.
part_numbers = [l.text for l in product_links]
# Look for the part number in the list that most closely matches the requested part number.
match = difflib.get_close_matches(pn, part_numbers, 1, 0.0)[0]
# Now look for the link that goes with the closest matching part number.
for l in product_links:
if l.text == match:
# Get the tree for the linked-to page and return that.
self.logger.log(DEBUG_OBSESSIVE,'Selecting {} from product table for {} from {}'.format(l.text, pn, self.name))
return self.dist_get_part_html_tree(pn, extra_search_terms,
url=l.get('href', ''),
descend=descend-1)
# I don't know what happened here, so give up.
self.logger.log(DEBUG_OBSESSIVE,'Unknown error for {} from {}'.format(pn, self.name))
self.logger.log(DEBUG_HTTP_RESPONSES,'Response was %s' % html)
raise PartHtmlError
self.logger.log(DEBUG_OBSESSIVE,'No part number {} in HTML page from {}'.format(pn, self.name))
raise PartHtmlError
try:
tree = BeautifulSoup(html, 'lxml')
except Exception:
self.logger.log(DEBUG_OBSESSIVE,'No HTML tree for {} from {}'.format(pn, self.name))
raise PartHtmlError
# If the tree contains the tag for a product page, then just return it.
if tree.find('div', id='pdpPricingAvailability') is not None:
return tree, url
# If the tree is for a list of products, then examine the links to try to find the part number.
if tree.find('div', id='searchResultsTbl') is not None:
self.logger.log(DEBUG_OBSESSIVE,'Found product table for {} from {}'.format(pn, self.name))
if descend <= 0:
self.logger.log(DEBUG_OBSESSIVE,'Passed descent limit for {} from {}'.format(pn, self.name))
raise PartHtmlError
else:
# Look for the table of products.
products = tree.find(
'table',
class_='SearchResultsTable').find_all(
'tr',
class_=('SearchResultsRowOdd', 'SearchResultsRowEven'))
# Extract the product links for the part numbers from the table.
product_links = [p.find('div', class_='mfrDiv').a for p in products]
# Extract all the part numbers from the text portion of the links.
part_numbers = [l.text for l in product_links]
continue
if v is not None:
logger.log(DEBUG_OBSESSIVE, 'Matched Variant ... ' + v + mtch.group('name') )
# The field name is anything that came after the leading
# 'kicost' and optional variant field.
name = mtch.group('name')
name = field_name_translations.get(name, name)
# If the field name isn't for a manufacturer's part
# number or a distributors catalog number, then add
# it to 'local' if it doesn't start with a distributor
# name and colon.
#if name not in ('manf#', 'manf', 'desc', 'value', 'comment', 'S1PN', 'S1MN', 'S1PL', 'S2PN', 'S2MN', 'S2PL') and name[:-1] not in distributor_dict:
dist_mtch = re.match('([^:]+):',name)
if dist_mtch and dist_mtch.group(1) not in distributor_dict:
# 'name' is a distibutore (preceded & followed with ':'
logger.log(DEBUG_OBSESSIVE, 'Assigning local: for name "{}" dist "{}" ... '.format(name,dist_mtch.group(1)) )
# Original code supposes that name is a distributor
if SEPRTR not in name: # This field has no distributor.
name = 'local:' + name # Assign it to a local distributor.
value = str(f.string)
if value or v is not None:
# Empty value also propagated to force deleting default value
fields[name] = value
logger.log(DEBUG_OBSESSIVE, 'Field {}={}'.format(name,value))
except AttributeError:
pass # No fields found for this part.
return fields
def start_new_session(self, scrape_base_url=True):
self.userAgent = get_user_agent()
# Use "requests" instead of "urllib" because "urllib" does not allow
# to remove "Connection: close" header which causes problems with some servers.
self.session = requests.session()
self.session.headers["User-Agent"] = self.userAgent
# Restore configuration cookies from previous session.
for c in self.config_cookies:
self.logger.log(DEBUG_OBSESSIVE, "Restore cookie: %s", c)
self.session.cookies.set(c[1], c[2], domain=c[0])
if scrape_base_url and self.domain:
self.scrape_URL(self.domain, retry=False)
self.show_cookies()