| |
| """ |
| Alignment and Divergence Analyzer |
| Analyzes long-term alignment/divergence between economic indicators using Spearman correlation |
| and detects sudden deviations using Z-score analysis. |
| """ |
|
|
| import logging |
| import numpy as np |
| import pandas as pd |
| import matplotlib.pyplot as plt |
| import seaborn as sns |
| from scipy import stats |
| from typing import Dict, List, Optional, Tuple, Union |
| from datetime import datetime, timedelta |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class AlignmentDivergenceAnalyzer: |
| """ |
| Analyzes long-term alignment/divergence patterns and sudden deviations in economic indicators |
| """ |
| |
| def __init__(self, data: pd.DataFrame): |
| """ |
| Initialize analyzer with economic data |
| |
| Args: |
| data: DataFrame with economic indicators (time series) |
| """ |
| self.data = data.copy() |
| self.results = {} |
| |
| def analyze_long_term_alignment(self, |
| indicators: List[str] = None, |
| window_sizes: List[int] = [12, 24, 48], |
| min_periods: int = 8) -> Dict: |
| """ |
| Analyze long-term alignment/divergence using rolling Spearman correlation |
| |
| Args: |
| indicators: List of indicators to analyze. If None, use all numeric columns |
| window_sizes: List of rolling window sizes (in periods) |
| min_periods: Minimum periods required for correlation calculation |
| |
| Returns: |
| Dictionary with alignment analysis results |
| """ |
| if indicators is None: |
| indicators = self.data.select_dtypes(include=[np.number]).columns.tolist() |
| |
| logger.info(f"Analyzing long-term alignment for {len(indicators)} indicators") |
| |
| |
| growth_data = self.data[indicators].pct_change().dropna() |
| |
| |
| alignment_results = { |
| 'rolling_correlations': {}, |
| 'alignment_summary': {}, |
| 'divergence_periods': {}, |
| 'trend_analysis': {} |
| } |
| |
| |
| for i, indicator1 in enumerate(indicators): |
| for j, indicator2 in enumerate(indicators): |
| if i >= j: |
| continue |
| |
| pair_name = f"{indicator1}_vs_{indicator2}" |
| logger.info(f"Analyzing alignment: {pair_name}") |
| |
| |
| pair_data = growth_data[[indicator1, indicator2]].dropna() |
| |
| if len(pair_data) < min_periods: |
| logger.warning(f"Insufficient data for {pair_name}") |
| continue |
| |
| |
| rolling_corrs = {} |
| alignment_trends = {} |
| |
| for window in window_sizes: |
| if window <= len(pair_data): |
| |
| |
| |
| corr_values = [] |
| for start_idx in range(len(pair_data) - window + 1): |
| window_data = pair_data.iloc[start_idx:start_idx + window] |
| if len(window_data.dropna()) >= min_periods: |
| corr_val = window_data.corr(method='spearman').iloc[0, 1] |
| if not pd.isna(corr_val): |
| corr_values.append(corr_val) |
| |
| if corr_values: |
| rolling_corrs[f"window_{window}"] = corr_values |
| |
| |
| alignment_trends[f"window_{window}"] = self._analyze_correlation_trend( |
| corr_values, pair_name, window |
| ) |
| |
| |
| alignment_results['rolling_correlations'][pair_name] = rolling_corrs |
| alignment_results['trend_analysis'][pair_name] = alignment_trends |
| |
| |
| alignment_results['divergence_periods'][pair_name] = self._identify_divergence_periods( |
| pair_data, rolling_corrs, pair_name |
| ) |
| |
| |
| alignment_results['alignment_summary'] = self._generate_alignment_summary( |
| alignment_results['trend_analysis'] |
| ) |
| |
| self.results['alignment'] = alignment_results |
| return alignment_results |
| |
| def detect_sudden_deviations(self, |
| indicators: List[str] = None, |
| z_threshold: float = 2.0, |
| window_size: int = 12, |
| min_periods: int = 6) -> Dict: |
| """ |
| Detect sudden deviations using Z-score analysis |
| |
| Args: |
| indicators: List of indicators to analyze. If None, use all numeric columns |
| z_threshold: Z-score threshold for flagging deviations |
| window_size: Rolling window size for Z-score calculation |
| min_periods: Minimum periods required for Z-score calculation |
| |
| Returns: |
| Dictionary with deviation detection results |
| """ |
| if indicators is None: |
| indicators = self.data.select_dtypes(include=[np.number]).columns.tolist() |
| |
| logger.info(f"Detecting sudden deviations for {len(indicators)} indicators") |
| |
| |
| growth_data = self.data[indicators].pct_change().dropna() |
| |
| deviation_results = { |
| 'z_scores': {}, |
| 'deviations': {}, |
| 'deviation_summary': {}, |
| 'extreme_events': {} |
| } |
| |
| for indicator in indicators: |
| if indicator not in growth_data.columns: |
| continue |
| |
| series = growth_data[indicator].dropna() |
| |
| if len(series) < min_periods: |
| logger.warning(f"Insufficient data for {indicator}") |
| continue |
| |
| |
| rolling_mean = series.rolling(window=window_size, min_periods=min_periods).mean() |
| rolling_std = series.rolling(window=window_size, min_periods=min_periods).std() |
| |
| |
| z_scores = (series - rolling_mean) / rolling_std |
| |
| |
| deviations = z_scores[abs(z_scores) > z_threshold] |
| |
| |
| deviation_results['z_scores'][indicator] = z_scores |
| deviation_results['deviations'][indicator] = deviations |
| |
| |
| deviation_results['extreme_events'][indicator] = self._analyze_extreme_events( |
| series, z_scores, deviations, indicator |
| ) |
| |
| |
| deviation_results['deviation_summary'] = self._generate_deviation_summary( |
| deviation_results['deviations'], deviation_results['extreme_events'] |
| ) |
| |
| self.results['deviations'] = deviation_results |
| return deviation_results |
| |
| def _analyze_correlation_trend(self, corr_values: List[float], |
| pair_name: str, window: int) -> Dict: |
| """Analyze trend in correlation values""" |
| if len(corr_values) < 2: |
| return {'trend': 'insufficient_data', 'direction': 'unknown'} |
| |
| |
| x = np.arange(len(corr_values)) |
| y = np.array(corr_values) |
| |
| slope, intercept, r_value, p_value, std_err = stats.linregress(x, y) |
| |
| |
| if abs(slope) < 0.001: |
| trend_direction = 'stable' |
| elif slope > 0: |
| trend_direction = 'increasing_alignment' |
| else: |
| trend_direction = 'decreasing_alignment' |
| |
| |
| if abs(r_value) > 0.7: |
| trend_strength = 'strong' |
| elif abs(r_value) > 0.4: |
| trend_strength = 'moderate' |
| else: |
| trend_strength = 'weak' |
| |
| return { |
| 'trend': trend_direction, |
| 'strength': trend_strength, |
| 'slope': slope, |
| 'r_squared': r_value**2, |
| 'p_value': p_value, |
| 'mean_correlation': np.mean(corr_values), |
| 'correlation_volatility': np.std(corr_values) |
| } |
| |
| def _identify_divergence_periods(self, pair_data: pd.DataFrame, |
| rolling_corrs: Dict, pair_name: str) -> Dict: |
| """Identify periods of significant divergence""" |
| divergence_periods = [] |
| |
| for window_name, corr_values in rolling_corrs.items(): |
| if len(corr_values) < 4: |
| continue |
| |
| |
| corr_series = pd.Series(corr_values) |
| divergence_mask = corr_series < 0.1 |
| |
| if divergence_mask.any(): |
| divergence_periods.append({ |
| 'window': window_name, |
| 'divergence_count': divergence_mask.sum(), |
| 'divergence_percentage': (divergence_mask.sum() / len(corr_series)) * 100, |
| 'min_correlation': corr_series.min(), |
| 'max_correlation': corr_series.max() |
| }) |
| |
| return divergence_periods |
| |
| def _analyze_extreme_events(self, series: pd.Series, z_scores: pd.Series, |
| deviations: pd.Series, indicator: str) -> Dict: |
| """Analyze extreme events for an indicator""" |
| if deviations.empty: |
| return {'count': 0, 'events': []} |
| |
| events = [] |
| for date, z_score in deviations.items(): |
| events.append({ |
| 'date': date, |
| 'z_score': z_score, |
| 'growth_rate': series.loc[date], |
| 'severity': 'extreme' if abs(z_score) > 3.0 else 'moderate' |
| }) |
| |
| |
| events.sort(key=lambda x: abs(x['z_score']), reverse=True) |
| |
| return { |
| 'count': len(events), |
| 'events': events[:10], |
| 'max_z_score': max(abs(d['z_score']) for d in events), |
| 'mean_z_score': np.mean([abs(d['z_score']) for d in events]) |
| } |
| |
| def _generate_alignment_summary(self, trend_analysis: Dict) -> Dict: |
| """Generate summary of alignment trends""" |
| summary = { |
| 'increasing_alignment': [], |
| 'decreasing_alignment': [], |
| 'stable_alignment': [], |
| 'strong_trends': [], |
| 'moderate_trends': [], |
| 'weak_trends': [] |
| } |
| |
| for pair_name, trends in trend_analysis.items(): |
| for window_name, trend_info in trends.items(): |
| trend = trend_info['trend'] |
| strength = trend_info['strength'] |
| |
| if trend == 'increasing_alignment': |
| summary['increasing_alignment'].append(pair_name) |
| elif trend == 'decreasing_alignment': |
| summary['decreasing_alignment'].append(pair_name) |
| elif trend == 'stable': |
| summary['stable_alignment'].append(pair_name) |
| |
| if strength == 'strong': |
| summary['strong_trends'].append(f"{pair_name}_{window_name}") |
| elif strength == 'moderate': |
| summary['moderate_trends'].append(f"{pair_name}_{window_name}") |
| else: |
| summary['weak_trends'].append(f"{pair_name}_{window_name}") |
| |
| return summary |
| |
| def _generate_deviation_summary(self, deviations: Dict, extreme_events: Dict) -> Dict: |
| """Generate summary of deviation analysis""" |
| summary = { |
| 'total_deviations': 0, |
| 'indicators_with_deviations': [], |
| 'most_volatile_indicators': [], |
| 'extreme_events_count': 0 |
| } |
| |
| for indicator, dev_series in deviations.items(): |
| if not dev_series.empty: |
| summary['total_deviations'] += len(dev_series) |
| summary['indicators_with_deviations'].append(indicator) |
| |
| |
| growth_series = self.data[indicator].pct_change().dropna() |
| volatility = growth_series.std() |
| |
| summary['most_volatile_indicators'].append({ |
| 'indicator': indicator, |
| 'volatility': volatility, |
| 'deviation_count': len(dev_series) |
| }) |
| |
| |
| summary['most_volatile_indicators'].sort( |
| key=lambda x: x['volatility'], reverse=True |
| ) |
| |
| |
| for indicator, events in extreme_events.items(): |
| summary['extreme_events_count'] += events['count'] |
| |
| return summary |
| |
| def plot_alignment_analysis(self, save_path: Optional[str] = None) -> None: |
| """Plot alignment analysis results""" |
| if 'alignment' not in self.results: |
| logger.warning("No alignment analysis results to plot") |
| return |
| |
| alignment_results = self.results['alignment'] |
| |
| |
| fig, axes = plt.subplots(2, 2, figsize=(15, 12)) |
| fig.suptitle('Economic Indicators Alignment Analysis', fontsize=16) |
| |
| |
| if alignment_results['rolling_correlations']: |
| |
| latest_correlations = {} |
| for pair_name, windows in alignment_results['rolling_correlations'].items(): |
| if 'window_12' in windows and windows['window_12']: |
| latest_correlations[pair_name] = windows['window_12'][-1] |
| |
| if latest_correlations: |
| |
| indicators = list(set([pair.split('_vs_')[0] for pair in latest_correlations.keys()] + |
| [pair.split('_vs_')[1] for pair in latest_correlations.keys()])) |
| |
| corr_matrix = pd.DataFrame(index=indicators, columns=indicators, dtype=float) |
| for pair, corr in latest_correlations.items(): |
| ind1, ind2 = pair.split('_vs_') |
| corr_matrix.loc[ind1, ind2] = float(corr) |
| corr_matrix.loc[ind2, ind1] = float(corr) |
| |
| |
| np.fill_diagonal(corr_matrix.values, 1.0) |
| |
| |
| corr_matrix = corr_matrix.astype(float) |
| |
| sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', center=0, |
| ax=axes[0,0], cbar_kws={'label': 'Spearman Correlation'}) |
| axes[0,0].set_title('Latest Rolling Correlations (12-period window)') |
| |
| |
| if alignment_results['trend_analysis']: |
| trend_data = [] |
| for pair_name, trends in alignment_results['trend_analysis'].items(): |
| for window_name, trend_info in trends.items(): |
| trend_data.append({ |
| 'Pair': pair_name, |
| 'Window': window_name, |
| 'Trend': trend_info['trend'], |
| 'Strength': trend_info['strength'], |
| 'Slope': trend_info['slope'] |
| }) |
| |
| if trend_data: |
| trend_df = pd.DataFrame(trend_data) |
| trend_counts = trend_df['Trend'].value_counts() |
| |
| axes[0,1].pie(trend_counts.values, labels=trend_counts.index, autopct='%1.1f%%') |
| axes[0,1].set_title('Alignment Trend Distribution') |
| |
| |
| if 'deviations' in self.results: |
| deviation_results = self.results['deviations'] |
| if deviation_results['deviation_summary']['most_volatile_indicators']: |
| vol_data = deviation_results['deviation_summary']['most_volatile_indicators'] |
| indicators = [d['indicator'] for d in vol_data[:5]] |
| volatilities = [d['volatility'] for d in vol_data[:5]] |
| |
| axes[1,0].bar(indicators, volatilities) |
| axes[1,0].set_title('Most Volatile Indicators') |
| axes[1,0].set_ylabel('Volatility (Std Dev of Growth Rates)') |
| axes[1,0].tick_params(axis='x', rotation=45) |
| |
| |
| if 'deviations' in self.results: |
| deviation_results = self.results['deviations'] |
| if deviation_results['z_scores']: |
| |
| indicators_to_plot = list(deviation_results['z_scores'].keys())[:3] |
| |
| for indicator in indicators_to_plot: |
| z_scores = deviation_results['z_scores'][indicator] |
| axes[1,1].plot(z_scores.index, z_scores.values, label=indicator, alpha=0.7) |
| |
| axes[1,1].axhline(y=2, color='red', linestyle='--', alpha=0.5, label='Threshold') |
| axes[1,1].axhline(y=-2, color='red', linestyle='--', alpha=0.5) |
| axes[1,1].set_title('Z-Score Timeline') |
| axes[1,1].set_ylabel('Z-Score') |
| axes[1,1].legend() |
| axes[1,1].grid(True, alpha=0.3) |
| |
| plt.tight_layout() |
| |
| if save_path: |
| plt.savefig(save_path, dpi=300, bbox_inches='tight') |
| |
| plt.show() |
| |
| def generate_insights_report(self) -> str: |
| """Generate a comprehensive insights report""" |
| if not self.results: |
| return "No analysis results available. Please run alignment and deviation analysis first." |
| |
| report = [] |
| report.append("=" * 80) |
| report.append("ECONOMIC INDICATORS ALIGNMENT & DEVIATION ANALYSIS REPORT") |
| report.append("=" * 80) |
| report.append("") |
| |
| |
| if 'alignment' in self.results: |
| alignment_results = self.results['alignment'] |
| summary = alignment_results['alignment_summary'] |
| |
| report.append("📊 LONG-TERM ALIGNMENT ANALYSIS") |
| report.append("-" * 40) |
| |
| report.append(f"• Increasing Alignment Pairs: {len(summary['increasing_alignment'])}") |
| report.append(f"• Decreasing Alignment Pairs: {len(summary['decreasing_alignment'])}") |
| report.append(f"• Stable Alignment Pairs: {len(summary['stable_alignment'])}") |
| report.append(f"• Strong Trends: {len(summary['strong_trends'])}") |
| report.append("") |
| |
| if summary['increasing_alignment']: |
| report.append("🔺 Pairs with Increasing Alignment:") |
| for pair in summary['increasing_alignment'][:5]: |
| report.append(f" - {pair}") |
| report.append("") |
| |
| if summary['decreasing_alignment']: |
| report.append("🔻 Pairs with Decreasing Alignment:") |
| for pair in summary['decreasing_alignment'][:5]: |
| report.append(f" - {pair}") |
| report.append("") |
| |
| |
| if 'deviations' in self.results: |
| deviation_results = self.results['deviations'] |
| summary = deviation_results['deviation_summary'] |
| |
| report.append("⚠️ SUDDEN DEVIATION ANALYSIS") |
| report.append("-" * 35) |
| |
| report.append(f"• Total Deviations Detected: {summary['total_deviations']}") |
| report.append(f"• Indicators with Deviations: {len(summary['indicators_with_deviations'])}") |
| report.append(f"• Extreme Events: {summary['extreme_events_count']}") |
| report.append("") |
| |
| if summary['most_volatile_indicators']: |
| report.append("📈 Most Volatile Indicators:") |
| for item in summary['most_volatile_indicators'][:5]: |
| report.append(f" - {item['indicator']}: {item['volatility']:.4f} volatility") |
| report.append("") |
| |
| |
| extreme_events = deviation_results['extreme_events'] |
| if extreme_events: |
| report.append("🚨 Recent Extreme Events:") |
| for indicator, events in extreme_events.items(): |
| if events['events']: |
| latest_event = events['events'][0] |
| report.append(f" - {indicator}: {latest_event['date'].strftime('%Y-%m-%d')} " |
| f"(Z-score: {latest_event['z_score']:.2f})") |
| report.append("") |
| |
| report.append("=" * 80) |
| report.append("Analysis completed successfully.") |
| |
| return "\n".join(report) |