""" core.py - Simple Game Generator Core 3 USE CASES: 1. run_multi() - Analyze + generate nhiều games phù hợp 2. run_single() - Analyze + generate 1 game tốt nhất (1 API call) 3. generate() - Generate 1 game cụ thể (không analyze) """ import os import json import time from typing import List, Dict, Any, Optional, Union from dataclasses import dataclass from langchain_core.prompts import ChatPromptTemplate from .llm_config import ModelConfig, get_llm, get_default_config from .game_registry import get_registry, get_game, get_analyzer_context from .validator import QuoteValidator from .logger import logger @dataclass class TokenUsage: prompt_tokens: int = 0 completion_tokens: int = 0 total_tokens: int = 0 input_chars: int = 0 # Character count sent to LLM output_chars: int = 0 # Character count received from LLM def add(self, usage: Dict[str, int]): self.prompt_tokens += usage.get("prompt_tokens", 0) or usage.get( "input_tokens", 0 ) self.completion_tokens += usage.get("completion_tokens", 0) or usage.get( "output_tokens", 0 ) self.total_tokens = self.prompt_tokens + self.completion_tokens def add_chars(self, input_text: str, output_text: str): """Track character counts for LLM input/output""" self.input_chars += len(input_text) if input_text else 0 self.output_chars += len(output_text) if output_text else 0 def to_dict(self) -> Dict[str, int]: return { "prompt_tokens": self.prompt_tokens, "completion_tokens": self.completion_tokens, "total_tokens": self.total_tokens, "input_chars": self.input_chars, "output_chars": self.output_chars, } class GameCore: """ Simple Game Generator. Usage: core = GameCore() # 1. Generate nhiều games (analyze first) result = core.run_multi(text) # 2. Generate 1 game tốt nhất (1 API call) result = core.run_single(text) # 3. Generate 1 game cụ thể result = core.generate("quiz", text) """ def __init__(self, llm_config: Optional[Union[ModelConfig, Dict, str]] = None): self.llm_config = self._parse_config(llm_config) self.llm = get_llm(self.llm_config) self.validator = QuoteValidator() self.registry = get_registry() print(f"🤖 LLM: {self.llm_config.provider}/{self.llm_config.model_name}") def _parse_config(self, config) -> ModelConfig: if config is None: if os.getenv("GOOGLE_API_KEY"): return get_default_config("gemini") elif os.getenv("OPENAI_API_KEY"): return get_default_config("openai") return get_default_config("ollama") if isinstance(config, ModelConfig): return config if isinstance(config, str): return get_default_config(config) if isinstance(config, dict): return ModelConfig(**config) raise ValueError(f"Invalid config: {type(config)}") # ============== 1. RUN MULTI (Analyze + Generate nhiều games) ============== def run_multi( self, text: str, enabled_games: Optional[List[str]] = None, max_items: int = 3, min_score: int = 20, validate: bool = True, debug: bool = False, ) -> Dict[str, Any]: """ Analyze text + Generate nhiều games phù hợp. Returns: {success, games, results, errors, token_usage, llm} """ tracker = TokenUsage() errors = [] # 1. Analyze (also returns metadata) available = enabled_games or self.registry.get_game_types() logger.info(f"Analyzing text for multi-gen. Available games: {available}") games, scores, metadata, err = self._analyze( text, available, min_score, tracker, debug ) errors.extend(err) if not games: logger.warning("Analyzer found no suitable games matches.") return self._result(False, [], {}, errors, tracker, metadata=metadata) logger.info(f"Analyzer selected: {games}") # 2. Generate results, err = self._generate_multi(games, text, max_items, tracker, debug) errors.extend(err) # 3. Validate if validate: results = self._validate(results, text) # Check if any game has items has_items = any( data.get("items", []) for data in results.values() if isinstance(data, dict) ) return self._result( has_items, games, results, errors, tracker, scores, metadata ) # ============== 1.5. RUN FAST (1 API call: Analyze + Generate ALL suitable games) ============== def run_fast( self, text: str, enabled_games: Optional[List[str]] = None, max_items: int = 3, min_score: int = 50, validate: bool = True, debug: bool = False, ) -> Dict[str, Any]: """ OPTIMIZED: 1 API call để analyze + generate TẤT CẢ games phù hợp. Output format GIỐNG HỆT run_multi(): - 1 call duy nhất thay vì 2 (analyze + generate) - Trả về nhiều games với items Returns: {success, games, game_scores, metadata, results, token_usage, llm} """ tracker = TokenUsage() available = enabled_games or self.registry.get_game_types() logger.info(f"[run_fast] Starting with games: {available}") # Build FULL game schemas (giống _generate_multi) games_schema = [] for gt in available: game = get_game(gt) if game: games_schema.append(f"""### {gt.upper()} {game.generated_system_prompt} REQUIRED OUTPUT FORMAT: {game.format_instructions}""") # Format cho response fmt = ", ".join( [ f'"{gt}": {{"score": 0-100, "reason": "...", "items": [...]}}' for gt in available ] ) prompt = ChatPromptTemplate.from_messages( [ ( "system", """You are an educational game analyzer AND generator. In ONE response: 1. SCORE each game type (0-100) based on how well the text matches game requirements 2. GENERATE items for games with score >= {min_score} SCORING GUIDE: - 70-100: Text matches game requirements well → GENERATE items - 40-69: Partial match → GENERATE items if >= min_score - 0-39: Does not match → DO NOT generate items GENERATION RULES: - KEEP original language from text - original_quote = EXACT copy from source text - ALL content must come from source text only - Include ALL required fields (image_description, image_keywords, etc.) - Generate max {max_items} items per game - STRICTLY FOLLOW each game's GENERATION RULES defined in their schema below""", ), ( "human", """GAMES AND THEIR SCHEMAS: {schemas} SOURCE TEXT: {text} RESPOND with this EXACT JSON structure: {{ "metadata": {{ "title": "short title from source", "description": "one sentence summary", "grade": 1-5, "difficulty": 1-5 }}, {format} }}""", ), ] ) if debug: print(f"\n{'=' * 50}\n✨ RUN FAST (1 call)\n{'=' * 50}") print(f"Text length: {len(text)}") try: # Build input for tracking invoke_params = { "schemas": "\n\n".join(games_schema), "text": text, "format": fmt, "min_score": min_score, "max_items": max_items, } resp = (prompt | self.llm).invoke(invoke_params) tracker.add(self._get_usage(resp)) # Track character counts: input = all params joined, output = response content input_text = " ".join(str(v) for v in invoke_params.values()) tracker.add_chars(input_text, resp.content) if debug: print(f"📝 Response: {resp.content[:500]}...") data = self._parse_json(resp.content) metadata = data.get("metadata", {}) # Process results - format giống _generate_multi results = {} scores = [] selected_games = [] errors = [] for gt in available: game_data = data.get(gt, {}) if not isinstance(game_data, dict): continue score = game_data.get("score", 0) reason = game_data.get("reason", "") items = game_data.get("items", []) scores.append({"type": gt, "score": score, "reason": reason}) if score >= min_score and items: # Post-process items (giống _generate_multi) processed_items = self._post_process(items, gt) # Validate if needed if validate: processed_items = [ i for i in processed_items if self.validator.validate_quote( i.get("original_quote", ""), text ).is_valid ] # Thống nhất structure: {items: [...], metadata: {...}} - giống run_multi results[gt] = { "items": processed_items, "metadata": game_data.get("metadata"), } if processed_items: selected_games.append(gt) else: errors.append(f"No valid items for {gt}") elif score >= min_score: errors.append(f"No items generated for {gt}") # Sort scores scores.sort(key=lambda x: x.get("score", 0), reverse=True) return self._result( success=len(selected_games) > 0, games=selected_games, results=results, errors=errors, tracker=tracker, scores=scores, metadata=metadata, ) except Exception as e: logger.error(f"[run_fast] Error: {e}") return self._result(False, [], {}, [str(e)], tracker) # ============== 2. RUN SINGLE (1 API call: Analyze + Generate 1 game) ============== def run_single( self, text: str, enabled_games: Optional[List[str]] = None, max_items: int = 3, validate: bool = True, debug: bool = False, ) -> Dict[str, Any]: """ 1 API call: Analyze + Generate game tốt nhất. Returns: {success, game_type, reason, items, errors, token_usage, llm} """ tracker = TokenUsage() available = enabled_games or self.registry.get_game_types() logger.info(f"Starting run_single for available games: {available}") # Build games info games_info = [] for gt in available: game = get_game(gt) if game: example = ( json.dumps( game.examples[0].get("output", {}), ensure_ascii=False, indent=2 ) if game.examples else "{}" ) games_info.append( f"### {gt}\n{game.description}\nExample output:\n{example}" ) prompt = ChatPromptTemplate.from_messages( [ ( "system", """You are an educational game generator. 1. ANALYZE text and CHOOSE the BEST game type 2. GENERATE items for that game RULES: - KEEP original language - original_quote = EXACT copy from source - ALL content from source only""", ), ( "human", """GAMES: {games_info} TEXT: {text} Choose BEST game from: {types} Generate max {max_items} items. Return JSON: {{"game_type": "chosen", "reason": "why", "items": [...]}}""", ), ] ) content = { "games_info": "\n\n".join(games_info), "text": text[:2000], "types": ", ".join(available), "max_items": max_items, } if debug: print(f"\n{'=' * 50}\n🎯 RUN SINGLE\n{'=' * 50}") try: resp = (prompt | self.llm).invoke(content) tracker.add(self._get_usage(resp)) # Track character counts input_text = " ".join(str(v) for v in content.values()) tracker.add_chars(input_text, resp.content) data = self._parse_json(resp.content) game_type = data.get("game_type") items = self._post_process(data.get("items", []), game_type) if validate and items: items = [ i for i in items if self.validator.validate_quote( i.get("original_quote", ""), text ).is_valid ] return { "success": len(items) > 0, "game_type": game_type, "reason": data.get("reason", ""), "items": items, "errors": [], "token_usage": tracker.to_dict(), "llm": f"{self.llm_config.provider}/{self.llm_config.model_name}", } except Exception as e: return { "success": False, "game_type": None, "items": [], "errors": [str(e)], "token_usage": tracker.to_dict(), "llm": f"{self.llm_config.provider}/{self.llm_config.model_name}", } # ============== 3. GENERATE (1 game cụ thể, không analyze) ============== def generate( self, game_type: str, text: str, max_items: int = 3, validate: bool = True, debug: bool = False, ) -> Dict[str, Any]: """Generate 1 game cụ thể""" tracker = TokenUsage() logger.info(f"Generating single game content: {game_type}") game = get_game(game_type) if not game: return { "success": False, "game_type": game_type, "items": [], "errors": [f"Game not found: {game_type}"], "token_usage": {}, "llm": "", } # Build Format Rules Section format_rules_section = "" if game.input_format_rules: rules_str = "\n".join(f"- {r}" for r in game.input_format_rules) format_rules_section = f""" CRITICAL: FIRST, VALIDATE THE INPUT TEXT. Format Rules: {rules_str} If the text is completely UNSUITABLE for this game type, you MUST output strictly this JSON and nothing else: {{{{ "format_error": "Input text incompatible with game requirements." }}}} """ prompt = ChatPromptTemplate.from_messages( [ ( "system", f"""{game.generated_system_prompt} {format_rules_section}""", ), ( "human", """TEXT TO PROCESS: {text} Generate content in JSON format: {format_instructions}""", ), ] ) if debug: print(f"\n{'=' * 50}\n🎮 GENERATE: {game_type}\n{'=' * 50}") try: invoke_params = { "text": text, "format_instructions": game.format_instructions, } resp = (prompt | self.llm).invoke(invoke_params) tracker.add(self._get_usage(resp)) # Track character counts input_text = " ".join(str(v) for v in invoke_params.values()) tracker.add_chars(input_text, resp.content) # 1. Parse as raw JSON first to check for format_error raw_data = None try: raw_data = self._parse_json(resp.content) except: pass # 2. Check if it's a format_error immediately if raw_data and raw_data.get("format_error"): return { "success": False, "game_type": game_type, "data": None, "format_error": raw_data["format_error"], "errors": [raw_data["format_error"]], "token_usage": tracker.to_dict(), "llm": f"{self.llm_config.provider}/{self.llm_config.model_name}", } parsed_data = raw_data # 3. Try output_parser for structured validation if present if game.output_parser: try: parsed = game.output_parser.parse(resp.content) parsed_data = parsed.model_dump() except Exception as pe: if debug: print(f"⚠️ output_parser failed: {pe}") # Keep raw_data if parser fails but we have JSON # Check format error if parsed_data and parsed_data.get("format_error"): return { "success": False, "game_type": game_type, "data": None, "format_error": parsed_data["format_error"], "errors": [parsed_data["format_error"]], "token_usage": tracker.to_dict(), "llm": f"{self.llm_config.provider}/{self.llm_config.model_name}", } # Post-process items = parsed_data.get("items", []) if parsed_data else [] items = self._post_process(items, game_type) if validate and items: items = [ i for i in items if self.validator.validate_quote( i.get("original_quote", ""), text ).is_valid ] if not items: return { "success": False, "game_type": game_type, "data": None, "format_error": "No items extracted", "errors": [], "token_usage": tracker.to_dict(), "llm": f"{self.llm_config.provider}/{self.llm_config.model_name}", } if parsed_data: parsed_data["items"] = items return { "success": True, "game_type": game_type, "data": parsed_data, "errors": [], "token_usage": tracker.to_dict(), "llm": f"{self.llm_config.provider}/{self.llm_config.model_name}", } except Exception as e: return { "success": False, "game_type": game_type, "data": None, "errors": [str(e)], "token_usage": tracker.to_dict(), "llm": f"{self.llm_config.provider}/{self.llm_config.model_name}", } # ============== PRIVATE METHODS ============== def _analyze( self, text: str, available: List[str], min_score: int, tracker: TokenUsage, debug: bool, ) -> tuple: """Analyze text để suggest games - với retry""" # Lấy context từ game configs context = get_analyzer_context() prompt = ChatPromptTemplate.from_messages( [ ( "system", """You are a game type analyzer. Score each game 0-100 based on how well the text matches the game requirements. GAME REQUIREMENTS: {context} SCORING: - 70-100: Text matches game requirements well - 40-69: Partial match - 0-39: Does not match requirements IMPORTANT: You MUST use the exact game type name (e.g. 'quiz', 'sequence') in the "type" field. Return valid JSON with scores AND metadata about the content: {{ "scores": [ {{ "type": "NAME_OF_GAME_TYPE", "score": 80, "reason": "..." }} ], "metadata": {{ "title": "Title from source or create short title", "description": "One sentence summary", "grade": 1-5, "difficulty": 1-5 }} }}""", ), ( "human", """TEXT TO ANALYZE: {text} Analyze for games: {types} Return JSON:""", ), ] ) max_retries = 2 for attempt in range(max_retries): try: invoke_params = { "context": context, "text": text, "types": ", ".join(available), } resp = (prompt | self.llm).invoke(invoke_params) tracker.add(self._get_usage(resp)) # Track character counts input_text = " ".join(str(v) for v in invoke_params.values()) tracker.add_chars(input_text, resp.content) if debug: print(f"📝 Analyzer raw: {resp.content}") # Parse JSON với fallback content = resp.content.strip() if not content: if debug: print(f"⚠️ Empty response, retry {attempt + 1}") continue data = self._parse_json(content) scores = [ s for s in data.get("scores", []) if s.get("type") in available and s.get("score", 0) >= min_score ] scores.sort(key=lambda x: x.get("score", 0), reverse=True) # Extract metadata from response metadata = data.get("metadata", {}) if debug: print(f"🔍 Scores: {scores}") print(f"📋 Metadata: {metadata}") return [s["type"] for s in scores], scores, metadata, [] except Exception as e: if debug: print(f"⚠️ Analyze attempt {attempt + 1} failed: {e}") if attempt == max_retries - 1: # Final fallback: return all games với low score return available, [], {}, [f"Analyze error: {e}"] return available, [], {}, ["Analyze failed after retries"] def _generate_multi( self, games: List[str], text: str, max_items: int, tracker: TokenUsage, debug: bool, ) -> tuple: """Generate nhiều games""" if len(games) == 1: result = self.generate( games[0], text, max_items, validate=False, debug=debug ) tracker.add(result.get("token_usage", {})) # Fix: generate returns {data: {items: [...]}} not {items: [...]} data = result.get("data") or {} items = data.get("items", []) if isinstance(data, dict) else [] return { games[0]: {"items": items, "metadata": data.get("metadata")} }, result.get("errors", []) # Multi-game: Build schema info for each game games_schema = [] for gt in games: game = get_game(gt) if game: games_schema.append(f"""### {gt.upper()} {game.generated_system_prompt} REQUIRED OUTPUT FORMAT: {game.format_instructions}""") prompt = ChatPromptTemplate.from_messages( [ ( "system", """You are a multi-game content generator. In ONE response: 1. Generate items for EACH game type following their EXACT schema GENERATION RULES: - KEEP original language from text - original_quote = EXACT copy from source text - ALL content must come from source text only - Include ALL required fields (image_description, image_keywords, etc.) - STRICTLY FOLLOW each game's GENERATION RULES defined in their schema below""", ), ( "human", """GAMES AND THEIR SCHEMAS: {schemas} SOURCE TEXT: {text} Generate items for: {types} Return valid JSON: {{{format}}}""", ), ] ) fmt = ", ".join( [f'"{gt}": {{"items": [...], "metadata": {{...}}}}' for gt in games] ) try: invoke_params = { "schemas": "\n\n".join(games_schema), "text": text, "types": ", ".join(games), "format": fmt, } resp = (prompt | self.llm).invoke(invoke_params) tracker.add(self._get_usage(resp)) # Track character counts input_text = " ".join(str(v) for v in invoke_params.values()) tracker.add_chars(input_text, resp.content) data = self._parse_json(resp.content) results = {} errors = [] for gt in games: game_data = data.get(gt, {}) if isinstance(data.get(gt), dict) else {} items = game_data.get("items", []) items = self._post_process(items, gt) # Thống nhất structure: {items: [...], metadata: {...}} results[gt] = {"items": items, "metadata": game_data.get("metadata")} if not items: errors.append(f"No items for {gt}") return results, errors except Exception as e: return {gt: {"items": [], "metadata": None} for gt in games}, [ f"Generate error: {e}" ] def _validate(self, results: Dict[str, dict], text: str) -> Dict[str, dict]: """Validate items trong results""" validated = {} for gt, data in results.items(): items = data.get("items", []) if isinstance(data, dict) else [] valid_items = [ i for i in items if self.validator.validate_quote( i.get("original_quote", ""), text ).is_valid ] validated[gt] = { "items": valid_items, "metadata": data.get("metadata") if isinstance(data, dict) else None, } return validated def _post_process(self, items: List, game_type: str) -> List[Dict]: ms = int(time.time() * 1000) result = [] for i, item in enumerate(items): d = ( item if isinstance(item, dict) else (item.model_dump() if hasattr(item, "model_dump") else {}) ) d["id"] = f"{game_type[:2].upper()}-{ms}-{i}" d["game_type"] = game_type result.append(d) return result def _parse_json(self, content: str) -> Dict: if "```" in content: content = content.split("```")[1].replace("json", "").strip() return json.loads(content) def _get_usage(self, resp) -> Dict: if hasattr(resp, "response_metadata"): meta = resp.response_metadata return meta.get( "usage", meta.get("usage_metadata", meta.get("token_usage", {})) ) return getattr(resp, "usage_metadata", {}) def _result( self, success: bool, games: List, results: Dict, errors: List, tracker: TokenUsage, scores: List = None, metadata: Dict = None, ) -> Dict: return { "success": success, "games": games, "game_scores": scores or [], "metadata": metadata or {}, "results": results, "errors": errors, "token_usage": tracker.to_dict(), "llm": f"{self.llm_config.provider}/{self.llm_config.model_name}", } # ============== ASYNC WRAPPERS (for concurrent FastAPI handling) ============== # These methods run the blocking LLM calls in a thread pool async def run_fast_async( self, text: str, enabled_games: Optional[List[str]] = None, max_items: int = 3, min_score: int = 50, validate: bool = True, debug: bool = False, ) -> Dict[str, Any]: """Async wrapper for run_fast - runs in thread pool to not block event loop""" import asyncio return await asyncio.to_thread( self.run_fast, text, enabled_games, max_items, min_score, validate, debug ) async def run_single_async( self, text: str, enabled_games: Optional[List[str]] = None, max_items: int = 3, validate: bool = True, debug: bool = False, ) -> Dict[str, Any]: """Async wrapper for run_single - runs in thread pool to not block event loop""" import asyncio return await asyncio.to_thread( self.run_single, text, enabled_games, max_items, validate, debug ) async def run_multi_async( self, text: str, enabled_games: Optional[List[str]] = None, max_items: int = 3, validate: bool = True, debug: bool = False, ) -> Dict[str, Any]: """Async wrapper for run_multi - runs in thread pool to not block event loop""" import asyncio return await asyncio.to_thread( self.run_multi, text, enabled_games, max_items, validate, debug ) async def generate_async( self, text: str, game_types: Union[List[str], str], max_items: int = 10, validate: bool = True, debug: bool = False, ) -> Dict[str, Any]: """Async wrapper for generate - runs in thread pool to not block event loop""" import asyncio return await asyncio.to_thread( self.generate, text, game_types, max_items, validate, debug )