Back to FinTopic

FinTopic API Developer Guide

Build intelligent financial event detection systems with real-time sentiment analysis and topic clustering

Quick Start

FinTopic provides a powerful REST API for accessing real-time financial news (updated within minutes), sentiment analysis, and topic clustering. This guide will help you build event detection systems and trading bots.

Get Your API Key

Sign up on RapidAPI to get instant access. Free tier includes 100 requests/month.

Subscribe on RapidAPI

Installation

# Python
pip install requests pandas numpy scipy

# Or with conda
conda install requests pandas numpy scipy

Authentication

All requests must include your RapidAPI key in the headers. Here's how to set it up:

import requests

# Your RapidAPI credentials
API_KEY = "your-rapidapi-key-here"
BASE_URL = "https://finance-pulse.p.rapidapi.com"

headers = {
    "X-RapidAPI-Key": API_KEY,
    "X-RapidAPI-Host": "finance-pulse.p.rapidapi.com"
}

# Test the connection
response = requests.get(f"{BASE_URL}/symbols", headers=headers)
print(response.json())

Rate Limits

Basic: 100 req/month | Pro: 10,000 req/month | Ultra: 50,000 req/month. Requests are rate-limited per minute to ensure fair usage. Data is real-time with updates typically within minutes of news publication.

API Endpoints

Get Financial Statements

GET /statements

Retrieve curated financial statements with AI-powered sentiment scoring and importance ranking. Data is updated in real-time, often within minutes of publication. Returns up to 50 statements per page.

Query Parameters

Parameter Type Description Example
symbol string Filter by stock ticker AAPL, TSLA
industry_group string Filter by sector InformationTechnology
source_type string news, reddit, speculative news
topic_id UUID Get statements from specific topic 456e7890-...
page integer Page number (1-100000) 1

Response Fields

Field Type Description
id UUID Unique statement identifier
statement string The financial statement text (max 2000 chars)
sentiment string bullish | bearish | neutral
importance string low | medium | high | ultra
symbols array Related stock tickers
topic_id UUID | null Associated topic cluster
created_at ISO 8601 Statement timestamp

Get Topic Clusters

GET /topics

Retrieve AI-clustered financial topics representing trending narratives in real-time. Topics are updated continuously as new statements arrive. Perfect for understanding market themes and building topic-based alerts.

Query Parameters

Same as /statements endpoint (symbol, industry_group, source_type, page)

Response Fields

Field Type Description
cluster_id UUID Unique topic identifier
name string AI-generated topic name
member_count integer Number of statements in cluster
symbols array Most mentioned tickers
created_at ISO 8601 Topic creation time
updated_at ISO 8601 Last update time

Get Tracked Symbols

GET /symbols

Returns complete list of stock tickers currently tracked by the system.

Building Event Detection Systems

Learn how to detect significant market events using statistical analysis and the FinTopic API.

1. Symbol-Based Event Detection

Monitor specific stocks for unusual activity or sentiment changes.

import requests
import pandas as pd
from datetime import datetime, timedelta
from collections import Counter

