守护者传奇折相思
697.12 MB · 2025-11-10
作为爬虫工程师,我们经常面临这样的挑战:
分布式爬虫解决方案:
[主节点] (任务调度)
│
├──[爬虫节点1] → 爬取URL1 → 存储数据
├──[爬虫节点2] → 爬取URL2 → 存储数据
└──[爬虫节点N] → 爬取URLN → 存储数据
关键组件:
# 伪代码展示核心逻辑
while True:
# 1. 从分布式队列获取任务
url = redis.lpop('task_queue')
# 2. 检查是否已爬取(去重)
if not redis.sismember('visited_urls', url):
# 3. 执行爬取
html = download(url)
data = parse(html)
# 4. 存储结果
mongo.insert(data)
redis.sadd('visited_urls', url) # 标记已爬
pip install scrapy scrapy-redis redis pymongo
distributed_spider/
├── spiders/
│ └── example_spider.py # 爬虫核心逻辑
├── settings.py # 分布式配置
└── requirements.txt
# 启用Scrapy-Redis调度器
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
# 确保所有爬虫共享相同的去重过滤
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
# Redis连接配置
REDIS_HOST = 'your_redis_server' # 可以是云Redis或本地
REDIS_PORT = 6379
REDIS_PASSWORD = 'your_password' # 如果有密码
# 保持爬取状态(可选)
SCHEDULER_PERSIST = True
import scrapy
from scrapy_redis.spiders import RedisSpider
class ExampleSpider(RedisSpider):
name = 'example'
# 注意:这里不定义start_urls,而是从Redis获取
redis_key = 'example:start_urls' # Redis中的起始URL键名
def parse(self, response):
# 示例:提取页面标题和链接
yield {
'url': response.url,
'title': response.css('title::text').get(),
'content_length': len(response.text)
}
# 提取新链接并加入队列(分布式自动处理)
for link in response.css('a::attr(href)').getall():
yield response.follow(link, callback=self.parse)
# 本地启动(测试用)
redis-server
# 或使用云Redis(如阿里云Redis)
import redis
r = redis.Redis(host='localhost', port=6379)
r.lpush('example:start_urls', 'https://example.com') # 添加种子URL
# 在多台机器/终端执行(相同代码)
scrapy crawl example
关键说明:
# 在settings.py中调整并发参数
CONCURRENT_REQUESTS = 32 # 每个节点并发数
DOWNLOAD_DELAY = 0.5 # 下载延迟(秒)
# 按域名限制并发
AUTOTHROTTLE_ENABLED = True
# SCHEDULER_PERSIST = True # 已在配置中启用
# 重启爬虫时会自动继续未完成的任务
# pipelines.py
import pymongo
class MongoPipeline:
def __init__(self, mongo_uri):
self.mongo_uri = mongo_uri
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI')
)
def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client['crawler_db']
def process_item(self, item, spider):
self.db['pages'].insert_one(dict(item))
return item
distributed_spider/
├── scrapy.cfg
├── distributed_spider/
│ ├── __init__.py
│ ├── items.py # 数据结构定义
│ ├── middlewares.py # 中间件配置
│ ├── pipelines.py # 数据存储管道
│ ├── settings.py # 核心配置
│ └── spiders/
│ ├── __init__.py
│ └── example_spider.py # 主爬虫逻辑
└── requirements.txt
requirements.txt内容:
scrapy==2.11.0
scrapy-redis==0.7.2
redis==4.5.5
pymongo==4.3.3
# 在middlewares.py中添加
class RandomUserAgentMiddleware:
def process_request(self, request, spider):
request.headers['User-Agent'] = random.choice(USER_AGENTS)
# 使用Redis CLI监控任务队列
redis-cli llen example:start_urls # 查看剩余任务数
redis-cli scard example:dupefilter # 查看过滤的URL数
DOWNLOAD_DELAY| 核心要素 | 实现方案 | 注意事项 |
|---|---|---|
| 任务分发 | Redis队列 | 确保所有节点连接同一Redis |
| 去重控制 | Redis Set/Bloom Filter | 大规模爬取考虑Bloom Filter |
| 容错处理 | 持久化调度状态 | SCHEDULER_PERSIST=True |
| 扩展性 | 动态添加爬虫节点 | 无状态设计,随时扩容 |
下一步学习建议:
通过这套方案,你可以轻松构建一个每小时处理数万页面的分布式爬虫系统!