Spaces:
Sleeping
Sleeping
Update app/gemini_analyzer.py
Browse files- app/gemini_analyzer.py +34 -56
app/gemini_analyzer.py
CHANGED
|
@@ -6,7 +6,6 @@ This module provides structured analysis of financial text, including:
|
|
| 6 |
- Key entity extraction (e.g., cryptocurrencies).
|
| 7 |
- Topic classification.
|
| 8 |
- Potential market impact assessment.
|
| 9 |
-
- Synthesis of multiple news items into a daily briefing.
|
| 10 |
"""
|
| 11 |
import os
|
| 12 |
import logging
|
|
@@ -17,7 +16,7 @@ from typing import Optional, TypedDict, List, Union
|
|
| 17 |
# Configure logging
|
| 18 |
logger = logging.getLogger(__name__)
|
| 19 |
|
| 20 |
-
# ---
|
| 21 |
class AnalysisResult(TypedDict):
|
| 22 |
sentiment: str
|
| 23 |
sentiment_score: float
|
|
@@ -27,7 +26,7 @@ class AnalysisResult(TypedDict):
|
|
| 27 |
impact: str
|
| 28 |
summary: str
|
| 29 |
error: Optional[str]
|
| 30 |
-
|
| 31 |
|
| 32 |
class GeminiAnalyzer:
|
| 33 |
"""Manages interaction with the Google Gemini API for deep text analysis."""
|
|
@@ -42,8 +41,8 @@ class GeminiAnalyzer:
|
|
| 42 |
self.params = {"key": self.api_key}
|
| 43 |
self.headers = {"Content-Type": "application/json"}
|
| 44 |
|
| 45 |
-
def
|
| 46 |
-
"""Creates the structured JSON prompt for
|
| 47 |
return {
|
| 48 |
"contents": [{
|
| 49 |
"parts": [{
|
|
@@ -67,66 +66,45 @@ class GeminiAnalyzer:
|
|
| 67 |
}]
|
| 68 |
}
|
| 69 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 70 |
async def analyze_text(self, text: str) -> AnalysisResult:
|
| 71 |
"""Sends text to Gemini and returns a structured analysis."""
|
| 72 |
-
prompt = self.
|
| 73 |
try:
|
| 74 |
-
response = await self.client.post(
|
|
|
|
|
|
|
| 75 |
response.raise_for_status()
|
| 76 |
|
| 77 |
full_response = response.json()
|
| 78 |
-
|
| 79 |
|
| 80 |
-
|
| 81 |
-
analysis
|
| 82 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 83 |
|
| 84 |
except Exception as e:
|
| 85 |
logger.error(f"❌ Gemini Analysis Error: {e}")
|
| 86 |
return {
|
| 87 |
-
"sentiment": "ERROR", "sentiment_score": 0, "reason": str(e),
|
| 88 |
"entities": [], "topic": "Unknown", "impact": "Unknown",
|
| 89 |
-
"summary": "Failed to
|
| 90 |
-
}
|
| 91 |
-
|
| 92 |
-
async def generate_daily_briefing(self, analysis_items: List[dict]) -> str:
|
| 93 |
-
"""Generates a high-level market briefing from a list of analyzed news items."""
|
| 94 |
-
if not analysis_items:
|
| 95 |
-
return "### Briefing Unavailable\nNo news items were analyzed in the last period."
|
| 96 |
-
|
| 97 |
-
context = "\n".join([f"- {item.get('summary')} (Impact: {item.get('impact')}, Topic: {item.get('topic')})" for item in analysis_items])
|
| 98 |
-
|
| 99 |
-
briefing_prompt = {
|
| 100 |
-
"contents": [{
|
| 101 |
-
"parts": [{
|
| 102 |
-
"text": f"""
|
| 103 |
-
You are a senior crypto market analyst named 'Sentinel'. Your tone is professional, concise, and insightful.
|
| 104 |
-
Based on the following list of analyzed news items from the last 24 hours, write a "Daily Market Briefing".
|
| 105 |
-
|
| 106 |
-
The briefing must have three sections using markdown:
|
| 107 |
-
1. "### Executive Summary": A single, impactful paragraph summarizing the overall market mood and key events.
|
| 108 |
-
2. "### Top Bullish Signals": 2-3 bullet points on the most positive developments.
|
| 109 |
-
3. "### Top Bearish Signals": 2-3 bullet points on the most significant risks or negative news.
|
| 110 |
-
|
| 111 |
-
Here is the data to analyze:
|
| 112 |
-
{context}
|
| 113 |
-
"""
|
| 114 |
-
}]
|
| 115 |
-
}],
|
| 116 |
-
"safetySettings": [
|
| 117 |
-
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"},
|
| 118 |
-
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"},
|
| 119 |
-
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"},
|
| 120 |
-
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"},
|
| 121 |
-
]
|
| 122 |
-
}
|
| 123 |
-
|
| 124 |
-
try:
|
| 125 |
-
response = await self.client.post(self.API_URL, headers=self.headers, params=self.params, json=briefing_prompt, timeout=120.0)
|
| 126 |
-
response.raise_for_status()
|
| 127 |
-
full_response = response.json()
|
| 128 |
-
briefing_text = full_response["candidates"][0]["content"]["parts"][0]["text"]
|
| 129 |
-
return briefing_text
|
| 130 |
-
except Exception as e:
|
| 131 |
-
logger.error(f"❌ Gemini Briefing Error: {e}")
|
| 132 |
-
return "### Briefing Unavailable\nCould not generate the daily market briefing due to a Gemini API error."
|
|
|
|
| 6 |
- Key entity extraction (e.g., cryptocurrencies).
|
| 7 |
- Topic classification.
|
| 8 |
- Potential market impact assessment.
|
|
|
|
| 9 |
"""
|
| 10 |
import os
|
| 11 |
import logging
|
|
|
|
| 16 |
# Configure logging
|
| 17 |
logger = logging.getLogger(__name__)
|
| 18 |
|
| 19 |
+
# --- Pydantic-like models for structured output ---
|
| 20 |
class AnalysisResult(TypedDict):
|
| 21 |
sentiment: str
|
| 22 |
sentiment_score: float
|
|
|
|
| 26 |
impact: str
|
| 27 |
summary: str
|
| 28 |
error: Optional[str]
|
| 29 |
+
|
| 30 |
|
| 31 |
class GeminiAnalyzer:
|
| 32 |
"""Manages interaction with the Google Gemini API for deep text analysis."""
|
|
|
|
| 41 |
self.params = {"key": self.api_key}
|
| 42 |
self.headers = {"Content-Type": "application/json"}
|
| 43 |
|
| 44 |
+
def _build_prompt(self, text: str) -> dict:
|
| 45 |
+
"""Creates the structured JSON prompt for the Gemini API."""
|
| 46 |
return {
|
| 47 |
"contents": [{
|
| 48 |
"parts": [{
|
|
|
|
| 66 |
}]
|
| 67 |
}
|
| 68 |
|
| 69 |
+
def _extract_json(self, text: str) -> Optional[dict]:
|
| 70 |
+
"""Finds and parses the first valid JSON object in a string."""
|
| 71 |
+
try:
|
| 72 |
+
# Find the first '{' and the last '}' to isolate the JSON blob
|
| 73 |
+
start_index = text.find('{')
|
| 74 |
+
end_index = text.rfind('}')
|
| 75 |
+
if start_index != -1 and end_index != -1 and end_index > start_index:
|
| 76 |
+
json_str = text[start_index:end_index+1]
|
| 77 |
+
return json.loads(json_str)
|
| 78 |
+
except json.JSONDecodeError as e:
|
| 79 |
+
logger.error(f"Failed to decode JSON from extracted text: {text} | Error: {e}")
|
| 80 |
+
return None
|
| 81 |
+
|
| 82 |
async def analyze_text(self, text: str) -> AnalysisResult:
|
| 83 |
"""Sends text to Gemini and returns a structured analysis."""
|
| 84 |
+
prompt = self._build_prompt(text)
|
| 85 |
try:
|
| 86 |
+
response = await self.client.post(
|
| 87 |
+
self.API_URL, headers=self.headers, params=self.params, json=prompt, timeout=60.0
|
| 88 |
+
)
|
| 89 |
response.raise_for_status()
|
| 90 |
|
| 91 |
full_response = response.json()
|
| 92 |
+
response_text = full_response["candidates"][0]["content"]["parts"][0]["text"]
|
| 93 |
|
| 94 |
+
# Use the new robust JSON extractor
|
| 95 |
+
analysis = self._extract_json(response_text)
|
| 96 |
+
|
| 97 |
+
if analysis:
|
| 98 |
+
analysis["error"] = None
|
| 99 |
+
return analysis
|
| 100 |
+
else:
|
| 101 |
+
# This will be logged if the helper function fails
|
| 102 |
+
raise ValueError(f"Could not extract valid JSON from Gemini response: {response_text}")
|
| 103 |
|
| 104 |
except Exception as e:
|
| 105 |
logger.error(f"❌ Gemini Analysis Error: {e}")
|
| 106 |
return {
|
| 107 |
+
"sentiment": "ERROR", "sentiment_score": 0.0, "reason": str(e),
|
| 108 |
"entities": [], "topic": "Unknown", "impact": "Unknown",
|
| 109 |
+
"summary": "Failed to perform analysis.", "error": str(e)
|
| 110 |
+
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|