class SymbolEventDetector:
    def __init__(self, api_key):
        self.base_url = "https://finance-pulse.p.rapidapi.com"
        self.headers = {
            "X-RapidAPI-Key": api_key,
            "X-RapidAPI-Host": "finance-pulse.p.rapidapi.com"
        }

    def get_symbol_statements(self, symbol, max_pages=5):
        """Fetch all recent statements for a symbol"""
        all_statements = []

        for page in range(1, max_pages + 1):
            response = requests.get(
                f"{self.base_url}/statements",
                headers=self.headers,
                params={"symbol": symbol, "page": page}
            )

            if response.status_code == 200:
                data = response.json()
                if not data:
                    break
                all_statements.extend(data)
            else:
                print(f"Error: {response.status_code}")
                break

        return pd.DataFrame(all_statements)

    def detect_sentiment_shift(self, df, window_hours=24):
        """Detect significant sentiment changes"""
        if len(df) < 10:
            return {"event_detected": False, "reason": "Insufficient data"}

        # Convert timestamps (ensure timezone-aware)
        df['created_at'] = pd.to_datetime(df['created_at'], utc=True)
        df = df.sort_values('created_at')

        # Calculate sentiment scores
        sentiment_map = {'bullish': 1, 'neutral': 0, 'bearish': -1}
        df['sentiment_score'] = df['sentiment'].map(sentiment_map)

        # Split into recent vs historical (use UTC for comparison)
        cutoff = pd.Timestamp.now(tz='UTC') - pd.Timedelta(hours=window_hours)
        recent = df[df['created_at'] >= cutoff]
        historical = df[df['created_at'] < cutoff]

        if len(recent) < 5 or len(historical) < 5:
            return {"event_detected": False, "reason": "Insufficient data in window"}

        recent_sentiment = recent['sentiment_score'].mean()
        historical_sentiment = historical['sentiment_score'].mean()

        sentiment_change = recent_sentiment - historical_sentiment

        # Detect significant shift (threshold: 0.5)
        if abs(sentiment_change) > 0.5:
            direction = "bullish" if sentiment_change > 0 else "bearish"
            return {
                "event_detected": True,
                "direction": direction,
                "magnitude": abs(sentiment_change),
                "recent_avg": recent_sentiment,
                "historical_avg": historical_sentiment,
                "recent_count": len(recent)
            }

        return {"event_detected": False, "reason": "No significant shift"}

    def detect_volume_spike(self, df, window_hours=24, threshold=2.0):
        """Detect unusual statement volume (potential breaking news)"""
        df['created_at'] = pd.to_datetime(df['created_at'], utc=True)

        # Count statements per hour
        df['hour'] = df['created_at'].dt.floor('H')
        hourly_counts = df.groupby('hour').size()

        if len(hourly_counts) < 24:
            return {"event_detected": False, "reason": "Insufficient time range"}

        # Compare recent volume to baseline
        recent_volume = hourly_counts.tail(window_hours).mean()
        baseline_volume = hourly_counts.head(-window_hours).mean()

        if baseline_volume == 0:
            return {"event_detected": False, "reason": "No baseline data"}

        volume_ratio = recent_volume / baseline_volume

        if volume_ratio > threshold:
            return {
                "event_detected": True,
                "volume_ratio": volume_ratio,
                "recent_volume": recent_volume,
                "baseline_volume": baseline_volume
            }

        return {"event_detected": False, "reason": "No volume spike"}

# Example usage
detector = SymbolEventDetector("your-api-key")
statements = detector.get_symbol_statements("NVDA")

sentiment_event = detector.detect_sentiment_shift(statements)
volume_event = detector.detect_volume_spike(statements)

if sentiment_event["event_detected"]:
    print(f"🚨 Sentiment shift detected: {sentiment_event}")

if volume_event["event_detected"]:
    print(f"📈 Volume spike detected: {volume_event}")

2. Topic-Based Event Detection

Monitor topic clusters for emerging trends and rapid growth.

import numpy as np
from scipy import stats

