|
|
import asyncio |
|
|
from collections import Counter |
|
|
from datetime import datetime, timedelta |
|
|
from typing import List, Optional, Dict |
|
|
import aiohttp |
|
|
|
|
|
import pandas as pd |
|
|
|
|
|
from src.api.insiders.insider_trade import InsiderTrade |
|
|
from src.api.insiders.trading_report import TradingReport |
|
|
from src.api.insiders.transaction_types import TransactionType |
|
|
from src.telegram_bot.config import Config |
|
|
from src.telegram_bot.logger import main_logger as logger |
|
|
|
|
|
|
|
|
class InsiderTradingAggregator: |
|
|
"""Async aggregator for insider trading data from multiple APIs""" |
|
|
|
|
|
def __init__(self, session_timeout: int = 30): |
|
|
|
|
|
self.apis = { |
|
|
'fmp': { |
|
|
'base_url': 'https://financialmodelingprep.com/stable', |
|
|
'api_key': Config.FMP_API_KEY, |
|
|
'rate_limit': 250, |
|
|
'requests_per_minute': 10 |
|
|
}, |
|
|
'sec_api': { |
|
|
'base_url': 'https://api.sec-api.io', |
|
|
'api_key': Config.SEC_API_KEY, |
|
|
'rate_limit': 100, |
|
|
'requests_per_minute': 5 |
|
|
}, |
|
|
'eod': { |
|
|
'base_url': 'https://eodhistoricaldata.com/api', |
|
|
'api_key': None, |
|
|
'rate_limit': 1000, |
|
|
'requests_per_minute': 20 |
|
|
}, |
|
|
'tradefeeds': { |
|
|
'base_url': 'https://api.tradefeeds.com', |
|
|
'api_key': None, |
|
|
'rate_limit': 500, |
|
|
'requests_per_minute': 15 |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
self.request_counts = {api: 0 for api in self.apis.keys()} |
|
|
self.last_reset = datetime.now() |
|
|
self.session_timeout = session_timeout |
|
|
|
|
|
|
|
|
self.semaphores = { |
|
|
api: asyncio.Semaphore(config['requests_per_minute']) |
|
|
for api, config in self.apis.items() |
|
|
} |
|
|
|
|
|
def set_api_key(self, api_name: str, api_key: str) -> None: |
|
|
"""Set API key for a specific service""" |
|
|
if api_name in self.apis: |
|
|
self.apis[api_name]['api_key'] = api_key |
|
|
logger.info(f"API key set for {api_name}") |
|
|
else: |
|
|
logger.error(f"Unknown API: {api_name}") |
|
|
|
|
|
def _check_rate_limit(self, api_name: str) -> bool: |
|
|
"""Check if we're within daily rate limits""" |
|
|
if datetime.now() - self.last_reset > timedelta(days=1): |
|
|
self.request_counts = {api: 0 for api in self.apis.keys()} |
|
|
self.last_reset = datetime.now() |
|
|
|
|
|
return self.request_counts[api_name] < self.apis[api_name]['rate_limit'] |
|
|
|
|
|
async def _make_request(self, session: aiohttp.ClientSession, api_name: str, |
|
|
endpoint: str, params: Dict = None) -> Optional[Dict]: |
|
|
"""Make async rate-limited request to an API""" |
|
|
if not self._check_rate_limit(api_name): |
|
|
logger.warning(f"Daily rate limit reached for {api_name}") |
|
|
return None |
|
|
|
|
|
if not self.apis[api_name]['api_key']: |
|
|
logger.warning(f"No API key set for {api_name}, skipping request.") |
|
|
return None |
|
|
|
|
|
|
|
|
async with self.semaphores[api_name]: |
|
|
try: |
|
|
url = f"{self.apis[api_name]['base_url']}/{endpoint}" |
|
|
if params is None: |
|
|
params = {} |
|
|
|
|
|
|
|
|
if api_name == 'fmp': |
|
|
params['apikey'] = self.apis[api_name]['api_key'] |
|
|
elif api_name in ['sec_api', 'eod', 'tradefeeds']: |
|
|
params['token'] = self.apis[api_name]['api_key'] |
|
|
|
|
|
async with session.get(url, params=params) as response: |
|
|
response.raise_for_status() |
|
|
self.request_counts[api_name] += 1 |
|
|
return await response.json() |
|
|
|
|
|
except aiohttp.ClientError as e: |
|
|
logger.error(f"Request failed for {api_name} ({url}): {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
''' |
|
|
async def _make_post_request(self, session: aiohttp.ClientSession, api_name: str, |
|
|
endpoint: str, payload: Dict) -> Optional[Dict]: |
|
|
"""Make an async rate-limited POST request to an API.""" |
|
|
if not self._check_rate_limit(api_name): |
|
|
logger.warning(f"Daily rate limit reached for {api_name}") |
|
|
return None |
|
|
|
|
|
if not self.apis[api_name]['api_key']: |
|
|
logger.warning(f"No API key set for {api_name}, skipping request.") |
|
|
return None |
|
|
|
|
|
async with self.semaphores[api_name]: |
|
|
try: |
|
|
url = f"{self.apis[api_name]['base_url']}/{endpoint}" |
|
|
|
|
|
# Set Authorization header per SEC API docs (no "Bearer" prefix) |
|
|
headers = { |
|
|
'Authorization': self.apis[api_name]['api_key'], |
|
|
'Content-Type': 'application/json' |
|
|
} |
|
|
|
|
|
async with session.post(url, json=payload, headers=headers) as response: |
|
|
response.raise_for_status() |
|
|
self.request_counts[api_name] += 1 |
|
|
return await response.json() |
|
|
|
|
|
except aiohttp.ClientError as e: |
|
|
logger.error(f"POST request failed for {api_name} ({url}): {e}") |
|
|
return None |
|
|
''' |
|
|
|
|
|
async def _make_post_request(self, session: aiohttp.ClientSession, api_name: str, |
|
|
endpoint: str, payload: Dict) -> Optional[Dict]: |
|
|
"""Make an async rate-limited POST request to an API.""" |
|
|
if not self._check_rate_limit(api_name): |
|
|
logger.warning(f"Daily rate limit reached for {api_name}") |
|
|
return None |
|
|
|
|
|
if not self.apis[api_name]['api_key']: |
|
|
logger.warning(f"No API key set for {api_name}, skipping request.") |
|
|
return None |
|
|
|
|
|
async with self.semaphores[api_name]: |
|
|
try: |
|
|
|
|
|
url = f"{self.apis[api_name]['base_url']}/{endpoint}" |
|
|
|
|
|
|
|
|
url_with_token = f"{url}?token={self.apis[api_name]['api_key']}" |
|
|
|
|
|
|
|
|
async with session.post(url_with_token, json=payload) as response: |
|
|
response.raise_for_status() |
|
|
self.request_counts[api_name] += 1 |
|
|
return await response.json() |
|
|
|
|
|
except aiohttp.ClientError as e: |
|
|
logger.error(f"POST request failed for {api_name} ({url}): {e}") |
|
|
return None |
|
|
|
|
|
async def get_fmp_insider_trades(self, session: aiohttp.ClientSession, |
|
|
symbol: str, limit: int = 100, filter_days: int = 30) -> List[InsiderTrade]: |
|
|
""" |
|
|
Get insider trades from Financial Modeling Prep API |
|
|
Get insider trades from FMP by iterating day-by-day for a specific period. |
|
|
NOTE: This method is inefficient and makes many API calls. |
|
|
""" |
|
|
if filter_days > 14: |
|
|
logger.warning(f"FMP date range capped at 14 days. Reducing from {filter_days} to 14.") |
|
|
filter_days = 14 |
|
|
|
|
|
all_trades = [] |
|
|
today = datetime.now() |
|
|
date_range = [today - timedelta(days=i) for i in range(filter_days)] |
|
|
|
|
|
|
|
|
for single_date in date_range: |
|
|
page = 0 |
|
|
date_str = single_date.strftime('%Y-%m-%d') |
|
|
|
|
|
|
|
|
while True: |
|
|
endpoint = "insider-trading/latest" |
|
|
params = { |
|
|
'symbol': symbol, |
|
|
'date': date_str, |
|
|
'page': page, |
|
|
'limit': limit |
|
|
} |
|
|
|
|
|
logger.info(f"Requesting FMP data for {symbol} on {date_str}, page {page}...") |
|
|
data = await self._make_request(session, 'fmp', endpoint, params) |
|
|
|
|
|
|
|
|
if not data: |
|
|
break |
|
|
|
|
|
for trade in data: |
|
|
try: |
|
|
disposition = (trade.get('acquistionOrDisposition') or '').upper() |
|
|
trans_type = TransactionType.BUY.value if disposition == 'A' else TransactionType.SELL.value |
|
|
|
|
|
shares = int(trade.get('securitiesTransacted', 0) or 0) |
|
|
price = float(trade.get('price', 0) or 0) |
|
|
|
|
|
insider_trade = InsiderTrade( |
|
|
symbol=trade.get('symbol', ''), |
|
|
company_name=trade.get('companyName', ''), |
|
|
insider_name=trade.get('reportingName', ''), |
|
|
position=trade.get('typeOfOwner', ''), |
|
|
transaction_date=trade.get('transactionDate', ''), |
|
|
transaction_type=trans_type, |
|
|
shares=shares, |
|
|
price=price, |
|
|
value=float(shares * price), |
|
|
form_type=trade.get('formType', ''), |
|
|
source='FMP', |
|
|
filing_date=trade.get('filingDate', ''), |
|
|
ownership_type=trade.get('directOrIndirectOwnership', '') |
|
|
) |
|
|
all_trades.append(insider_trade) |
|
|
except (ValueError, TypeError) as e: |
|
|
logger.warning(f"Error parsing FMP trade data for {symbol}: {e} -> {trade}") |
|
|
continue |
|
|
|
|
|
|
|
|
page += 1 |
|
|
await asyncio.sleep(0.2) |
|
|
|
|
|
logger.info( |
|
|
f"Retrieved {len(all_trades)} trades from FMP for {symbol} by iterating through {len(date_range)} days.") |
|
|
return all_trades |
|
|
|
|
|
''' |
|
|
async def get_sec_api_insider_trades(self, session: aiohttp.ClientSession, |
|
|
symbol: str, limit: int = 100) -> List[InsiderTrade]: |
|
|
"""Get insider trades from SEC-API""" |
|
|
trades = [] |
|
|
endpoint = "insider-transactions" |
|
|
params = {'ticker': symbol, 'limit': limit} |
|
|
|
|
|
data = await self._make_request(session, 'sec_api', endpoint, params) |
|
|
if not data or 'transactions' not in data: |
|
|
return trades |
|
|
|
|
|
for trade in data['transactions']: |
|
|
try: |
|
|
insider_trade = InsiderTrade( |
|
|
symbol=trade.get('ticker', ''), |
|
|
company_name=trade.get('companyName', ''), |
|
|
insider_name=trade.get('personName', ''), |
|
|
position=trade.get('position', ''), |
|
|
transaction_date=trade.get('transactionDate', ''), |
|
|
transaction_type=trade.get('transactionType', ''), |
|
|
shares=int(trade.get('sharesTraded', 0) or 0), |
|
|
price=float(trade.get('pricePerShare', 0) or 0), |
|
|
value=float(trade.get('transactionValue', 0) or 0), |
|
|
form_type=trade.get('formType', ''), |
|
|
source='SEC-API', |
|
|
filing_date=trade.get('filingDate', '') |
|
|
) |
|
|
trades.append(insider_trade) |
|
|
except (ValueError, TypeError) as e: |
|
|
logger.warning(f"Error parsing SEC-API trade data for {symbol}: {e} -> {trade}") |
|
|
continue |
|
|
|
|
|
logger.info(f"Retrieved {len(trades)} trades from SEC-API for {symbol}") |
|
|
return trades |
|
|
|
|
|
|
|
|
async def get_sec_api_insider_trades(self, session: aiohttp.ClientSession, |
|
|
symbol: str, limit: int = 50, |
|
|
filter_days: int = 30) -> List[InsiderTrade]: |
|
|
"""Get insider trades from SEC-API using a POST request with proper query structure.""" |
|
|
all_trades = [] |
|
|
|
|
|
# Ensure the limit does not exceed the API's maximum |
|
|
if limit > 50: |
|
|
limit = 50 |
|
|
|
|
|
# Date range for filtering |
|
|
to_date = datetime.now() |
|
|
from_date = to_date - timedelta(days=filter_days) |
|
|
|
|
|
start_index = 0 |
|
|
|
|
|
while True: |
|
|
# Construct query payload according to SEC API documentation |
|
|
query_payload = { |
|
|
"query": { |
|
|
"query_string": { |
|
|
"query": f'issuer.tradingSymbol:"{symbol}" AND periodOfReport:[{from_date.strftime("%Y-%m-%d")} TO {to_date.strftime("%Y-%m-%d")}]' |
|
|
} |
|
|
}, |
|
|
"from": str(start_index), |
|
|
"size": str(limit), |
|
|
"sort": [{"filedAt": {"order": "desc"}}] |
|
|
} |
|
|
|
|
|
endpoint = "insider-trading" |
|
|
logger.info(f"Requesting SEC-API data for {symbol}, starting at index {start_index}...") |
|
|
data = await self._make_post_request(session, 'sec_api', endpoint, query_payload) |
|
|
|
|
|
if not data or 'transactions' not in data or not data['transactions']: |
|
|
break |
|
|
|
|
|
for filing in data['transactions']: |
|
|
# Parse both non-derivative and derivative transactions |
|
|
transactions_list = filing.get('nonDerivativeTable', {}).get('transactions', []) + \ |
|
|
filing.get('derivativeTable', {}).get('transactions', []) |
|
|
|
|
|
for transaction in transactions_list: |
|
|
try: |
|
|
# Map transaction codes to buy/sell |
|
|
trans_code = transaction.get('coding', {}).get('code', '') |
|
|
if trans_code in ('A', 'P'): # Acquisition or Purchase |
|
|
trans_type = TransactionType.BUY.value |
|
|
elif trans_code in ('D', 'S'): # Disposition or Sale |
|
|
trans_type = TransactionType.SELL.value |
|
|
else: |
|
|
continue # Skip other transaction types |
|
|
|
|
|
shares = transaction.get('amounts', {}).get('shares') |
|
|
price_per_share = transaction.get('amounts', {}).get('pricePerShare') |
|
|
|
|
|
if shares is None or price_per_share is None: |
|
|
continue |
|
|
|
|
|
shares = int(shares) |
|
|
price_per_share = float(price_per_share) |
|
|
|
|
|
insider_trade = InsiderTrade( |
|
|
symbol=filing.get('issuer', {}).get('tradingSymbol', symbol), |
|
|
company_name=filing.get('issuer', {}).get('name', ''), |
|
|
insider_name=filing.get('reportingOwner', {}).get('name', ''), |
|
|
position=filing.get('reportingOwner', {}).get('relationship', {}).get('officerTitle', |
|
|
'Insider'), |
|
|
transaction_date=transaction.get('transactionDate', {}).get('value', ''), |
|
|
transaction_type=trans_type, |
|
|
shares=shares, |
|
|
price=price_per_share, |
|
|
value=float(shares * price_per_share), |
|
|
form_type=filing.get('documentType', ''), |
|
|
source='SEC-API', |
|
|
filing_date=filing.get('filedAt', '') |
|
|
) |
|
|
all_trades.append(insider_trade) |
|
|
|
|
|
except (ValueError, TypeError, AttributeError) as e: |
|
|
logger.warning(f"Error parsing SEC-API transaction for {symbol}: {e}") |
|
|
continue |
|
|
|
|
|
start_index += limit |
|
|
|
|
|
# Stop if we got fewer results than requested (last page) |
|
|
if len(data['transactions']) < limit: |
|
|
break |
|
|
|
|
|
logger.info(f"Retrieved {len(all_trades)} trades from SEC-API for {symbol}.") |
|
|
return all_trades |
|
|
''' |
|
|
|
|
|
async def get_sec_api_insider_trades(self, session: aiohttp.ClientSession, |
|
|
symbol: str, limit: int = 50, |
|
|
filter_days: int = 30) -> List[InsiderTrade]: |
|
|
"""Get insider trades from SEC-API using a POST request with proper query structure.""" |
|
|
all_trades = [] |
|
|
|
|
|
if limit > 50: |
|
|
limit = 50 |
|
|
|
|
|
to_date = datetime.now() |
|
|
from_date = to_date - timedelta(days=filter_days) |
|
|
|
|
|
start_index = 0 |
|
|
|
|
|
while True: |
|
|
|
|
|
query_payload = { |
|
|
"query": f'issuer.tradingSymbol:"{symbol}" AND periodOfReport:[{from_date.strftime("%Y-%m-%d")} TO {to_date.strftime("%Y-%m-%d")}]', |
|
|
"from": str(start_index), |
|
|
"size": str(limit), |
|
|
"sort": [{"filedAt": {"order": "desc"}}] |
|
|
} |
|
|
|
|
|
endpoint = "insider-trading" |
|
|
logger.info(f"Requesting SEC-API data for {symbol}, starting at index {start_index}...") |
|
|
data = await self._make_post_request(session, 'sec_api', endpoint, query_payload) |
|
|
|
|
|
if not data or 'transactions' not in data or not data['transactions']: |
|
|
break |
|
|
|
|
|
for filing in data['transactions']: |
|
|
|
|
|
non_deriv_trans = filing.get('nonDerivativeTable', {}).get('transactions', []) |
|
|
deriv_trans = filing.get('derivativeTable', {}).get('transactions', []) |
|
|
|
|
|
transactions_list = non_deriv_trans + deriv_trans |
|
|
|
|
|
for transaction in transactions_list: |
|
|
try: |
|
|
trans_code = transaction.get('coding', {}).get('code', '') |
|
|
if trans_code in ('A', 'P'): |
|
|
trans_type = TransactionType.BUY.value |
|
|
elif trans_code in ('D', 'S'): |
|
|
trans_type = TransactionType.SELL.value |
|
|
else: |
|
|
continue |
|
|
|
|
|
shares = transaction.get('amounts', {}).get('shares') |
|
|
price_per_share = transaction.get('amounts', {}).get('pricePerShare') |
|
|
|
|
|
if shares is None or price_per_share is None: |
|
|
continue |
|
|
|
|
|
shares = float(shares) |
|
|
price_per_share = float(price_per_share) |
|
|
|
|
|
insider_trade = InsiderTrade( |
|
|
symbol=filing.get('issuer', {}).get('tradingSymbol', symbol), |
|
|
company_name=filing.get('issuer', {}).get('name', ''), |
|
|
insider_name=filing.get('reportingOwner', {}).get('name', ''), |
|
|
position=filing.get('reportingOwner', {}).get('relationship', {}).get('officerTitle', |
|
|
'Insider'), |
|
|
transaction_date=transaction.get('transactionDate', ''), |
|
|
transaction_type=trans_type, |
|
|
shares=int(shares), |
|
|
price=price_per_share, |
|
|
value=shares * price_per_share, |
|
|
form_type=filing.get('documentType', ''), |
|
|
source='SEC-API', |
|
|
filing_date=filing.get('filedAt', '') |
|
|
) |
|
|
all_trades.append(insider_trade) |
|
|
|
|
|
except (ValueError, TypeError, AttributeError) as e: |
|
|
logger.warning(f"Error parsing SEC-API transaction for {symbol}: {e}") |
|
|
continue |
|
|
|
|
|
start_index += limit |
|
|
|
|
|
if len(data['transactions']) < limit: |
|
|
break |
|
|
|
|
|
logger.info(f"Retrieved {len(all_trades)} trades from SEC-API for {symbol}.") |
|
|
return all_trades |
|
|
|
|
|
async def get_eod_insider_trades(self, session: aiohttp.ClientSession, |
|
|
symbol: str) -> List[InsiderTrade]: |
|
|
"""Get insider trades from EOD Historical Data""" |
|
|
trades = [] |
|
|
|
|
|
endpoint = f"insider-transactions/{symbol}.US" |
|
|
|
|
|
data = await self._make_request(session, 'eod', endpoint) |
|
|
if not data: |
|
|
return trades |
|
|
|
|
|
|
|
|
data_to_iterate = data.values() if isinstance(data, dict) else data |
|
|
|
|
|
for trade in data_to_iterate: |
|
|
try: |
|
|
insider_trade = InsiderTrade( |
|
|
symbol=symbol, |
|
|
company_name='', |
|
|
insider_name=trade.get('ownerName', ''), |
|
|
position=trade.get('ownerPosition', ''), |
|
|
transaction_date=trade.get('date', ''), |
|
|
transaction_type=trade.get('transactionType', ''), |
|
|
shares=int(trade.get('transactionAmount', 0) or 0), |
|
|
price=float(trade.get('transactionPrice', 0) or 0), |
|
|
value=float(trade.get('transactionAmount', 0) or 0) * float(trade.get('transactionPrice', 0) or 0), |
|
|
form_type='Form 4', |
|
|
source='EOD' |
|
|
) |
|
|
trades.append(insider_trade) |
|
|
except (ValueError, TypeError) as e: |
|
|
logger.warning(f"Error parsing EOD trade data for {symbol}: {e} -> {trade}") |
|
|
continue |
|
|
|
|
|
logger.info(f"Retrieved {len(trades)} trades from EOD for {symbol}") |
|
|
return trades |
|
|
|
|
|
async def get_tradefeeds_insider_trades(self, session: aiohttp.ClientSession, |
|
|
symbol: str, limit: int = 100) -> List[InsiderTrade]: |
|
|
"""Get insider trades from Tradefeeds API""" |
|
|
trades = [] |
|
|
endpoint = "insider_transactions" |
|
|
params = {'symbol': symbol, 'limit': limit} |
|
|
|
|
|
data = await self._make_request(session, 'tradefeeds', endpoint, params) |
|
|
if not data or 'data' not in data: |
|
|
return trades |
|
|
|
|
|
for trade in data['data']: |
|
|
try: |
|
|
shares = int(trade.get('sharesTraded', 0) or 0) |
|
|
price = float(trade.get('averagePrice', 0) or 0) |
|
|
insider_trade = InsiderTrade( |
|
|
symbol=trade.get('symbol', ''), |
|
|
company_name=trade.get('companyName', ''), |
|
|
insider_name=trade.get('insiderName', ''), |
|
|
position=trade.get('relationship', ''), |
|
|
transaction_date=trade.get('transactionDate', ''), |
|
|
transaction_type=trade.get('transactionCode', ''), |
|
|
shares=shares, |
|
|
price=price, |
|
|
value=shares * price, |
|
|
form_type='Form 4', |
|
|
source='Tradefeeds' |
|
|
) |
|
|
trades.append(insider_trade) |
|
|
except (ValueError, TypeError) as e: |
|
|
logger.warning(f"Error parsing Tradefeeds trade data for {symbol}: {e} -> {trade}") |
|
|
continue |
|
|
|
|
|
logger.info(f"Retrieved {len(trades)} trades from Tradefeeds for {symbol}") |
|
|
return trades |
|
|
|
|
|
async def get_factored_ai_insider_trades(self, session: aiohttp.ClientSession, |
|
|
symbol: str) -> List[InsiderTrade]: |
|
|
"""Get insider trades from Factored.AI (S&P 500 companies only)""" |
|
|
trades = [] |
|
|
try: |
|
|
url = f"https://raw.githubusercontent.com/Factored-AI/insider-trading/main/data/{symbol}_insider_trades.json" |
|
|
|
|
|
async with session.get(url) as response: |
|
|
if response.status == 200: |
|
|
data = await response.json() |
|
|
|
|
|
for trade in data: |
|
|
try: |
|
|
shares = int(trade.get('shares', 0) or 0) |
|
|
price = float(trade.get('price', 0) or 0) |
|
|
insider_trade = InsiderTrade( |
|
|
symbol=symbol, |
|
|
company_name=trade.get('company_name', ''), |
|
|
insider_name=trade.get('insider_name', ''), |
|
|
position=trade.get('position', ''), |
|
|
transaction_date=trade.get('transaction_date', ''), |
|
|
transaction_type=trade.get('transaction_type', ''), |
|
|
shares=shares, |
|
|
price=price, |
|
|
value=shares * price, |
|
|
form_type='Form 4', |
|
|
source='Factored.AI' |
|
|
) |
|
|
trades.append(insider_trade) |
|
|
except (ValueError, TypeError) as e: |
|
|
logger.warning(f"Error parsing Factored.AI trade data for {symbol}: {e} -> {trade}") |
|
|
continue |
|
|
|
|
|
logger.info(f"Retrieved {len(trades)} trades from Factored.AI for {symbol}") |
|
|
else: |
|
|
logger.info(f"No Factored.AI data found for {symbol} (status: {response.status})") |
|
|
|
|
|
except aiohttp.ClientError as e: |
|
|
logger.error(f"Failed to retrieve data from Factored.AI for {symbol}: {e}") |
|
|
|
|
|
return trades |
|
|
|
|
|
def deduplicate_trades(self, trades: List[InsiderTrade]) -> List[InsiderTrade]: |
|
|
"""Advanced deduplication based on multiple factors""" |
|
|
if not trades: |
|
|
return [] |
|
|
|
|
|
|
|
|
unique_trades_map = {trade.hash: trade for trade in trades} |
|
|
unique_trades = list(unique_trades_map.values()) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
unique_trades.sort(key=lambda t: (t.insider_name.lower(), t.transaction_date, t.value)) |
|
|
|
|
|
if not unique_trades: |
|
|
return [] |
|
|
|
|
|
final_trades = [unique_trades[0]] |
|
|
for i in range(1, len(unique_trades)): |
|
|
prev_trade = final_trades[-1] |
|
|
curr_trade = unique_trades[i] |
|
|
|
|
|
|
|
|
if (prev_trade.insider_name.lower() == curr_trade.insider_name.lower() and |
|
|
prev_trade.transaction_date == curr_trade.transaction_date and |
|
|
abs(prev_trade.value - curr_trade.value) < 1000): |
|
|
|
|
|
|
|
|
if len(curr_trade.company_name) > len(prev_trade.company_name): |
|
|
final_trades[-1] = curr_trade |
|
|
else: |
|
|
final_trades.append(curr_trade) |
|
|
|
|
|
logger.info(f"Deduplication: {len(trades)} -> {len(final_trades)} trades") |
|
|
return final_trades |
|
|
|
|
|
def filter_trades_by_period(self, trades: List[InsiderTrade], |
|
|
days: int = 7, end_date: Optional[datetime] = None) -> List[InsiderTrade]: |
|
|
"""Filter trades by time period (default: last 7 days)""" |
|
|
if end_date is None: |
|
|
end_date = datetime.now() |
|
|
|
|
|
start_date = end_date - timedelta(days=days) |
|
|
|
|
|
filtered_trades = [] |
|
|
for trade in trades: |
|
|
trade_date = trade.transaction_date_dt |
|
|
if trade_date and start_date <= trade_date <= end_date: |
|
|
filtered_trades.append(trade) |
|
|
|
|
|
logger.info(f"Filtered {len(filtered_trades)} trades from last {days} days") |
|
|
return filtered_trades |
|
|
|
|
|
async def get_all_insider_trades(self, symbol: str, limit_per_api: int = 100, |
|
|
filter_days: int = 7) -> List[InsiderTrade]: |
|
|
"""Aggregate insider trades from all available APIs with async processing""" |
|
|
timeout = aiohttp.ClientTimeout(total=self.session_timeout) |
|
|
|
|
|
async with aiohttp.ClientSession(timeout=timeout) as session: |
|
|
|
|
|
tasks = [ |
|
|
|
|
|
self.get_sec_api_insider_trades(session, symbol, limit_per_api), |
|
|
|
|
|
|
|
|
|
|
|
] |
|
|
|
|
|
|
|
|
results = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
|
|
|
|
|
|
all_trades = [] |
|
|
api_names = ['FMP', 'SEC-API', 'EOD', 'Tradefeeds', 'Factored.AI'] |
|
|
|
|
|
for i, result in enumerate(results): |
|
|
if isinstance(result, Exception): |
|
|
logger.error(f"Error fetching from {api_names[i]} for {symbol}: {result}") |
|
|
elif result: |
|
|
all_trades.extend(result) |
|
|
|
|
|
|
|
|
unique_trades = self.deduplicate_trades(all_trades) |
|
|
|
|
|
|
|
|
if filter_days > 0: |
|
|
unique_trades = self.filter_trades_by_period(unique_trades, filter_days) |
|
|
|
|
|
logger.info(f"Total unique trades found for {symbol}: {len(unique_trades)}") |
|
|
return unique_trades |
|
|
|
|
|
def generate_trading_report(self, trades: List[InsiderTrade], |
|
|
period_days: int = 7) -> Optional[TradingReport]: |
|
|
"""Generate comprehensive trading report""" |
|
|
if not trades: |
|
|
return None |
|
|
|
|
|
|
|
|
symbol = trades[0].symbol |
|
|
|
|
|
|
|
|
company_name_counter = Counter(t.company_name for t in trades if t.company_name) |
|
|
company_name = company_name_counter.most_common(1)[0][0] if company_name_counter else "N/A" |
|
|
|
|
|
total_trades = len(trades) |
|
|
|
|
|
|
|
|
buy_trades = [t for t in trades if 'buy' in t.transaction_type.lower() or t.transaction_type.lower() == 'a'] |
|
|
sell_trades = [t for t in trades if 'sell' in t.transaction_type.lower() or t.transaction_type.lower() == 'd'] |
|
|
|
|
|
total_value_bought = sum(abs(t.value) for t in buy_trades) |
|
|
total_value_sold = sum(abs(t.value) for t in sell_trades) |
|
|
net_value = total_value_bought - total_value_sold |
|
|
|
|
|
|
|
|
insider_stats = {} |
|
|
for trade in trades: |
|
|
name = trade.insider_name |
|
|
if name not in insider_stats: |
|
|
insider_stats[name] = {'count': 0, 'value': 0} |
|
|
insider_stats[name]['count'] += 1 |
|
|
insider_stats[name]['value'] += abs(trade.value) |
|
|
|
|
|
top_insiders = sorted( |
|
|
[(name, stats['count'], stats['value']) for name, stats in insider_stats.items()], |
|
|
key=lambda x: x[2], reverse=True |
|
|
)[:5] |
|
|
|
|
|
|
|
|
trade_dates = [t.transaction_date_dt for t in trades if t.transaction_date_dt] |
|
|
period_start = min(trade_dates) if trade_dates else datetime.now() - timedelta(days=period_days) |
|
|
period_end = max(trade_dates) if trade_dates else datetime.now() |
|
|
|
|
|
return TradingReport( |
|
|
symbol=symbol, |
|
|
company_name=company_name, |
|
|
total_trades=total_trades, |
|
|
buy_trades=len(buy_trades), |
|
|
sell_trades=len(sell_trades), |
|
|
total_value_bought=total_value_bought, |
|
|
total_value_sold=total_value_sold, |
|
|
net_value=net_value, |
|
|
top_insiders=top_insiders, |
|
|
period_start=period_start, |
|
|
period_end=period_end, |
|
|
trades=trades |
|
|
) |
|
|
|
|
|
''' |
|
|
def format_telegram_message(self, report: TradingReport) -> str: |
|
|
"""Generate formatted Telegram message from trading report""" |
|
|
if not report: |
|
|
return "β No insider trading data found for the specified period." |
|
|
|
|
|
trend_emoji = "π" if report.net_value > 0 else "π" if report.net_value < 0 else "β‘οΈ" |
|
|
|
|
|
message = f"π **INSIDER TRADING REPORT** π\n\n" |
|
|
message += f"π’ **Company:** {report.company_name} (${report.symbol})\n" |
|
|
message += f"π
**Period:** {report.period_start.strftime('%Y-%m-%d')} to {report.period_end.strftime('%Y-%m-%d')}\n\n" |
|
|
|
|
|
message += "π **SUMMARY**\n" |
|
|
message += f"β’ Total Trades: {report.total_trades}\n" |
|
|
message += f"β’ Buy Trades: {report.buy_trades} π’\n" |
|
|
message += f"β’ Sell Trades: {report.sell_trades} π΄\n\n" |
|
|
|
|
|
message += "π° **FINANCIAL IMPACT**\n" |
|
|
message += f"β’ Total Bought: ${report.total_value_bought:,.2f}\n" |
|
|
message += f"β’ Total Sold: ${report.total_value_sold:,.2f}\n" |
|
|
message += f"β’ Net Position: {trend_emoji} ${report.net_value:,.2f}\n\n" |
|
|
|
|
|
message += "π₯ **TOP INSIDERS BY ACTIVITY**\n" |
|
|
message += "```\n" |
|
|
message += "ββββββββββββββββββββββββββββ¬βββββββββ¬βββββββββββββββ\n" |
|
|
message += "β Name β Trades β Total Value β\n" |
|
|
message += "ββββββββββββββββββββββββββββΌβββββββββΌβββββββββββββββ€\n" |
|
|
|
|
|
for name, count, value in report.top_insiders: |
|
|
display_name = (name[:23] + '..') if len(name) > 25 else name |
|
|
message += f"β {display_name:<24} β {count:<6} β ${value: >10,.0f} β\n" |
|
|
|
|
|
message += "ββββββββββββββββββββββββββββ΄βββββββββ΄βββββββββββββββ\n```\n" |
|
|
|
|
|
if report.trades: |
|
|
message += "\nπ **RECENT SIGNIFICANT TRADES**\n" |
|
|
top_trades = sorted(report.trades, key=lambda x: abs(x.value), reverse=True)[:5] |
|
|
|
|
|
message += "```\n" |
|
|
message += "βββββββββββββββββββββ¬βββββββ¬βββββββββββ¬βββββββββββ¬ββββββββββββββ\n" |
|
|
message += "β Insider Name β Type β Date β Shares β Total Value β\n" |
|
|
message += "βββββββββββββββββββββΌβββββββΌβββββββββββΌβββββββββββΌββββββββββββββ€\n" |
|
|
|
|
|
for trade in top_trades: |
|
|
insider_name = (trade.insider_name[:16] + '..') if len(trade.insider_name) > 18 else trade.insider_name |
|
|
trans_type = "BUY" if "buy" in trade.transaction_type.lower() or "a" == trade.transaction_type.lower() else "SELL" |
|
|
type_display = f"{'π’' if trans_type == 'BUY' else 'π΄'}{trans_type}" |
|
|
|
|
|
date_str = trade.transaction_date_dt.strftime( |
|
|
'%Y-%m-%d') if trade.transaction_date_dt else trade.transaction_date[:10] |
|
|
|
|
|
message += f"β {insider_name:<17} β {type_display:<4} β {date_str:<8} β {trade.shares:>8,d} β ${abs(trade.value):>10,.0f} β\n" |
|
|
|
|
|
message += "βββββββββββββββββββββ΄βββββββ΄βββββββββββ΄βββββββββββ΄ββββββββββββββ\n```\n" |
|
|
|
|
|
sources = sorted(list(set(t.source for t in report.trades))) |
|
|
message += f"\nπ **Sources:** {', '.join(sources)}\n" |
|
|
message += f"β° Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" |
|
|
|
|
|
return message |
|
|
''' |
|
|
|
|
|
def format_telegram_message(self, report: TradingReport) -> str: |
|
|
"""Generate formatted Telegram message from trading report""" |
|
|
if not report: |
|
|
return "β No insider trading data found for the specified period." |
|
|
|
|
|
trend_emoji = "π" if report.net_value > 0 else "π" if report.net_value < 0 else "β‘οΈ" |
|
|
|
|
|
message = f"π *INSIDER TRADING REPORT* π\n\n" |
|
|
message += f"π’ *Company:* {report.company_name} (${report.symbol})\n" |
|
|
message += f"π
*Period:* {report.period_start.strftime('%Y-%m-%d')} to {report.period_end.strftime('%Y-%m-%d')}\n\n" |
|
|
|
|
|
message += "π *SUMMARY*\n" |
|
|
message += f"β’ Total Trades: {report.total_trades}\n" |
|
|
message += f"β’ Buy Trades: {report.buy_trades} π’\n" |
|
|
message += f"β’ Sell Trades: {report.sell_trades} π΄\n\n" |
|
|
|
|
|
message += "π° *FINANCIAL IMPACT*\n" |
|
|
message += f"β’ Total Bought: ${report.total_value_bought:,.2f}\n" |
|
|
message += f"β’ Total Sold: ${report.total_value_sold:,.2f}\n" |
|
|
message += f"β’ Net Position: {trend_emoji} ${report.net_value:,.2f}\n\n" |
|
|
|
|
|
message += "π₯ *TOP INSIDERS BY ACTIVITY*\n" |
|
|
for i, (name, count, value) in enumerate(report.top_insiders, 1): |
|
|
display_name = (name[:35] + '...') if len(name) > 38 else name |
|
|
message += f"`{i}.` {display_name}\n" |
|
|
message += f" β Trades: {count} | Value: ${value:,.0f}\n" |
|
|
|
|
|
if report.trades: |
|
|
message += "\nπ *RECENT SIGNIFICANT TRADES*\n" |
|
|
top_trades = sorted(report.trades, key=lambda x: abs(x.value), reverse=True)[:5] |
|
|
|
|
|
for trade in top_trades: |
|
|
insider_name = (trade.insider_name[:30] + '...') if len(trade.insider_name) > 33 else trade.insider_name |
|
|
trans_type = "BUY" if "buy" in trade.transaction_type.lower() or "a" == trade.transaction_type.lower() else "SELL" |
|
|
emoji = 'π’' if trans_type == 'BUY' else 'π΄' |
|
|
|
|
|
date_str = trade.transaction_date_dt.strftime( |
|
|
'%Y-%m-%d') if trade.transaction_date_dt else trade.transaction_date[:10] |
|
|
|
|
|
message += f"\n{emoji} *{trans_type}* | {insider_name}\n" |
|
|
message += f"`Date: {date_str}`\n" |
|
|
message += f"`Shares: {trade.shares:,d}`\n" |
|
|
message += f"`Value: ${abs(trade.value):,.0f}`\n" |
|
|
|
|
|
sources = sorted(list(set(t.source for t in report.trades))) |
|
|
message += f"\nπ *Sources:* {', '.join(sources)}\n" |
|
|
message += f"β° Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" |
|
|
|
|
|
return message |
|
|
|
|
|
|
|
|
|
|
|
def format_telegram_message_ultra_compact(self, report: TradingReport) -> str: |
|
|
"""Ultra compact format for high-frequency monitoring""" |
|
|
if not report or not report.trades: |
|
|
return f"β«οΈ ${report.symbol}: No activity" |
|
|
|
|
|
trend = "π" if report.net_value > 0 else "π" if report.net_value < 0 else "β‘οΈ" |
|
|
|
|
|
|
|
|
biggest_trade = max(report.trades, key=lambda x: abs(x.value)) |
|
|
|
|
|
message = f"π¨ **${report.symbol}** | {trend} Net: ${report.net_value:,.0f} | {report.total_trades} trades" |
|
|
|
|
|
trans_type = "BUY" if "buy" in biggest_trade.transaction_type.lower() else "SELL" |
|
|
|
|
|
insider_short = biggest_trade.insider_name.split()[-1] |
|
|
message += f"\n Lgst: {trans_type} ${abs(biggest_trade.value):,.0f} by {insider_short}" |
|
|
|
|
|
return message |
|
|
|
|
|
|
|
|
def format_telegram_message_short(self, report: TradingReport) -> str: |
|
|
"""Generate short Telegram message for quick updates""" |
|
|
if not report: |
|
|
return f"β No insider trading activity found." |
|
|
|
|
|
trend_emoji = "π" if report.net_value > 0 else "π" if report.net_value < 0 else "β‘οΈ" |
|
|
|
|
|
message = f"π¨ **${report.symbol} Insider Alert** π¨\n\n" |
|
|
message += f"{trend_emoji} Net Value: **${report.net_value:,.0f}**\n" |
|
|
message += f"π Total Trades: {report.total_trades} ({report.buy_trades}B / {report.sell_trades}S)\n\n" |
|
|
|
|
|
if report.top_insiders: |
|
|
top_insider_name = report.top_insiders[0][0] |
|
|
top_insider_value = report.top_insiders[0][2] |
|
|
|
|
|
display_name = (top_insider_name[:25] + '...') if len(top_insider_name) > 28 else top_insider_name |
|
|
message += f"π€ Top Insider: {display_name}\n" |
|
|
message += f"π° Total Activity: ${top_insider_value:,.0f}" |
|
|
else: |
|
|
message += "π€ No top insider data available." |
|
|
|
|
|
return message |
|
|
|
|
|
async def get_multiple_symbols_report(self, symbols: List[str], |
|
|
filter_days: int = 7) -> Dict[str, TradingReport]: |
|
|
"""Get reports for multiple symbols concurrently""" |
|
|
tasks = [self.get_all_insider_trades(symbol, filter_days=filter_days) for symbol in symbols] |
|
|
|
|
|
results = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
|
|
|
reports = {} |
|
|
for i, result in enumerate(results): |
|
|
symbol = symbols[i] |
|
|
if isinstance(result, Exception): |
|
|
logger.error(f"Error processing {symbol}: {result}") |
|
|
reports[symbol] = None |
|
|
else: |
|
|
reports[symbol] = self.generate_trading_report(result, filter_days) |
|
|
|
|
|
return reports |
|
|
|
|
|
def trades_to_dataframe(self, trades: List[InsiderTrade]) -> pd.DataFrame: |
|
|
"""Convert list of InsiderTrade objects to pandas DataFrame""" |
|
|
if not trades: |
|
|
return pd.DataFrame() |
|
|
|
|
|
data = [trade.__dict__ for trade in trades] |
|
|
|
|
|
df = pd.DataFrame(data) |
|
|
|
|
|
df['Transaction Date'] = pd.to_datetime(df['Transaction Date'], errors='coerce') |
|
|
return df |
|
|
|
|
|
|
|
|
|
|
|
async def example_single_symbol(): |
|
|
"""Example: Get report for single symbol""" |
|
|
aggregator = InsiderTradingAggregator() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
symbol = 'AAPL' |
|
|
logger.info(f"--- Running single symbol example for ${symbol} ---") |
|
|
trades = await aggregator.get_all_insider_trades(symbol, filter_days=30) |
|
|
report = aggregator.generate_trading_report(trades) |
|
|
|
|
|
if report: |
|
|
telegram_msg = aggregator.format_telegram_message(report) |
|
|
print("\n--- Full Telegram Message ---") |
|
|
print(telegram_msg) |
|
|
else: |
|
|
print(f"No recent trades found for {symbol}") |
|
|
|
|
|
|
|
|
async def example_multiple_symbols(): |
|
|
"""Example: Get reports for multiple symbols""" |
|
|
aggregator = InsiderTradingAggregator() |
|
|
|
|
|
symbols = ['AAPL', 'GOOGL', 'MSFT', 'TSLA'] |
|
|
logger.info(f"\n--- Running multiple symbols example for {', '.join(symbols)} ---") |
|
|
reports = await aggregator.get_multiple_symbols_report(symbols, filter_days=30) |
|
|
|
|
|
for symbol, report in reports.items(): |
|
|
print(f"\n--- Report for ${symbol} ---") |
|
|
if report and report.total_trades > 0: |
|
|
short_msg = aggregator.format_telegram_message_short(report) |
|
|
print(short_msg) |
|
|
else: |
|
|
print(f"No recent trades found for {symbol}") |
|
|
|
|
|
|
|
|
async def main(): |
|
|
await example_single_symbol() |
|
|
print("\n" + "=" * 50) |
|
|
await example_multiple_symbols() |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
|
|
|
asyncio.run(main()) |
|
|
|