add new oi_scraper directory for collecting open interest data and update the main EA to integrate with the scraper functionality
258 lines
8.3 KiB
Python
258 lines
8.3 KiB
Python
import os
|
|
import logging
|
|
import json
|
|
from playwright.sync_api import sync_playwright
|
|
from dotenv import load_dotenv
|
|
import pandas as pd
|
|
|
|
load_dotenv()
|
|
|
|
# Configuration
|
|
CME_USERNAME = os.getenv("CME_USERNAME")
|
|
CME_PASSWORD = os.getenv("CME_PASSWORD")
|
|
PRODUCT_URL = os.getenv(
|
|
"PRODUCT_URL",
|
|
"https://cmegroup.quikstrike.net/User/QuikStrikeView.aspx?pid=40&viewitemid=IntegratedOpenInterestTool",
|
|
)
|
|
INVESTING_URL = os.getenv("INVESTING_URL", "https://www.investing.com/commodities/gold")
|
|
CSV_OUTPUT_PATH = os.getenv("CSV_OUTPUT_PATH", "./oi_data.csv")
|
|
TOP_N_STRIKES = int(os.getenv("TOP_N_STRIKES", "3"))
|
|
HEADLESS = os.getenv("HEADLESS", "false").lower() == "true"
|
|
TIMEOUT_SECONDS = int(os.getenv("TIMEOUT_SECONDS", "30"))
|
|
RETRY_ATTEMPTS = int(os.getenv("RETRY_ATTEMPTS", "3"))
|
|
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
|
|
COOKIE_FILE = "./cookies.json"
|
|
|
|
logging.basicConfig(level=getattr(logging, LOG_LEVEL))
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def save_cookies(context):
|
|
cookies = context.cookies()
|
|
with open(COOKIE_FILE, "w") as f:
|
|
json.dump(cookies, f)
|
|
logger.info("Cookies saved to file")
|
|
|
|
|
|
def load_cookies(context):
|
|
if os.path.exists(COOKIE_FILE):
|
|
with open(COOKIE_FILE, "r") as f:
|
|
cookies = json.load(f)
|
|
context.add_cookies(cookies)
|
|
logger.info("Cookies loaded from file")
|
|
return True
|
|
return False
|
|
|
|
|
|
def is_logged_in(page):
|
|
page.goto(PRODUCT_URL, timeout=TIMEOUT_SECONDS * 1000)
|
|
page.wait_for_load_state("networkidle", timeout=TIMEOUT_SECONDS * 1000)
|
|
return "login" not in page.url.lower()
|
|
|
|
|
|
def login_to_cme(page):
|
|
logger.info("Attempting to login to CME QuikStrike...")
|
|
|
|
page.goto(
|
|
"https://www.cmegroup.com/account/login.html", timeout=TIMEOUT_SECONDS * 1000
|
|
)
|
|
|
|
try:
|
|
page.fill('input[name="username"]', CME_USERNAME)
|
|
page.fill('input[name="password"]', CME_PASSWORD)
|
|
page.click('button[type="submit"]')
|
|
|
|
page.wait_for_load_state("networkidle", timeout=TIMEOUT_SECONDS * 1000)
|
|
|
|
if "login" in page.url.lower():
|
|
logger.error("Login failed - still on login page")
|
|
page.screenshot(path="login_failed.png")
|
|
return False
|
|
|
|
logger.info("Login successful")
|
|
page.screenshot(path="login_success.png")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Login error: {e}")
|
|
page.screenshot(path="login_error.png")
|
|
return False
|
|
|
|
|
|
def navigate_to_oi_heatmap(page):
|
|
logger.info(f"Navigating to OI Heatmap: {PRODUCT_URL}")
|
|
page.goto(PRODUCT_URL, timeout=TIMEOUT_SECONDS * 1000)
|
|
page.wait_for_load_state("networkidle", timeout=TIMEOUT_SECONDS * 1000)
|
|
|
|
|
|
def extract_oi_data(page):
|
|
logger.info("Extracting OI data from Gold matrix table...")
|
|
|
|
call_levels = []
|
|
put_levels = []
|
|
|
|
table = page.locator("table.grid-thm").first
|
|
rows = table.locator("tbody tr").all()
|
|
|
|
for row in rows:
|
|
try:
|
|
cells = row.locator("td").all()
|
|
if len(cells) < 3:
|
|
continue
|
|
|
|
strike_cell = cells[0].text_content().strip()
|
|
if not strike_cell or not strike_cell.replace(".", "").isdigit():
|
|
continue
|
|
|
|
strike = float(strike_cell)
|
|
|
|
cells_with_data = cells[2:]
|
|
|
|
for i in range(0, len(cells_with_data), 2):
|
|
if i + 1 >= len(cells_with_data):
|
|
break
|
|
|
|
call_cell = cells_with_data[i]
|
|
put_cell = cells_with_data[i + 1]
|
|
|
|
call_text = call_cell.text_content().strip()
|
|
put_text = put_cell.text_content().strip()
|
|
|
|
if call_text and call_text.replace(",", "").isdigit():
|
|
call_oi = int(call_text.replace(",", ""))
|
|
call_levels.append(
|
|
{"Type": "CALL", "Strike": strike, "OI": call_oi}
|
|
)
|
|
|
|
if put_text and put_text.replace(",", "").isdigit():
|
|
put_oi = int(put_text.replace(",", ""))
|
|
put_levels.append({"Type": "PUT", "Strike": strike, "OI": put_oi})
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error parsing row: {e}")
|
|
continue
|
|
|
|
if not call_levels:
|
|
logger.warning("No CALL OI data extracted")
|
|
if not put_levels:
|
|
logger.warning("No PUT OI data extracted")
|
|
|
|
call_df = (
|
|
pd.DataFrame(call_levels).nlargest(TOP_N_STRIKES, "OI")
|
|
if call_levels
|
|
else pd.DataFrame()
|
|
)
|
|
put_df = (
|
|
pd.DataFrame(put_levels).nlargest(TOP_N_STRIKES, "OI")
|
|
if put_levels
|
|
else pd.DataFrame()
|
|
)
|
|
|
|
result_df = pd.concat([call_df, put_df], ignore_index=True)
|
|
|
|
logger.info(f"Extracted {len(result_df)} OI levels")
|
|
return result_df
|
|
|
|
|
|
def scrape_investing_gold_price(page):
|
|
logger.info(f"Scraping gold price from: {INVESTING_URL}")
|
|
|
|
try:
|
|
page.goto(INVESTING_URL, timeout=TIMEOUT_SECONDS * 1000)
|
|
page.wait_for_load_state("domcontentloaded", timeout=TIMEOUT_SECONDS * 1000)
|
|
|
|
price_locator = page.locator('div[data-test="instrument-price-last"]')
|
|
|
|
if price_locator.count() > 0:
|
|
price_text = price_locator.text_content().strip()
|
|
price_text = price_text.replace(",", "")
|
|
price = float(price_text)
|
|
logger.info(f"Extracted gold price: {price}")
|
|
return price
|
|
else:
|
|
logger.warning("Price element not found, trying alternative selector")
|
|
alt_locator = page.locator(".text-5xl\\/9")
|
|
if alt_locator.count() > 0:
|
|
price_text = alt_locator.text_content().strip()
|
|
price_text = price_text.replace(",", "")
|
|
price = float(price_text)
|
|
logger.info(f"Extracted gold price (alt): {price}")
|
|
return price
|
|
|
|
logger.warning("Could not extract gold price")
|
|
return 0.0
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error scraping gold price: {e}")
|
|
return 0.0
|
|
|
|
|
|
def export_to_csv(df, future_price=0.0):
|
|
output_path = CSV_OUTPUT_PATH
|
|
|
|
with open(output_path, "w") as f:
|
|
df.to_csv(f, index=False)
|
|
f.write("\n[Price]\n")
|
|
f.write(f"FuturePrice,{future_price}\n")
|
|
|
|
logger.info(f"Exported OI data and price to {output_path}")
|
|
|
|
|
|
def run_scraper():
|
|
if not CME_USERNAME or not CME_PASSWORD:
|
|
logger.error("Missing CME_USERNAME or CME_PASSWORD in .env file")
|
|
return
|
|
|
|
future_price = 0.0
|
|
|
|
for attempt in range(RETRY_ATTEMPTS):
|
|
try:
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(headless=HEADLESS)
|
|
context = browser.new_context()
|
|
page = context.new_page()
|
|
|
|
loaded_cookies = load_cookies(context)
|
|
page2 = context.new_page()
|
|
|
|
if loaded_cookies and is_logged_in(page2):
|
|
logger.info("Using existing session (cookies)")
|
|
else:
|
|
logger.info("No valid session found, logging in...")
|
|
if not login_to_cme(page):
|
|
browser.close()
|
|
if attempt < RETRY_ATTEMPTS - 1:
|
|
logger.info(
|
|
f"Retrying... Attempt {attempt + 2}/{RETRY_ATTEMPTS}"
|
|
)
|
|
continue
|
|
else:
|
|
logger.error("All login attempts failed")
|
|
return
|
|
save_cookies(context)
|
|
|
|
navigate_to_oi_heatmap(page)
|
|
oi_data = extract_oi_data(page)
|
|
|
|
if not oi_data.empty:
|
|
logger.info("Extracting gold price from investing.com...")
|
|
future_price = scrape_investing_gold_price(page)
|
|
|
|
export_to_csv(oi_data, future_price)
|
|
else:
|
|
logger.warning("No OI data extracted")
|
|
|
|
browser.close()
|
|
break
|
|
|
|
except Exception as e:
|
|
logger.error(f"Scraper error (attempt {attempt + 1}): {e}")
|
|
if attempt < RETRY_ATTEMPTS - 1:
|
|
logger.info(f"Retrying... Attempt {attempt + 2}/{RETRY_ATTEMPTS}")
|
|
else:
|
|
logger.error("All attempts failed")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run_scraper()
|