class TopicEventDetector:
    def __init__(self, api_key):
        self.base_url = "https://finance-pulse.p.rapidapi.com"
        self.headers = {
            "X-RapidAPI-Key": api_key,
            "X-RapidAPI-Host": "finance-pulse.p.rapidapi.com"
        }

    def get_topics(self, **filters):
        """Fetch topics with optional filters"""
        response = requests.get(
            f"{self.base_url}/topics",
            headers=self.headers,
            params=filters
        )

        if response.status_code == 200:
            return pd.DataFrame(response.json())
        return pd.DataFrame()

    def get_topic_statements(self, topic_id, max_pages=10):
        """Get all statements for a specific topic"""
        all_statements = []

        for page in range(1, max_pages + 1):
            response = requests.get(
                f"{self.base_url}/statements",
                headers=self.headers,
                params={"topic_id": topic_id, "page": page}
            )

            if response.status_code == 200:
                data = response.json()
                if not data:
                    break
                all_statements.extend(data)
            else:
                break

        return pd.DataFrame(all_statements)

    def detect_rapid_growth(self, topic_id):
        """Detect if a topic is growing rapidly"""
        statements = self.get_topic_statements(topic_id)

        if len(statements) < 10:
            return {"event_detected": False, "reason": "Insufficient data"}

        statements['created_at'] = pd.to_datetime(statements['created_at'], utc=True)
        statements = statements.sort_values('created_at')

        # Calculate time between statements (in hours)
        statements['time_delta'] = statements['created_at'].diff().dt.total_seconds() / 3600

        # Get recent velocity vs overall velocity
        recent_deltas = statements['time_delta'].tail(10).dropna()
        overall_deltas = statements['time_delta'].dropna()

        if len(recent_deltas) < 5:
            return {"event_detected": False, "reason": "Not enough recent data"}

        recent_velocity = 1 / recent_deltas.mean()  # statements per hour
        overall_velocity = 1 / overall_deltas.mean()

        # Acceleration ratio
        acceleration = recent_velocity / overall_velocity

        if acceleration > 2.0:  # 2x faster than average
            return {
                "event_detected": True,
                "acceleration": acceleration,
                "recent_velocity": recent_velocity,
                "overall_velocity": overall_velocity,
                "total_statements": len(statements)
            }

        return {"event_detected": False, "reason": "No rapid growth detected"}

    def detect_sentiment_divergence(self, topic_id):
        """Detect conflicting sentiments within a topic (controversy indicator)"""
        statements = self.get_topic_statements(topic_id)

        if len(statements) < 15:
            return {"event_detected": False, "reason": "Insufficient data"}

        # Count sentiment distribution
        sentiment_counts = statements['sentiment'].value_counts()
        total = len(statements)

        bullish_pct = sentiment_counts.get('bullish', 0) / total
        bearish_pct = sentiment_counts.get('bearish', 0) / total

        # High divergence = both bullish and bearish > 30%
        if bullish_pct > 0.3 and bearish_pct > 0.3:
            return {
                "event_detected": True,
                "bullish_pct": bullish_pct,
                "bearish_pct": bearish_pct,
                "divergence_score": min(bullish_pct, bearish_pct),
                "interpretation": "High controversy/uncertainty"
            }

        return {"event_detected": False, "reason": "No divergence"}

# Example usage
detector = TopicEventDetector("your-api-key")

# Monitor all topics
topics = detector.get_topics()

for _, topic in topics.iterrows():
    topic_id = topic['cluster_id']
    topic_name = topic['name']

    # Check for rapid growth
    growth_event = detector.detect_rapid_growth(topic_id)
    if growth_event["event_detected"]:
        print(f"🚀 Topic '{topic_name}' is accelerating: {growth_event}")

    # Check for sentiment divergence
    divergence = detector.detect_sentiment_divergence(topic_id)
    if divergence["event_detected"]:
        print(f"⚠️ High controversy in '{topic_name}': {divergence}")

3. Cross-Symbol Correlation Analysis

Detect when multiple symbols start moving together in news sentiment.

from scipy.stats import pearsonr
from itertools import combinations

