全给我上去!

This commit is contained in:
高子兴 2024-11-29 17:09:32 +08:00
parent fea29e4d71
commit 69d751897d
9 changed files with 317 additions and 11 deletions

View File

@ -1,12 +1,8 @@
from random import randint
from fastapi import FastAPI
import os
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from random import randint
from typing import AsyncGenerator from typing import AsyncGenerator
from fastapi import FastAPI from fastapi import FastAPI
from tortoise import Tortoise, generate_config from tortoise import Tortoise, generate_config
from tortoise.contrib.fastapi import RegisterTortoise from tortoise.contrib.fastapi import RegisterTortoise
@ -49,9 +45,6 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
async with RegisterTortoise( async with RegisterTortoise(
app=app, app=app,
config=config, config=config,
generate_schemas=True,
add_exception_handlers=True,
_create_db=True,
): ):
# db connected # db connected
yield yield

View File

@ -4,6 +4,5 @@ requests~=2.32.3
tortoise-orm~=0.21.7 tortoise-orm~=0.21.7
httpx~=0.27.2 httpx~=0.27.2
websockets~=13.1
fastapi~=0.115.3 fastapi~=0.115.5
uvicorn~=0.20.0

9
web_spider/__init__.py Normal file
View File

@ -0,0 +1,9 @@
# -*- coding: utf-8 -*-
# @Time : 2024/11/13 上午9:25
# @Author : 河瞬
# @FileName: __init__.py.py
# @Software: PyCharm
# from .run_spiders import *
from .base_spider import BaseSpider
from .httpx_spider import HttpxSpider

33
web_spider/base_spider.py Normal file
View File

@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
# @Time : 2024/11/15 下午4:09
# @Author : 河瞬
# @FileName: base_spider.py
# @Software: PyCharm
from typing import List, Dict
class BaseSpider:
name: str
index_url: List[str]
timeout: int
use_proxy_default: bool
def __init__(self, proxy_url: str = None, timeout: int = 6):
self.use_proxy = proxy_url is not None
self.proxy_url = proxy_url
self.timeout = timeout
def set_proxy(self, proxy_url: str):
self.use_proxy = True
self.proxy_url = proxy_url
async def get_news_urls(self, page_cnt: int) -> List[Dict | None]:
"""
:param page_cnt:
:return: List[Dict] 字典必须包含title和url两个key
"""
raise NotImplementedError("Subclass method not implemented:get_news_urls")
async def get_news_content(self, urls: List[str]) -> List[str | None]:
raise NotImplementedError("Subclass method not implemented:get_news_content")

BIN
web_spider/db.sqlite3 Normal file

Binary file not shown.

View File

@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
# @Time : 2024/11/24 下午6:26
# @Author : 河瞬
# @FileName: httpx_spider.py
# @Software: PyCharm
from httpx import AsyncClient, Proxy
from .base_spider import BaseSpider
class HttpxSpider(BaseSpider):
headers = {}
proxy = None
def __init__(self, proxy_url: str = None):
super().__init__(proxy_url)
self.proxy = Proxy(proxy_url) if self.use_proxy else None
def set_proxy(self, proxy_url: str):
super().set_proxy(proxy_url)
self.proxy = Proxy(proxy_url)
def get_session(self) -> AsyncClient:
return AsyncClient(headers=self.headers, proxy=self.proxy, timeout=self.timeout, follow_redirects=True)

117
web_spider/run_spiders.py Normal file
View File

@ -0,0 +1,117 @@
# -*- coding: utf-8 -*-
# @Time : 2024/11/14 上午9:26
# @Author : 河瞬
# @FileName: scraper.py
# @Software: PyCharm
import asyncio
import logging
import pkgutil
import inspect
import time
from typing import List
from tortoise import Tortoise, transactions
from models import Source, RawData
from web_spider import BaseSpider, HttpxSpider
import web_spider.spiders as spiders
def initialize_spiders(package) -> List[BaseSpider]:
objs = []
for importer, modname, ispkg in pkgutil.iter_modules(package.__path__):
module = importer.find_module(modname).load_module(modname)
for name, obj in inspect.getmembers(module):
if inspect.isclass(obj) and issubclass(obj, BaseSpider) and obj is not BaseSpider and obj is not HttpxSpider:
objs.append(obj())
return objs
async def init_database(spider_objs: List[BaseSpider]):
await Tortoise.init(
db_url="sqlite://db.sqlite3",
modules={"models": ["models", ]},
)
await Tortoise.generate_schemas()
for obj in spider_objs:
await Source.get_or_create(name=obj.name, index_url=','.join(obj.index_url))
async def close_database():
await Tortoise.close_connections()
def set_proxy(proxy_url: str, spider_objs: List[BaseSpider]):
for spider in spider_objs:
spider.set_proxy(proxy_url)
async def spider_exception_handler(spider_task_func, *args, max_retries: int = 3):
retries = 0
while retries < max_retries:
try:
result = await spider_task_func(*args)
if isinstance(result[0], str) and any(msg := filter(lambda x: x.startswith("[ERROR]"), result)):
raise Exception(" ".join(list(msg)))
return result
except Exception as e:
retries += 1
logging.error(f"{spider_task_func.__name__} raised an exception: {e}, retrying ({retries}/{max_retries})")
if retries == max_retries:
logging.error(f"{spider_task_func.__name__} failed after {max_retries} retries")
return None
async def process_content_spider(obj, threads):
source = await Source.get(name=obj.name)
urls = [record.url for record in await RawData.filter(source=source, content=None)]
batch_size = len(urls) // threads + 1
coros = []
for t in range(threads):
start_index = t * batch_size
end_index = min((t + 1) * batch_size, len(urls))
thread_urls = urls[start_index:end_index]
task = spider_exception_handler(obj.get_news_content, thread_urls)
coros.append(task)
return coros, urls
async def run_spiders(page_cnt: int = 2, use_proxy: bool = False, threads: int = 4):
try:
spider_objs: List[BaseSpider] = initialize_spiders(spiders)
await init_database(spider_objs)
# coros = [spider_exception_handler(spider.get_news_urls, page_cnt) for spider in spider_objs]
# results = await asyncio.gather(*coros)
#
# for i, obj in enumerate(spider_objs):
# source = await Source.get(name=obj.name)
# for item in results[i]:
# if not await RawData.filter(url=item['url']).exists():
# await RawData.create(title=item['title'], url=item['url'], source=source)
all_coros = []
all_urls = []
for obj in spider_objs:
coros, urls = await process_content_spider(obj, threads)
all_coros.extend(coros)
all_urls.extend(urls)
all_results = await asyncio.gather(*all_coros)
all_results = [item for sublist in all_results for item in sublist if item is not None]
async with transactions.in_transaction() as conn:
for j, content in enumerate(all_results):
record = await RawData.get(url=all_urls[j])
record.content = content
await record.save(using_db=conn)
finally:
await close_database()
if __name__ == '__main__':
start_time = time.time()
asyncio.run(run_spiders(page_cnt=5, use_proxy=False, threads=1))
print(f"总用时时长: {time.time() - start_time}")

