PsycologyAPI/web_spider/run_spiders.py
2024-11-29 17:09:32 +08:00

118 lines
4.1 KiB
Python

# -*- coding: utf-8 -*-
# @Time : 2024/11/14 上午9:26
# @Author : 河瞬
# @FileName: scraper.py
# @Software: PyCharm
import asyncio
import logging
import pkgutil
import inspect
import time
from typing import List
from tortoise import Tortoise, transactions
from models import Source, RawData
from web_spider import BaseSpider, HttpxSpider
import web_spider.spiders as spiders
def initialize_spiders(package) -> List[BaseSpider]:
objs = []
for importer, modname, ispkg in pkgutil.iter_modules(package.__path__):
module = importer.find_module(modname).load_module(modname)
for name, obj in inspect.getmembers(module):
if inspect.isclass(obj) and issubclass(obj, BaseSpider) and obj is not BaseSpider and obj is not HttpxSpider:
objs.append(obj())
return objs
async def init_database(spider_objs: List[BaseSpider]):
await Tortoise.init(
db_url="sqlite://db.sqlite3",
modules={"models": ["models", ]},
)
await Tortoise.generate_schemas()
for obj in spider_objs:
await Source.get_or_create(name=obj.name, index_url=','.join(obj.index_url))
async def close_database():
await Tortoise.close_connections()
def set_proxy(proxy_url: str, spider_objs: List[BaseSpider]):
for spider in spider_objs:
spider.set_proxy(proxy_url)
async def spider_exception_handler(spider_task_func, *args, max_retries: int = 3):
retries = 0
while retries < max_retries:
try:
result = await spider_task_func(*args)
if isinstance(result[0], str) and any(msg := filter(lambda x: x.startswith("[ERROR]"), result)):
raise Exception(" ".join(list(msg)))
return result
except Exception as e:
retries += 1
logging.error(f"{spider_task_func.__name__} raised an exception: {e}, retrying ({retries}/{max_retries})")
if retries == max_retries:
logging.error(f"{spider_task_func.__name__} failed after {max_retries} retries")
return None
async def process_content_spider(obj, threads):
source = await Source.get(name=obj.name)
urls = [record.url for record in await RawData.filter(source=source, content=None)]
batch_size = len(urls) // threads + 1
coros = []
for t in range(threads):
start_index = t * batch_size
end_index = min((t + 1) * batch_size, len(urls))
thread_urls = urls[start_index:end_index]
task = spider_exception_handler(obj.get_news_content, thread_urls)
coros.append(task)
return coros, urls
async def run_spiders(page_cnt: int = 2, use_proxy: bool = False, threads: int = 4):
try:
spider_objs: List[BaseSpider] = initialize_spiders(spiders)
await init_database(spider_objs)
# coros = [spider_exception_handler(spider.get_news_urls, page_cnt) for spider in spider_objs]
# results = await asyncio.gather(*coros)
#
# for i, obj in enumerate(spider_objs):
# source = await Source.get(name=obj.name)
# for item in results[i]:
# if not await RawData.filter(url=item['url']).exists():
# await RawData.create(title=item['title'], url=item['url'], source=source)
all_coros = []
all_urls = []
for obj in spider_objs:
coros, urls = await process_content_spider(obj, threads)
all_coros.extend(coros)
all_urls.extend(urls)
all_results = await asyncio.gather(*all_coros)
all_results = [item for sublist in all_results for item in sublist if item is not None]
async with transactions.in_transaction() as conn:
for j, content in enumerate(all_results):
record = await RawData.get(url=all_urls[j])
record.content = content
await record.save(using_db=conn)
finally:
await close_database()
if __name__ == '__main__':
start_time = time.time()
asyncio.run(run_spiders(page_cnt=5, use_proxy=False, threads=1))
print(f"总用时时长: {time.time() - start_time}")