Compare commits

...

2 Commits

  1. 6
      .gitignore
  2. 117
      batch_process.py
  3. 66
      get_link.py
  4. 103
      price_finder.py
  5. 21
      proxy_class.py
  6. 27
      xpaths.json

6
.gitignore

@ -1,2 +1,6 @@
__pycache__
.gitignore
.gitignore
/uBlock0.chromium
/bg.html
/test_this_bullshit.py
/output

117
batch_process.py

@ -1,12 +1,25 @@
from price_finder import price_finder,BS
from price_finder import ParseResult
from lxml import etree
from bs4 import BeautifulSoup as BS
from itertools import cycle
import requests
import requests_html
from urllib.parse import urlparse
# import requests_html
import sys
from ipaddress import ip_address
def get_proxies(country = 'United States'):
ses = requests_html.HTMLSession()
r = ses.get('https://free-proxy-list.net/')
page = BS(r.html.raw_html,'lxml')
from get_link import get_link
import json
with open('xpaths.json') as file:
xpaths_data = json.load(file)
parser = etree.HTMLParser()
def text2tree(text):
return etree.fromstring(text,parser)
def get_proxies(link='https://free-proxy-list.net/',country = 'United States'):
## ses = requests_html.HTMLSession()
r = requests.get(link)
page = BS(r.content,'lxml')
table = page.find(id='proxylisttable')
headers,*rows = table.find_all('tr')
headers = list(tag.text.lower() for tag in headers.find_all('th'))
@ -21,55 +34,61 @@ def get_proxies(country = 'United States'):
try:
ip_address(tr[ip])
assert int(port) >= 0 and int(port) < 2**16
if tr[https_support] == "yes" and tr[country_id] == country:
if (tr[https_support] == "yes" or False) and tr[country_id] == country:
proxies.append('{}:{}'.format(tr[ip],tr[port]))
except (ValueError,AssertionError):
pass
except Exception as e:
print(row)
raise e
return cycle(proxies)
def get_prices(links):
proxies = get_proxies()
s = requests_html.HTMLSession()
return proxies
class proxy_iter:
def __init__(self,proxies):
self._proxies = set(proxies)
self.proxies = self._proxies.copy()
self.bad_proxies = set()
# self.used_proxies = {}
def __next__(self):
self.proxies -= self.bad_proxies
if len(self.proxies) == 0:
raise StopIteration
elem = self.proxies.pop()
if len(self.proxies) == 0:
self.proxies = self._proxies.copy()
return elem
def __iter__(self):
return self
def blacklist(self,proxy):
self.bad_proxies.add(proxy)
def get_prices(links,use_proxies = True):
pages = {}
xpaths = {link:xpaths_data[urlparse(link).netloc] for link in links}
# print(xpaths)
if use_proxies:
proxies = proxy_iter(get_proxies() + get_proxies('https://www.us-proxy.org/'))
for link in links:
for proxy in proxies:
print(link,proxy)
try:
page = get_link(link,xpaths,proxy=proxy)
pages[link] = page
break
except Exception as e:
print(type(e),e,file=sys.stdout)
proxies.blacklist(proxy)
if len(links) != len(pages.keys()):
raise Exception('all proxies suck')
else:
pages = get_link(links,xpaths)
ret = []
bad_proxies= set()
for link in links:
page = None
render_tries = 0
print(link)
while not page:
proxy = next(proxies)
while proxy in bad_proxies:
proxy = next(proxies)
print(proxy)
try:
r = s.get(link,proxies={'http':proxy,'https':proxy})
print('got')
try:
render_tries += 1
r.html.render()
print('rendered')
except requests_html.MaxRetries:
if render_tries > 2:
pass
else:
print('!'+proxy)
bad_proxies.update([proxy])
continue
page = r.html.raw_html
ret.append(price_finder(link,bs=BS(page,'lxml')))
except (requests.exceptions.ProxyError,requests.exceptions.SSLError):
print('!'+proxy)
bad_proxies.update([proxy])
print(bad_proxies)
s.close()
tree = text2tree(pages[link])
ret.append(
ParseResult(link,tree)
)
return ret
if __name__ == "__main__":
import saveto
import random
ql = saveto.load('quad_links')
random.shuffle(ql)
products = get_prices(ql)

