2024-11-29 09:09:32 +00:00
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
# @Time : 2024/11/14 上午9:26
|
|
|
|
|
# @Author : 河瞬
|
|
|
|
|
# @FileName: scraper.py
|
|
|
|
|
# @Software: PyCharm
|
|
|
|
|
|
|
|
|
|
import asyncio
|
|
|
|
|
import logging
|
|
|
|
|
import pkgutil
|
|
|
|
|
import inspect
|
|
|
|
|
import time
|
|
|
|
|
from typing import List
|
|
|
|
|
from tortoise import Tortoise, transactions
|
|
|
|
|
|
|
|
|
|
from models import Source, RawData
|
|
|
|
|
from web_spider import BaseSpider, HttpxSpider
|
|
|
|
|
import web_spider.spiders as spiders
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def initialize_spiders(package) -> List[BaseSpider]:
|
|
|
|
|
objs = []
|
|
|
|
|
for importer, modname, ispkg in pkgutil.iter_modules(package.__path__):
|
|
|
|
|
module = importer.find_module(modname).load_module(modname)
|
|
|
|
|
for name, obj in inspect.getmembers(module):
|
2024-12-28 13:01:29 +00:00
|
|
|
if inspect.isclass(obj) and issubclass(obj,
|
|
|
|
|
BaseSpider) and obj is not BaseSpider and obj is not HttpxSpider:
|
2024-11-29 09:09:32 +00:00
|
|
|
objs.append(obj())
|
|
|
|
|
return objs
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def init_database(spider_objs: List[BaseSpider]):
|
|
|
|
|
await Tortoise.init(
|
|
|
|
|
db_url="sqlite://db.sqlite3",
|
|
|
|
|
modules={"models": ["models", ]},
|
|
|
|
|
)
|
|
|
|
|
await Tortoise.generate_schemas()
|
|
|
|
|
for obj in spider_objs:
|
|
|
|
|
await Source.get_or_create(name=obj.name, index_url=','.join(obj.index_url))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def close_database():
|
|
|
|
|
await Tortoise.close_connections()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def set_proxy(proxy_url: str, spider_objs: List[BaseSpider]):
|
|
|
|
|
for spider in spider_objs:
|
|
|
|
|
spider.set_proxy(proxy_url)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def spider_exception_handler(spider_task_func, *args, max_retries: int = 3):
|
|
|
|
|
retries = 0
|
|
|
|
|
while retries < max_retries:
|
|
|
|
|
try:
|
|
|
|
|
result = await spider_task_func(*args)
|
|
|
|
|
if isinstance(result[0], str) and any(msg := filter(lambda x: x.startswith("[ERROR]"), result)):
|
|
|
|
|
raise Exception(" ".join(list(msg)))
|
|
|
|
|
return result
|
|
|
|
|
except Exception as e:
|
|
|
|
|
retries += 1
|
|
|
|
|
logging.error(f"{spider_task_func.__name__} raised an exception: {e}, retrying ({retries}/{max_retries})")
|
|
|
|
|
if retries == max_retries:
|
|
|
|
|
logging.error(f"{spider_task_func.__name__} failed after {max_retries} retries")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def process_content_spider(obj, threads):
|
|
|
|
|
source = await Source.get(name=obj.name)
|
|
|
|
|
urls = [record.url for record in await RawData.filter(source=source, content=None)]
|
|
|
|
|
batch_size = len(urls) // threads + 1
|
|
|
|
|
coros = []
|
|
|
|
|
for t in range(threads):
|
|
|
|
|
start_index = t * batch_size
|
|
|
|
|
end_index = min((t + 1) * batch_size, len(urls))
|
|
|
|
|
thread_urls = urls[start_index:end_index]
|
|
|
|
|
task = spider_exception_handler(obj.get_news_content, thread_urls)
|
|
|
|
|
coros.append(task)
|
|
|
|
|
return coros, urls
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def run_spiders(page_cnt: int = 2, use_proxy: bool = False, threads: int = 4):
|
|
|
|
|
try:
|
|
|
|
|
spider_objs: List[BaseSpider] = initialize_spiders(spiders)
|
|
|
|
|
|
|
|
|
|
await init_database(spider_objs)
|
|
|
|
|
|
|
|
|
|
# coros = [spider_exception_handler(spider.get_news_urls, page_cnt) for spider in spider_objs]
|
|
|
|
|
# results = await asyncio.gather(*coros)
|
|
|
|
|
#
|
|
|
|
|
# for i, obj in enumerate(spider_objs):
|
|
|
|
|
# source = await Source.get(name=obj.name)
|
|
|
|
|
# for item in results[i]:
|
|
|
|
|
# if not await RawData.filter(url=item['url']).exists():
|
|
|
|
|
# await RawData.create(title=item['title'], url=item['url'], source=source)
|
|
|
|
|
all_coros = []
|
|
|
|
|
all_urls = []
|
|
|
|
|
|
|
|
|
|
for obj in spider_objs:
|
|
|
|
|
coros, urls = await process_content_spider(obj, threads)
|
|
|
|
|
all_coros.extend(coros)
|
|
|
|
|
all_urls.extend(urls)
|
|
|
|
|
|
|
|
|
|
all_results = await asyncio.gather(*all_coros)
|
|
|
|
|
|
|
|
|
|
all_results = [item for sublist in all_results for item in sublist if item is not None]
|
|
|
|
|
|
|
|
|
|
async with transactions.in_transaction() as conn:
|
|
|
|
|
for j, content in enumerate(all_results):
|
|
|
|
|
record = await RawData.get(url=all_urls[j])
|
|
|
|
|
record.content = content
|
|
|
|
|
await record.save(using_db=conn)
|
2024-12-28 13:01:29 +00:00
|
|
|
# record = await RawData.get(id=1)
|
|
|
|
|
# print(record.title, record.content)
|
2024-11-29 09:09:32 +00:00
|
|
|
finally:
|
|
|
|
|
await close_database()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
start_time = time.time()
|
|
|
|
|
asyncio.run(run_spiders(page_cnt=5, use_proxy=False, threads=1))
|
|
|
|
|
print(f"总用时时长: {time.time() - start_time} 秒")
|