class CorrelationDetector:
    def __init__(self, api_key):
        self.base_url = "https://finance-pulse.p.rapidapi.com"
        self.headers = {
            "X-RapidAPI-Key": api_key,
            "X-RapidAPI-Host": "finance-pulse.p.rapidapi.com"
        }

    def build_sentiment_timeline(self, symbol, hours=72):
        """Create hourly sentiment scores for a symbol"""
        response = requests.get(
            f"{self.base_url}/statements",
            headers=self.headers,
            params={"symbol": symbol}
        )

        if response.status_code != 200:
            return None

        df = pd.DataFrame(response.json())
        if len(df) == 0:
            return None

        # Process timestamps and sentiment
        df['created_at'] = pd.to_datetime(df['created_at'], utc=True)
        sentiment_map = {'bullish': 1, 'neutral': 0, 'bearish': -1}
        df['score'] = df['sentiment'].map(sentiment_map)

        # Create hourly timeline
        df['hour'] = df['created_at'].dt.floor('H')
        timeline = df.groupby('hour')['score'].mean()

        return timeline

    def detect_correlation_changes(self, symbols):
        """Find symbols with newly emerging correlation"""
        timelines = {}

        # Build timelines for each symbol
        for symbol in symbols:
            timeline = self.build_sentiment_timeline(symbol)
            if timeline is not None and len(timeline) > 24:
                timelines[symbol] = timeline

        if len(timelines) < 2:
            return []

        # Find common time range
        common_hours = None
        for timeline in timelines.values():
            if common_hours is None:
                common_hours = set(timeline.index)
            else:
                common_hours = common_hours.intersection(timeline.index)

        common_hours = sorted(list(common_hours))

        if len(common_hours) < 24:
            return []

        # Calculate correlations
        correlations = []
        for sym1, sym2 in combinations(timelines.keys(), 2):
            series1 = timelines[sym1].loc[common_hours]
            series2 = timelines[sym2].loc[common_hours]

            # Recent correlation (last 24 hours)
            recent_hours = common_hours[-24:]
            recent_corr, recent_p = pearsonr(
                series1.loc[recent_hours],
                series2.loc[recent_hours]
            )

            # Historical correlation (older data)
            hist_hours = common_hours[:-24]
            if len(hist_hours) >= 24:
                hist_corr, hist_p = pearsonr(
                    series1.loc[hist_hours],
                    series2.loc[hist_hours]
                )

                # Significant correlation change
                if abs(recent_corr - hist_corr) > 0.4 and recent_p < 0.05:
                    correlations.append({
                        "symbol1": sym1,
                        "symbol2": sym2,
                        "recent_correlation": recent_corr,
                        "historical_correlation": hist_corr,
                        "change": recent_corr - hist_corr,
                        "p_value": recent_p
                    })

        return sorted(correlations, key=lambda x: abs(x['change']), reverse=True)

# Example: Monitor tech stocks
detector = CorrelationDetector("your-api-key")
symbols = ["AAPL", "MSFT", "GOOGL", "NVDA", "AMD"]

changes = detector.detect_correlation_changes(symbols)

for change in changes:
    print(f"🔗 {change['symbol1']} and {change['symbol2']} "
          f"correlation changed by {change['change']:.2f}")

Best Practices

Rate Limiting & Caching

Implement caching to avoid hitting rate limits:

import time
import json
from pathlib import Path

class CachedAPIClient:
    def __init__(self, api_key, cache_dir="./cache", cache_ttl=300):
        self.api_key = api_key
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(exist_ok=True)
        self.cache_ttl = cache_ttl  # seconds
        self.base_url = "https://finance-pulse.p.rapidapi.com"
        self.headers = {
            "X-RapidAPI-Key": api_key,
            "X-RapidAPI-Host": "finance-pulse.p.rapidapi.com"
        }

    def _get_cache_path(self, endpoint, params):
        cache_key = f"{endpoint}_{hash(str(sorted(params.items())))}"
        return self.cache_dir / f"{cache_key}.json"

    def get(self, endpoint, params=None):
        params = params or {}
        cache_path = self._get_cache_path(endpoint, params)

        # Check cache
        if cache_path.exists():
            cache_age = time.time() - cache_path.stat().st_mtime
            if cache_age < self.cache_ttl:
                with open(cache_path) as f:
                    return json.load(f)

        # Fetch fresh data
        response = requests.get(
            f"{self.base_url}/{endpoint}",
            headers=self.headers,
            params=params
        )

        if response.status_code == 200:
            data = response.json()
            with open(cache_path, 'w') as f:
                json.dump(data, f)
            return data

        raise Exception(f"API Error: {response.status_code}")

# Usage
client = CachedAPIClient("your-api-key", cache_ttl=300)  # 5 min cache
data = client.get("statements", {"symbol": "AAPL"})

Error Handling

import logging
from requests.exceptions import RequestException

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def safe_api_call(func, max_retries=3, backoff=2):
    """Wrapper for API calls with retry logic"""
    for attempt in range(max_retries):
        try:
            return func()
        except RequestException as e:
            if attempt == max_retries - 1:
                logger.error(f"Failed after {max_retries} attempts: {e}")
                raise

            wait_time = backoff ** attempt
            logger.warning(f"Request failed, retrying in {wait_time}s...")
            time.sleep(wait_time)

