# -*- coding: utf-8 -*- # @Time : 2024/11/14 上午9:26 # @Author : 河瞬 # @FileName: scraper.py # @Software: PyCharm import asyncio import logging import pkgutil import inspect import time from typing import List from tortoise import Tortoise, transactions from models import Source, RawData from web_spider import BaseSpider, HttpxSpider import web_spider.spiders as spiders def initialize_spiders(package) -> List[BaseSpider]: objs = [] for importer, modname, ispkg in pkgutil.iter_modules(package.__path__): module = importer.find_module(modname).load_module(modname) for name, obj in inspect.getmembers(module): if inspect.isclass(obj) and issubclass(obj, BaseSpider) and obj is not BaseSpider and obj is not HttpxSpider: objs.append(obj()) return objs async def init_database(spider_objs: List[BaseSpider]): await Tortoise.init( db_url="sqlite://db.sqlite3", modules={"models": ["models", ]}, ) await Tortoise.generate_schemas() for obj in spider_objs: await Source.get_or_create(name=obj.name, index_url=','.join(obj.index_url)) async def close_database(): await Tortoise.close_connections() def set_proxy(proxy_url: str, spider_objs: List[BaseSpider]): for spider in spider_objs: spider.set_proxy(proxy_url) async def spider_exception_handler(spider_task_func, *args, max_retries: int = 3): retries = 0 while retries < max_retries: try: result = await spider_task_func(*args) if isinstance(result[0], str) and any(msg := filter(lambda x: x.startswith("[ERROR]"), result)): raise Exception(" ".join(list(msg))) return result except Exception as e: retries += 1 logging.error(f"{spider_task_func.__name__} raised an exception: {e}, retrying ({retries}/{max_retries})") if retries == max_retries: logging.error(f"{spider_task_func.__name__} failed after {max_retries} retries") return None async def process_content_spider(obj, threads): source = await Source.get(name=obj.name) urls = [record.url for record in await RawData.filter(source=source, content=None)] batch_size = len(urls) // threads + 1 coros = [] for t in range(threads): start_index = t * batch_size end_index = min((t + 1) * batch_size, len(urls)) thread_urls = urls[start_index:end_index] task = spider_exception_handler(obj.get_news_content, thread_urls) coros.append(task) return coros, urls async def run_spiders(page_cnt: int = 2, use_proxy: bool = False, threads: int = 4): try: spider_objs: List[BaseSpider] = initialize_spiders(spiders) await init_database(spider_objs) # coros = [spider_exception_handler(spider.get_news_urls, page_cnt) for spider in spider_objs] # results = await asyncio.gather(*coros) # # for i, obj in enumerate(spider_objs): # source = await Source.get(name=obj.name) # for item in results[i]: # if not await RawData.filter(url=item['url']).exists(): # await RawData.create(title=item['title'], url=item['url'], source=source) all_coros = [] all_urls = [] for obj in spider_objs: coros, urls = await process_content_spider(obj, threads) all_coros.extend(coros) all_urls.extend(urls) all_results = await asyncio.gather(*all_coros) all_results = [item for sublist in all_results for item in sublist if item is not None] async with transactions.in_transaction() as conn: for j, content in enumerate(all_results): record = await RawData.get(url=all_urls[j]) record.content = content await record.save(using_db=conn) finally: await close_database() if __name__ == '__main__': start_time = time.time() asyncio.run(run_spiders(page_cnt=5, use_proxy=False, threads=1)) print(f"总用时时长: {time.time() - start_time} 秒")