def make_rag_chunks(rows, max_chars=700):
chunks = []
for row in rows:
text = (
row.get("text_preview")
or row.get("rendered_text")
or row.get("description")
or ""
)
text = normalize_text(text)
if not text:
continue
sentences = re.split(r"(?<=[.!?])\s+", text)
current = ""
for sentence in sentences:
if len(current) + len(sentence) + 1 <= max_chars:
current = (current + " " + sentence).strip()
else:
if current:
chunks.append(
{
"chunk_id": hashlib.sha1(
(row.get("url", "") + current).encode()
).hexdigest()[:12],
"url": row.get("url"),
"source": row.get("source"),
"page_type": row.get("page_type"),
"title": row.get("title") or row.get("name"),
"text": current,
}
)
current = sentence
if current:
chunks.append(
{
"chunk_id": hashlib.sha1(
(row.get("url", "") + current).encode()
).hexdigest()[:12],
"url": row.get("url"),
"source": row.get("source"),
"page_type": row.get("page_type"),
"title": row.get("title") or row.get("name"),
"text": current,
}
)
return chunks
def analyze_outputs(base_url, bs4_rows, parsel_rows, playwright_rows):
all_rows = bs4_rows + parsel_rows + playwright_rows
products = flatten_products(all_rows)
crawl_df = pd.DataFrame(all_rows)
product_df = pd.DataFrame(products)
if not product_df.empty:
product_df["price"] = pd.to_numeric(product_df["price"], errors="coerce")
product_df["stock"] = pd.to_numeric(product_df["stock"], errors="coerce")
product_df["rating"] = pd.to_numeric(product_df["rating"], errors="coerce")
product_df["inventory_value"] = product_df["price"] * product_df["stock"]
graph = build_link_graph(base_url, bs4_rows)
graph_path = OUTPUT_DIR / "site_link_graph.graphml"
if graph.number_of_nodes() > 0:
nx.write_graphml(graph, graph_path)
chunks = make_rag_chunks(all_rows)
rag_path = OUTPUT_DIR / "rag_chunks.jsonl"
with rag_path.open("w", encoding="utf-8") as f:
for chunk in chunks:
f.write(json.dumps(chunk, ensure_ascii=False) + "\n")
crawl_json_path = OUTPUT_DIR / "combined_crawl_results.json"
crawl_json_path.write_text(
json.dumps(all_rows, ensure_ascii=False, indent=2),
encoding="utf-8",
)
product_csv_path = OUTPUT_DIR / "normalized_product_catalog.csv"
if not product_df.empty:
product_df.to_csv(product_csv_path, index=False)
price_plot_path = OUTPUT_DIR / "product_price_chart.png"
if not product_df.empty and product_df["price"].notna().any():
plot_df = product_df.dropna(subset=["price"]).copy()
plot_df["label"] = plot_df["sku"].fillna("unknown") + "\n" + plot_df["source"].fillna("")
ax = plot_df.plot(
kind="bar",
x="label",
y="price",
legend=False,
figsize=(11, 5),
title="Extracted Product Prices by Source",
)
ax.set_xlabel("Product / extraction source")
ax.set_ylabel("Price")
plt.xticks(rotation=35, ha="right")
plt.tight_layout()
plt.savefig(price_plot_path, dpi=160)
plt.show()
graph_stats = {
"nodes": graph.number_of_nodes(),
"edges": graph.number_of_edges(),
"weakly_connected_components": (
nx.number_weakly_connected_components(graph)
if graph.number_of_nodes()
else 0
),
}
if graph.number_of_nodes() > 0:
in_degrees = dict(graph.in_degree())
out_degrees = dict(graph.out_degree())
graph_stats["top_in_degree"] = sorted(
in_degrees.items(),
key=lambda x: x[1],
reverse=True,
)[:5]
graph_stats["top_out_degree"] = sorted(
out_degrees.items(),
key=lambda x: x[1],
reverse=True,
)[:5]
summary = {
"base_url": base_url,
"rows_total": len(all_rows),
"beautifulsoup_rows": len(bs4_rows),
"parsel_rows": len(parsel_rows),
"playwright_rows": len(playwright_rows),
"products_total": len(product_df),
"rag_chunks_total": len(chunks),
"graph": graph_stats,
"outputs": {
"beautifulsoup_json": str(OUTPUT_DIR / "beautifulsoup_crawl.json"),
"beautifulsoup_csv": str(OUTPUT_DIR / "beautifulsoup_crawl.csv"),
"parsel_json": str(OUTPUT_DIR / "parsel_products.json"),
"parsel_csv": str(OUTPUT_DIR / "parsel_products.csv"),
"playwright_json": str(OUTPUT_DIR / "playwright_dynamic.json"),
"playwright_csv": str(OUTPUT_DIR / "playwright_dynamic.csv"),
"combined_json": str(crawl_json_path),
"product_csv": str(product_csv_path) if product_csv_path.exists() else None,
"rag_jsonl": str(rag_path),
"graphml": str(graph_path) if graph_path.exists() else None,
"price_plot": str(price_plot_path) if price_plot_path.exists() else None,
"screenshots_dir": str(SCREENSHOT_DIR),
},
}
summary_path = OUTPUT_DIR / "run_summary.md"
summary_path.write_text(
"# Crawlee Python Advanced Tutorial Run Summary\n\n"
f"- Local demo site: `{base_url}`\n"
f"- Total extracted rows: `{summary['rows_total']}`\n"
f"- BeautifulSoup rows: `{summary['beautifulsoup_rows']}`\n"
f"- Parsel rows: `{summary['parsel_rows']}`\n"
f"- Playwright rows: `{summary['playwright_rows']}`\n"
f"- Normalized products: `{summary['products_total']}`\n"
f"- RAG chunks: `{summary['rag_chunks_total']}`\n"
f"- Link graph nodes: `{graph_stats['nodes']}`\n"
f"- Link graph edges: `{graph_stats['edges']}`\n\n"
"## Output files\n\n"
+ "\n".join(f"- `{k}`: `{v}`" for k, v in summary["outputs"].items())
+ "\n",
encoding="utf-8",
)
print("\n=== 4) Analysis summary ===")
print(json.dumps(summary, indent=2, ensure_ascii=False))
try:
from IPython.display import display, Markdown, Image as IPImage
display(Markdown("## Crawlee crawl preview"))
if not crawl_df.empty:
preview_cols = [
col for col in ["source", "page_type", "title", "url"]
if col in crawl_df.columns
]
display(crawl_df[preview_cols].head(12))
display(Markdown("## Normalized product catalog"))
if not product_df.empty:
display(product_df.head(20))
if price_plot_path.exists():
display(Markdown("## Product price chart"))
display(IPImage(filename=str(price_plot_path)))
screenshot_path = SCREENSHOT_DIR / "dynamic_catalog_full_page.png"
if screenshot_path.exists():
display(Markdown("## Playwright screenshot of JavaScript-rendered page"))
display(IPImage(filename=str(screenshot_path)))
display(Markdown(f"## Output directory\n`{OUTPUT_DIR}`"))
except Exception as exc:
print("Notebook display skipped:", repr(exc))
return summary
async def main():
httpd, base_url = start_local_server(SITE_DIR)
print(f"\nLocal demo website is running at: {base_url}/index.html")
try:
bs4_rows = await run_beautifulsoup_crawl(base_url)
parsel_rows = await run_parsel_precision_crawl(base_url)
playwright_rows = await run_playwright_dynamic_crawl(base_url)
summary = analyze_outputs(base_url, bs4_rows, parsel_rows, playwright_rows)
return summary
finally:
httpd.shutdown()
print("\nLocal demo server shut down.")
loop = asyncio.get_event_loop()
summary = loop.run_until_complete(main())
print("\nTutorial complete.")
print(f"All outputs are in: {OUTPUT_DIR}")
print("Key files:")
for file_path in sorted(OUTPUT_DIR.rglob("*")):
if file_path.is_file():
print(" -", file_path)