diff --git a/plex/run_automation.py b/plex/run_automation.py deleted file mode 100644 index a45378e..0000000 --- a/plex/run_automation.py +++ /dev/null @@ -1,317 +0,0 @@ -import os -import sys -import json -import time -import re -import ssl -import urllib.request -import urllib.parse -import urllib.error - -# Locate config relative to this script file so it works regardless of -# which directory the script is launched from on the NAS. -SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) -CONFIG_FILE = os.path.join(SCRIPT_DIR, "automation_config.json") -IPTV_CONFIG_FILE = os.path.join(SCRIPT_DIR, ".iptv_config.json") - -# SSL context that skips certificate verification (same as httpx verify=False) -_SSL_CTX = ssl.create_default_context() -_SSL_CTX.check_hostname = False -_SSL_CTX.verify_mode = ssl.CERT_NONE - - -def clean_movie_title(title: str) -> str: - cleaned = re.sub(r'^[A-Z0-9\-\s]{2,15}\s*[-\:]\s*', '', title) - cleaned = re.sub(r'[\[\(](malayalam|tamil|hindi|english|telugu|kannada|eng|tam|mal|hin|fhd|hd|4k|1080p|720p|hevc|web-dl|bluray|rip|dual|h264|h265|multi)[\]\)]', '', cleaned, flags=re.IGNORECASE) - cleaned = re.sub(r'[\\/*?:"<>|]', '', cleaned) - cleaned = re.sub(r'\s+', ' ', cleaned).strip() - return cleaned.rstrip('. ') - -def load_settings(): - if os.path.exists(CONFIG_FILE): - with open(CONFIG_FILE, "r") as f: - return json.load(f) - print(f"[-] Error: {CONFIG_FILE} is missing. Please set it up.") - sys.exit(1) - -def load_iptv_config(): - if os.path.exists(IPTV_CONFIG_FILE): - with open(IPTV_CONFIG_FILE, "r") as f: - return json.load(f) - print(f"[-] Error: {IPTV_CONFIG_FILE} missing. Run downloader.py to create credentials.") - sys.exit(1) - -def http_get(url: str, params: dict = None, headers: dict = None, timeout: int = 30) -> bytes: - """ - Simple HTTP GET using stdlib urllib. Returns response body as bytes. - Follows redirects automatically. Skips SSL verification. - Tries with IPTV Smarters UA first, falls back to VLC UA on failure. - """ - if params: - url = url + "?" + urllib.parse.urlencode(params) - - user_agents = ["IPTV Smarters", "VLC/3.0.18 LibVLC/3.0.18"] - last_err = None - for ua in user_agents: - req_headers = {"User-Agent": ua} - if headers: - req_headers.update(headers) - req = urllib.request.Request(url, headers=req_headers) - try: - with urllib.request.urlopen(req, timeout=timeout, context=_SSL_CTX) as resp: - return resp.read() - except urllib.error.HTTPError as e: - last_err = e - continue - except Exception as e: - last_err = e - continue - raise last_err or RuntimeError(f"All user-agent attempts failed for: {url}") - - -def request_api_json(url: str, params: dict) -> dict: - data = http_get(url, params=params) - return json.loads(data.decode("utf-8")) - - -def download_playlist(settings, iptv): - playlist_path = settings["playlist_file"] - if os.path.exists(playlist_path): - age = time.time() - os.path.getmtime(playlist_path) - if age < 6 * 3600: - print("[*] Local playlist.m3u is fresh (less than 6 hours old). Skipping download.") - return - - print("[*] Downloading latest playlist from IPTV provider...") - server = iptv["server"].rstrip('/') - username = iptv["username"] - password = iptv["password"] - get_url = f"{server}/get.php" - params = {"username": username, "password": password, "type": "m3u_plus", "output": "ts"} - - try: - content = http_get(get_url, params=params, timeout=120) - if len(content) > 1000: - os.makedirs(os.path.dirname(playlist_path) if os.path.dirname(playlist_path) else ".", exist_ok=True) - with open(playlist_path, "wb") as f: - f.write(content) - print("[+] Playlist downloaded successfully.") - return - except Exception as e: - print(f"[-] Direct playlist link failed or was blocked: {e}") - - - print("[*] Falling back to reconstruction mode via API endpoints...") - api_url = f"{server}/player_api.php" - try: - live_cat = request_api_json(api_url, {"username": username, "password": password, "action": "get_live_categories"}) - live_cat_dict = {str(c["category_id"]): c["category_name"] for c in live_cat} if isinstance(live_cat, list) else {} - - vod_cat = request_api_json(api_url, {"username": username, "password": password, "action": "get_vod_categories"}) - vod_cat_dict = {str(c["category_id"]): c["category_name"] for c in vod_cat} if isinstance(vod_cat, list) else {} - - series_cat = request_api_json(api_url, {"username": username, "password": password, "action": "get_series_categories"}) - series_cat_dict = {str(c["category_id"]): c["category_name"] for c in series_cat} if isinstance(series_cat, list) else {} - - print("[*] Fetching live, movie, and series streams...") - live_streams = request_api_json(api_url, {"username": username, "password": password, "action": "get_live_streams"}) - vod_streams = request_api_json(api_url, {"username": username, "password": password, "action": "get_vod_streams"}) - series_streams = request_api_json(api_url, {"username": username, "password": password, "action": "get_series"}) - - with open(playlist_path, "w", encoding="utf-8") as f: - f.write("#EXTM3U\n") - - # Live TV - if isinstance(live_streams, list): - for s in live_streams: - f.write(f'#EXTINF:-1 tvg-id="" tvg-name="{s.get("name")}" tvg-logo="{s.get("stream_icon", "")}" group-title="{live_cat_dict.get(str(s.get("category_id")), "Live TV")}",{s.get("name")}\n{server}/live/{username}/{password}/{s.get("stream_id")}.ts\n') - - # Movies - if isinstance(vod_streams, list): - for s in vod_streams: - ext = s.get("container_extension", "mp4") - f.write(f'#EXTINF:-1 tvg-id="" tvg-name="{s.get("name")}" tvg-logo="{s.get("stream_icon", "")}" group-title="{vod_cat_dict.get(str(s.get("category_id")), "Movies")}",{s.get("name")}\n{server}/movie/{username}/{password}/{s.get("stream_id")}.{ext}\n') - - # Series - if isinstance(series_streams, list): - for s in series_streams: - f.write(f'#EXTINF:-1 tvg-id="" tvg-name="{s.get("name")}" tvg-logo="{s.get("cover", "")}" group-title="{series_cat_dict.get(str(s.get("category_id")), "Series")}",{s.get("name")}\n{server}/series/{username}/{password}/{s.get("series_id")}.mp4\n') - - print("[+] Custom M3U playlist generated successfully via API.") - except Exception as api_err: - print(f"[-] Failed to reconstruct playlist: {api_err}") - - -def split_and_create_strms(settings): - playlist_path = os.path.abspath(settings["playlist_file"]) - split_dir = os.path.abspath(settings["split_dir"]) - vod_dir = os.path.abspath(settings["output_dir"]) - languages = settings["languages"] - - # NOTE: .strm files now contain the raw IPTV stream URL directly. - # The proxy URL (http://127.0.0.1:8086/...) is stored only in the Plex - # database via SQLite triggers installed by nas_trigger_installer.py. - # The NAS proxy reads the raw URL from the .strm file and 302-redirects - # Plex to it server-side, bypassing the browser mixed-content block. - - os.makedirs(split_dir, exist_ok=True) - os.makedirs(vod_dir, exist_ok=True) - - handlers = {} - counts = {} - strm_counts = {"movies": 0, "series": 0} - - all_langs = languages + ["others"] - types = ["live", "movies", "series"] - for lang in all_langs: - handlers[lang] = {} - counts[lang] = {} - for t in types: - filename = os.path.join(split_dir, f"{lang}_{t}.m3u") - handlers[lang][t] = open(filename, "w", encoding="utf-8") - handlers[lang][t].write("#EXTM3U\n") - counts[lang][t] = 0 - - # Track all newly generated STRM file paths to find and prune orphan ones later - active_strm_paths = set() - - print("[*] Splitting playlist and generating STRM files...") - with open(playlist_path, "r", encoding="utf-8", errors="ignore") as f: - current_inf = None - for line in f: - line = line.strip() - if line.startswith("#EXTM3U"): - continue - elif line.startswith("#EXTINF:"): - current_inf = line - elif line.startswith("http://") or line.startswith("https://"): - if current_inf: - url_type = "live" - if "/movie/" in line: - url_type = "movies" - elif "/series/" in line: - url_type = "series" - - target_lang = "others" - lower_inf = current_inf.lower() - for lang in languages: - if lang in lower_inf: - target_lang = lang - break - - handlers[target_lang][url_type].write(f"{current_inf}\n{line}\n") - counts[target_lang][url_type] += 1 - - # Only generate STRM files if the target language is matched (not "others") - if url_type in ["movies", "series"] and target_lang != "others": - title = "Unknown" - comma_idx = current_inf.rfind(",") - if comma_idx != -1: - title = current_inf[comma_idx+1:].strip() - clean_title = clean_movie_title(title) - - lang_subdir = target_lang.capitalize() - - # The raw IPTV stream URL — this is what goes into the .strm file. - # The NAS proxy (nas_strm_proxy.py) reads this and 302-redirects Plex to it. - raw_stream_url = line - - if url_type == "movies": - dest_dir = os.path.join(vod_dir, "Movies", lang_subdir) - os.makedirs(dest_dir, exist_ok=True) - strm_path = os.path.join(dest_dir, f"{clean_title}.strm") - strm_counts["movies"] += 1 - else: - # Series subfolder structure - dest_dir = os.path.join(vod_dir, "Series", lang_subdir, clean_title) - os.makedirs(dest_dir, exist_ok=True) - url_parts = line.split("/") - stream_id = url_parts[-1].split(".")[0] if url_parts else "0" - strm_path = os.path.join(dest_dir, f"{clean_title}_{stream_id}.strm") - strm_counts["series"] += 1 - - active_strm_paths.add(strm_path) - - # Only rewrite the file if the URL has changed (avoids bumping mtime - # unnecessarily, which would trigger a Plex rescan for every run) - should_write = True - if os.path.exists(strm_path): - try: - with open(strm_path, "r", encoding="utf-8") as strm_f: - if strm_f.read().strip() == raw_stream_url: - should_write = False - except Exception: - pass - - if should_write: - with open(strm_path, "w", encoding="utf-8") as strm_f: - strm_f.write(raw_stream_url + "\n") - - current_inf = None - - # Close split file handlers - for lang in handlers: - for t in types: - handlers[lang][t].close() - filepath = os.path.join(split_dir, f"{lang}_{t}.m3u") - if counts[lang][t] == 0 and os.path.exists(filepath): - os.remove(filepath) - - # Clean up (Delete) obsolete .strm files that are no longer in the IPTV playlist - print("[*] Checking for obsolete STRM files to prune...") - removed_count = 0 - for root_dir, _, files in os.walk(vod_dir): - for file in files: - if file.endswith(".strm"): - full_path = os.path.join(root_dir, file) - if full_path not in active_strm_paths: - try: - os.remove(full_path) - removed_count += 1 - # If a directory becomes empty, remove it as well - parent = os.path.dirname(full_path) - if not os.listdir(parent): - os.rmdir(parent) - except Exception as err: - print(f"[-] Error removing obsolete STRM {file}: {err}") - - if removed_count > 0: - print(f"[+] Pruned {removed_count} obsolete STRM files.") - - print(f"[+] STRM Generation Complete: {strm_counts['movies']} Movies, {strm_counts['series']} Series.") - -def trigger_plex_scan(settings): - server_url = settings.get("plex_server_url", "").rstrip('/') - token = settings.get("plex_token", "") - movie_section = settings.get("plex_movie_library_id", "") - series_section = settings.get("plex_series_library_id", "") - - if not token or token == "YOUR_PLEX_TOKEN": - print("[*] Skipping Plex Scan: plex_token not configured.") - return - - for section in [movie_section, series_section]: - if section: - url = f"{server_url}/library/sections/{section}/refresh?X-Plex-Token={token}" - try: - data = http_get(url, headers={"Accept": "application/json"}, timeout=15) - print(f"[+] Triggered Plex library scan for section {section}.") - except urllib.error.HTTPError as e: - print(f"[-] Failed to trigger Plex scan for section {section} (HTTP {e.code}).") - except Exception as e: - print(f"[-] Error connecting to Plex Server: {e}") - -def main(): - print("=" * 60) - print(" IPTV NAS AUTOMATION PIPELINE") - print("=" * 60) - settings = load_settings() - iptv = load_iptv_config() - - download_playlist(settings, iptv) - split_and_create_strms(settings) - trigger_plex_scan(settings) - print("[+] Done!") - -if __name__ == "__main__": - main()