A Coding Implementation to Explore and Analyze the TaskTrove Dataset with Streaming Parsing Visualization and Verifier Detection


filename_counter: Counter = Counter()
all_json_keys:    Counter = Counter()
samples_for_show: List = []


for i, row in enumerate(tqdm(ds_test, desc="inspecting structure", total=200)):
   if i >= 200:
       break
   p = parse_task(row["task_binary"])
   if p["format"] in ("tar", "zip"):
       for name, body in p["files"].items():
           filename_counter[name] += 1
           if name.endswith(".json") and isinstance(body, str):
               try:
                   obj = json.loads(body)
                   if isinstance(obj, dict):
                       for k in obj.keys():
                           all_json_keys[k] += 1
               except Exception:
                   pass
       if len(samples_for_show) < 2:
           samples_for_show.append((row["path"], p))


print("\nMost common filenames inside task archives:")
for name, n in filename_counter.most_common(15):
   print(f"  {n:>4}  {name}")


print("\nMost common top-level JSON keys (across any *.json):")
for k, n in all_json_keys.most_common(20):
   print(f"  {n:>4}  {k}")


if samples_for_show:
   print(f"\nFull file listing for one sample task ({samples_for_show[0][0]}):")
   for name, body in samples_for_show[0][1]["files"].items():
       sz = len(body) if isinstance(body, (str, bytes)) else 0
       print(f"  {name}  ({sz:,} B)")




VERIFIER_FILE_PATTERNS = ("verifier", "verify", "grader", "judge", "score", "eval")
VERIFIER_JSON_KEYS     = ("verifier", "verifier_config", "judge", "grader",
                         "rubric", "test_patch", "FAIL_TO_PASS", "tests")




def has_verifier(parsed: Dict[str, Any]) -> bool:
   """Detect verifiers via filename, JSON content, or both."""
   if parsed["format"] not in ("tar", "zip"):
       c = parsed.get("content")
       if isinstance(c, dict):
           return any(k in c for k in VERIFIER_JSON_KEYS)
       return False


   files = parsed["files"]


   for name in files:
       low = name.lower()
       if any(pat in low for pat in VERIFIER_FILE_PATTERNS):
           return True


   for name, body in files.items():
       if name.endswith((".json", ".yaml", ".yml")) and isinstance(body, str):
           try:
               obj = json.loads(body)
               if isinstance(obj, dict) and any(k in obj for k in VERIFIER_JSON_KEYS):
                   return True
           except Exception:
               pass
           low = body.lower()
           if "verifier" in low or "test_patch" in low:
               return True


   return False




class TaskTroveExplorer:
   """High-level interface to the open-thoughts/TaskTrove dataset."""


   def __init__(self, split: str = "test", dataset_id: str = DATASET_ID):
       self.dataset_id = dataset_id
       self.split = split
       self._ds = load_dataset(dataset_id, split=split, streaming=True)


   def iter(self, limit: Optional[int] = None,
            source_filter: Optional[str] = None) -> Iterator[Dict[str, Any]]:
       rx = re.compile(source_filter) if source_filter else None
       n = 0
       for row in self._ds:
           if rx and not rx.search(source_of(row["path"])):
               continue
           yield row
           n += 1
           if limit is not None and n >= limit:
               return


   def sample(self, n: int = 5,
              source_filter: Optional[str] = None) -> List[Dict[str, Any]]:
       out = []
       for row in self.iter(limit=n, source_filter=source_filter):
           parsed = parse_task(row["task_binary"])
           parsed["path"] = row["path"]
           parsed["source"] = source_of(row["path"])
           out.append(parsed)
       return out


   def summary(self, limit: int = 1000,
               source_filter: Optional[str] = None) -> pd.DataFrame:
       rows = []
       for row in self.iter(limit=limit, source_filter=source_filter):
           parsed = parse_task(row["task_binary"])
           rows.append({
               "source": source_of(row["path"]),
               "compressed": parsed["compressed_size"],
               "raw": parsed["raw_size"],
               "format": parsed["format"],
               "n_files": len(parsed.get("files", {})),
               "has_verifier": has_verifier(parsed),
           })
       df = pd.DataFrame(rows)
       if df.empty:
           return df
       return (df.groupby("source")
                 .agg(n=("compressed", "count"),
                      mean_compressed_kb=("compressed", lambda s: s.mean()/1024),
                      mean_raw_kb=("raw",                lambda s: s.mean()/1024),
                      mean_n_files=("n_files", "mean"),
                      verifier_rate=("has_verifier", "mean"))
                 .round(2)
                 .sort_values("n", ascending=False))


   @staticmethod
   def has_verifier(parsed: Dict[str, Any]) -> bool:
       return has_verifier(parsed)


   def export(self, output_dir: Union[str, Path], n: int = 10,
              source_filter: Optional[str] = None) -> Path:
       output_dir = Path(output_dir)
       output_dir.mkdir(parents=True, exist_ok=True)
       for parsed in self.sample(n=n, source_filter=source_filter):
           slug = parsed["path"].replace("/", "_")
           tdir = output_dir / slug
           tdir.mkdir(exist_ok=True)
           if parsed["format"] in ("tar", "zip"):
               for name, body in parsed["files"].items():
                   out = tdir / name
                   out.parent.mkdir(parents=True, exist_ok=True)
                   if isinstance(body, str):
                       out.write_text(body, encoding="utf-8")
                   else:
                       out.write_bytes(body)
           else:
               content = parsed.get("content", b"")
               if isinstance(content, (dict, list)):
                   (tdir / "task.json").write_text(json.dumps(content, indent=2))
               elif isinstance(content, str):
                   (tdir / "task.txt").write_text(content)
               else:
                   (tdir / "task.bin").write_bytes(content)
       print(f"✓ exported tasks to {output_dir.resolve()}")
       return output_dir




explorer = TaskTroveExplorer(split="test")


print("\nSample of 3 parsed tasks:")
for s in explorer.sample(n=3):
   print(f"path: {s['path']} | source: {s['source']} | format: {s['format']} | "
         f"files: {len(s.get('files', {}))} | verifier: {has_verifier(s)}")



Source link

Leave a Reply

Your email address will not be published. Required fields are marked *