Important Considerations

Market Hours: News volume is typically higher during market hours (9:30 AM - 4:00 PM ET)

Data Freshness: Data is updated in real-time, typically within minutes of news publication. Statements appear as soon as they're processed by our AI pipeline.

Statistical Significance: Always check for sufficient sample sizes before drawing conclusions

Complete Example: Earnings Alert System

Here's a complete system that monitors for earnings-related events and sends alerts:

import requests
import pandas as pd
from datetime import datetime, timedelta

class EarningsAlertSystem:
    def __init__(self, api_key, symbols_to_watch):
        self.api_key = api_key
        self.symbols = symbols_to_watch
        self.base_url = "https://finance-pulse.p.rapidapi.com"
        self.headers = {
            "X-RapidAPI-Key": api_key,
            "X-RapidAPI-Host": "finance-pulse.p.rapidapi.com"
        }

    def scan_for_earnings(self):
        """Scan all watched symbols for earnings-related activity"""
        alerts = []

        for symbol in self.symbols:
            print(f"Scanning {symbol}...")

            # Get recent statements
            response = requests.get(
                f"{self.base_url}/statements",
                headers=self.headers,
                params={"symbol": symbol, "page": 1}
            )

            if response.status_code != 200:
                continue

            statements = pd.DataFrame(response.json())
            if len(statements) == 0:
                continue

            # Check for earnings keywords
            earnings_keywords = [
                'earnings', 'eps', 'revenue', 'guidance',
                'quarter', 'q1', 'q2', 'q3', 'q4'
            ]

            statements['is_earnings'] = statements['statement'].str.lower().apply(
                lambda x: any(kw in x for kw in earnings_keywords)
            )

            earnings_statements = statements[statements['is_earnings']]

            if len(earnings_statements) > 0:
                # Check recency (last 6 hours)
                earnings_statements['created_at'] = pd.to_datetime(
                    earnings_statements['created_at'], utc=True
                )
                recent_cutoff = pd.Timestamp.now(tz='UTC') - pd.Timedelta(hours=6)
                recent_earnings = earnings_statements[
                    earnings_statements['created_at'] >= recent_cutoff
                ]

                if len(recent_earnings) > 0:
                    # Calculate sentiment
                    sentiment_counts = recent_earnings['sentiment'].value_counts()
                    dominant_sentiment = sentiment_counts.idxmax()

                    alerts.append({
                        "symbol": symbol,
                        "event_type": "earnings",
                        "statement_count": len(recent_earnings),
                        "dominant_sentiment": dominant_sentiment,
                        "latest_statement": recent_earnings.iloc[0]['statement'],
                        "importance": recent_earnings.iloc[0]['importance'],
                        "timestamp": recent_earnings.iloc[0]['created_at']
                    })

        return alerts

    def send_alert(self, alert):
        """Format and display alert (customize for email/Slack/etc)"""
        print("=" * 60)
        print(f"🔔 EARNINGS ALERT: {alert['symbol']}")
        print(f"Sentiment: {alert['dominant_sentiment'].upper()}")
        print(f"Importance: {alert['importance']}")
        print(f"Statements: {alert['statement_count']}")
        print(f"Latest: {alert['latest_statement'][:100]}...")
        print(f"Time: {alert['timestamp']}")
        print("=" * 60)

    def run(self):
        """Run the alert system"""
        alerts = self.scan_for_earnings()

        if len(alerts) == 0:
            print("No earnings activity detected.")
        else:
            for alert in alerts:
                self.send_alert(alert)

# Usage
watchlist = ["AAPL", "MSFT", "GOOGL", "AMZN", "NVDA", "TSLA"]
system = EarningsAlertSystem("your-api-key", watchlist)
system.run()

Next Steps

Integrate this with a scheduler (e.g., cron, APScheduler) to run every hour, and connect to notification services like Slack, Discord, or email for real-time alerts.

Ready to Build?

Get your API key and start detecting market events in minutes

Start Building with FinTopic API