66
get_link.py

@ -0,0 +1,66 @@
import pyppeteer
import pyppeteer.errors
import asyncio
import os
async def _get_link(browser,link,xpath):
pages = await browser.pages()
page = pages[0]
await page.goto(link,waitUntil='documentloaded')
xpath = [xpath['name'],xpath['price']]
for _xpath in xpath:
print(repr(_xpath))
try:
await page.waitForXPath(_xpath)
except pyppeteer.errors.TimeoutError:
pass
await asyncio.sleep(1)
webpage = None
for i in range(20):
try:
webpage = await page.content()
break
except:
await asyncio.sleep(1)
return webpage
async def _single_link(browser,link,xpath):
webpage = await _get_link(browser,link,xpath)
await browser.close()
return webpage
async def _multi_link(browser,links,xpaths):
results = {}
for link in links:
xpath = xpaths[link]
webpage = await _get_link(browser,link,xpath)
results[link] = webpage
await browser.close()
return results
def get_link(links,xpaths,headless = False,proxy = None):
loop = asyncio.get_event_loop()
run = loop.run_until_complete
opts = {
'headless':headless,
}
if proxy:
opts['args'] = [f'--proxy-server={proxy}']
else:
opts['args'] = []
ext = os.path.join(os.path.dirname(__file__),'uBlock0.chromium')
opts['args'] += [f'--disable-extensions-except={ext}', f'--load-extension={ext}']
# print(opts)
browser = run(pyppeteer.launch(**opts))
try:
if isinstance(links,list):
result = run(_multi_link(browser,links,xpaths))
else:
result = run(_single_link(browser,links,xpaths[links]))
return result
except Exception as e:
run(browser.close())
raise e

103
price_finder.py

