A Coding Implementation Showcasing ClawTeam’s Multi-Agent Swarm Orchestration with OpenAI Operate Calling

A Coding Implementation Showcasing ClawTeam’s Multi-Agent Swarm Orchestration with OpenAI Operate Calling


SWARM_TOOLS = [
   {
       "type": "function",
       "function": {
           "name": "task_update",
           "description": "Update the status of a task. Use 'in_progress' when starting, 'completed' when done.",
           "parameters": {
               "type": "object",
               "properties": {
                   "task_id": {"type": "string", "description": "The task ID"},
                   "status": {"type": "string", "enum": ["in_progress", "completed", "failed"]},
                   "end result": {"sort": "string", "description": "Outcome or output of the duty"},
               },
               "required": ["task_id", "status"],
           },
       },
   },
   {
       "sort": "perform",
       "perform": {
           "title": "inbox_send",
           "description": "Ship a message to a different agent (e.g., 'chief' or a employee title).",
           "parameters": {
               "sort": "object",
               "properties": {
                   "to": {"sort": "string", "description": "Recipient agent title"},
                   "message": {"sort": "string", "description": "Message content material"},
               },
               "required": ["to", "message"],
           },
       },
   },
   {
       "sort": "perform",
       "perform": {
           "title": "inbox_receive",
           "description": "Test and devour all messages in your inbox.",
           "parameters": {
               "sort": "object",
               "properties": {},
           },
       },
   },
   {
       "sort": "perform",
       "perform": {
           "title": "task_list",
           "description": "Checklist duties assigned to you or all staff duties.",
           "parameters": {
               "sort": "object",
               "properties": {
                   "proprietor": {"sort": "string", "description": "Filter by proprietor title (non-obligatory)"},
               },
           },
       },
   },
]




class SwarmAgent:


   def __init__(
       self,
       title: str,
       position: str,
       system_prompt: str,
       task_board: TaskBoard,
       inbox: InboxSystem,
       registry: TeamRegistry,
   ):
       self.title = title
       self.position = position
       self.system_prompt = system_prompt
       self.task_board = task_board
       self.inbox = inbox
       self.registry = registry
       self.conversation_history: listing[dict] = []
       self.inbox.register(title)
       self.registry.register(title, position)


   def _build_system_prompt(self) -> str:
       coord_protocol = f"""
## Coordination Protocol (auto-injected — you might be agent '{self.title}')


You're a part of an AI agent swarm. Your position: {self.position}
Your title: {self.title}


Obtainable instruments (equal to ClawTeam CLI):
- task_list: Test your assigned duties (like `clawteam activity listing`)
- task_update: Replace activity standing to in_progress/accomplished/failed (like `clawteam activity replace`)
- inbox_send: Ship messages to different brokers (like `clawteam inbox ship`)
- inbox_receive: Test your inbox for messages (like `clawteam inbox obtain`)


WORKFLOW:
1. Test your duties with task_list
2. Mark a activity as in_progress while you begin
3. Do the work (assume, analyze, produce output)
4. Mark the duty as accomplished together with your end result
5. Ship a abstract message to 'chief' when finished
"""
       return self.system_prompt + "n" + coord_protocol


   def _handle_tool_call(self, tool_name: str, args: dict) -> str:
       if tool_name == "task_update":
           standing = TaskStatus(args["status"])
           end result = args.get("end result", "")
           self.task_board.update_status(args["task_id"], standing, end result)
           if standing == TaskStatus.COMPLETED:
               self.registry.increment_completed(self.title)
           return json.dumps({"okay": True, "task_id": args["task_id"], "new_status": args["status"]})


       elif tool_name == "inbox_send":
           self.inbox.ship(self.title, args["to"], args["message"])
           return json.dumps({"okay": True, "sent_to": args["to"]})


       elif tool_name == "inbox_receive":
           msgs = self.inbox.obtain(self.title)
           if not msgs:
               return json.dumps({"messages": [], "be aware": "No new messages"})
           return json.dumps({
               "messages": [
                   {"from": m.sender, "content": m.content, "time": m.timestamp}
                   for m in msgs
               ]
           })


       elif tool_name == "task_list":
           proprietor = args.get("proprietor", self.title)
           duties = self.task_board.get_tasks(proprietor=proprietor)
           return json.dumps({"duties": [t.to_dict() for t in tasks]})


       return json.dumps({"error": f"Unknown device: {tool_name}"})


   def run(self, user_message: str, max_iterations: int = 6) -> str:
       self.conversation_history.append({"position": "consumer", "content material": user_message})


       for iteration in vary(max_iterations):
           strive:
               response = consumer.chat.completions.create(
                   mannequin=MODEL,
                   messages=[
                       {"role": "system", "content": self._build_system_prompt()},
                       *self.conversation_history,
                   ],
                   instruments=SWARM_TOOLS,
                   tool_choice="auto",
                   temperature=0.4,
               )
           besides Exception as e:
               return f"[API Error] {e}"


           alternative = response.decisions[0]
           msg = alternative.message


           assistant_msg = {"position": "assistant", "content material": msg.content material or ""}
           if msg.tool_calls:
               assistant_msg["tool_calls"] = [
                   {
                       "id": tc.id,
                       "type": "function",
                       "function": {"name": tc.function.name, "arguments": tc.function.arguments},
                   }
                   for tc in msg.tool_calls
               ]
           self.conversation_history.append(assistant_msg)


           if not msg.tool_calls:
               return msg.content material or "(No response)"


           for tc in msg.tool_calls:
               fn_name = tc.perform.title
               fn_args = json.masses(tc.perform.arguments)
               end result = self._handle_tool_call(fn_name, fn_args)
               self.conversation_history.append({
                   "position": "device",
                   "tool_call_id": tc.id,
                   "content material": end result,
               })


       return "(Agent reached max iterations)"



Source link

Leave a Reply

Your email address will not be published. Required fields are marked *