1688关键字API接口解析:(实战案例)
以下是基于1688关键字API接口的实战解析,结合官方文档和真实案例为您呈现:
一、接口核心参数解析
参数名 | 类型 | 必填 | 说明 | 示例值 |
---|---|---|---|---|
q | String | 是 | 搜索关键词(支持多词组合) | "女装连衣裙2025新款" |
start_price | Float | 否 | 价格区间下限(单位:元) | 50.0 |
end_price | Float | 否 | 价格区间上限(单位:元) | 200.0 |
page | Integer | 否 | 当前页码(默认1) | 3 |
page_size | Integer | 否 | 每页数量(默认40,最大200) | 100 |
sort | String | 否 | 排序方式(bid:总价 _sale:销量) | "_sale" |
filter | String | 否 | 高级筛选(JSON格式) | {"48h_ship":"true"} |
fields | String | 否 | 返回字段控制 | "title,price,sales" |
二、实战场景代码示例(Python)
python复制代码
import requests | |
import hmac | |
import hashlib | |
import urllib.parse | |
def generate_sign(params, secret): | |
"""生成1688 API签名""" | |
sorted_params = sorted(params.items()) | |
sign_str = "&".join([f"{k}={v}" for k, v in sorted_params]) | |
sign_str += f"&secret={secret}" | |
return hmac.new(secret.encode(), sign_str.encode(), hashlib.md5).hexdigest().upper() | |
def search_products(app_key, app_secret, keyword, page=1): | |
"""执行关键词搜索""" | |
url = "https://api-gw.onebound.cn/1688/item_search" | |
params = { | |
"key": app_key, | |
"secret": app_secret, | |
"q": keyword, | |
"page": page, | |
"page_size": 100, | |
"sort": "_sale", | |
"fields": "itemId,title,price,sales,picUrl" | |
} | |
params["sign"] = generate_sign(params, app_secret) | |
response = requests.get(url, params=params) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
raise Exception(f"请求失败,状态码:{response.status_code}") | |
# 使用示例 | |
APP_KEY = "您的app_key" | |
APP_SECRET = "您的app_secret" | |
try: | |
results = search_products(APP_KEY, APP_SECRET, "夏季男士T恤") | |
total = results["total_results"] | |
print(f"共找到 {total} 件商品") | |
for item in results["items"]["item"][:5]: # 取前5条结果 | |
print(f""" | |
商品ID: {item["itemId"]} | |
标题: {item["title"]} | |
价格: {item["price"]} 元 | |
销量: {item["sales"]} 件 | |
图片: {item["picUrl"]} | |
""") | |
except Exception as e: | |
print(f"发生错误:{str(e)}") |
三、高级功能实战案例
1. 竞品价格监控
python复制代码
def monitor_competitors(keyword, threshold=5): | |
"""监控竞品价格变动""" | |
results = search_products(APP_KEY, APP_SECRET, keyword) | |
price_changes = [] | |
for item in results["items"]["item"]: | |
# 假设数据库已存储历史价格 | |
current_price = item["price"] | |
historical_price = get_historical_price(item["itemId"]) | |
if abs(current_price - historical_price) > threshold: | |
price_changes.append({ | |
"itemId": item["itemId"], | |
"title": item["title"], | |
"old_price": historical_price, | |
"new_price": current_price, | |
"change_rate": (current_price - historical_price)/historical_price | |
}) | |
if price_changes: | |
send_alert_email(price_changes) | |
return price_changes |
2. 智能补货系统
python复制代码
def generate_replenishment_plan(keyword, min_stock=50): | |
"""生成智能补货计划""" | |
results = search_products(APP_KEY, APP_SECRET, keyword) | |
replenishment_list = [] | |
for item in results["items"]["item"]: | |
# 获取实时库存(需结合其他API) | |
current_stock = get_real_time_stock(item["itemId"]) | |
if current_stock < min_stock: | |
replenishment_list.append({ | |
"itemId": item["itemId"], | |
"title": item["title"], | |
"current_stock": current_stock, | |
"suggested_replenish": min_stock - current_stock, | |
"sales_velocity": calculate_sales_velocity(item["itemId"]) | |
}) | |
return sorted(replenishment_list, key=lambda x: x["sales_velocity"], reverse=True) |
3. SEO关键词优化
python复制代码
def analyze_search_terms(keyword): | |
"""分析搜索词效果""" | |
results = search_products(APP_KEY, APP_SECRET, keyword) | |
term_stats = {} | |
for item in results["items"]["item"]: | |
for term in extract_keywords(item["title"]): | |
if term in term_stats: | |
term_stats[term]["count"] += 1 | |
term_stats[term]["sales"] += item["sales"] | |
else: | |
term_stats[term] = { | |
"count": 1, | |
"sales": item["sales"], | |
"avg_price": item["price"] | |
} | |
# 计算转化率等指标 | |
for term in term_stats: | |
term_stats[term]["conversion_rate"] = term_stats[term]["sales"] / term_stats[term]["count"] | |
return sorted(term_stats.items(), key=lambda x: x[1]["sales"], reverse=True) |
四、性能优化技巧
- 批量查询优化:
python复制代码
# 使用多线程批量查询 | |
from concurrent.futures import ThreadPoolExecutor | |
def batch_search(keywords): | |
results = {} | |
with ThreadPoolExecutor(max_workers=5) as executor: | |
futures = {executor.submit(search_products, APP_KEY, APP_SECRET, keyword): keyword for keyword in keywords} | |
for future in futures: | |
keyword = futures[future] | |
try: | |
results[keyword] = future.result() | |
except Exception as e: | |
results[keyword] = {"error": str(e)} | |
return results |
- 缓存机制:
python复制代码
import redis | |
def cached_search(keyword, expire=300): | |
"""带缓存的搜索""" | |
r = redis.Redis() | |
cache_key = f"1688_search_{keyword}" | |
if r.exists(cache_key): | |
return json.loads(r.get(cache_key)) | |
results = search_products(APP_KEY, APP_SECRET, keyword) | |
r.setex(cache_key, expire, json.dumps(results)) | |
return results |
- 请求频率控制:
python复制代码
from time import sleep | |
def safe_search(keyword, max_retries=3): | |
"""带重试机制的搜索""" | |
for attempt in range(max_retries): | |
try: | |
return search_products(APP_KEY, APP_SECRET, keyword) | |
except Exception as e: | |
if "rate limit" in str(e).lower(): | |
sleep(60 * (attempt + 1)) # 指数退避 | |
else: | |
raise e | |
raise Exception("达到最大重试次数") |
建议定期查阅阿里巴巴开放平台/万邦开放平台,获取最新接口规范和功能更新。对于大规模数据采集需求,可考虑使用分布式爬虫框架(如Scrapy-Redis)进行任务分发和结果存储。