diff --git a/main.py b/main.py index 7ad2df6..b7ca48d 100644 --- a/main.py +++ b/main.py @@ -1,12 +1,8 @@ -from random import randint - -from fastapi import FastAPI -import os from contextlib import asynccontextmanager +from random import randint from typing import AsyncGenerator from fastapi import FastAPI - from tortoise import Tortoise, generate_config from tortoise.contrib.fastapi import RegisterTortoise @@ -49,9 +45,6 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: async with RegisterTortoise( app=app, config=config, - generate_schemas=True, - add_exception_handlers=True, - _create_db=True, ): # db connected yield diff --git a/requirements.txt b/requirements.txt index 610361c..f6327fc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,6 +4,5 @@ requests~=2.32.3 tortoise-orm~=0.21.7 httpx~=0.27.2 -websockets~=13.1 -fastapi~=0.115.3 -uvicorn~=0.20.0 + +fastapi~=0.115.5 \ No newline at end of file diff --git a/web_spider/__init__.py b/web_spider/__init__.py new file mode 100644 index 0000000..eb2a70f --- /dev/null +++ b/web_spider/__init__.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- +# @Time : 2024/11/13 上午9:25 +# @Author : 河瞬 +# @FileName: __init__.py.py +# @Software: PyCharm + +# from .run_spiders import * +from .base_spider import BaseSpider +from .httpx_spider import HttpxSpider diff --git a/web_spider/base_spider.py b/web_spider/base_spider.py new file mode 100644 index 0000000..b223ef2 --- /dev/null +++ b/web_spider/base_spider.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +# @Time : 2024/11/15 下午4:09 +# @Author : 河瞬 +# @FileName: base_spider.py +# @Software: PyCharm +from typing import List, Dict + + +class BaseSpider: + name: str + index_url: List[str] + timeout: int + use_proxy_default: bool + + def __init__(self, proxy_url: str = None, timeout: int = 6): + self.use_proxy = proxy_url is not None + self.proxy_url = proxy_url + self.timeout = timeout + + def set_proxy(self, proxy_url: str): + self.use_proxy = True + self.proxy_url = proxy_url + + async def get_news_urls(self, page_cnt: int) -> List[Dict | None]: + """ + + :param page_cnt: + :return: List[Dict] 字典必须包含title和url两个key + """ + raise NotImplementedError("Subclass method not implemented:get_news_urls") + + async def get_news_content(self, urls: List[str]) -> List[str | None]: + raise NotImplementedError("Subclass method not implemented:get_news_content") diff --git a/web_spider/db.sqlite3 b/web_spider/db.sqlite3 new file mode 100644 index 0000000..f30f98d Binary files /dev/null and b/web_spider/db.sqlite3 differ diff --git a/web_spider/httpx_spider.py b/web_spider/httpx_spider.py new file mode 100644 index 0000000..e7de977 --- /dev/null +++ b/web_spider/httpx_spider.py @@ -0,0 +1,24 @@ +# -*- coding: utf-8 -*- +# @Time : 2024/11/24 下午6:26 +# @Author : 河瞬 +# @FileName: httpx_spider.py +# @Software: PyCharm +from httpx import AsyncClient, Proxy + +from .base_spider import BaseSpider + + +class HttpxSpider(BaseSpider): + headers = {} + proxy = None + + def __init__(self, proxy_url: str = None): + super().__init__(proxy_url) + self.proxy = Proxy(proxy_url) if self.use_proxy else None + + def set_proxy(self, proxy_url: str): + super().set_proxy(proxy_url) + self.proxy = Proxy(proxy_url) + + def get_session(self) -> AsyncClient: + return AsyncClient(headers=self.headers, proxy=self.proxy, timeout=self.timeout, follow_redirects=True) diff --git a/web_spider/run_spiders.py b/web_spider/run_spiders.py new file mode 100644 index 0000000..df28f1f --- /dev/null +++ b/web_spider/run_spiders.py @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +# @Time : 2024/11/14 上午9:26 +# @Author : 河瞬 +# @FileName: scraper.py +# @Software: PyCharm + +import asyncio +import logging +import pkgutil +import inspect +import time +from typing import List +from tortoise import Tortoise, transactions + +from models import Source, RawData +from web_spider import BaseSpider, HttpxSpider +import web_spider.spiders as spiders + + +def initialize_spiders(package) -> List[BaseSpider]: + objs = [] + for importer, modname, ispkg in pkgutil.iter_modules(package.__path__): + module = importer.find_module(modname).load_module(modname) + for name, obj in inspect.getmembers(module): + if inspect.isclass(obj) and issubclass(obj, BaseSpider) and obj is not BaseSpider and obj is not HttpxSpider: + objs.append(obj()) + return objs + + +async def init_database(spider_objs: List[BaseSpider]): + await Tortoise.init( + db_url="sqlite://db.sqlite3", + modules={"models": ["models", ]}, + ) + await Tortoise.generate_schemas() + for obj in spider_objs: + await Source.get_or_create(name=obj.name, index_url=','.join(obj.index_url)) + + +async def close_database(): + await Tortoise.close_connections() + + +def set_proxy(proxy_url: str, spider_objs: List[BaseSpider]): + for spider in spider_objs: + spider.set_proxy(proxy_url) + + +async def spider_exception_handler(spider_task_func, *args, max_retries: int = 3): + retries = 0 + while retries < max_retries: + try: + result = await spider_task_func(*args) + if isinstance(result[0], str) and any(msg := filter(lambda x: x.startswith("[ERROR]"), result)): + raise Exception(" ".join(list(msg))) + return result + except Exception as e: + retries += 1 + logging.error(f"{spider_task_func.__name__} raised an exception: {e}, retrying ({retries}/{max_retries})") + if retries == max_retries: + logging.error(f"{spider_task_func.__name__} failed after {max_retries} retries") + return None + + +async def process_content_spider(obj, threads): + source = await Source.get(name=obj.name) + urls = [record.url for record in await RawData.filter(source=source, content=None)] + batch_size = len(urls) // threads + 1 + coros = [] + for t in range(threads): + start_index = t * batch_size + end_index = min((t + 1) * batch_size, len(urls)) + thread_urls = urls[start_index:end_index] + task = spider_exception_handler(obj.get_news_content, thread_urls) + coros.append(task) + return coros, urls + + +async def run_spiders(page_cnt: int = 2, use_proxy: bool = False, threads: int = 4): + try: + spider_objs: List[BaseSpider] = initialize_spiders(spiders) + + await init_database(spider_objs) + + # coros = [spider_exception_handler(spider.get_news_urls, page_cnt) for spider in spider_objs] + # results = await asyncio.gather(*coros) + # + # for i, obj in enumerate(spider_objs): + # source = await Source.get(name=obj.name) + # for item in results[i]: + # if not await RawData.filter(url=item['url']).exists(): + # await RawData.create(title=item['title'], url=item['url'], source=source) + all_coros = [] + all_urls = [] + + for obj in spider_objs: + coros, urls = await process_content_spider(obj, threads) + all_coros.extend(coros) + all_urls.extend(urls) + + all_results = await asyncio.gather(*all_coros) + + all_results = [item for sublist in all_results for item in sublist if item is not None] + + async with transactions.in_transaction() as conn: + for j, content in enumerate(all_results): + record = await RawData.get(url=all_urls[j]) + record.content = content + await record.save(using_db=conn) + finally: + await close_database() + + +if __name__ == '__main__': + start_time = time.time() + asyncio.run(run_spiders(page_cnt=5, use_proxy=False, threads=1)) + print(f"总用时时长: {time.time() - start_time} 秒") diff --git a/web_spider/spiders/__init__.py b/web_spider/spiders/__init__.py new file mode 100644 index 0000000..aea6bbc --- /dev/null +++ b/web_spider/spiders/__init__.py @@ -0,0 +1,11 @@ +# -*- coding: utf-8 -*- +# @Time : 2024/11/15 下午7:32 +# @Author : 河瞬 +# @FileName: __init__.py +# @Software: PyCharm + +from .caltech import PsycologySpider + +__all__ = [ + "PsycologySpider" +] diff --git a/web_spider/spiders/caltech.py b/web_spider/spiders/caltech.py new file mode 100644 index 0000000..e19e96a --- /dev/null +++ b/web_spider/spiders/caltech.py @@ -0,0 +1,120 @@ +# -*- coding: utf-8 -*- +import re + +from bs4 import BeautifulSoup +from crawl4ai import AsyncWebCrawler + +from typing import List, Dict + +from web_spider import HttpxSpider +import asyncio + + +class PsycologySpider(HttpxSpider): + name = "Psycology" + use_proxy_default = False + index_url = ["https://www.xinli001.com/info/emot?page=1"] + headers = {"Host": "www.xinli001.com", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:132.0) Gecko/20100101 Firefox/132.0", + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Accept-Language": "en-US,en;q=0.5", + "Accept-Encoding": "gzip", # , deflate, br, zstd", + "Referer": "https://www.xinli001.com", + "DNT": "1", + "Sec-GPC": "1", "Connection": "keep-alive", + "Upgrade-Insecure-Requests": "1", + "Sec-Fetch-Dest": "document", + "Sec-Fetch-Mode": "navigate", + "Sec-Fetch-Site": "same-origin", "Sec-Fetch-User": "?1", + "Priority": "u=0, i"} + + async def get_news_urls(self, page_cnt: int = 8) -> List[Dict | None]: + news: List[Dict] = [] + s = self.get_session() + for i in range(page_cnt): + response = await s.get(f"https://www.xinli001.com/info/emot?page={i + 1}") + response.raise_for_status() + soup = BeautifulSoup(response.text, 'html.parser') + article_list = soup.select_one("#articleListM") + articles = article_list.find_all(class_='item') + + for article in articles: + link_tag = article.find(class_="right").find('a', class_='title') + + title = link_tag.get_text().strip() + if link_tag['href'].startswith("http"): + url = link_tag['href'] + else: + url = "https://" + self.headers['Host'] + link_tag['href'] + + result_dict = {'title': title, 'url': url} + news.append(result_dict) + + return news + + def filter_content(self, content: str): + # content = content.replace("#", "") + content = content.replace("*", "") + content = content.replace("> \n", "") + content = content.replace(" \n", "\n") + content = content.replace("\n\n", "\n") + content = content.replace("\n\n", "\n") + pattern = r'!\[\]\(https?://[^\s)]+\)' + cleaned_text = re.sub(pattern, '', content) + pattern2 = r'#+ ' + cleaned_text = re.sub(pattern2, '', cleaned_text) + return cleaned_text + + async def get_news_content(self, urls: List[str]) -> List[str]: + contents = [] + # s = self.get_session() + # for url in urls: + # attempt = 0 + # max_attempts = 3 + # while attempt < max_attempts: + # try: + # response = await s.get(url) + # response.raise_for_status() + # contents.append(response.text) + # await asyncio.sleep(1) + # break # 成功后跳出循环 + # except Exception as e: + # attempt += 1 + # if attempt == max_attempts: + # print(f"Failed to fetch {url} after {max_attempts} attempts: {e}") + # await asyncio.sleep(1) # 等待1秒后重试 + # await s.aclose() + sessionid="asdf" + async with AsyncWebCrawler(proxy=self.proxy_url if self.use_proxy else None) as crawler: + for url in urls: + result = await crawler.arun( + url=url, + page_timeout=self.timeout * 1000, + magic=True, + # bypass_cache=True, + wait_for="css:.yxl-editor-article ", + css_selector=".yxl-editor-article", + exclude_external_links=True, + session_id=sessionid, + ) + t = self.filter_content(result.markdown) + # print(t) + contents.append(t) + await asyncio.sleep(2) + return contents + + +async def main(): + s = PsycologySpider() + # urls = await s.get_news_urls(2) + # print(urls) + # print(len(urls)) + contents = await s.get_news_content(["https://www.xinli001.com/info/100497752"]) + print(contents) + + +if __name__ == "__main__": + import asyncio + + loop = asyncio.get_event_loop() + loop.run_until_complete(main())