def _now_iso() -> str:
return datetime.utcnow().change(microsecond=0).isoformat() + "Z"
def _stable_id(prefix: str, seed: str) -> str:
h = hashlib.sha256(seed.encode("utf-8")).hexdigest()[:10]
return f"{prefix}_{h}"
class MockEHR:
def __init__(self):
self.orders_queue: Listing[SurgeryOrder] = []
self.patient_docs: Dict[str, List[ClinicalDocument]] = {}
def seed_data(self, n_orders: int = 5):
random.seed(7)
def make_patient(i: int) -> Affected person:
pid = f"PT{i:04d}"
plan = random.alternative(listing(InsurancePlan))
return Affected person(
patient_id=pid,
title=f"Affected person {i}",
dob="1980-01-01",
member_id=f"M{i:08d}",
plan=plan,
)
def docs_for_order(affected person: Affected person, surgical procedure: SurgeryType) -> Listing[ClinicalDocument]:
base = [
ClinicalDocument(
doc_id=_stable_id("DOC", patient.patient_id + "H&P"),
doc_type=DocType.H_AND_P,
created_at=_now_iso(),
content="H&P: Relevant history, exam findings, and surgical indication.",
source="EHR",
),
ClinicalDocument(
doc_id=_stable_id("DOC", patient.patient_id + "NOTE"),
doc_type=DocType.CLINICAL_NOTE,
created_at=_now_iso(),
content="Clinical note: Symptoms, conservative management attempted, clinician assessment.",
source="EHR",
),
ClinicalDocument(
doc_id=_stable_id("DOC", patient.patient_id + "MEDS"),
doc_type=DocType.MED_LIST,
created_at=_now_iso(),
content="Medication list: Current meds, allergies, contraindications.",
source="EHR",
),
]
possibly = []
if surgical procedure in [SurgeryType.KNEE_ARTHROPLASTY, SurgeryType.SPINE_FUSION, SurgeryType.BARIATRIC]:
possibly.append(
ClinicalDocument(
doc_id=_stable_id("DOC", affected person.patient_id + "LABS"),
doc_type=DocType.LABS,
created_at=_now_iso(),
content material="Labs: CBC/CMP inside final 30 days.",
supply="LabSystem",
)
)
if surgical procedure in [SurgeryType.SPINE_FUSION, SurgeryType.KNEE_ARTHROPLASTY]:
possibly.append(
ClinicalDocument(
doc_id=_stable_id("DOC", affected person.patient_id + "IMG"),
doc_type=DocType.IMAGING,
created_at=_now_iso(),
content material="Imaging: MRI/X-ray report supporting analysis and severity.",
supply="Radiology",
)
)
closing = base + [d for d in maybe if random.random() > 0.35]
if random.random() > 0.6:
closing.append(
ClinicalDocument(
doc_id=_stable_id("DOC", affected person.patient_id + "PRIOR_TX"),
doc_type=DocType.PRIOR_TX,
created_at=_now_iso(),
content material="Prior therapies: PT, meds, injections tried over 6+ weeks.",
supply="EHR",
)
)
if random.random() > 0.5:
closing.append(
ClinicalDocument(
doc_id=_stable_id("DOC", affected person.patient_id + "CONSENT"),
doc_type=DocType.CONSENT,
created_at=_now_iso(),
content material="Consent: Signed process consent and danger disclosure.",
supply="EHR",
)
)
return closing
for i in vary(1, n_orders + 1):
affected person = make_patient(i)
surgical procedure = random.alternative(listing(SurgeryType))
order = SurgeryOrder(
order_id=_stable_id("ORD", affected person.patient_id + surgical procedure.worth),
affected person=affected person,
surgery_type=surgical procedure,
scheduled_date=(datetime.utcnow().date() + timedelta(days=random.randint(3, 21))).isoformat(),
ordering_provider_npi=str(random.randint(1000000000, 1999999999)),
diagnosis_codes=["M17.11", "M54.5"] if surgical procedure != SurgeryType.CATARACT else ["H25.9"],
created_at=_now_iso(),
)
self.orders_queue.append(order)
self.patient_docs[patient.patient_id] = docs_for_order(affected person, surgical procedure)
def poll_new_surgery_orders(self, max_n: int = 1) -> Listing[SurgeryOrder]:
pulled = self.orders_queue[:max_n]
self.orders_queue = self.orders_queue[max_n:]
return pulled
def get_patient_documents(self, patient_id: str) -> Listing[ClinicalDocument]:
return listing(self.patient_docs.get(patient_id, []))
def fetch_additional_docs(self, patient_id: str, wanted: Listing[DocType]) -> Listing[ClinicalDocument]:
generated = []
for dt in wanted:
generated.append(
ClinicalDocument(
doc_id=_stable_id("DOC", patient_id + dt.worth + str(time.time())),
doc_type=dt,
created_at=_now_iso(),
content material=f"Auto-collected doc for {dt.worth}: extracted and formatted per payer coverage.",
supply="AutoCollector",
)
)
self.patient_docs.setdefault(patient_id, []).prolong(generated)
return generated
class MockPayerPortal:
def __init__(self):
self.db: Dict[str, Dict[str, Any]] = {}
random.seed(11)
def required_docs_policy(self, plan: InsurancePlan, surgical procedure: SurgeryType) -> Listing[DocType]:
base = [DocType.H_AND_P, DocType.CLINICAL_NOTE, DocType.MED_LIST]
if surgical procedure in [SurgeryType.SPINE_FUSION, SurgeryType.KNEE_ARTHROPLASTY]:
base += [DocType.IMAGING, DocType.LABS, DocType.PRIOR_TX]
if surgical procedure == SurgeryType.BARIATRIC:
base += [DocType.LABS, DocType.PRIOR_TX]
if plan in [InsurancePlan.PAYER_BETA, InsurancePlan.PAYER_GAMMA]:
base += [DocType.CONSENT]
return sorted(listing(set(base)), key=lambda x: x.worth)
def submit(self, pa: PriorAuthRequest) -> PayerResponse:
payer_ref = _stable_id("PAYREF", pa.request_id + _now_iso())
docs_present = {d.doc_type for d in pa.docs_attached}
required = self.required_docs_policy(pa.order.affected person.plan, pa.order.surgery_type)
lacking = [d for d in required if d not in docs_present]
self.db[payer_ref] = {
"standing": AuthStatus.SUBMITTED,
"order_id": pa.order.order_id,
"plan": pa.order.affected person.plan,
"surgical procedure": pa.order.surgery_type,
"lacking": lacking,
"polls": 0,
"submitted_at": _now_iso(),
"denial_reason": None,
}
msg = "Submission obtained. Case queued for overview."
if lacking:
msg += " Preliminary validation signifies incomplete documentation."
return PayerResponse(standing=AuthStatus.SUBMITTED, payer_ref=payer_ref, message=msg)
def check_status(self, payer_ref: str) -> PayerResponse:
if payer_ref not in self.db:
return PayerResponse(
standing=AuthStatus.DENIED,
payer_ref=payer_ref,
message="Case not discovered (potential payer system error).",
denial_reason=DenialReason.OTHER,
confidence=0.4,
)
case = self.db[payer_ref]
case["polls"] += 1
if case["status"] == AuthStatus.SUBMITTED and case["polls"] >= 1:
case["status"] = AuthStatus.IN_REVIEW
if case["status"] == AuthStatus.IN_REVIEW and case["polls"] >= 3:
if case["missing"]:
case["status"] = AuthStatus.DENIED
case["denial_reason"] = DenialReason.MISSING_DOCS
else:
roll = random.random()
if roll < 0.10:
case["status"] = AuthStatus.DENIED
case["denial_reason"] = DenialReason.CODING_ISSUE
elif roll < 0.18:
case["status"] = AuthStatus.DENIED
case["denial_reason"] = DenialReason.MEDICAL_NECESSITY
else:
case["status"] = AuthStatus.APPROVED
if case["status"] == AuthStatus.DENIED:
dr = case["denial_reason"] or DenialReason.OTHER
lacking = case["missing"] if dr == DenialReason.MISSING_DOCS else []
conf = 0.9 if dr != DenialReason.OTHER else 0.55
return PayerResponse(
standing=AuthStatus.DENIED,
payer_ref=payer_ref,
message=f"Denied. Cause={dr.worth}.",
denial_reason=dr,
missing_docs=lacking,
confidence=conf,
)
if case["status"] == AuthStatus.APPROVED:
return PayerResponse(
standing=AuthStatus.APPROVED,
payer_ref=payer_ref,
message="Permitted. Authorization issued.",
confidence=0.95,
)
return PayerResponse(
standing=case["status"],
payer_ref=payer_ref,
message=f"Standing={case['status'].worth}. Polls={case['polls']}.",
confidence=0.9,
)
def file_appeal(self, payer_ref: str, appeal_text: str, attached_docs: Listing[ClinicalDocument]) -> PayerResponse:
if payer_ref not in self.db:
return PayerResponse(
standing=AuthStatus.DENIED,
payer_ref=payer_ref,
message="Enchantment failed: case not discovered.",
denial_reason=DenialReason.OTHER,
confidence=0.4,
)
case = self.db[payer_ref]
docs_present = {d.doc_type for d in attached_docs}
still_missing = [d for d in case["missing"] if d not in docs_present]
case["missing"] = still_missing
case["status"] = AuthStatus.APPEALED
case["polls"] = 0
msg = "Enchantment submitted and queued for overview."
if still_missing:
msg += f" Warning: nonetheless lacking {', '.be part of([d.value for d in still_missing])}."
return PayerResponse(standing=AuthStatus.APPEALED, payer_ref=payer_ref, message=msg, confidence=0.9)Subscribe to Updates
Get the latest creative news from FooBar about art, design and business.
Friday, January 16
Trending
- What the Metropolis of Calgary’s property assessments imply to your taxes in 2026 – Calgary
- Google AI Releases TranslateGemma: A New Household of Open Translation Fashions Constructed on Gemma 3 with Help for 55 Languages
- SBP permits trade corporations to make use of Raast
- Will Markets React to $2.8B Crypto Choices Expiry Occasion?
- Khyber Pakhtunkhwa Public Service Fee KPPSC Jobs 2026 2026 Job Commercial Pakistan
- 9 JRPGs that Deserve Anime Diversifications
- YouTube launches instruments to manage teenagers’ display screen time
- Name of the Wilde: Montreal Canadiens bested by Sabres with 5-3 loss in Buffalo – Montreal
- Find out how to Construct a Secure, Autonomous Prior Authorization Agent for Healthcare Income Cycle Administration with Human-in-the-Loop Controls
- Businessmen warn NEPRA dropping autonomy
Previous ArticleBusinessmen warn NEPRA dropping autonomy
Related Posts
Add A Comment

