穿越来找茬
35.39MB · 2025-09-13
item_search_shop
接口是获取特定店铺所有商品数据的核心接口,能够批量获取店铺内的商品列表、基础信息、价格、销量等关键数据。对于店铺分析、竞品监控、商品归类研究等场景具有重要价值,尤其适合需要全面了解某店铺经营状况的业务需求。
一、接口核心特性分析
核心功能:批量获取指定店铺内的商品数据,支持分页获取和条件筛选
数据维度:
应用场景:
主流电商平台的店铺商品搜索接口通常采用以下认证方式:
appkey + appsecret
的身份验证access_token
进行授权访问请求参数
参数名 | 类型 | 是否必填 | 说明 |
---|---|---|---|
shop_id | String | 是 | 店铺 ID |
page | Integer | 否 | 页码,默认 1 |
page_size | Integer | 否 | 每页条数,默认 20,最大通常为 100 |
sort | String | 否 | 排序方式:price_asc (价格升序)、price_desc (价格降序)、sales_desc (销量降序) |
category_id | String | 否 | 店铺内分类 ID,用于筛选特定分类商品 |
min_price /max_price | Float | 否 | 价格区间筛选 |
is_promotion | Boolean | 否 | 是否只显示促销商品 |
appkey | String | 是 | 应用密钥 |
sign | String | 是 | 请求签名 |
timestamp | Integer | 是 | 时间戳 |
响应核心字段
分页信息:总商品数、总页数、当前页码
商品列表:每个商品包含
二、Python 脚本实现
以下是通用的店铺商品搜索接口调用实现,适用于主流电商平台的 item_search_shop
接口,包含完整的认证、请求、数据解析和分析功能:
import requests
import time
import json
import logging
import hashlib
import re
from typing import Dict, Optional, List, Tuple
from requests.exceptions import RequestException
import matplotlib.pyplot as plt
import pandas as pd
from collections import defaultdict
logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" )
class ShopItemSearchAPI: def init(self, appkey: str, appsecret: str, platform: str = "general"): """ 初始化店铺商品搜索API客户端 :param appkey: 开放平台appkey :param appsecret: 开放平台appsecret :param platform: 平台标识,用于适配不同平台的接口差异 """ self.appkey = appkey self.appsecret = appsecret self.platform = platform self.base_url = self._get_base_url() self.session = requests.Session() self.session.headers.update({ "Content-Type": "application/json", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36" })
def _get_base_url(self) -> str:
"""根据平台获取基础URL"""
# 可根据实际平台扩展
platform_urls = {
"general": "https://apiplatform.com/api",
"taobao": "https://eco.taobao.com/router/rest",
"jd": "https://api.jd.com/routerjson",
"1688": "https://gw.open.1688.com/openapi/gateway.do"
}
return platform_urls.get(self.platform, platform_urls["general"])
def _generate_sign(self, params: Dict) -> str:
"""生成请求签名,不同平台可能有差异"""
if self.platform == "taobao":
# 淘宝签名方式
sorted_params = sorted(params.items(), key=lambda x: x[0])
sign_str = ""
for k, v in sorted_params:
sign_str += f"{k}{v}"
sign_str += self.appsecret
return hashlib.md5(sign_str.encode()).hexdigest().upper()
elif self.platform == "jd":
# 京东签名方式
sorted_params = sorted(params.items(), key=lambda x: x[0])
sign_str = self.appsecret
for k, v in sorted_params:
sign_str += f"{k}{v}"
sign_str += self.appsecret
return hashlib.md5(sign_str.encode()).hexdigest().upper()
else:
# 通用签名方式
sorted_params = sorted(params.items(), key=lambda x: x[0])
sign_str = f"{self.appsecret}"
for k, v in sorted_params:
sign_str += f"{k}={v}&"
sign_str = sign_str.rstrip('&') + f"{self.appsecret}"
return hashlib.md5(sign_str.encode()).hexdigest().upper()
def search_shop_items(self,
shop_id: str,
page: int = 1,
page_size: int = 20,
sort: str = "default",
category_id: Optional[str] = None,
min_price: Optional[float] = None,
max_price: Optional[float] = None,
is_promotion: Optional[bool] = None) -> Optional[Dict]:
"""
搜索店铺商品
:param shop_id: 店铺ID
:param page: 页码
:param page_size: 每页条数
:param sort: 排序方式
:param category_id: 店铺内分类ID
:param min_price: 最低价格
:param max_price: 最高价格
:param is_promotion: 是否只看促销商品
:return: 商品列表数据
"""
# 验证排序方式
valid_sorts = ["default", "price_asc", "price_desc", "sales_desc", "new_desc"]
if sort not in valid_sorts:
logging.error(f"无效的排序方式: {sort},支持: {valid_sorts}")
return None
# 验证分页大小
if page_size < 1 or page_size > 100:
logging.error(f"每页条数必须在1-100之间,当前为: {page_size}")
return page_size > 100 and 100 or 1
# 构建基础参数
timestamp = int(time.time())
params = self._build_platform_params(shop_id, page, page_size, sort, timestamp)
# 添加可选参数
if category_id:
params["category_id"] = category_id
if min_price is not None:
params["min_price"] = min_price
if max_price is not None:
params["max_price"] = max_price
if is_promotion is not None:
params["is_promotion"] = "true" if is_promotion else "false"
# 生成签名
params["sign"] = self._generate_sign(params)
try:
response = self.session.get(self.base_url, params=params, timeout=15)
response.raise_for_status()
# 不同平台可能有不同的响应格式
if self.platform in ["jd", "1688"]:
result = response.json()
else:
# 淘宝等平台可能返回JSONP格式
json_str = re.findall(r'({.*})', response.text)[0]
result = json.loads(json_str)
# 处理不同平台的响应结构
if self.platform == "taobao":
# 淘宝API响应结构
if "error_response" in result:
logging.error(f"获取商品失败: {result['error_response']['msg']} (错误码: {result['error_response']['code']})")
return None
items_data = result.get("tbk_shop_item_get_response", {}).get("results", {}).get("n_tbk_item", [])
total_count = result.get("tbk_shop_item_get_response", {}).get("total_results", 0)
elif self.platform == "jd":
# 京东API响应结构
items_data = result.get("jingdong_shop_item_search_response", {}).get("result", {}).get("items", [])
total_count = result.get("jingdong_shop_item_search_response", {}).get("result", {}).get("total_count", 0)
else:
# 通用响应结构
if result.get("code") != 0:
logging.error(f"获取商品失败: {result.get('msg', '未知错误')} (错误码: {result.get('code')})")
return None
items_data = result.get("data", {}).get("items", [])
total_count = result.get("data", {}).get("total_count", 0)
# 格式化商品数据
return self._format_items_data(items_data, total_count, page, page_size)
except RequestException as e:
logging.error(f"请求异常: {str(e)}")
return None
except (json.JSONDecodeError, IndexError) as e:
logging.error(f"解析响应失败: {str(e)}, 响应内容: {response.text[:200]}...")
return None
def _build_platform_params(self, shop_id: str, page: int, page_size: int, sort: str, timestamp: int) -> Dict:
"""构建不同平台的特有参数"""
if self.platform == "taobao":
return {
"method": "taobao.tbk.shop.item.get",
"app_key": self.appkey,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"format": "json",
"v": "2.0",
"fields": "num_iid,title,pict_url,price,orginal_price,sales,comment_count,shop_title,category_name",
"shop_id": shop_id,
"page_no": page,
"page_size": page_size,
"sort": sort
}
elif self.platform == "jd":
return {
"method": "jingdong.shop.item.search",
"app_key": self.appkey,
"timestamp": timestamp,
"360buy_param_json": json.dumps({
"shop_id": shop_id,
"page": page,
"page_size": page_size,
"sort": sort
})
}
else:
return {
"method": "item.search.shop",
"appkey": self.appkey,
"timestamp": timestamp,
"shop_id": shop_id,
"page": page,
"page_size": page_size,
"sort": sort
}
def _format_items_data(self, items_data: List[Dict], total_count: int, page: int, page_size: int) -> Dict:
"""格式化商品数据"""
# 分页信息
pagination = {
"total_items": total_count,
"total_pages": (total_count + page_size - 1) // page_size,
"current_page": page,
"page_size": page_size
}
# 格式化商品列表
items = []
for item in items_data:
# 适配不同平台的字段差异
if self.platform == "taobao":
item_id = item.get("num_iid")
price = self._safe_float(item.get("price"))
original_price = self._safe_float(item.get("orginal_price"))
sales = self._safe_int(item.get("sales"))
comment_count = self._safe_int(item.get("comment_count"))
category_name = item.get("category_name")
image_url = item.get("pict_url")
title = item.get("title")
elif self.platform == "jd":
item_id = item.get("sku_id")
price = self._safe_float(item.get("jd_price"))
original_price = self._safe_float(item.get("market_price"))
sales = self._safe_int(item.get("sales_count"))
comment_count = self._safe_int(item.get("comment_count"))
category_name = item.get("category_name")
image_url = item.get("image_url")
title = item.get("name")
else:
item_id = item.get("item_id")
price = self._safe_float(item.get("price"))
original_price = self._safe_float(item.get("original_price"))
sales = self._safe_int(item.get("sales_count"))
comment_count = self._safe_int(item.get("comment_count"))
category_name = item.get("category_name")
image_url = item.get("image_url")
title = item.get("title")
# 计算折扣
discount = 0
if original_price > 0 and price > 0:
discount = round((price / original_price) * 10, 1)
# 判断是否促销
is_promotion = original_price > price and original_price - price > 0.01
items.append({
"item_id": item_id,
"title": title,
"image_url": image_url,
"price": price,
"original_price": original_price,
"discount": discount,
"is_promotion": is_promotion,
"sales_count": sales,
"comment_count": comment_count,
"category_name": category_name,
"category_id": item.get("category_id"),
"url": item.get("url"),
"tags": item.get("tags", "").split(",") if item.get("tags") else []
})
return {
"pagination": pagination,
"items": items,
"raw_data": items_data # 保留原始数据
}
def get_shop_all_items(self, shop_id: str, max_pages: int = 20, **kwargs) -> List[Dict]:
"""
获取店铺所有商品
:param shop_id: 店铺ID
:param max_pages: 最大页数限制
:param**kwargs: 其他搜索参数
:return: 所有商品列表
"""
all_items = []
page = 1
while page <= max_pages:
logging.info(f"获取店铺第 {page} 页商品")
result = self.search_shop_items(
shop_id=shop_id,
page=page,
page_size=100, # 使用最大页大小减少请求次数
**kwargs
)
if not result or not result["items"]:
break
all_items.extend(result["items"])
# 检查是否已到最后一页
if page >= result["pagination"]["total_pages"]:
break
page += 1
# 控制请求频率,遵守平台API的QPS限制
time.sleep(2)
logging.info(f"共获取到 {len(all_items)} 件商品")
return all_items
def analyze_shop_items(self, items: List[Dict]) -> Dict:
"""分析店铺商品数据,生成店铺经营分析报告"""
if not items:
return {}
total = len(items)
# 价格分析
price_analysis = self._analyze_prices(items)
# 分类分析
category_analysis = self._analyze_categories(items)
# 销售分析
sales_analysis = self._analyze_sales(items)
# 促销分析
promotion_analysis = self._analyze_promotions(items)
# 提取热销商品
top_sales = sorted(items, key=lambda x: x["sales_count"], reverse=True)[:10]
# 提取高折扣商品
top_discounts = sorted([i for i in items if i["is_promotion"]], key=lambda x: x["discount"], reverse=False)[:10]
return {
"total_items": total,
"price_analysis": price_analysis,
"category_analysis": category_analysis,
"sales_analysis": sales_analysis,
"promotion_analysis": promotion_analysis,
"top_sales": top_sales,
"top_discounts": top_discounts
}
def _analyze_prices(self, items: List[Dict]) -> Dict:
"""分析价格分布"""
prices = [item["price"] for item in items if item["price"] > 0]
if not prices:
return {}
min_price = min(prices)
max_price = max(prices)
avg_price = round(sum(prices) / len(prices), 2)
# 价格区间分布
price_ranges = self._get_price_ranges(min_price, max_price)
range_counts = defaultdict(int)
for price in prices:
for r in price_ranges:
if r[0] <= price < r[1]:
range_counts[f"{r[0]}-{r[1]}"] += 1
break
else:
range_counts[f"{price_ranges[-1][1]}+"] += 1
return {
"min_price": min_price,
"max_price": max_price,
"avg_price": avg_price,
"median_price": self._calculate_median(prices),
"range_distribution": dict(range_counts)
}
def _analyze_categories(self, items: List[Dict]) -> Dict:
"""分析分类分布"""
category_counts = defaultdict(int)
category_sales = defaultdict(int)
category_prices = defaultdict(list)
for item in items:
cat_name = item["category_name"] or "未分类"
category_counts[cat_name] += 1
category_sales[cat_name] += item["sales_count"]
if item["price"] > 0:
category_prices[cat_name].append(item["price"])
# 计算每个分类的平均价格
category_avg_prices = {
cat: round(sum(prices)/len(prices), 2)
for cat, prices in category_prices.items() if prices
}
# 按商品数量排序
sorted_categories = sorted(category_counts.items(), key=lambda x: x[1], reverse=True)
return {
"total_categories": len(category_counts),
"distribution": dict(category_counts),
"sales_by_category": dict(category_sales),
"avg_price_by_category": category_avg_prices,
"top_categories": sorted_categories[:5]
}
def _analyze_sales(self, items: List[Dict]) -> Dict:
"""分析销售情况"""
total_sales = sum(item["sales_count"] for item in items)
avg_sales = round(total_sales / len(items), 1) if items else 0
# 销量分布
sales_ranges = [(0, 1), (1, 10), (10, 100), (100, 1000), (1000, float('inf'))]
sales_distribution = defaultdict(int)
for item in items:
sales = item["sales_count"]
for r in sales_ranges:
if r[0] <= sales < r[1]:
sales_distribution[f"{r[0]}-{r[1] if r[1] != float('inf') else '+'}"] += 1
break
# 销量与价格相关性
price_sales_correlation = self._calculate_correlation(
[item["price"] for item in items],
[item["sales_count"] for item in items]
)
return {
"total_sales": total_sales,
"avg_sales_per_item": avg_sales,
"distribution": dict(sales_distribution),
"price_sales_correlation": round(price_sales_correlation, 4)
}
def _analyze_promotions(self, items: List[Dict]) -> Dict:
"""分析促销情况"""
promotion_items = [item for item in items if item["is_promotion"]]
promotion_count = len(promotion_items)
promotion_ratio = round(promotion_count / len(items) * 100, 1) if items else 0
# 平均折扣力度
avg_discount = 0
if promotion_items:
avg_discount = round(sum(item["discount"] for item in promotion_items) / len(promotion_items), 1)
# 促销商品销量占比
promotion_sales = sum(item["sales_count"] for item in promotion_items)
total_sales = sum(item["sales_count"] for item in items)
promotion_sales_ratio = round(promotion_sales / total_sales * 100, 1) if total_sales > 0 else 0
return {
"promotion_count": promotion_count,
"promotion_ratio": promotion_ratio,
"avg_discount": avg_discount,
"sales_ratio": promotion_sales_ratio
}
# 工具方法
def _safe_float(self, value) -> float:
"""安全转换为float"""
try:
return float(value) if value is not None else 0.0
except (ValueError, TypeError):
return 0.0
def _safe_int(self, value) -> int:
"""安全转换为int"""
try:
return int(value) if value is not None else 0
except (ValueError, TypeError):
return 0
def _get_price_ranges(self, min_price: float, max_price: float) -> List[Tuple[float, float]]:
"""生成合理的价格区间"""
if min_price >= max_price:
return [(min_price - 1, max_price + 1)]
# 计算区间数量,最多10个区间
range_count = min(10, int(max_price - min_price) // 5 + 1)
step = (max_price - min_price) / range_count
ranges = []
for i in range(range_count):
start = min_price + i * step
end = min_price + (i + 1) * step
ranges.append((round(start, 1), round(end, 1)))
return ranges
def _calculate_median(self, data: List[float]) -> float:
"""计算中位数"""
sorted_data = sorted(data)
n = len(sorted_data)
if n % 2 == 1:
return round(sorted_data[n//2], 2)
else:
return round((sorted_data[n//2 - 1] + sorted_data[n//2]) / 2, 2)
def _calculate_correlation(self, x: List[float], y: List[float]) -> float:
"""计算皮尔逊相关系数"""
if len(x) != len(y) or len(x) < 2:
return 0.0
n = len(x)
sum_x = sum(x)
sum_y = sum(y)
sum_xy = sum(xi * yi for xi, yi in zip(x, y))
sum_x2 = sum(xi **2 for xi in x)
sum_y2 = sum(yi** 2 for yi in y)
numerator = n * sum_xy - sum_x * sum_y
denominator = ((n * sum_x2 - sum_x **2) * (n * sum_y2 - sum_y** 2)) **0.5
return numerator / denominator if denominator != 0 else 0.0
def visualize_analysis(self, analysis: Dict, output_dir: str = ".") -> None:
"""可视化分析结果"""
# 设置中文显示
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
plt.rcParams["axes.unicode_minus"] = False
# 1. 价格分布直方图
plt.figure(figsize=(10, 6))
ranges = list(analysis["price_analysis"]["range_distribution"].keys())
counts = list(analysis["price_analysis"]["range_distribution"].values())
plt.bar(ranges, counts, color='skyblue')
plt.title("商品价格区间分布")
plt.xlabel("价格区间")
plt.ylabel("商品数量")
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(f"{output_dir}/price_distribution.png")
plt.close()
# 2. 分类分布饼图
plt.figure(figsize=(10, 6))
top_cats = analysis["category_analysis"]["top_categories"]
if top_cats:
labels = [cat[0] for cat in top_cats]
sizes = [cat[1] for cat in top_cats]
# 其他分类合并
if len(analysis["category_analysis"]["distribution"]) > 5:
other_count = sum(analysis["category_analysis"]["distribution"].values()) - sum(sizes)
labels.append("其他")
sizes.append(other_count)
plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90)
plt.title("商品分类分布")
plt.axis('equal')
plt.tight_layout()
plt.savefig(f"{output_dir}/category_distribution.png")
plt.close()
# 3. 销量分布直方图
plt.figure(figsize=(10, 6))
sales_ranges = list(analysis["sales_analysis"]["distribution"].keys())
sales_counts = list(analysis["sales_analysis"]["distribution"].values())
plt.bar(sales_ranges, sales_counts, color='lightgreen')
plt.title("商品销量区间分布")
plt.xlabel("销量区间")
plt.ylabel("商品数量")
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(f"{output_dir}/sales_distribution.png")
plt.close()
# 4. 促销商品占比饼图
plt.figure(figsize=(8, 8))
promotion_counts = [
analysis["promotion_analysis"]["promotion_count"],
analysis["total_items"] - analysis["promotion_analysis"]["promotion_count"]
]
plt.pie(promotion_counts, labels=['促销商品', '非促销商品'],
autopct='%1.1f%%', startangle=90, colors=['orange', 'lightgray'])
plt.title("促销商品占比")
plt.axis('equal')
plt.tight_layout()
plt.savefig(f"{output_dir}/promotion_ratio.png")
plt.close()
logging.info(f"分析图表已保存至 {output_dir} 目录")
if name == "main": # 替换为实际的appkey和appsecret(从对应平台开放平台获取) APPKEY = "your_appkey" APPSECRET = "your_appsecret" # 替换为目标店铺ID SHOP_ID = "123456" # 平台选择:taobao, jd, 1688 或 general PLATFORM = "general"
# 初始化API客户端
api = ShopItemSearchAPI(APPKEY, APPSECRET, platform=PLATFORM)
# 获取店铺所有商品
all_items = api.get_shop_all_items(
shop_id=SHOP_ID,
max_pages=5, # 最多获取5页
sort="sales_desc" # 按销量排序
)
if all_items:
# 分析店铺商品
analysis = api.analyze_shop_items(all_items)
print(f"=== 店铺商品分析报告 (店铺ID: {SHOP_ID}) ===")
print(f"店铺总商品数: {analysis['total_items']}件")
# 价格分析
print("n价格分析:")
print(f" 价格范围: {analysis['price_analysis']['min_price']}-{analysis['price_analysis']['max_price']}元")
print(f" 平均价格: {analysis['price_analysis']['avg_price']}元")
print(f" 中位数价格: {analysis['price_analysis']['median_price']}元")
print(" 价格区间分布:")
for range_str, count in analysis['price_analysis']['range_distribution'].items():
print(f" {range_str}元: {count}件")
# 分类分析
print("n分类分析:")
print(f" 总分类数: {analysis['category_analysis']['total_categories']}个")
print(" 商品数量最多的5个分类:")
for cat, count in analysis['category_analysis']['top_categories']:
print(f" {cat}: {count}件 ({round(count/analysis['total_items']*100, 1)}%)")
# 销售分析
print("n销售分析:")
print(f" 总销量: {analysis['sales_analysis']['total_sales']}件")
print(f" 平均单品销量: {analysis['sales_analysis']['avg_sales_per_item']}件")
print(f" 价格与销量相关性: {analysis['sales_analysis']['price_sales_correlation']}")
# 促销分析
print("n促销分析:")
print(f" 促销商品数: {analysis['promotion_analysis']['promotion_count']}件 ({analysis['promotion_analysis']['promotion_ratio']}%)")
print(f" 平均折扣: {analysis['promotion_analysis']['avg_discount']}折")
print(f" 促销商品销量占比: {analysis['promotion_analysis']['sales_ratio']}%")
# 热销商品
print("n热销商品TOP5:")
for i, item in enumerate(analysis['top_sales'][:5], 1):
print(f" {i}. {item['title'][:30]}...")
print(f" 价格: {item['price']}元, 销量: {item['sales_count']}件")
# 生成可视化图表
api.visualize_analysis(analysis)
三、接口调用注意事项
错误码 | 说明 | 解决方案 |
---|---|---|
401 | 认证失败 | 检查 appkey 和 appsecret 是否正确,重新生成签名 |
403 | 权限不足 | 申请更高权限,或检查店铺是否设置了访问限制 |
404 | 店铺不存在 | 确认 shop_id 是否正确有效 |
429 | 调用频率超限 | 降低调用频率,实现请求限流,增加请求间隔 |
500 | 服务器错误 | 实现重试机制,最多 3 次,采用指数退避策略 |
10001 | 参数错误 | 检查请求参数格式,特别是 shop_id 和分页参数 |
10002 | 访问受限 | 该店铺可能设置了数据访问限制,无法获取 |
四、应用场景与扩展建议
典型应用场景
扩展建议
通过合理使用 item_search_shop
接口,开发者可以构建功能强大的店铺分析系统,深入了解目标店铺的商品结构、价格策略和销售表现,为商业决策提供数据支持。在实际应用中,需根据具体平台的接口特性进行适配和调整,并严格遵守平台的使用规范和限制。