View File

@ -0,0 +1,11 @@
# -*- coding: utf-8 -*-
# @Time : 2024/11/15 下午7:32
# @Author : 河瞬
# @FileName: __init__.py
# @Software: PyCharm
from .caltech import PsycologySpider
__all__ = [
"PsycologySpider"
]

View File

@ -0,0 +1,120 @@
# -*- coding: utf-8 -*-
import re
from bs4 import BeautifulSoup
from crawl4ai import AsyncWebCrawler
from typing import List, Dict
from web_spider import HttpxSpider
import asyncio
class PsycologySpider(HttpxSpider):
name = "Psycology"
use_proxy_default = False
index_url = ["https://www.xinli001.com/info/emot?page=1"]
headers = {"Host": "www.xinli001.com",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:132.0) Gecko/20100101 Firefox/132.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip", # , deflate, br, zstd",
"Referer": "https://www.xinli001.com",
"DNT": "1",
"Sec-GPC": "1", "Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin", "Sec-Fetch-User": "?1",
"Priority": "u=0, i"}
async def get_news_urls(self, page_cnt: int = 8) -> List[Dict | None]:
news: List[Dict] = []
s = self.get_session()
for i in range(page_cnt):
response = await s.get(f"https://www.xinli001.com/info/emot?page={i + 1}")
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
article_list = soup.select_one("#articleListM")
articles = article_list.find_all(class_='item')
for article in articles:
link_tag = article.find(class_="right").find('a', class_='title')
title = link_tag.get_text().strip()
if link_tag['href'].startswith("http"):
url = link_tag['href']
else:
url = "https://" + self.headers['Host'] + link_tag['href']
result_dict = {'title': title, 'url': url}
news.append(result_dict)
return news
def filter_content(self, content: str):
# content = content.replace("#", "")
content = content.replace("*", "")
content = content.replace("> \n", "")
content = content.replace(" \n", "\n")
content = content.replace("\n\n", "\n")
content = content.replace("\n\n", "\n")
pattern = r'!\[\]\(https?://[^\s)]+\)'
cleaned_text = re.sub(pattern, '', content)
pattern2 = r'#+ '
cleaned_text = re.sub(pattern2, '', cleaned_text)
return cleaned_text
async def get_news_content(self, urls: List[str]) -> List[str]:
contents = []
# s = self.get_session()
# for url in urls:
# attempt = 0
# max_attempts = 3
# while attempt < max_attempts:
# try:
# response = await s.get(url)
# response.raise_for_status()
# contents.append(response.text)
# await asyncio.sleep(1)
# break # 成功后跳出循环
# except Exception as e:
# attempt += 1
# if attempt == max_attempts:
# print(f"Failed to fetch {url} after {max_attempts} attempts: {e}")
# await asyncio.sleep(1) # 等待1秒后重试
# await s.aclose()
sessionid="asdf"
async with AsyncWebCrawler(proxy=self.proxy_url if self.use_proxy else None) as crawler:
for url in urls:
result = await crawler.arun(
url=url,
page_timeout=self.timeout * 1000,
magic=True,
# bypass_cache=True,
wait_for="css:.yxl-editor-article ",
css_selector=".yxl-editor-article",
exclude_external_links=True,
session_id=sessionid,
)
t = self.filter_content(result.markdown)
# print(t)
contents.append(t)
await asyncio.sleep(2)
return contents
async def main():
s = PsycologySpider()
# urls = await s.get_news_urls(2)
# print(urls)
# print(len(urls))
contents = await s.get_news_content(["https://www.xinli001.com/info/100497752"])
print(contents)
if __name__ == "__main__":
import asyncio
loop = asyncio.get_event_loop()
loop.run_until_complete(main())