def normalize(knowledge):
return {"textual content": (knowledge.get("textual content") or "").strip().decrease()}
def tokenize(knowledge):
textual content = knowledge.get("textual content", "")
cleaned = "".be a part of(c if (c.isalnum() or c.isspace()) else " " for c in textual content)
tokens = [t for t in cleaned.split() if t]
return {"tokens": tokens, "depend": len(tokens)}
def sentiment(knowledge):
toks = knowledge.get("tokens", [])
pos = sum(t in POSITIVE for t in toks)
neg = sum(t in NEGATIVE for t in toks)
rating = pos - neg
label = "optimistic" if rating > 0 else "unfavourable" if rating < 0 else "impartial"
return {"label": label, "rating": rating, "pos": pos, "neg": neg}
def key phrases(knowledge):
toks = knowledge.get("tokens", [])
cease = {"the","a","an","is","it","to","of","and","in","for","on","how"}
freq = Counter(t for t in toks if t not in cease and len(t) > 2)
return {"key phrases": freq.most_common(knowledge.get("top_n", 5))}
def analyze(knowledge):
norm = employee.set off({"function_id": "textual content::normalize", "payload": {"textual content": knowledge.get("textual content","")}})
toks = employee.set off({"function_id": "textual content::tokenize", "payload": norm})
despatched = employee.set off({"function_id": "textual content::sentiment", "payload": toks})
keys = employee.set off({"function_id": "textual content::key phrases", "payload": {**toks, "top_n": knowledge.get("top_n", 5)}})
with _LOCK:
_STATE["docs_analyzed"] += 1
for okay, c in keys["keywords"]:
_STATE["keyword_totals"][k] += c
n = _STATE["docs_analyzed"]
return {"tokens": toks["count"], "sentiment": despatched, "key phrases": keys["keywords"], "docs_analyzed": n}
def report(knowledge):
with _LOCK:
return {"docs_analyzed": _STATE["docs_analyzed"],
"heartbeats": _STATE["heartbeats"],
"top_keywords_all_docs": _STATE["keyword_totals"].most_common(5)}
def http_analyze(knowledge):
physique = knowledge.get("physique") or {}
outcome = employee.set off({"function_id": "pipeline::analyze", "payload": physique})
return {"status_code": 200, "physique": outcome, "headers": {"Content material-Sort": "software/json"}}
def heartbeat(knowledge):
with _LOCK:
_STATE["heartbeats"] += 1
return {"okay": True}
for fid, fn in [
("text::normalize", normalize), ("text::tokenize", tokenize),
("text::sentiment", sentiment), ("text::keywords", keywords),
("pipeline::analyze", analyze), ("stats::report", report),
("http::analyze", http_analyze), ("cron::heartbeat", heartbeat),
]:
employee.register_function(fid, fn)