Check the status of my API here: https://crypto-predictor-app-0947ef5b3b47.herokuapp.com
This is a site my friend and I were working on as I was doing this project. Take a look!
!pip install pandas requests
!pip install matplotlib seaborn
!pip install imbalanced-learn
!pip install ta
import pandas as pd
import requests
import matplotlib.pyplot as plt
import seaborn as sns
from imblearn.over_sampling import SMOTE
import numpy as np
import time
import ta
from ta import add_all_ta_features
from ta.utils import dropna
from sklearn.metrics import accuracy_score, classification_report, roc_auc_score
def get_historical_data(crypto_id, days):
time.sleep(5) # Introducing a delay of 5 seconds between requests
url = f”https://api.coingecko.com/api/v3/coins/{crypto_id}/market_chart?vs_currency=usd&days={days}”
response = requests.get(url)
data = response.json()
# Check if ‘prices’, ‘market_caps’, and ‘total_volumes’ keys exist
if ‘prices’ in data and ‘market_caps’ in data and ‘total_volumes’ in data:
combined_data = [
{
“date”: p[0],
“price”: p[1],
“market_cap”: m[1],
“24h_volume”: v[1]
}
for p, m, v in zip(data[‘prices’], data[‘market_caps’], data[‘total_volumes’])
]
return combined_data
else:
print(f”Error fetching data for {crypto_id}. Response: {data}”)
return []
def get_top_n_coins(n):
url = f”https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd&order=market_cap_desc&limit={n}&sparkline=false&price_change_percentage=false”
response = requests.get(url)
# Check if the response is successful
if response.status_code != 200:
print(f”Error {response.status_code}: {response.text}”)
return []
try:
data = response.json()
return [coin[‘id’] for coin in data]
except (TypeError, KeyError) as e:
print(f”Unexpected response format: {response.text}”)
return []
def get_historical_data_for_coins(coins, days=”90″):
historical_data = {}
for coin in coins:
historical_data[coin] = get_historical_data(coin, days)
return historical_data
def add_indicators(prices):
# Convert to DataFrame
df = pd.DataFrame(prices)
df[“date”] = pd.to_datetime(df[“date”], unit=’ms’) # Convert timestamp to date
df.set_index(“date”, inplace=True)
# Calculate Moving Averages
df[‘1_day_MA’] = df[‘price’].rolling(window=24).mean()
df[‘7_day_MA’] = df[‘price’].rolling(window=7*24).mean()
# Using ta library to add technical indicators
# First drop NaN values
df = dropna(df)
# Adding RSI
df[‘RSI’] = ta.momentum.RSIIndicator(df[‘price’]).rsi()
# Adding MACD and MACD Signal line
MACD = ta.trend.MACD(df[‘price’])
df[‘MACD’] = MACD.macd()
df[‘MACD_signal’] = MACD.macd_signal()
# Adding Bollinger Bands
Bollinger = ta.volatility.BollingerBands(df[‘price’])
df[‘Bollinger_high’] = Bollinger.bollinger_hband()
df[‘Bollinger_low’] = Bollinger.bollinger_lband()
# Adding Market Cap and 24-hour Trading Volume
# Optionally, you can apply scaling or transformations to these features
df[‘market_cap’] = df[‘market_cap’]
df[’24h_volume’] = df[’24h_volume’]
return df
coins = get_top_n_coins(50)
historical_data = get_historical_data_for_coins(coins)
data_with_indicators = {}
for coin, data in historical_data.items():
data_with_indicators[coin] = add_indicators(data)
# Create a list to store individual dataframes
df_list = []
for coin, df in data_with_indicators.items():
df[‘coin’] = coin # Add a new column with the coin name (or ID)
df_list.append(df)
# Concatenate all the dataframes in the list into a single dataframe
combined_df = pd.concat(df_list, axis=0)
data = combined_df.dropna()
# Pseudo-code to avoid the SettingWithCopyWarning
# Create a copy of the data
data_copy = data.copy()
# Shift the price column by -1 days
data_copy[‘price_next_week’] = data_copy.groupby(‘coin’)[‘price’].shift(-24) #hourly data
# Create the target variable. 1 if the price increased in the next week, 0 otherwise.
data_copy[‘target’] = (data_copy[‘price_next_week’] > data_copy[‘price’]).astype(int)
# Drop the ‘price_next_week’ column as it’s not needed anymore
data_copy.drop(‘price_next_week’, axis=1, inplace=True)
from sklearn.model_selection import StratifiedShuffleSplit
# 1. Stratified Split
sss = StratifiedShuffleSplit(n_splits=1, test_size=0.3, random_state=42) # 70% training, 30% for validation+test
# This will give indices for training and (validation+test) sets, preserving the distribution of coins
for train_idx, temp_idx in sss.split(data_copy, data_copy[‘coin’]):
train_data = data_copy.iloc[train_idx]
temp_data = data_copy.iloc[temp_idx]
# Now, split the temp_data into validation and test datasets (50-50 split)
valid_data = temp_data.sample(frac=0.5, random_state=42)
test_data = temp_data.drop(valid_data.index)
# 2. One-Hot Encoding
train_data = pd.get_dummies(train_data, columns=[‘coin’])
valid_data = pd.get_dummies(valid_data, columns=[‘coin’])
test_data = pd.get_dummies(test_data, columns=[‘coin’])
# 3. Data Preparation
X_train = train_data.drop(‘target’, axis=1)
y_train = train_data[‘target’]
X_valid = valid_data.drop(‘target’, axis=1)
y_valid = valid_data[‘target’]
from sklearn.ensemble import RandomForestClassifier
# 4. Model Initialization
rf_model = RandomForestClassifier(n_estimators=100, random_state=42)
# 5. Training
rf_model.fit(X_train, y_train)
# 6. Validation
y_pred_rf = rf_model.predict(X_valid)
accuracy_rf = accuracy_score(y_valid, y_pred_rf)
roc_auc_rf = roc_auc_score(y_valid, y_pred_rf)
report_rf = classification_report(y_valid, y_pred_rf)
print(f”Accuracy (Random Forest): {accuracy_rf}”)
print(f”ROC-AUC (Random Forest): {roc_auc_rf}”)
print(“Classification Report (Random Forest):”)
print(report_rf)
# 7. Preparation
X_test = test_data.drop(‘target’, axis=1)
y_test = test_data[‘target’]
# 8. Prediction
y_pred_test = rf_model.predict(X_test)
# 9. Evaluation
accuracy_test = accuracy_score(y_test, y_pred_test)
roc_auc_test = roc_auc_score(y_test, y_pred_test)
report_test = classification_report(y_test, y_pred_test)
print(f”Accuracy (Test Set – Random Forest): {accuracy_test*100:.2f}%”)
print(f”ROC-AUC (Test Set – Random Forest): {roc_auc_test*100:.2f}%”)
print(“Classification Report (Test Set – Random Forest):”)
print(report_test)