Add price caching to prevent repeated file reads and improve performance. Implement multi-path search for CSV files with fallback options. Add comprehensive logging for CSV load success/failure states. Update dashboard to display CSV loading status. Simplify scraper CSV output format and automate file transfer to terminal MQL5 Files directory.
365 lines
12 KiB
Python
365 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
CME OI Scraper - Extracts Open Interest data from CME QuikStrike and gold price from investing.com
|
|
Usage: python main.py
|
|
Requires: pip install -r requirements.txt
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
import json
|
|
from datetime import datetime
|
|
from playwright.sync_api import sync_playwright
|
|
from dotenv import load_dotenv
|
|
import pandas as pd
|
|
|
|
load_dotenv()
|
|
|
|
# Configuration
|
|
CME_USERNAME = os.getenv("CME_USERNAME")
|
|
CME_PASSWORD = os.getenv("CME_PASSWORD")
|
|
CME_LOGIN_URL = os.getenv(
|
|
"CME_LOGIN_URL", "https://login.cmegroup.com/sso/accountstatus/showAuth.action"
|
|
)
|
|
QUIKSTRIKE_URL = (
|
|
"https://www.cmegroup.com/tools-information/quikstrike/open-interest-heatmap.html"
|
|
)
|
|
QUIKSTRIKE_URL = (
|
|
"https://www.cmegroup.com/tools-information/quikstrike/open-interest-heatmap.html"
|
|
)
|
|
INVESTING_URL = os.getenv("INVESTING_URL", "https://www.investing.com/commodities/gold")
|
|
CSV_OUTPUT_PATH = os.getenv("CSV_OUTPUT_PATH", "./oi_data.csv")
|
|
TOP_N_STRIKES = int(os.getenv("TOP_N_STRIKES", "3"))
|
|
HEADLESS = os.getenv("HEADLESS", "false").lower() == "true"
|
|
TIMEOUT_SECONDS = int(os.getenv("TIMEOUT_SECONDS", "30"))
|
|
RETRY_ATTEMPTS = int(os.getenv("RETRY_ATTEMPTS", "3"))
|
|
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
|
|
COOKIE_FILE = "./cookies.json"
|
|
|
|
logging.basicConfig(level=getattr(logging, LOG_LEVEL))
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def save_cookies(context):
|
|
cookies = context.cookies()
|
|
with open(COOKIE_FILE, "w") as f:
|
|
json.dump(cookies, f)
|
|
logger.info("Cookies saved to file")
|
|
|
|
|
|
def load_cookies(context):
|
|
if os.path.exists(COOKIE_FILE):
|
|
with open(COOKIE_FILE, "r") as f:
|
|
cookies = json.load(f)
|
|
context.add_cookies(cookies)
|
|
logger.info("Cookies loaded from file")
|
|
return True
|
|
return False
|
|
|
|
|
|
def delete_cookies():
|
|
if os.path.exists(COOKIE_FILE):
|
|
os.remove(COOKIE_FILE)
|
|
logger.info("Cookies deleted")
|
|
|
|
|
|
def are_cookies_valid(page):
|
|
logger.info("Checking if cookies are valid...")
|
|
page.goto(QUIKSTRIKE_URL, timeout=TIMEOUT_SECONDS * 1000)
|
|
page.wait_for_load_state("domcontentloaded", timeout=TIMEOUT_SECONDS * 1000)
|
|
page.wait_for_timeout(3000)
|
|
|
|
try:
|
|
frame = page.frame_locator("iframe.cmeIframe").first
|
|
page.wait_for_timeout(5000)
|
|
|
|
table_exists = frame.locator("table.grid-thm").count() > 0
|
|
if table_exists:
|
|
logger.info("Cookies are valid - OI table found in iframe")
|
|
else:
|
|
logger.info("Cookies may be expired - no OI table found in iframe")
|
|
return table_exists
|
|
except Exception as e:
|
|
logger.info(f"Cookies expired - error checking iframe: {e}")
|
|
return False
|
|
|
|
|
|
def login_to_cme(page):
|
|
logger.info("Attempting to login to CME QuikStrike...")
|
|
|
|
page.goto(CME_LOGIN_URL, timeout=TIMEOUT_SECONDS * 1000)
|
|
page.wait_for_load_state("domcontentloaded", timeout=TIMEOUT_SECONDS * 1000)
|
|
page.wait_for_timeout(1000)
|
|
|
|
try:
|
|
page.fill("#user", CME_USERNAME)
|
|
page.fill("#pwd", CME_PASSWORD)
|
|
page.click("#loginBtn")
|
|
|
|
logger.info("Waiting for login redirect...")
|
|
page.wait_for_timeout(30000)
|
|
|
|
current_url = page.url.lower()
|
|
logger.info(f"Current URL after login attempt: {current_url}")
|
|
|
|
if "login" in current_url or "sso" in current_url:
|
|
logger.error("Login may have failed - still on SSO/login page")
|
|
page.screenshot(path="login_failed.png")
|
|
return False
|
|
|
|
logger.info("Login successful")
|
|
page.screenshot(path="login_success.png")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Login error: {e}")
|
|
page.screenshot(path="login_error.png")
|
|
return False
|
|
|
|
|
|
def select_gold_product(page):
|
|
logger.info("Selecting Gold product...")
|
|
|
|
logger.info("Switching to iframe context...")
|
|
frame = page.frame_locator("iframe.cmeIframe").first
|
|
page.wait_for_timeout(5000)
|
|
|
|
logger.info("Step 1: Clicking dropdown arrow...")
|
|
frame.locator("#ctl11_hlProductArrow").click()
|
|
page.wait_for_timeout(1000)
|
|
|
|
logger.info("Step 2: Clicking Metals...")
|
|
frame.locator('a[groupid="6"]:has-text("Metals")').click()
|
|
page.wait_for_timeout(500)
|
|
|
|
logger.info("Step 3: Clicking Precious Metals...")
|
|
frame.locator('a[familyid="6"]:has-text("Precious Metals")').click()
|
|
page.wait_for_timeout(500)
|
|
|
|
logger.info("Step 4: Clicking Gold...")
|
|
frame.locator('a[title="Gold"]').click()
|
|
|
|
logger.info("Waiting for Gold data to load...")
|
|
page.wait_for_timeout(10000)
|
|
logger.info("Gold product selected")
|
|
|
|
|
|
def navigate_to_oi_heatmap(page):
|
|
logger.info(f"Navigating to QuikStrike: {QUIKSTRIKE_URL}")
|
|
page.goto(QUIKSTRIKE_URL, timeout=TIMEOUT_SECONDS * 1000)
|
|
page.wait_for_load_state("domcontentloaded", timeout=TIMEOUT_SECONDS * 1000)
|
|
page.wait_for_timeout(5000)
|
|
|
|
select_gold_product(page)
|
|
|
|
|
|
def extract_oi_data(page):
|
|
logger.info("Extracting OI data from Gold matrix table...")
|
|
|
|
logger.info("Switching to iframe context...")
|
|
frame = page.frame_locator("iframe.cmeIframe").first
|
|
page.wait_for_timeout(8000)
|
|
|
|
logger.info("Looking for table.grid-thm...")
|
|
call_levels = []
|
|
put_levels = []
|
|
|
|
table = frame.locator("table.grid-thm").first
|
|
table.wait_for(state="visible", timeout=10000)
|
|
logger.info("Table found, waiting for data...")
|
|
|
|
rows = table.locator("tbody tr").all()
|
|
logger.info(f"Found {len(rows)} rows in table")
|
|
|
|
for row in rows:
|
|
try:
|
|
cells = row.locator("td").all()
|
|
if len(cells) < 3:
|
|
continue
|
|
|
|
strike = None
|
|
for cell in cells:
|
|
text = cell.text_content().strip()
|
|
if text and text.replace(".", "").isdigit():
|
|
strike = float(text)
|
|
break
|
|
|
|
if strike is None:
|
|
continue
|
|
|
|
number_cells = row.locator("td.number").all()
|
|
logger.debug(f"Strike {strike}: found {len(number_cells)} number cells")
|
|
|
|
for i in range(0, len(number_cells), 2):
|
|
if i + 1 >= len(number_cells):
|
|
break
|
|
|
|
call_cell = number_cells[i]
|
|
put_cell = number_cells[i + 1]
|
|
|
|
call_text = call_cell.text_content().strip()
|
|
put_text = put_cell.text_content().strip()
|
|
|
|
if call_text and call_text != "-":
|
|
call_oi = int(call_text.replace(",", ""))
|
|
call_levels.append(
|
|
{"Type": "CALL", "Strike": strike, "OI": call_oi}
|
|
)
|
|
|
|
if put_text and put_text != "-":
|
|
put_oi = int(put_text.replace(",", ""))
|
|
put_levels.append({"Type": "PUT", "Strike": strike, "OI": put_oi})
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error parsing row: {e}")
|
|
continue
|
|
|
|
logger.info(
|
|
f"Extracted {len(call_levels)} CALL levels, {len(put_levels)} PUT levels"
|
|
)
|
|
|
|
if call_levels:
|
|
call_df = pd.DataFrame(call_levels)
|
|
call_df = call_df.drop_duplicates(subset="Strike", keep="first")
|
|
call_df = call_df.sort_values("OI")
|
|
call_df = call_df.tail(TOP_N_STRIKES)
|
|
call_df["Type"] = "CALL"
|
|
else:
|
|
call_df = pd.DataFrame()
|
|
|
|
if put_levels:
|
|
put_df = pd.DataFrame(put_levels)
|
|
put_df = put_df.drop_duplicates(subset="Strike", keep="first")
|
|
put_df = put_df.sort_values("OI")
|
|
put_df = put_df.tail(TOP_N_STRIKES)
|
|
put_df["Type"] = "PUT"
|
|
else:
|
|
put_df = pd.DataFrame()
|
|
|
|
result_df = pd.concat([call_df, put_df])
|
|
result_df = result_df[["Type", "Strike", "OI"]]
|
|
|
|
logger.info(f"Final top {TOP_N_STRIKES} unique strikes for CALL and PUT extracted")
|
|
return result_df
|
|
|
|
|
|
def scrape_investing_gold_price(page):
|
|
logger.info(f"Scraping gold price from: {INVESTING_URL}")
|
|
|
|
try:
|
|
page.goto(INVESTING_URL, timeout=60000, wait_until="domcontentloaded")
|
|
logger.info(f"Page loaded, title: {page.title()}")
|
|
|
|
page.wait_for_timeout(5000)
|
|
logger.info("Waited for JavaScript to render")
|
|
|
|
selectors = [
|
|
'div[data-test="instrument-price-last"]',
|
|
".text-5xl\\/9.font-bold.text-\\[#232526\\]",
|
|
'[data-test="instrument-price-last"]',
|
|
".text-5xl\\/9",
|
|
]
|
|
|
|
price = 0.0
|
|
for selector in selectors:
|
|
try:
|
|
locator = page.locator(selector)
|
|
if locator.count() > 0:
|
|
locator.first.wait_for(state="visible", timeout=10000)
|
|
price_text = locator.first.text_content().strip()
|
|
if price_text:
|
|
price_text = price_text.replace(",", "")
|
|
price = float(price_text)
|
|
logger.info(f"Extracted gold price ({selector}): {price}")
|
|
break
|
|
except Exception as e:
|
|
logger.debug(f"Selector {selector} failed: {e}")
|
|
continue
|
|
|
|
if price == 0.0:
|
|
logger.warning("Could not extract gold price, all selectors failed")
|
|
|
|
return price
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error scraping gold price: {e}")
|
|
return 0.0
|
|
|
|
|
|
def export_to_csv(df, future_price=0.0):
|
|
output_path = CSV_OUTPUT_PATH
|
|
|
|
with open(output_path, "w") as f:
|
|
f.write("date,future_price\n")
|
|
f.write(f"{datetime.now().strftime('%Y-%m-%d')},{future_price}\n")
|
|
|
|
logger.info(f"Exported OI data and price to {output_path}")
|
|
|
|
|
|
def run_scraper():
|
|
if not CME_USERNAME or not CME_PASSWORD:
|
|
logger.error("Missing CME_USERNAME or CME_PASSWORD in .env file")
|
|
return
|
|
|
|
future_price = 0.0
|
|
|
|
for attempt in range(RETRY_ATTEMPTS):
|
|
try:
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(headless=HEADLESS)
|
|
context = browser.new_context()
|
|
page = context.new_page()
|
|
|
|
cookies_loaded = load_cookies(context)
|
|
cookies_valid = False
|
|
|
|
if cookies_loaded:
|
|
cookies_valid = are_cookies_valid(page)
|
|
|
|
if cookies_valid:
|
|
logger.info("Using cached session")
|
|
else:
|
|
if cookies_loaded:
|
|
logger.info("Cookies expired, deleting and re-logging in...")
|
|
delete_cookies()
|
|
|
|
logger.info("Logging in to CME...")
|
|
if not login_to_cme(page):
|
|
browser.close()
|
|
if attempt < RETRY_ATTEMPTS - 1:
|
|
logger.info(
|
|
f"Retrying... Attempt {attempt + 2}/{RETRY_ATTEMPTS}"
|
|
)
|
|
continue
|
|
else:
|
|
logger.error("All login attempts failed")
|
|
return
|
|
|
|
navigate_to_oi_heatmap(page)
|
|
oi_data = extract_oi_data(page)
|
|
save_cookies(context)
|
|
|
|
if len(oi_data) > 0:
|
|
logger.info("Extracting gold price from investing.com...")
|
|
future_price = scrape_investing_gold_price(page)
|
|
logger.info(f"Gold price extracted: {future_price}")
|
|
|
|
export_to_csv(oi_data, future_price)
|
|
else:
|
|
logger.warning("No OI data extracted")
|
|
|
|
browser.close()
|
|
break
|
|
|
|
except Exception as e:
|
|
logger.error(f"Scraper error (attempt {attempt + 1}): {e}")
|
|
if attempt < RETRY_ATTEMPTS - 1:
|
|
logger.info(f"Retrying... Attempt {attempt + 2}/{RETRY_ATTEMPTS}")
|
|
else:
|
|
logger.error("All attempts failed")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run_scraper()
|