5 changed files with 101 additions and 160 deletions
-
4.gitignore
-
102batch_process.py
-
44get_link.py
-
105price_finder.py
-
6xpaths.json
@ -1,4 +1,6 @@ |
|||
__pycache__ |
|||
.gitignore |
|||
/uBlock0.chromium |
|||
/bg.html |
|||
/bg.html |
|||
/test_this_bullshit.py |
|||
/output |
|||
@ -1,50 +1,66 @@ |
|||
import pyppeteer |
|||
import pyppeteer.errors |
|||
import asyncio |
|||
import os |
|||
async def _get_link(browser,link): |
|||
|
|||
async def _get_link(browser,link,xpath): |
|||
pages = await browser.pages() |
|||
page = pages[0] |
|||
await page.goto(link,timeout=60_000) |
|||
await page.goto(link,waitUntil='documentloaded') |
|||
|
|||
xpath = [xpath['name'],xpath['price']] |
|||
for _xpath in xpath: |
|||
print(repr(_xpath)) |
|||
try: |
|||
await page.waitForXPath(_xpath) |
|||
except pyppeteer.errors.TimeoutError: |
|||
pass |
|||
await asyncio.sleep(1) |
|||
webpage = None |
|||
for i in range(20): |
|||
try: |
|||
webpage = await page.content() |
|||
break |
|||
except: |
|||
time.sleep(1) |
|||
await asyncio.sleep(1) |
|||
return webpage |
|||
|
|||
async def _single_link(browser,link): |
|||
webpage = await _get_link(browser,link) |
|||
async def _single_link(browser,link,xpath): |
|||
webpage = await _get_link(browser,link,xpath) |
|||
await browser.close() |
|||
return webpage |
|||
|
|||
async def _multi_link(browser,links): |
|||
async def _multi_link(browser,links,xpaths): |
|||
results = {} |
|||
for link in links: |
|||
webpage = await _get_link(browser,link) |
|||
xpath = xpaths[link] |
|||
webpage = await _get_link(browser,link,xpath) |
|||
results[link] = webpage |
|||
await browser.close() |
|||
return results |
|||
|
|||
def get_link(links,headless = True,proxy = None): |
|||
ext = os.path.join(os.path.dirname(__file__),'uBlock0.chromium') |
|||
def get_link(links,xpaths,headless = False,proxy = None): |
|||
loop = asyncio.get_event_loop() |
|||
run = loop.run_until_complete |
|||
opts = { |
|||
'headless':headless, |
|||
} |
|||
opts['args'] = [f'--disable-extensions-except={ext}', f'--load-extension={ext}'] |
|||
if proxy: |
|||
opts['args'] += [f'--proxy-server={proxy}'] |
|||
opts['args'] = [f'--proxy-server={proxy}'] |
|||
|
|||
else: |
|||
opts['args'] = [] |
|||
ext = os.path.join(os.path.dirname(__file__),'uBlock0.chromium') |
|||
opts['args'] += [f'--disable-extensions-except={ext}', f'--load-extension={ext}'] |
|||
# print(opts) |
|||
browser = run(pyppeteer.launch(**opts)) |
|||
try: |
|||
if isinstance(links,list): |
|||
result = run(_multi_link(browser,links)) |
|||
result = run(_multi_link(browser,links,xpaths)) |
|||
else: |
|||
result = run(_single_link(browser,links)) |
|||
result = run(_single_link(browser,links,xpaths[links])) |
|||
return result |
|||
except Exception as e: |
|||
run(browser.close()) |
|||
raise e |
|||
raise e |
|||
|
|||
@ -1,81 +1,62 @@ |
|||
import urllib |
|||
from fake_useragent import UserAgent |
|||
from bs4 import BeautifulSoup as BS |
|||
from requests_html import HTMLSession |
|||
import re |
|||
import datetime |
|||
# import pytz |
|||
import copy |
|||
import json |
|||
with open('xpaths.json') as file: |
|||
xpaths = json.load(file) |
|||
|
|||
user_agent = UserAgent().chrome |
|||
debug = None |
|||
def get_words(string,n): |
|||
words = re.finditer(r"(\b[^ \n]+\b)",string) |
|||
def get_words(raw,n): |
|||
words = re.finditer(r"(\b[^ \n]+\b)",raw) |
|||
word_list = list(match.group(0) for match in words) |
|||
if len(word_list) > n: |
|||
word_list = word_list[:n] |
|||
return ' '.join(word_list) |
|||
def get_page(url): |
|||
page = None |
|||
while not page: |
|||
page = urllib.request.Request(url,headers = {"User-Agent":user_agent}) |
|||
page = str(urllib.request.urlopen(page).read()) |
|||
|
|||
return page |
|||
|
|||
def get_BS(url): |
|||
return BS(get_page(url),"lxml") |
|||
def format_price(raw): |
|||
return re.search(r'\d+(\.\d)?',raw).group(0) |
|||
|
|||
class ParseResult: |
|||
|
|||
class price_finder: |
|||
page_funcs = { |
|||
"www.amazon.com":{ |
|||
"name":lambda page: re.sub(r"( {2,}|\n|\\n)","",page.find("span",id="productTitle").text), |
|||
"price":lambda page: page.find(name = "span",id = re.compile("priceblock.*")).text |
|||
}, |
|||
"www.banggood.com":{ |
|||
"name":lambda page: page.find("h1",attrs = {"itemprop":"name"}).text, |
|||
"price":lambda page: page.find("div",attrs = {"class":"now"}).get("oriprice") |
|||
}, |
|||
"www.dalprops.com":{ |
|||
"name":lambda page: page.find("h1",attrs = {"class":"product_title"}).text, |
|||
"price":lambda page: page.find("meta",attrs = {"itemprop":"price"}).get("content") |
|||
}, |
|||
"www.gearbest.com":{ |
|||
"name":lambda page:re.sub(" {2,}|\n","",page.find("div",attrs = {"class":"goodsIntro_titleWrap"}).find("h1").text), |
|||
"price":lambda page: page.find("span",attrs={"class":"goodsIntro_price"}).text |
|||
}, |
|||
"hobbyking.com":{ |
|||
"name":lambda page: page.find("h1",attrs={"class":"product-name"}).text, |
|||
"price":lambda page: page.find("span",id = re.compile(r"product-price.*")).find("span",attrs={"class":"price"}).text |
|||
}, |
|||
"www.getfpv.com":{ |
|||
"name": lambda page: re.sub(r"\\n|\n","", page.find("div",attrs={"class":"product-name"}).text), |
|||
"price": lambda page: re.sub(r"\\n|\n","", page.find("span",attrs={"id":re.compile("product-price.*")}).text) |
|||
} |
|||
} |
|||
def __init__(self,url,space_seperated_categories = 7,bs=None): |
|||
def __init__(self,url,tree,space_seperated_categories = 7,): |
|||
self.url=url |
|||
self.info_url = urllib.parse.urlparse(url) |
|||
self.word_len = space_seperated_categories |
|||
if self.info_url.netloc not in price_finder.page_funcs.keys(): |
|||
raise NotImplementedError("Not implemented for {}".format(self.info_url.netloc)) |
|||
if bs: |
|||
self.bs= bs |
|||
else: |
|||
self.bs = get_BS(url) |
|||
# self.words = re_words(space_seperated_categories) |
|||
self.tree = tree |
|||
|
|||
|
|||
self.time = datetime.datetime.today() |
|||
self.info_product = self._get_product_info_() |
|||
|
|||
|
|||
def _get_product_info_(self): |
|||
funcs = price_finder.page_funcs[self.info_url.netloc] |
|||
# print(self.url) |
|||
|
|||
|
|||
|
|||
host = self.info_url.netloc |
|||
|
|||
product_name = get_words( |
|||
self.tree.xpath(xpaths[host]['name'])[0].text |
|||
) |
|||
|
|||
other_raw = None |
|||
try: |
|||
other_raw = self.tree.xpath(xpaths[host]['other'])[0].text |
|||
except KeyError: |
|||
pass |
|||
|
|||
if host in ['www.gearbest.com']: |
|||
if other.raw: |
|||
price = '0.00' |
|||
else: |
|||
price = format_price( |
|||
self.tree.xpath(xpaths[host]['price'])[0].text |
|||
) |
|||
|
|||
else: |
|||
price = format_price( |
|||
self.tree.xpath(xpaths[host]['price'])[0].text |
|||
) |
|||
|
|||
return { |
|||
"product_name":get_words(funcs["name"](self.bs),self.word_len), |
|||
"price":funcs["price"](self.bs).replace("$",""), |
|||
} |
|||
# def to_json(self): |
|||
# ret = copy.deepcopy(self.__dict__) |
|||
# ret['time'] = ret['time']. |
|||
"product_name":product_name, |
|||
"price":price, |
|||
} |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue