import os import sys import json import time import re import ssl import urllib.request import urllib.parse import urllib.error # Locate config relative to this script file so it works regardless of # which directory the script is launched from on the NAS. SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) CONFIG_FILE = os.path.join(SCRIPT_DIR, "automation_config.json") IPTV_CONFIG_FILE = os.path.join(SCRIPT_DIR, ".iptv_config.json") # SSL context that skips certificate verification (same as httpx verify=False) _SSL_CTX = ssl.create_default_context() _SSL_CTX.check_hostname = False _SSL_CTX.verify_mode = ssl.CERT_NONE def clean_movie_title(title: str) -> str: cleaned = re.sub(r'^[A-Z0-9\-\s]{2,15}\s*[-\:]\s*', '', title) cleaned = re.sub(r'[\[\(](malayalam|tamil|hindi|english|telugu|kannada|eng|tam|mal|hin|fhd|hd|4k|1080p|720p|hevc|web-dl|bluray|rip|dual|h264|h265|multi)[\]\)]', '', cleaned, flags=re.IGNORECASE) cleaned = re.sub(r'[\\/*?:"<>|]', '', cleaned) cleaned = re.sub(r'\s+', ' ', cleaned).strip() return cleaned.rstrip('. ') def load_settings(): if os.path.exists(CONFIG_FILE): with open(CONFIG_FILE, "r") as f: return json.load(f) print(f"[-] Error: {CONFIG_FILE} is missing. Please set it up.") sys.exit(1) def load_iptv_config(): if os.path.exists(IPTV_CONFIG_FILE): with open(IPTV_CONFIG_FILE, "r") as f: return json.load(f) print(f"[-] Error: {IPTV_CONFIG_FILE} missing. Run downloader.py to create credentials.") sys.exit(1) def http_get(url: str, params: dict = None, headers: dict = None, timeout: int = 30) -> bytes: """ Simple HTTP GET using stdlib urllib. Returns response body as bytes. Follows redirects automatically. Skips SSL verification. Tries with IPTV Smarters UA first, falls back to VLC UA on failure. """ if params: url = url + "?" + urllib.parse.urlencode(params) user_agents = ["IPTV Smarters", "VLC/3.0.18 LibVLC/3.0.18"] last_err = None for ua in user_agents: req_headers = {"User-Agent": ua} if headers: req_headers.update(headers) req = urllib.request.Request(url, headers=req_headers) try: with urllib.request.urlopen(req, timeout=timeout, context=_SSL_CTX) as resp: return resp.read() except urllib.error.HTTPError as e: last_err = e continue except Exception as e: last_err = e continue raise last_err or RuntimeError(f"All user-agent attempts failed for: {url}") def request_api_json(url: str, params: dict) -> dict: data = http_get(url, params=params) return json.loads(data.decode("utf-8")) def download_playlist(settings, iptv): playlist_path = settings["playlist_file"] if os.path.exists(playlist_path): age = time.time() - os.path.getmtime(playlist_path) if age < 6 * 3600: print("[*] Local playlist.m3u is fresh (less than 6 hours old). Skipping download.") return print("[*] Downloading latest playlist from IPTV provider...") server = iptv["server"].rstrip('/') username = iptv["username"] password = iptv["password"] get_url = f"{server}/get.php" params = {"username": username, "password": password, "type": "m3u_plus", "output": "ts"} try: content = http_get(get_url, params=params, timeout=120) if len(content) > 1000: os.makedirs(os.path.dirname(playlist_path) if os.path.dirname(playlist_path) else ".", exist_ok=True) with open(playlist_path, "wb") as f: f.write(content) print("[+] Playlist downloaded successfully.") return except Exception as e: print(f"[-] Direct playlist link failed or was blocked: {e}") print("[*] Falling back to reconstruction mode via API endpoints...") api_url = f"{server}/player_api.php" try: live_cat = request_api_json(api_url, {"username": username, "password": password, "action": "get_live_categories"}) live_cat_dict = {str(c["category_id"]): c["category_name"] for c in live_cat} if isinstance(live_cat, list) else {} vod_cat = request_api_json(api_url, {"username": username, "password": password, "action": "get_vod_categories"}) vod_cat_dict = {str(c["category_id"]): c["category_name"] for c in vod_cat} if isinstance(vod_cat, list) else {} series_cat = request_api_json(api_url, {"username": username, "password": password, "action": "get_series_categories"}) series_cat_dict = {str(c["category_id"]): c["category_name"] for c in series_cat} if isinstance(series_cat, list) else {} print("[*] Fetching live, movie, and series streams...") live_streams = request_api_json(api_url, {"username": username, "password": password, "action": "get_live_streams"}) vod_streams = request_api_json(api_url, {"username": username, "password": password, "action": "get_vod_streams"}) series_streams = request_api_json(api_url, {"username": username, "password": password, "action": "get_series"}) with open(playlist_path, "w", encoding="utf-8") as f: f.write("#EXTM3U\n") # Live TV if isinstance(live_streams, list): for s in live_streams: f.write(f'#EXTINF:-1 tvg-id="" tvg-name="{s.get("name")}" tvg-logo="{s.get("stream_icon", "")}" group-title="{live_cat_dict.get(str(s.get("category_id")), "Live TV")}",{s.get("name")}\n{server}/live/{username}/{password}/{s.get("stream_id")}.ts\n') # Movies if isinstance(vod_streams, list): for s in vod_streams: ext = s.get("container_extension", "mp4") f.write(f'#EXTINF:-1 tvg-id="" tvg-name="{s.get("name")}" tvg-logo="{s.get("stream_icon", "")}" group-title="{vod_cat_dict.get(str(s.get("category_id")), "Movies")}",{s.get("name")}\n{server}/movie/{username}/{password}/{s.get("stream_id")}.{ext}\n') # Series if isinstance(series_streams, list): for s in series_streams: f.write(f'#EXTINF:-1 tvg-id="" tvg-name="{s.get("name")}" tvg-logo="{s.get("cover", "")}" group-title="{series_cat_dict.get(str(s.get("category_id")), "Series")}",{s.get("name")}\n{server}/series/{username}/{password}/{s.get("series_id")}.mp4\n') print("[+] Custom M3U playlist generated successfully via API.") except Exception as api_err: print(f"[-] Failed to reconstruct playlist: {api_err}") def split_and_create_strms(settings): playlist_path = os.path.abspath(settings["playlist_file"]) split_dir = os.path.abspath(settings["split_dir"]) vod_dir = os.path.abspath(settings["output_dir"]) languages = settings["languages"] # NOTE: .strm files now contain the raw IPTV stream URL directly. # The proxy URL (http://127.0.0.1:8086/...) is stored only in the Plex # database via SQLite triggers installed by nas_trigger_installer.py. # The NAS proxy reads the raw URL from the .strm file and 302-redirects # Plex to it server-side, bypassing the browser mixed-content block. os.makedirs(split_dir, exist_ok=True) os.makedirs(vod_dir, exist_ok=True) handlers = {} counts = {} strm_counts = {"movies": 0, "series": 0} all_langs = languages + ["others"] types = ["live", "movies", "series"] for lang in all_langs: handlers[lang] = {} counts[lang] = {} for t in types: filename = os.path.join(split_dir, f"{lang}_{t}.m3u") handlers[lang][t] = open(filename, "w", encoding="utf-8") handlers[lang][t].write("#EXTM3U\n") counts[lang][t] = 0 # Track all newly generated STRM file paths to find and prune orphan ones later active_strm_paths = set() print("[*] Splitting playlist and generating STRM files...") with open(playlist_path, "r", encoding="utf-8", errors="ignore") as f: current_inf = None for line in f: line = line.strip() if line.startswith("#EXTM3U"): continue elif line.startswith("#EXTINF:"): current_inf = line elif line.startswith("http://") or line.startswith("https://"): if current_inf: url_type = "live" if "/movie/" in line: url_type = "movies" elif "/series/" in line: url_type = "series" target_lang = "others" lower_inf = current_inf.lower() for lang in languages: if lang in lower_inf: target_lang = lang break handlers[target_lang][url_type].write(f"{current_inf}\n{line}\n") counts[target_lang][url_type] += 1 # Only generate STRM files if the target language is matched (not "others") if url_type in ["movies", "series"] and target_lang != "others": title = "Unknown" comma_idx = current_inf.rfind(",") if comma_idx != -1: title = current_inf[comma_idx+1:].strip() clean_title = clean_movie_title(title) lang_subdir = target_lang.capitalize() # The raw IPTV stream URL — this is what goes into the .strm file. # The NAS proxy (nas_strm_proxy.py) reads this and 302-redirects Plex to it. raw_stream_url = line if url_type == "movies": dest_dir = os.path.join(vod_dir, "Movies", lang_subdir) os.makedirs(dest_dir, exist_ok=True) strm_path = os.path.join(dest_dir, f"{clean_title}.strm") strm_counts["movies"] += 1 else: # Series subfolder structure dest_dir = os.path.join(vod_dir, "Series", lang_subdir, clean_title) os.makedirs(dest_dir, exist_ok=True) url_parts = line.split("/") stream_id = url_parts[-1].split(".")[0] if url_parts else "0" strm_path = os.path.join(dest_dir, f"{clean_title}_{stream_id}.strm") strm_counts["series"] += 1 active_strm_paths.add(strm_path) # Only rewrite the file if the URL has changed (avoids bumping mtime # unnecessarily, which would trigger a Plex rescan for every run) should_write = True if os.path.exists(strm_path): try: with open(strm_path, "r", encoding="utf-8") as strm_f: if strm_f.read().strip() == raw_stream_url: should_write = False except Exception: pass if should_write: with open(strm_path, "w", encoding="utf-8") as strm_f: strm_f.write(raw_stream_url + "\n") current_inf = None # Close split file handlers for lang in handlers: for t in types: handlers[lang][t].close() filepath = os.path.join(split_dir, f"{lang}_{t}.m3u") if counts[lang][t] == 0 and os.path.exists(filepath): os.remove(filepath) # Clean up (Delete) obsolete .strm files that are no longer in the IPTV playlist print("[*] Checking for obsolete STRM files to prune...") removed_count = 0 for root_dir, _, files in os.walk(vod_dir): for file in files: if file.endswith(".strm"): full_path = os.path.join(root_dir, file) if full_path not in active_strm_paths: try: os.remove(full_path) removed_count += 1 # If a directory becomes empty, remove it as well parent = os.path.dirname(full_path) if not os.listdir(parent): os.rmdir(parent) except Exception as err: print(f"[-] Error removing obsolete STRM {file}: {err}") if removed_count > 0: print(f"[+] Pruned {removed_count} obsolete STRM files.") print(f"[+] STRM Generation Complete: {strm_counts['movies']} Movies, {strm_counts['series']} Series.") def trigger_plex_scan(settings): server_url = settings.get("plex_server_url", "").rstrip('/') token = settings.get("plex_token", "") movie_section = settings.get("plex_movie_library_id", "") series_section = settings.get("plex_series_library_id", "") if not token or token == "YOUR_PLEX_TOKEN": print("[*] Skipping Plex Scan: plex_token not configured.") return for section in [movie_section, series_section]: if section: url = f"{server_url}/library/sections/{section}/refresh?X-Plex-Token={token}" try: data = http_get(url, headers={"Accept": "application/json"}, timeout=15) print(f"[+] Triggered Plex library scan for section {section}.") except urllib.error.HTTPError as e: print(f"[-] Failed to trigger Plex scan for section {section} (HTTP {e.code}).") except Exception as e: print(f"[-] Error connecting to Plex Server: {e}") def main(): print("=" * 60) print(" IPTV NAS AUTOMATION PIPELINE") print("=" * 60) settings = load_settings() iptv = load_iptv_config() download_playlist(settings, iptv) split_and_create_strms(settings) trigger_plex_scan(settings) print("[+] Done!") if __name__ == "__main__": main()