Build intelligent financial event detection systems with real-time sentiment analysis and topic clustering
FinTopic provides a powerful REST API for accessing real-time financial news (updated within minutes), sentiment analysis, and topic clustering. This guide will help you build event detection systems and trading bots.
Sign up on RapidAPI to get instant access. Free tier includes 100 requests/month.
Subscribe on RapidAPI# Python pip install requests pandas numpy scipy # Or with conda conda install requests pandas numpy scipy
All requests must include your RapidAPI key in the headers. Here's how to set it up:
import requests # Your RapidAPI credentials API_KEY = "your-rapidapi-key-here" BASE_URL = "https://finance-pulse.p.rapidapi.com" headers = { "X-RapidAPI-Key": API_KEY, "X-RapidAPI-Host": "finance-pulse.p.rapidapi.com" } # Test the connection response = requests.get(f"{BASE_URL}/symbols", headers=headers) print(response.json())
Basic: 100 req/month | Pro: 10,000 req/month | Ultra: 50,000 req/month. Requests are rate-limited per minute to ensure fair usage. Data is real-time with updates typically within minutes of news publication.
Retrieve curated financial statements with AI-powered sentiment scoring and importance ranking. Data is updated in real-time, often within minutes of publication. Returns up to 50 statements per page.
Parameter | Type | Description | Example |
---|---|---|---|
symbol |
string | Filter by stock ticker | AAPL , TSLA |
industry_group |
string | Filter by sector | InformationTechnology |
source_type |
string | news, reddit, speculative | news |
topic_id |
UUID | Get statements from specific topic | 456e7890-... |
page |
integer | Page number (1-100000) | 1 |
Field | Type | Description |
---|---|---|
id |
UUID | Unique statement identifier |
statement |
string | The financial statement text (max 2000 chars) |
sentiment |
string | bullish | bearish | neutral |
importance |
string | low | medium | high | ultra |
symbols |
array | Related stock tickers |
topic_id |
UUID | null | Associated topic cluster |
created_at |
ISO 8601 | Statement timestamp |
Retrieve AI-clustered financial topics representing trending narratives in real-time. Topics are updated continuously as new statements arrive. Perfect for understanding market themes and building topic-based alerts.
Same as /statements
endpoint (symbol, industry_group, source_type, page)
Field | Type | Description |
---|---|---|
cluster_id |
UUID | Unique topic identifier |
name |
string | AI-generated topic name |
member_count |
integer | Number of statements in cluster |
symbols |
array | Most mentioned tickers |
created_at |
ISO 8601 | Topic creation time |
updated_at |
ISO 8601 | Last update time |
Returns complete list of stock tickers currently tracked by the system.
Learn how to detect significant market events using statistical analysis and the FinTopic API.
Monitor specific stocks for unusual activity or sentiment changes.
import requests import pandas as pd from datetime import datetime, timedelta from collections import Counter class SymbolEventDetector: def __init__(self, api_key): self.base_url = "https://finance-pulse.p.rapidapi.com" self.headers = { "X-RapidAPI-Key": api_key, "X-RapidAPI-Host": "finance-pulse.p.rapidapi.com" } def get_symbol_statements(self, symbol, max_pages=5): """Fetch all recent statements for a symbol""" all_statements = [] for page in range(1, max_pages + 1): response = requests.get( f"{self.base_url}/statements", headers=self.headers, params={"symbol": symbol, "page": page} ) if response.status_code == 200: data = response.json() if not data: break all_statements.extend(data) else: print(f"Error: {response.status_code}") break return pd.DataFrame(all_statements) def detect_sentiment_shift(self, df, window_hours=24): """Detect significant sentiment changes""" if len(df) < 10: return {"event_detected": False, "reason": "Insufficient data"} # Convert timestamps (ensure timezone-aware) df['created_at'] = pd.to_datetime(df['created_at'], utc=True) df = df.sort_values('created_at') # Calculate sentiment scores sentiment_map = {'bullish': 1, 'neutral': 0, 'bearish': -1} df['sentiment_score'] = df['sentiment'].map(sentiment_map) # Split into recent vs historical (use UTC for comparison) cutoff = pd.Timestamp.now(tz='UTC') - pd.Timedelta(hours=window_hours) recent = df[df['created_at'] >= cutoff] historical = df[df['created_at'] < cutoff] if len(recent) < 5 or len(historical) < 5: return {"event_detected": False, "reason": "Insufficient data in window"} recent_sentiment = recent['sentiment_score'].mean() historical_sentiment = historical['sentiment_score'].mean() sentiment_change = recent_sentiment - historical_sentiment # Detect significant shift (threshold: 0.5) if abs(sentiment_change) > 0.5: direction = "bullish" if sentiment_change > 0 else "bearish" return { "event_detected": True, "direction": direction, "magnitude": abs(sentiment_change), "recent_avg": recent_sentiment, "historical_avg": historical_sentiment, "recent_count": len(recent) } return {"event_detected": False, "reason": "No significant shift"} def detect_volume_spike(self, df, window_hours=24, threshold=2.0): """Detect unusual statement volume (potential breaking news)""" df['created_at'] = pd.to_datetime(df['created_at'], utc=True) # Count statements per hour df['hour'] = df['created_at'].dt.floor('H') hourly_counts = df.groupby('hour').size() if len(hourly_counts) < 24: return {"event_detected": False, "reason": "Insufficient time range"} # Compare recent volume to baseline recent_volume = hourly_counts.tail(window_hours).mean() baseline_volume = hourly_counts.head(-window_hours).mean() if baseline_volume == 0: return {"event_detected": False, "reason": "No baseline data"} volume_ratio = recent_volume / baseline_volume if volume_ratio > threshold: return { "event_detected": True, "volume_ratio": volume_ratio, "recent_volume": recent_volume, "baseline_volume": baseline_volume } return {"event_detected": False, "reason": "No volume spike"} # Example usage detector = SymbolEventDetector("your-api-key") statements = detector.get_symbol_statements("NVDA") sentiment_event = detector.detect_sentiment_shift(statements) volume_event = detector.detect_volume_spike(statements) if sentiment_event["event_detected"]: print(f"🚨 Sentiment shift detected: {sentiment_event}") if volume_event["event_detected"]: print(f"📈 Volume spike detected: {volume_event}")
Monitor topic clusters for emerging trends and rapid growth.
import numpy as np from scipy import stats class TopicEventDetector: def __init__(self, api_key): self.base_url = "https://finance-pulse.p.rapidapi.com" self.headers = { "X-RapidAPI-Key": api_key, "X-RapidAPI-Host": "finance-pulse.p.rapidapi.com" } def get_topics(self, **filters): """Fetch topics with optional filters""" response = requests.get( f"{self.base_url}/topics", headers=self.headers, params=filters ) if response.status_code == 200: return pd.DataFrame(response.json()) return pd.DataFrame() def get_topic_statements(self, topic_id, max_pages=10): """Get all statements for a specific topic""" all_statements = [] for page in range(1, max_pages + 1): response = requests.get( f"{self.base_url}/statements", headers=self.headers, params={"topic_id": topic_id, "page": page} ) if response.status_code == 200: data = response.json() if not data: break all_statements.extend(data) else: break return pd.DataFrame(all_statements) def detect_rapid_growth(self, topic_id): """Detect if a topic is growing rapidly""" statements = self.get_topic_statements(topic_id) if len(statements) < 10: return {"event_detected": False, "reason": "Insufficient data"} statements['created_at'] = pd.to_datetime(statements['created_at'], utc=True) statements = statements.sort_values('created_at') # Calculate time between statements (in hours) statements['time_delta'] = statements['created_at'].diff().dt.total_seconds() / 3600 # Get recent velocity vs overall velocity recent_deltas = statements['time_delta'].tail(10).dropna() overall_deltas = statements['time_delta'].dropna() if len(recent_deltas) < 5: return {"event_detected": False, "reason": "Not enough recent data"} recent_velocity = 1 / recent_deltas.mean() # statements per hour overall_velocity = 1 / overall_deltas.mean() # Acceleration ratio acceleration = recent_velocity / overall_velocity if acceleration > 2.0: # 2x faster than average return { "event_detected": True, "acceleration": acceleration, "recent_velocity": recent_velocity, "overall_velocity": overall_velocity, "total_statements": len(statements) } return {"event_detected": False, "reason": "No rapid growth detected"} def detect_sentiment_divergence(self, topic_id): """Detect conflicting sentiments within a topic (controversy indicator)""" statements = self.get_topic_statements(topic_id) if len(statements) < 15: return {"event_detected": False, "reason": "Insufficient data"} # Count sentiment distribution sentiment_counts = statements['sentiment'].value_counts() total = len(statements) bullish_pct = sentiment_counts.get('bullish', 0) / total bearish_pct = sentiment_counts.get('bearish', 0) / total # High divergence = both bullish and bearish > 30% if bullish_pct > 0.3 and bearish_pct > 0.3: return { "event_detected": True, "bullish_pct": bullish_pct, "bearish_pct": bearish_pct, "divergence_score": min(bullish_pct, bearish_pct), "interpretation": "High controversy/uncertainty" } return {"event_detected": False, "reason": "No divergence"} # Example usage detector = TopicEventDetector("your-api-key") # Monitor all topics topics = detector.get_topics() for _, topic in topics.iterrows(): topic_id = topic['cluster_id'] topic_name = topic['name'] # Check for rapid growth growth_event = detector.detect_rapid_growth(topic_id) if growth_event["event_detected"]: print(f"🚀 Topic '{topic_name}' is accelerating: {growth_event}") # Check for sentiment divergence divergence = detector.detect_sentiment_divergence(topic_id) if divergence["event_detected"]: print(f"⚠️ High controversy in '{topic_name}': {divergence}")
Detect when multiple symbols start moving together in news sentiment.
from scipy.stats import pearsonr from itertools import combinations class CorrelationDetector: def __init__(self, api_key): self.base_url = "https://finance-pulse.p.rapidapi.com" self.headers = { "X-RapidAPI-Key": api_key, "X-RapidAPI-Host": "finance-pulse.p.rapidapi.com" } def build_sentiment_timeline(self, symbol, hours=72): """Create hourly sentiment scores for a symbol""" response = requests.get( f"{self.base_url}/statements", headers=self.headers, params={"symbol": symbol} ) if response.status_code != 200: return None df = pd.DataFrame(response.json()) if len(df) == 0: return None # Process timestamps and sentiment df['created_at'] = pd.to_datetime(df['created_at'], utc=True) sentiment_map = {'bullish': 1, 'neutral': 0, 'bearish': -1} df['score'] = df['sentiment'].map(sentiment_map) # Create hourly timeline df['hour'] = df['created_at'].dt.floor('H') timeline = df.groupby('hour')['score'].mean() return timeline def detect_correlation_changes(self, symbols): """Find symbols with newly emerging correlation""" timelines = {} # Build timelines for each symbol for symbol in symbols: timeline = self.build_sentiment_timeline(symbol) if timeline is not None and len(timeline) > 24: timelines[symbol] = timeline if len(timelines) < 2: return [] # Find common time range common_hours = None for timeline in timelines.values(): if common_hours is None: common_hours = set(timeline.index) else: common_hours = common_hours.intersection(timeline.index) common_hours = sorted(list(common_hours)) if len(common_hours) < 24: return [] # Calculate correlations correlations = [] for sym1, sym2 in combinations(timelines.keys(), 2): series1 = timelines[sym1].loc[common_hours] series2 = timelines[sym2].loc[common_hours] # Recent correlation (last 24 hours) recent_hours = common_hours[-24:] recent_corr, recent_p = pearsonr( series1.loc[recent_hours], series2.loc[recent_hours] ) # Historical correlation (older data) hist_hours = common_hours[:-24] if len(hist_hours) >= 24: hist_corr, hist_p = pearsonr( series1.loc[hist_hours], series2.loc[hist_hours] ) # Significant correlation change if abs(recent_corr - hist_corr) > 0.4 and recent_p < 0.05: correlations.append({ "symbol1": sym1, "symbol2": sym2, "recent_correlation": recent_corr, "historical_correlation": hist_corr, "change": recent_corr - hist_corr, "p_value": recent_p }) return sorted(correlations, key=lambda x: abs(x['change']), reverse=True) # Example: Monitor tech stocks detector = CorrelationDetector("your-api-key") symbols = ["AAPL", "MSFT", "GOOGL", "NVDA", "AMD"] changes = detector.detect_correlation_changes(symbols) for change in changes: print(f"🔗 {change['symbol1']} and {change['symbol2']} " f"correlation changed by {change['change']:.2f}")
Implement caching to avoid hitting rate limits:
import time import json from pathlib import Path class CachedAPIClient: def __init__(self, api_key, cache_dir="./cache", cache_ttl=300): self.api_key = api_key self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(exist_ok=True) self.cache_ttl = cache_ttl # seconds self.base_url = "https://finance-pulse.p.rapidapi.com" self.headers = { "X-RapidAPI-Key": api_key, "X-RapidAPI-Host": "finance-pulse.p.rapidapi.com" } def _get_cache_path(self, endpoint, params): cache_key = f"{endpoint}_{hash(str(sorted(params.items())))}" return self.cache_dir / f"{cache_key}.json" def get(self, endpoint, params=None): params = params or {} cache_path = self._get_cache_path(endpoint, params) # Check cache if cache_path.exists(): cache_age = time.time() - cache_path.stat().st_mtime if cache_age < self.cache_ttl: with open(cache_path) as f: return json.load(f) # Fetch fresh data response = requests.get( f"{self.base_url}/{endpoint}", headers=self.headers, params=params ) if response.status_code == 200: data = response.json() with open(cache_path, 'w') as f: json.dump(data, f) return data raise Exception(f"API Error: {response.status_code}") # Usage client = CachedAPIClient("your-api-key", cache_ttl=300) # 5 min cache data = client.get("statements", {"symbol": "AAPL"})
import logging from requests.exceptions import RequestException logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def safe_api_call(func, max_retries=3, backoff=2): """Wrapper for API calls with retry logic""" for attempt in range(max_retries): try: return func() except RequestException as e: if attempt == max_retries - 1: logger.error(f"Failed after {max_retries} attempts: {e}") raise wait_time = backoff ** attempt logger.warning(f"Request failed, retrying in {wait_time}s...") time.sleep(wait_time)
Market Hours: News volume is typically higher during market hours (9:30 AM - 4:00 PM ET)
Data Freshness: Data is updated in real-time, typically within minutes of news publication. Statements appear as soon as they're processed by our AI pipeline.
Statistical Significance: Always check for sufficient sample sizes before drawing conclusions
Here's a complete system that monitors for earnings-related events and sends alerts:
import requests import pandas as pd from datetime import datetime, timedelta class EarningsAlertSystem: def __init__(self, api_key, symbols_to_watch): self.api_key = api_key self.symbols = symbols_to_watch self.base_url = "https://finance-pulse.p.rapidapi.com" self.headers = { "X-RapidAPI-Key": api_key, "X-RapidAPI-Host": "finance-pulse.p.rapidapi.com" } def scan_for_earnings(self): """Scan all watched symbols for earnings-related activity""" alerts = [] for symbol in self.symbols: print(f"Scanning {symbol}...") # Get recent statements response = requests.get( f"{self.base_url}/statements", headers=self.headers, params={"symbol": symbol, "page": 1} ) if response.status_code != 200: continue statements = pd.DataFrame(response.json()) if len(statements) == 0: continue # Check for earnings keywords earnings_keywords = [ 'earnings', 'eps', 'revenue', 'guidance', 'quarter', 'q1', 'q2', 'q3', 'q4' ] statements['is_earnings'] = statements['statement'].str.lower().apply( lambda x: any(kw in x for kw in earnings_keywords) ) earnings_statements = statements[statements['is_earnings']] if len(earnings_statements) > 0: # Check recency (last 6 hours) earnings_statements['created_at'] = pd.to_datetime( earnings_statements['created_at'], utc=True ) recent_cutoff = pd.Timestamp.now(tz='UTC') - pd.Timedelta(hours=6) recent_earnings = earnings_statements[ earnings_statements['created_at'] >= recent_cutoff ] if len(recent_earnings) > 0: # Calculate sentiment sentiment_counts = recent_earnings['sentiment'].value_counts() dominant_sentiment = sentiment_counts.idxmax() alerts.append({ "symbol": symbol, "event_type": "earnings", "statement_count": len(recent_earnings), "dominant_sentiment": dominant_sentiment, "latest_statement": recent_earnings.iloc[0]['statement'], "importance": recent_earnings.iloc[0]['importance'], "timestamp": recent_earnings.iloc[0]['created_at'] }) return alerts def send_alert(self, alert): """Format and display alert (customize for email/Slack/etc)""" print("=" * 60) print(f"🔔 EARNINGS ALERT: {alert['symbol']}") print(f"Sentiment: {alert['dominant_sentiment'].upper()}") print(f"Importance: {alert['importance']}") print(f"Statements: {alert['statement_count']}") print(f"Latest: {alert['latest_statement'][:100]}...") print(f"Time: {alert['timestamp']}") print("=" * 60) def run(self): """Run the alert system""" alerts = self.scan_for_earnings() if len(alerts) == 0: print("No earnings activity detected.") else: for alert in alerts: self.send_alert(alert) # Usage watchlist = ["AAPL", "MSFT", "GOOGL", "AMZN", "NVDA", "TSLA"] system = EarningsAlertSystem("your-api-key", watchlist) system.run()
Integrate this with a scheduler (e.g., cron, APScheduler) to run every hour, and connect to notification services like Slack, Discord, or email for real-time alerts.
Get your API key and start detecting market events in minutes
Start Building with FinTopic API