financial_news_bot / src /api /insiders /insider_trading_aggregator.py
Dmitry Beresnev
fix detailed report for the insider trades
014e90f
import asyncio
from collections import Counter
from datetime import datetime, timedelta
from typing import List, Optional, Dict
import aiohttp
import pandas as pd
from src.api.insiders.insider_trade import InsiderTrade
from src.api.insiders.trading_report import TradingReport
from src.api.insiders.transaction_types import TransactionType
from src.telegram_bot.config import Config
from src.telegram_bot.logger import main_logger as logger
class InsiderTradingAggregator:
"""Async aggregator for insider trading data from multiple APIs"""
def __init__(self, session_timeout: int = 30):
# API configurations
self.apis = {
'fmp': {
'base_url': 'https://financialmodelingprep.com/stable',
'api_key': Config.FMP_API_KEY,
'rate_limit': 250, # Daily limit
'requests_per_minute': 10
},
'sec_api': {
'base_url': 'https://api.sec-api.io',
'api_key': Config.SEC_API_KEY,
'rate_limit': 100,
'requests_per_minute': 5
},
'eod': {
'base_url': 'https://eodhistoricaldata.com/api',
'api_key': None,
'rate_limit': 1000,
'requests_per_minute': 20
},
'tradefeeds': {
'base_url': 'https://api.tradefeeds.com',
'api_key': None,
'rate_limit': 500,
'requests_per_minute': 15
}
}
# Rate limiting tracking
self.request_counts = {api: 0 for api in self.apis.keys()}
self.last_reset = datetime.now()
self.session_timeout = session_timeout
# Semaphores for rate limiting per minute
self.semaphores = {
api: asyncio.Semaphore(config['requests_per_minute'])
for api, config in self.apis.items()
}
def set_api_key(self, api_name: str, api_key: str) -> None:
"""Set API key for a specific service"""
if api_name in self.apis:
self.apis[api_name]['api_key'] = api_key
logger.info(f"API key set for {api_name}")
else:
logger.error(f"Unknown API: {api_name}")
def _check_rate_limit(self, api_name: str) -> bool:
"""Check if we're within daily rate limits"""
if datetime.now() - self.last_reset > timedelta(days=1):
self.request_counts = {api: 0 for api in self.apis.keys()}
self.last_reset = datetime.now()
return self.request_counts[api_name] < self.apis[api_name]['rate_limit']
async def _make_request(self, session: aiohttp.ClientSession, api_name: str,
endpoint: str, params: Dict = None) -> Optional[Dict]:
"""Make async rate-limited request to an API"""
if not self._check_rate_limit(api_name):
logger.warning(f"Daily rate limit reached for {api_name}")
return None
if not self.apis[api_name]['api_key']:
logger.warning(f"No API key set for {api_name}, skipping request.")
return None
# Rate limiting with semaphore
async with self.semaphores[api_name]:
try:
url = f"{self.apis[api_name]['base_url']}/{endpoint}"
if params is None:
params = {}
# Add API key to parameters
if api_name == 'fmp':
params['apikey'] = self.apis[api_name]['api_key']
elif api_name in ['sec_api', 'eod', 'tradefeeds']:
params['token'] = self.apis[api_name]['api_key']
async with session.get(url, params=params) as response:
response.raise_for_status()
self.request_counts[api_name] += 1
return await response.json()
except aiohttp.ClientError as e:
logger.error(f"Request failed for {api_name} ({url}): {e}")
return None
# BUG FIX: Removed the unnecessary sleep. The semaphore already handles rate limiting.
# This was a major performance bottleneck.
# await asyncio.sleep(60 / self.apis[api_name]['requests_per_minute'])
'''
async def _make_post_request(self, session: aiohttp.ClientSession, api_name: str,
endpoint: str, payload: Dict) -> Optional[Dict]:
"""Make an async rate-limited POST request to an API."""
if not self._check_rate_limit(api_name):
logger.warning(f"Daily rate limit reached for {api_name}")
return None
if not self.apis[api_name]['api_key']:
logger.warning(f"No API key set for {api_name}, skipping request.")
return None
async with self.semaphores[api_name]:
try:
url = f"{self.apis[api_name]['base_url']}/{endpoint}"
# Set Authorization header per SEC API docs (no "Bearer" prefix)
headers = {
'Authorization': self.apis[api_name]['api_key'],
'Content-Type': 'application/json'
}
async with session.post(url, json=payload, headers=headers) as response:
response.raise_for_status()
self.request_counts[api_name] += 1
return await response.json()
except aiohttp.ClientError as e:
logger.error(f"POST request failed for {api_name} ({url}): {e}")
return None
'''
async def _make_post_request(self, session: aiohttp.ClientSession, api_name: str,
endpoint: str, payload: Dict) -> Optional[Dict]:
"""Make an async rate-limited POST request to an API."""
if not self._check_rate_limit(api_name):
logger.warning(f"Daily rate limit reached for {api_name}")
return None
if not self.apis[api_name]['api_key']:
logger.warning(f"No API key set for {api_name}, skipping request.")
return None
async with self.semaphores[api_name]:
try:
# Build the full URL
url = f"{self.apis[api_name]['base_url']}/{endpoint}"
# Add API key as query parameter (SEC API supports this method)
url_with_token = f"{url}?token={self.apis[api_name]['api_key']}"
# Make POST request with JSON payload
async with session.post(url_with_token, json=payload) as response:
response.raise_for_status()
self.request_counts[api_name] += 1
return await response.json()
except aiohttp.ClientError as e:
logger.error(f"POST request failed for {api_name} ({url}): {e}")
return None
async def get_fmp_insider_trades(self, session: aiohttp.ClientSession,
symbol: str, limit: int = 100, filter_days: int = 30) -> List[InsiderTrade]:
"""
Get insider trades from Financial Modeling Prep API
Get insider trades from FMP by iterating day-by-day for a specific period.
NOTE: This method is inefficient and makes many API calls.
"""
if filter_days > 14:
logger.warning(f"FMP date range capped at 14 days. Reducing from {filter_days} to 14.")
filter_days = 14
all_trades = []
today = datetime.now()
date_range = [today - timedelta(days=i) for i in range(filter_days)]
# Outer Loop: Iterate through each day in the specified range.
for single_date in date_range:
page = 0
date_str = single_date.strftime('%Y-%m-%d')
# Inner Loop: Handle pagination for the current day.
while True:
endpoint = "insider-trading/latest"
params = {
'symbol': symbol,
'date': date_str,
'page': page,
'limit': limit
}
logger.info(f"Requesting FMP data for {symbol} on {date_str}, page {page}...")
data = await self._make_request(session, 'fmp', endpoint, params)
# If no data is returned for this page, we're done with this date.
if not data:
break
for trade in data:
try:
disposition = (trade.get('acquistionOrDisposition') or '').upper()
trans_type = TransactionType.BUY.value if disposition == 'A' else TransactionType.SELL.value
shares = int(trade.get('securitiesTransacted', 0) or 0)
price = float(trade.get('price', 0) or 0)
insider_trade = InsiderTrade(
symbol=trade.get('symbol', ''),
company_name=trade.get('companyName', ''),
insider_name=trade.get('reportingName', ''),
position=trade.get('typeOfOwner', ''),
transaction_date=trade.get('transactionDate', ''),
transaction_type=trans_type,
shares=shares,
price=price,
value=float(shares * price),
form_type=trade.get('formType', ''),
source='FMP',
filing_date=trade.get('filingDate', ''),
ownership_type=trade.get('directOrIndirectOwnership', '')
)
all_trades.append(insider_trade)
except (ValueError, TypeError) as e:
logger.warning(f"Error parsing FMP trade data for {symbol}: {e} -> {trade}")
continue
# Increment the page to fetch the next set of results for the same day.
page += 1
await asyncio.sleep(0.2) # Courteous delay
logger.info(
f"Retrieved {len(all_trades)} trades from FMP for {symbol} by iterating through {len(date_range)} days.")
return all_trades
'''
async def get_sec_api_insider_trades(self, session: aiohttp.ClientSession,
symbol: str, limit: int = 100) -> List[InsiderTrade]:
"""Get insider trades from SEC-API"""
trades = []
endpoint = "insider-transactions"
params = {'ticker': symbol, 'limit': limit}
data = await self._make_request(session, 'sec_api', endpoint, params)
if not data or 'transactions' not in data:
return trades
for trade in data['transactions']:
try:
insider_trade = InsiderTrade(
symbol=trade.get('ticker', ''),
company_name=trade.get('companyName', ''),
insider_name=trade.get('personName', ''),
position=trade.get('position', ''),
transaction_date=trade.get('transactionDate', ''),
transaction_type=trade.get('transactionType', ''),
shares=int(trade.get('sharesTraded', 0) or 0),
price=float(trade.get('pricePerShare', 0) or 0),
value=float(trade.get('transactionValue', 0) or 0),
form_type=trade.get('formType', ''),
source='SEC-API',
filing_date=trade.get('filingDate', '')
)
trades.append(insider_trade)
except (ValueError, TypeError) as e:
logger.warning(f"Error parsing SEC-API trade data for {symbol}: {e} -> {trade}")
continue
logger.info(f"Retrieved {len(trades)} trades from SEC-API for {symbol}")
return trades
async def get_sec_api_insider_trades(self, session: aiohttp.ClientSession,
symbol: str, limit: int = 50,
filter_days: int = 30) -> List[InsiderTrade]:
"""Get insider trades from SEC-API using a POST request with proper query structure."""
all_trades = []
# Ensure the limit does not exceed the API's maximum
if limit > 50:
limit = 50
# Date range for filtering
to_date = datetime.now()
from_date = to_date - timedelta(days=filter_days)
start_index = 0
while True:
# Construct query payload according to SEC API documentation
query_payload = {
"query": {
"query_string": {
"query": f'issuer.tradingSymbol:"{symbol}" AND periodOfReport:[{from_date.strftime("%Y-%m-%d")} TO {to_date.strftime("%Y-%m-%d")}]'
}
},
"from": str(start_index),
"size": str(limit),
"sort": [{"filedAt": {"order": "desc"}}]
}
endpoint = "insider-trading"
logger.info(f"Requesting SEC-API data for {symbol}, starting at index {start_index}...")
data = await self._make_post_request(session, 'sec_api', endpoint, query_payload)
if not data or 'transactions' not in data or not data['transactions']:
break
for filing in data['transactions']:
# Parse both non-derivative and derivative transactions
transactions_list = filing.get('nonDerivativeTable', {}).get('transactions', []) + \
filing.get('derivativeTable', {}).get('transactions', [])
for transaction in transactions_list:
try:
# Map transaction codes to buy/sell
trans_code = transaction.get('coding', {}).get('code', '')
if trans_code in ('A', 'P'): # Acquisition or Purchase
trans_type = TransactionType.BUY.value
elif trans_code in ('D', 'S'): # Disposition or Sale
trans_type = TransactionType.SELL.value
else:
continue # Skip other transaction types
shares = transaction.get('amounts', {}).get('shares')
price_per_share = transaction.get('amounts', {}).get('pricePerShare')
if shares is None or price_per_share is None:
continue
shares = int(shares)
price_per_share = float(price_per_share)
insider_trade = InsiderTrade(
symbol=filing.get('issuer', {}).get('tradingSymbol', symbol),
company_name=filing.get('issuer', {}).get('name', ''),
insider_name=filing.get('reportingOwner', {}).get('name', ''),
position=filing.get('reportingOwner', {}).get('relationship', {}).get('officerTitle',
'Insider'),
transaction_date=transaction.get('transactionDate', {}).get('value', ''),
transaction_type=trans_type,
shares=shares,
price=price_per_share,
value=float(shares * price_per_share),
form_type=filing.get('documentType', ''),
source='SEC-API',
filing_date=filing.get('filedAt', '')
)
all_trades.append(insider_trade)
except (ValueError, TypeError, AttributeError) as e:
logger.warning(f"Error parsing SEC-API transaction for {symbol}: {e}")
continue
start_index += limit
# Stop if we got fewer results than requested (last page)
if len(data['transactions']) < limit:
break
logger.info(f"Retrieved {len(all_trades)} trades from SEC-API for {symbol}.")
return all_trades
'''
async def get_sec_api_insider_trades(self, session: aiohttp.ClientSession,
symbol: str, limit: int = 50,
filter_days: int = 30) -> List[InsiderTrade]:
"""Get insider trades from SEC-API using a POST request with proper query structure."""
all_trades = []
if limit > 50:
limit = 50
to_date = datetime.now()
from_date = to_date - timedelta(days=filter_days)
start_index = 0
while True:
# Use simple query string format as shown in official Python SDK
query_payload = {
"query": f'issuer.tradingSymbol:"{symbol}" AND periodOfReport:[{from_date.strftime("%Y-%m-%d")} TO {to_date.strftime("%Y-%m-%d")}]',
"from": str(start_index),
"size": str(limit),
"sort": [{"filedAt": {"order": "desc"}}]
}
endpoint = "insider-trading"
logger.info(f"Requesting SEC-API data for {symbol}, starting at index {start_index}...")
data = await self._make_post_request(session, 'sec_api', endpoint, query_payload)
if not data or 'transactions' not in data or not data['transactions']:
break
for filing in data['transactions']:
# Check if nonDerivativeTable exists and has transactions
non_deriv_trans = filing.get('nonDerivativeTable', {}).get('transactions', [])
deriv_trans = filing.get('derivativeTable', {}).get('transactions', [])
transactions_list = non_deriv_trans + deriv_trans
for transaction in transactions_list:
try:
trans_code = transaction.get('coding', {}).get('code', '')
if trans_code in ('A', 'P'):
trans_type = TransactionType.BUY.value
elif trans_code in ('D', 'S'):
trans_type = TransactionType.SELL.value
else:
continue
shares = transaction.get('amounts', {}).get('shares')
price_per_share = transaction.get('amounts', {}).get('pricePerShare')
if shares is None or price_per_share is None:
continue
shares = float(shares)
price_per_share = float(price_per_share)
insider_trade = InsiderTrade(
symbol=filing.get('issuer', {}).get('tradingSymbol', symbol),
company_name=filing.get('issuer', {}).get('name', ''),
insider_name=filing.get('reportingOwner', {}).get('name', ''),
position=filing.get('reportingOwner', {}).get('relationship', {}).get('officerTitle',
'Insider'),
transaction_date=transaction.get('transactionDate', ''),
transaction_type=trans_type,
shares=int(shares),
price=price_per_share,
value=shares * price_per_share,
form_type=filing.get('documentType', ''),
source='SEC-API',
filing_date=filing.get('filedAt', '')
)
all_trades.append(insider_trade)
except (ValueError, TypeError, AttributeError) as e:
logger.warning(f"Error parsing SEC-API transaction for {symbol}: {e}")
continue
start_index += limit
if len(data['transactions']) < limit:
break
logger.info(f"Retrieved {len(all_trades)} trades from SEC-API for {symbol}.")
return all_trades
async def get_eod_insider_trades(self, session: aiohttp.ClientSession,
symbol: str) -> List[InsiderTrade]:
"""Get insider trades from EOD Historical Data"""
trades = []
# NOTE: This endpoint hardcodes the US exchange.
endpoint = f"insider-transactions/{symbol}.US"
data = await self._make_request(session, 'eod', endpoint)
if not data:
return trades
# EOD data can be a dictionary of lists, so we iterate through its values.
data_to_iterate = data.values() if isinstance(data, dict) else data
for trade in data_to_iterate:
try:
insider_trade = InsiderTrade(
symbol=symbol,
company_name='', # EOD does not provide company name in this endpoint
insider_name=trade.get('ownerName', ''),
position=trade.get('ownerPosition', ''),
transaction_date=trade.get('date', ''),
transaction_type=trade.get('transactionType', ''),
shares=int(trade.get('transactionAmount', 0) or 0),
price=float(trade.get('transactionPrice', 0) or 0),
value=float(trade.get('transactionAmount', 0) or 0) * float(trade.get('transactionPrice', 0) or 0),
form_type='Form 4',
source='EOD'
)
trades.append(insider_trade)
except (ValueError, TypeError) as e:
logger.warning(f"Error parsing EOD trade data for {symbol}: {e} -> {trade}")
continue
logger.info(f"Retrieved {len(trades)} trades from EOD for {symbol}")
return trades
async def get_tradefeeds_insider_trades(self, session: aiohttp.ClientSession,
symbol: str, limit: int = 100) -> List[InsiderTrade]:
"""Get insider trades from Tradefeeds API"""
trades = []
endpoint = "insider_transactions"
params = {'symbol': symbol, 'limit': limit}
data = await self._make_request(session, 'tradefeeds', endpoint, params)
if not data or 'data' not in data:
return trades
for trade in data['data']:
try:
shares = int(trade.get('sharesTraded', 0) or 0)
price = float(trade.get('averagePrice', 0) or 0)
insider_trade = InsiderTrade(
symbol=trade.get('symbol', ''),
company_name=trade.get('companyName', ''),
insider_name=trade.get('insiderName', ''),
position=trade.get('relationship', ''),
transaction_date=trade.get('transactionDate', ''),
transaction_type=trade.get('transactionCode', ''),
shares=shares,
price=price,
value=shares * price,
form_type='Form 4',
source='Tradefeeds'
)
trades.append(insider_trade)
except (ValueError, TypeError) as e:
logger.warning(f"Error parsing Tradefeeds trade data for {symbol}: {e} -> {trade}")
continue
logger.info(f"Retrieved {len(trades)} trades from Tradefeeds for {symbol}")
return trades
async def get_factored_ai_insider_trades(self, session: aiohttp.ClientSession,
symbol: str) -> List[InsiderTrade]:
"""Get insider trades from Factored.AI (S&P 500 companies only)"""
trades = []
try:
url = f"https://raw.githubusercontent.com/Factored-AI/insider-trading/main/data/{symbol}_insider_trades.json"
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
for trade in data:
try:
shares = int(trade.get('shares', 0) or 0)
price = float(trade.get('price', 0) or 0)
insider_trade = InsiderTrade(
symbol=symbol,
company_name=trade.get('company_name', ''),
insider_name=trade.get('insider_name', ''),
position=trade.get('position', ''),
transaction_date=trade.get('transaction_date', ''),
transaction_type=trade.get('transaction_type', ''),
shares=shares,
price=price,
value=shares * price,
form_type='Form 4',
source='Factored.AI'
)
trades.append(insider_trade)
except (ValueError, TypeError) as e:
logger.warning(f"Error parsing Factored.AI trade data for {symbol}: {e} -> {trade}")
continue
logger.info(f"Retrieved {len(trades)} trades from Factored.AI for {symbol}")
else:
logger.info(f"No Factored.AI data found for {symbol} (status: {response.status})")
except aiohttp.ClientError as e:
logger.error(f"Failed to retrieve data from Factored.AI for {symbol}: {e}")
return trades
def deduplicate_trades(self, trades: List[InsiderTrade]) -> List[InsiderTrade]:
"""Advanced deduplication based on multiple factors"""
if not trades:
return []
# First pass: remove exact hash duplicates
unique_trades_map = {trade.hash: trade for trade in trades}
unique_trades = list(unique_trades_map.values())
# Second pass: group similar trades (same person, date, similar values)
# BUG FIX: The original nested loop was inefficient and could miss duplicates.
# This approach of sorting and grouping is more robust.
unique_trades.sort(key=lambda t: (t.insider_name.lower(), t.transaction_date, t.value))
if not unique_trades:
return []
final_trades = [unique_trades[0]]
for i in range(1, len(unique_trades)):
prev_trade = final_trades[-1]
curr_trade = unique_trades[i]
# Check for similarity
if (prev_trade.insider_name.lower() == curr_trade.insider_name.lower() and
prev_trade.transaction_date == curr_trade.transaction_date and
abs(prev_trade.value - curr_trade.value) < 1000): # Within $1000 tolerance
# Keep the trade with more complete information or from a preferred source
if len(curr_trade.company_name) > len(prev_trade.company_name):
final_trades[-1] = curr_trade # Replace previous with current
else:
final_trades.append(curr_trade)
logger.info(f"Deduplication: {len(trades)} -> {len(final_trades)} trades")
return final_trades
def filter_trades_by_period(self, trades: List[InsiderTrade],
days: int = 7, end_date: Optional[datetime] = None) -> List[InsiderTrade]:
"""Filter trades by time period (default: last 7 days)"""
if end_date is None:
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
filtered_trades = []
for trade in trades:
trade_date = trade.transaction_date_dt
if trade_date and start_date <= trade_date <= end_date:
filtered_trades.append(trade)
logger.info(f"Filtered {len(filtered_trades)} trades from last {days} days")
return filtered_trades
async def get_all_insider_trades(self, symbol: str, limit_per_api: int = 100,
filter_days: int = 7) -> List[InsiderTrade]:
"""Aggregate insider trades from all available APIs with async processing"""
timeout = aiohttp.ClientTimeout(total=self.session_timeout)
async with aiohttp.ClientSession(timeout=timeout) as session:
# Create tasks for all API calls
tasks = [
#self.get_fmp_insider_trades(session, symbol, limit_per_api),
self.get_sec_api_insider_trades(session, symbol, limit_per_api),
#self.get_eod_insider_trades(session, symbol),
#self.get_tradefeeds_insider_trades(session, symbol, limit_per_api),
#self.get_factored_ai_insider_trades(session, symbol)
]
# Execute all tasks concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
# Combine results, filtering out exceptions
all_trades = []
api_names = ['FMP', 'SEC-API', 'EOD', 'Tradefeeds', 'Factored.AI']
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Error fetching from {api_names[i]} for {symbol}: {result}")
elif result:
all_trades.extend(result)
# Deduplicate trades
unique_trades = self.deduplicate_trades(all_trades)
# Filter by period
if filter_days > 0:
unique_trades = self.filter_trades_by_period(unique_trades, filter_days)
logger.info(f"Total unique trades found for {symbol}: {len(unique_trades)}")
return unique_trades
def generate_trading_report(self, trades: List[InsiderTrade],
period_days: int = 7) -> Optional[TradingReport]:
"""Generate comprehensive trading report"""
if not trades:
return None
# Basic statistics
symbol = trades[0].symbol
# BUG FIX: Get the most common company name to avoid issues with inconsistent API data
company_name_counter = Counter(t.company_name for t in trades if t.company_name)
company_name = company_name_counter.most_common(1)[0][0] if company_name_counter else "N/A"
total_trades = len(trades)
# Separate buy/sell trades
buy_trades = [t for t in trades if 'buy' in t.transaction_type.lower() or t.transaction_type.lower() == 'a']
sell_trades = [t for t in trades if 'sell' in t.transaction_type.lower() or t.transaction_type.lower() == 'd']
total_value_bought = sum(abs(t.value) for t in buy_trades)
total_value_sold = sum(abs(t.value) for t in sell_trades)
net_value = total_value_bought - total_value_sold
# Top insiders by activity
insider_stats = {}
for trade in trades:
name = trade.insider_name
if name not in insider_stats:
insider_stats[name] = {'count': 0, 'value': 0}
insider_stats[name]['count'] += 1
insider_stats[name]['value'] += abs(trade.value)
top_insiders = sorted(
[(name, stats['count'], stats['value']) for name, stats in insider_stats.items()],
key=lambda x: x[2], reverse=True
)[:5]
# Period calculation
trade_dates = [t.transaction_date_dt for t in trades if t.transaction_date_dt]
period_start = min(trade_dates) if trade_dates else datetime.now() - timedelta(days=period_days)
period_end = max(trade_dates) if trade_dates else datetime.now()
return TradingReport(
symbol=symbol,
company_name=company_name,
total_trades=total_trades,
buy_trades=len(buy_trades),
sell_trades=len(sell_trades),
total_value_bought=total_value_bought,
total_value_sold=total_value_sold,
net_value=net_value,
top_insiders=top_insiders,
period_start=period_start,
period_end=period_end,
trades=trades
)
'''
def format_telegram_message(self, report: TradingReport) -> str:
"""Generate formatted Telegram message from trading report"""
if not report:
return "❌ No insider trading data found for the specified period."
trend_emoji = "πŸ“ˆ" if report.net_value > 0 else "πŸ“‰" if report.net_value < 0 else "➑️"
message = f"πŸ” **INSIDER TRADING REPORT** πŸ”\n\n"
message += f"🏒 **Company:** {report.company_name} (${report.symbol})\n"
message += f"πŸ“… **Period:** {report.period_start.strftime('%Y-%m-%d')} to {report.period_end.strftime('%Y-%m-%d')}\n\n"
message += "πŸ“Š **SUMMARY**\n"
message += f"β€’ Total Trades: {report.total_trades}\n"
message += f"β€’ Buy Trades: {report.buy_trades} 🟒\n"
message += f"β€’ Sell Trades: {report.sell_trades} πŸ”΄\n\n"
message += "πŸ’° **FINANCIAL IMPACT**\n"
message += f"β€’ Total Bought: ${report.total_value_bought:,.2f}\n"
message += f"β€’ Total Sold: ${report.total_value_sold:,.2f}\n"
message += f"β€’ Net Position: {trend_emoji} ${report.net_value:,.2f}\n\n"
message += "πŸ‘₯ **TOP INSIDERS BY ACTIVITY**\n"
message += "```\n"
message += "β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”\n"
message += "β”‚ Name β”‚ Trades β”‚ Total Value β”‚\n"
message += "β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€\n"
for name, count, value in report.top_insiders:
display_name = (name[:23] + '..') if len(name) > 25 else name
message += f"β”‚ {display_name:<24} β”‚ {count:<6} β”‚ ${value: >10,.0f} β”‚\n"
message += "β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜\n```\n"
if report.trades:
message += "\nπŸ“‹ **RECENT SIGNIFICANT TRADES**\n"
top_trades = sorted(report.trades, key=lambda x: abs(x.value), reverse=True)[:5]
message += "```\n"
message += "β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”\n"
message += "β”‚ Insider Name β”‚ Type β”‚ Date β”‚ Shares β”‚ Total Value β”‚\n"
message += "β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€\n"
for trade in top_trades:
insider_name = (trade.insider_name[:16] + '..') if len(trade.insider_name) > 18 else trade.insider_name
trans_type = "BUY" if "buy" in trade.transaction_type.lower() or "a" == trade.transaction_type.lower() else "SELL"
type_display = f"{'🟒' if trans_type == 'BUY' else 'πŸ”΄'}{trans_type}"
date_str = trade.transaction_date_dt.strftime(
'%Y-%m-%d') if trade.transaction_date_dt else trade.transaction_date[:10]
message += f"β”‚ {insider_name:<17} β”‚ {type_display:<4} β”‚ {date_str:<8} β”‚ {trade.shares:>8,d} β”‚ ${abs(trade.value):>10,.0f} β”‚\n"
message += "β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜\n```\n"
sources = sorted(list(set(t.source for t in report.trades)))
message += f"\nπŸ”— **Sources:** {', '.join(sources)}\n"
message += f"⏰ Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
return message
'''
def format_telegram_message(self, report: TradingReport) -> str:
"""Generate formatted Telegram message from trading report"""
if not report:
return "❌ No insider trading data found for the specified period."
trend_emoji = "πŸ“ˆ" if report.net_value > 0 else "πŸ“‰" if report.net_value < 0 else "➑️"
message = f"πŸ” *INSIDER TRADING REPORT* πŸ”\n\n"
message += f"🏒 *Company:* {report.company_name} (${report.symbol})\n"
message += f"πŸ“… *Period:* {report.period_start.strftime('%Y-%m-%d')} to {report.period_end.strftime('%Y-%m-%d')}\n\n"
message += "πŸ“Š *SUMMARY*\n"
message += f"β€’ Total Trades: {report.total_trades}\n"
message += f"β€’ Buy Trades: {report.buy_trades} 🟒\n"
message += f"β€’ Sell Trades: {report.sell_trades} πŸ”΄\n\n"
message += "πŸ’° *FINANCIAL IMPACT*\n"
message += f"β€’ Total Bought: ${report.total_value_bought:,.2f}\n"
message += f"β€’ Total Sold: ${report.total_value_sold:,.2f}\n"
message += f"β€’ Net Position: {trend_emoji} ${report.net_value:,.2f}\n\n"
message += "πŸ‘₯ *TOP INSIDERS BY ACTIVITY*\n"
for i, (name, count, value) in enumerate(report.top_insiders, 1):
display_name = (name[:35] + '...') if len(name) > 38 else name
message += f"`{i}.` {display_name}\n"
message += f" β”” Trades: {count} | Value: ${value:,.0f}\n"
if report.trades:
message += "\nπŸ“‹ *RECENT SIGNIFICANT TRADES*\n"
top_trades = sorted(report.trades, key=lambda x: abs(x.value), reverse=True)[:5]
for trade in top_trades:
insider_name = (trade.insider_name[:30] + '...') if len(trade.insider_name) > 33 else trade.insider_name
trans_type = "BUY" if "buy" in trade.transaction_type.lower() or "a" == trade.transaction_type.lower() else "SELL"
emoji = '🟒' if trans_type == 'BUY' else 'πŸ”΄'
date_str = trade.transaction_date_dt.strftime(
'%Y-%m-%d') if trade.transaction_date_dt else trade.transaction_date[:10]
message += f"\n{emoji} *{trans_type}* | {insider_name}\n"
message += f"`Date: {date_str}`\n"
message += f"`Shares: {trade.shares:,d}`\n"
message += f"`Value: ${abs(trade.value):,.0f}`\n"
sources = sorted(list(set(t.source for t in report.trades)))
message += f"\nπŸ”— *Sources:* {', '.join(sources)}\n"
message += f"⏰ Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
return message
# BUG FIX: This method was combined with the one below it, causing a SyntaxError.
# It has been properly separated now.
def format_telegram_message_ultra_compact(self, report: TradingReport) -> str:
"""Ultra compact format for high-frequency monitoring"""
if not report or not report.trades:
return f"▫️ ${report.symbol}: No activity"
trend = "πŸ“ˆ" if report.net_value > 0 else "πŸ“‰" if report.net_value < 0 else "➑️"
# Get biggest trade
biggest_trade = max(report.trades, key=lambda x: abs(x.value))
message = f"🚨 **${report.symbol}** | {trend} Net: ${report.net_value:,.0f} | {report.total_trades} trades"
trans_type = "BUY" if "buy" in biggest_trade.transaction_type.lower() else "SELL"
# Get last name for brevity
insider_short = biggest_trade.insider_name.split()[-1]
message += f"\n Lgst: {trans_type} ${abs(biggest_trade.value):,.0f} by {insider_short}"
return message
# BUG FIX: This method was missing its definition and was tangled with the above method.
def format_telegram_message_short(self, report: TradingReport) -> str:
"""Generate short Telegram message for quick updates"""
if not report:
return f"❌ No insider trading activity found."
trend_emoji = "πŸ“ˆ" if report.net_value > 0 else "πŸ“‰" if report.net_value < 0 else "➑️"
message = f"🚨 **${report.symbol} Insider Alert** 🚨\n\n"
message += f"{trend_emoji} Net Value: **${report.net_value:,.0f}**\n"
message += f"πŸ“Š Total Trades: {report.total_trades} ({report.buy_trades}B / {report.sell_trades}S)\n\n"
if report.top_insiders:
top_insider_name = report.top_insiders[0][0]
top_insider_value = report.top_insiders[0][2]
# Truncate long names
display_name = (top_insider_name[:25] + '...') if len(top_insider_name) > 28 else top_insider_name
message += f"πŸ‘€ Top Insider: {display_name}\n"
message += f"πŸ’° Total Activity: ${top_insider_value:,.0f}"
else:
message += "πŸ‘€ No top insider data available."
return message
async def get_multiple_symbols_report(self, symbols: List[str],
filter_days: int = 7) -> Dict[str, TradingReport]:
"""Get reports for multiple symbols concurrently"""
tasks = [self.get_all_insider_trades(symbol, filter_days=filter_days) for symbol in symbols]
results = await asyncio.gather(*tasks, return_exceptions=True)
reports = {}
for i, result in enumerate(results):
symbol = symbols[i]
if isinstance(result, Exception):
logger.error(f"Error processing {symbol}: {result}")
reports[symbol] = None
else:
reports[symbol] = self.generate_trading_report(result, filter_days)
return reports
def trades_to_dataframe(self, trades: List[InsiderTrade]) -> pd.DataFrame:
"""Convert list of InsiderTrade objects to pandas DataFrame"""
if not trades:
return pd.DataFrame()
data = [trade.__dict__ for trade in trades]
df = pd.DataFrame(data)
# Convert date column with error handling
df['Transaction Date'] = pd.to_datetime(df['Transaction Date'], errors='coerce')
return df
# Example usage functions
async def example_single_symbol():
"""Example: Get report for single symbol"""
aggregator = InsiderTradingAggregator()
# Set your API keys here (use environment variables in a real app)
# aggregator.set_api_key('fmp', 'your_fmp_api_key')
# aggregator.set_api_key('sec_api', 'your_sec_api_key')
symbol = 'AAPL'
logger.info(f"--- Running single symbol example for ${symbol} ---")
trades = await aggregator.get_all_insider_trades(symbol, filter_days=30)
report = aggregator.generate_trading_report(trades)
if report:
telegram_msg = aggregator.format_telegram_message(report)
print("\n--- Full Telegram Message ---")
print(telegram_msg)
else:
print(f"No recent trades found for {symbol}")
async def example_multiple_symbols():
"""Example: Get reports for multiple symbols"""
aggregator = InsiderTradingAggregator()
symbols = ['AAPL', 'GOOGL', 'MSFT', 'TSLA']
logger.info(f"\n--- Running multiple symbols example for {', '.join(symbols)} ---")
reports = await aggregator.get_multiple_symbols_report(symbols, filter_days=30)
for symbol, report in reports.items():
print(f"\n--- Report for ${symbol} ---")
if report and report.total_trades > 0:
short_msg = aggregator.format_telegram_message_short(report)
print(short_msg)
else:
print(f"No recent trades found for {symbol}")
async def main():
await example_single_symbol()
print("\n" + "=" * 50)
await example_multiple_symbols()
if __name__ == "__main__":
# In some environments (like Windows), this policy is needed for aiohttp
# asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())