@ -1,77 +1,62 @@
import urllib
from fake_useragent import UserAgent
from bs4 import BeautifulSoup as BS
from requests_html import HTMLSession
import re
import datetime
import copy
import json
with open('xpaths.json') as file:
xpaths = json.load(file)
user_agent = UserAgent().chrome
debug = None
def get_words(string,n):
words = re.finditer(r"(\b[^ \n]+\b)",string)
def get_words(raw,n):
words = re.finditer(r"(\b[^ \n]+\b)",raw)
word_list = list(match.group(0) for match in words)
if len(word_list) > n:
word_list = word_list[:n]
return ' '.join(word_list)
def get_page(url):
page = None
while not page:
page = urllib.request.Request(url,headers = {"User-Agent":user_agent})
page = str(urllib.request.urlopen(page).read())
return page
def get_BS(url):
return BS(get_page(url),"lxml")
def format_price(raw):
return re.search(r'\d+(\.\d)?',raw).group(0)
class ParseResult:
class price_finder:
page_funcs = {
"www.amazon.com":{
"name":lambda page: re.sub(r"( {2,}|\n|\\n)","",page.find("span",id="productTitle").text),
"price":lambda page: page.find(name = "span",id = re.compile("priceblock.*")).text
},
"www.banggood.com":{
"name":lambda page: page.find("h1",attrs = {"itemprop":"name"}).text,
"price":lambda page: page.find("div",attrs = {"class":"now"}).get("oriprice")
},
"www.dalprops.com":{
"name":lambda page: page.find("h1",attrs = {"class":"product_title"}).text,
"price":lambda page: page.find("meta",attrs = {"itemprop":"price"}).get("content")
},
"www.gearbest.com":{
"name":lambda page:re.sub(" {2,}|\n","",page.find("div",attrs = {"class":"goodsIntro_titleWrap"}).find("h1").text),
"price":lambda page: page.find("span",attrs={"class":"goodsIntro_price"}).text
},
"hobbyking.com":{
"name":lambda page: page.find("h1",attrs={"class":"product-name"}).text,
"price":lambda page: page.find("span",id = re.compile(r"product-price.*")).find("span",attrs={"class":"price"}).text
},
"www.getfpv.com":{
"name": lambda page: re.sub(r"\\n|\n","", page.find("div",attrs={"class":"product-name"}).text),
"price": lambda page: re.sub(r"\\n|\n","", page.find("span",attrs={"id":re.compile("product-price.*")}).text)
}
}
def __init__(self,url,space_seperated_categories = 7,bs=None):
def __init__(self,url,tree,space_seperated_categories = 7,):
self.url=url
self.info_url = urllib.parse.urlparse(url)
self.word_len = space_seperated_categories
if self.info_url.netloc not in price_finder.page_funcs.keys():
raise NotImplementedError("Not implemented for {}".format(self.info_url.netloc))
if bs:
self.bs= bs
else:
self.bs = get_BS(url)
# self.words = re_words(space_seperated_categories)
self.tree = tree
self.time = datetime.datetime.today()
self.info_product = self._get_product_info_()
def _get_product_info_(self):
funcs = price_finder.page_funcs[self.info_url.netloc]
# print(self.url)
host = self.info_url.netloc
product_name = get_words(
self.tree.xpath(xpaths[host]['name'])[0].text
)
other_raw = None
try:
other_raw = self.tree.xpath(xpaths[host]['other'])[0].text
except KeyError:
pass
if host in ['www.gearbest.com']:
if other.raw:
price = '0.00'
else:
price = format_price(
self.tree.xpath(xpaths[host]['price'])[0].text
)
else:
price = format_price(
self.tree.xpath(xpaths[host]['price'])[0].text
)
return {
"product_name":get_words(funcs["name"](self.bs),self.word_len),
"price":funcs["price"](self.bs).replace("$",""),
}
"product_name":product_name,
"price":price,
}

21
proxy_class.py

@ -0,0 +1,21 @@
class proxy_iter:
def __init__(self,proxies):
self._proxies = set(proxies)
self.proxies = self._proxies.copy()
self.bad_proxies = set()
# self.used_proxies = {}
def __next__(self):
self.proxies -= self.bad_proxies
if len(self.proxies) == 0:
raise StopIteration
elem = self.proxies.pop()
if len(self.proxies) == 0:
self.proxies = self._proxies.copy()
return elem
def __iter__(self):
return self
def blacklist(self,proxy):
self.bad_proxies.add(proxy)

27
xpaths.json

@ -0,0 +1,27 @@
{
"www.banggood.com": {
"name": "//h1[@itemprop='name']",
"price": "//div[@class='now']"
},
"www.gearbest.com": {
"name": "//h1[@class='goodsIntro_title']",
"price": "//span[contains(@class,'goodsIntro_price')]",
"other": "//div[@class='goodsIntro_noticeSubmit']"
},
"www.amazon.com": {
"name": "//span[@id='priceblock_dealprice' or @id='priceblock_ourprice']",
"price": "//span[@id='productTitle']"
},
"www.getfpv.com": {
"name": "//div[@class='product-name']/span",
"price": "//div[@class='price-box']/p[@class='special-price']/span[@class='price'] | //div[@class='price-box']/span[@class='regular-price']/span"
},
"www.dalprops.com": {
"name": "//h1[@itemprop='name']",
"price": "//*[@id='product-price']"
},
"hobbyking.com": {
"name": "//h1[contains(@class,'product-name')]",
"price": "//p[@class='special-price']/span[@class='price'] | //span[@class='regular-price']/span[@class='price']"
}
}
Loading…
Cancel
Save