Delete plex/run_automation.py
This commit is contained in:
parent
263ca189eb
commit
609d8e936d
@ -1,317 +0,0 @@
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import time
|
||||
import re
|
||||
import ssl
|
||||
import urllib.request
|
||||
import urllib.parse
|
||||
import urllib.error
|
||||
|
||||
# Locate config relative to this script file so it works regardless of
|
||||
# which directory the script is launched from on the NAS.
|
||||
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
CONFIG_FILE = os.path.join(SCRIPT_DIR, "automation_config.json")
|
||||
IPTV_CONFIG_FILE = os.path.join(SCRIPT_DIR, ".iptv_config.json")
|
||||
|
||||
# SSL context that skips certificate verification (same as httpx verify=False)
|
||||
_SSL_CTX = ssl.create_default_context()
|
||||
_SSL_CTX.check_hostname = False
|
||||
_SSL_CTX.verify_mode = ssl.CERT_NONE
|
||||
|
||||
|
||||
def clean_movie_title(title: str) -> str:
|
||||
cleaned = re.sub(r'^[A-Z0-9\-\s]{2,15}\s*[-\:]\s*', '', title)
|
||||
cleaned = re.sub(r'[\[\(](malayalam|tamil|hindi|english|telugu|kannada|eng|tam|mal|hin|fhd|hd|4k|1080p|720p|hevc|web-dl|bluray|rip|dual|h264|h265|multi)[\]\)]', '', cleaned, flags=re.IGNORECASE)
|
||||
cleaned = re.sub(r'[\\/*?:"<>|]', '', cleaned)
|
||||
cleaned = re.sub(r'\s+', ' ', cleaned).strip()
|
||||
return cleaned.rstrip('. ')
|
||||
|
||||
def load_settings():
|
||||
if os.path.exists(CONFIG_FILE):
|
||||
with open(CONFIG_FILE, "r") as f:
|
||||
return json.load(f)
|
||||
print(f"[-] Error: {CONFIG_FILE} is missing. Please set it up.")
|
||||
sys.exit(1)
|
||||
|
||||
def load_iptv_config():
|
||||
if os.path.exists(IPTV_CONFIG_FILE):
|
||||
with open(IPTV_CONFIG_FILE, "r") as f:
|
||||
return json.load(f)
|
||||
print(f"[-] Error: {IPTV_CONFIG_FILE} missing. Run downloader.py to create credentials.")
|
||||
sys.exit(1)
|
||||
|
||||
def http_get(url: str, params: dict = None, headers: dict = None, timeout: int = 30) -> bytes:
|
||||
"""
|
||||
Simple HTTP GET using stdlib urllib. Returns response body as bytes.
|
||||
Follows redirects automatically. Skips SSL verification.
|
||||
Tries with IPTV Smarters UA first, falls back to VLC UA on failure.
|
||||
"""
|
||||
if params:
|
||||
url = url + "?" + urllib.parse.urlencode(params)
|
||||
|
||||
user_agents = ["IPTV Smarters", "VLC/3.0.18 LibVLC/3.0.18"]
|
||||
last_err = None
|
||||
for ua in user_agents:
|
||||
req_headers = {"User-Agent": ua}
|
||||
if headers:
|
||||
req_headers.update(headers)
|
||||
req = urllib.request.Request(url, headers=req_headers)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=timeout, context=_SSL_CTX) as resp:
|
||||
return resp.read()
|
||||
except urllib.error.HTTPError as e:
|
||||
last_err = e
|
||||
continue
|
||||
except Exception as e:
|
||||
last_err = e
|
||||
continue
|
||||
raise last_err or RuntimeError(f"All user-agent attempts failed for: {url}")
|
||||
|
||||
|
||||
def request_api_json(url: str, params: dict) -> dict:
|
||||
data = http_get(url, params=params)
|
||||
return json.loads(data.decode("utf-8"))
|
||||
|
||||
|
||||
def download_playlist(settings, iptv):
|
||||
playlist_path = settings["playlist_file"]
|
||||
if os.path.exists(playlist_path):
|
||||
age = time.time() - os.path.getmtime(playlist_path)
|
||||
if age < 6 * 3600:
|
||||
print("[*] Local playlist.m3u is fresh (less than 6 hours old). Skipping download.")
|
||||
return
|
||||
|
||||
print("[*] Downloading latest playlist from IPTV provider...")
|
||||
server = iptv["server"].rstrip('/')
|
||||
username = iptv["username"]
|
||||
password = iptv["password"]
|
||||
get_url = f"{server}/get.php"
|
||||
params = {"username": username, "password": password, "type": "m3u_plus", "output": "ts"}
|
||||
|
||||
try:
|
||||
content = http_get(get_url, params=params, timeout=120)
|
||||
if len(content) > 1000:
|
||||
os.makedirs(os.path.dirname(playlist_path) if os.path.dirname(playlist_path) else ".", exist_ok=True)
|
||||
with open(playlist_path, "wb") as f:
|
||||
f.write(content)
|
||||
print("[+] Playlist downloaded successfully.")
|
||||
return
|
||||
except Exception as e:
|
||||
print(f"[-] Direct playlist link failed or was blocked: {e}")
|
||||
|
||||
|
||||
print("[*] Falling back to reconstruction mode via API endpoints...")
|
||||
api_url = f"{server}/player_api.php"
|
||||
try:
|
||||
live_cat = request_api_json(api_url, {"username": username, "password": password, "action": "get_live_categories"})
|
||||
live_cat_dict = {str(c["category_id"]): c["category_name"] for c in live_cat} if isinstance(live_cat, list) else {}
|
||||
|
||||
vod_cat = request_api_json(api_url, {"username": username, "password": password, "action": "get_vod_categories"})
|
||||
vod_cat_dict = {str(c["category_id"]): c["category_name"] for c in vod_cat} if isinstance(vod_cat, list) else {}
|
||||
|
||||
series_cat = request_api_json(api_url, {"username": username, "password": password, "action": "get_series_categories"})
|
||||
series_cat_dict = {str(c["category_id"]): c["category_name"] for c in series_cat} if isinstance(series_cat, list) else {}
|
||||
|
||||
print("[*] Fetching live, movie, and series streams...")
|
||||
live_streams = request_api_json(api_url, {"username": username, "password": password, "action": "get_live_streams"})
|
||||
vod_streams = request_api_json(api_url, {"username": username, "password": password, "action": "get_vod_streams"})
|
||||
series_streams = request_api_json(api_url, {"username": username, "password": password, "action": "get_series"})
|
||||
|
||||
with open(playlist_path, "w", encoding="utf-8") as f:
|
||||
f.write("#EXTM3U\n")
|
||||
|
||||
# Live TV
|
||||
if isinstance(live_streams, list):
|
||||
for s in live_streams:
|
||||
f.write(f'#EXTINF:-1 tvg-id="" tvg-name="{s.get("name")}" tvg-logo="{s.get("stream_icon", "")}" group-title="{live_cat_dict.get(str(s.get("category_id")), "Live TV")}",{s.get("name")}\n{server}/live/{username}/{password}/{s.get("stream_id")}.ts\n')
|
||||
|
||||
# Movies
|
||||
if isinstance(vod_streams, list):
|
||||
for s in vod_streams:
|
||||
ext = s.get("container_extension", "mp4")
|
||||
f.write(f'#EXTINF:-1 tvg-id="" tvg-name="{s.get("name")}" tvg-logo="{s.get("stream_icon", "")}" group-title="{vod_cat_dict.get(str(s.get("category_id")), "Movies")}",{s.get("name")}\n{server}/movie/{username}/{password}/{s.get("stream_id")}.{ext}\n')
|
||||
|
||||
# Series
|
||||
if isinstance(series_streams, list):
|
||||
for s in series_streams:
|
||||
f.write(f'#EXTINF:-1 tvg-id="" tvg-name="{s.get("name")}" tvg-logo="{s.get("cover", "")}" group-title="{series_cat_dict.get(str(s.get("category_id")), "Series")}",{s.get("name")}\n{server}/series/{username}/{password}/{s.get("series_id")}.mp4\n')
|
||||
|
||||
print("[+] Custom M3U playlist generated successfully via API.")
|
||||
except Exception as api_err:
|
||||
print(f"[-] Failed to reconstruct playlist: {api_err}")
|
||||
|
||||
|
||||
def split_and_create_strms(settings):
|
||||
playlist_path = os.path.abspath(settings["playlist_file"])
|
||||
split_dir = os.path.abspath(settings["split_dir"])
|
||||
vod_dir = os.path.abspath(settings["output_dir"])
|
||||
languages = settings["languages"]
|
||||
|
||||
# NOTE: .strm files now contain the raw IPTV stream URL directly.
|
||||
# The proxy URL (http://127.0.0.1:8086/...) is stored only in the Plex
|
||||
# database via SQLite triggers installed by nas_trigger_installer.py.
|
||||
# The NAS proxy reads the raw URL from the .strm file and 302-redirects
|
||||
# Plex to it server-side, bypassing the browser mixed-content block.
|
||||
|
||||
os.makedirs(split_dir, exist_ok=True)
|
||||
os.makedirs(vod_dir, exist_ok=True)
|
||||
|
||||
handlers = {}
|
||||
counts = {}
|
||||
strm_counts = {"movies": 0, "series": 0}
|
||||
|
||||
all_langs = languages + ["others"]
|
||||
types = ["live", "movies", "series"]
|
||||
for lang in all_langs:
|
||||
handlers[lang] = {}
|
||||
counts[lang] = {}
|
||||
for t in types:
|
||||
filename = os.path.join(split_dir, f"{lang}_{t}.m3u")
|
||||
handlers[lang][t] = open(filename, "w", encoding="utf-8")
|
||||
handlers[lang][t].write("#EXTM3U\n")
|
||||
counts[lang][t] = 0
|
||||
|
||||
# Track all newly generated STRM file paths to find and prune orphan ones later
|
||||
active_strm_paths = set()
|
||||
|
||||
print("[*] Splitting playlist and generating STRM files...")
|
||||
with open(playlist_path, "r", encoding="utf-8", errors="ignore") as f:
|
||||
current_inf = None
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line.startswith("#EXTM3U"):
|
||||
continue
|
||||
elif line.startswith("#EXTINF:"):
|
||||
current_inf = line
|
||||
elif line.startswith("http://") or line.startswith("https://"):
|
||||
if current_inf:
|
||||
url_type = "live"
|
||||
if "/movie/" in line:
|
||||
url_type = "movies"
|
||||
elif "/series/" in line:
|
||||
url_type = "series"
|
||||
|
||||
target_lang = "others"
|
||||
lower_inf = current_inf.lower()
|
||||
for lang in languages:
|
||||
if lang in lower_inf:
|
||||
target_lang = lang
|
||||
break
|
||||
|
||||
handlers[target_lang][url_type].write(f"{current_inf}\n{line}\n")
|
||||
counts[target_lang][url_type] += 1
|
||||
|
||||
# Only generate STRM files if the target language is matched (not "others")
|
||||
if url_type in ["movies", "series"] and target_lang != "others":
|
||||
title = "Unknown"
|
||||
comma_idx = current_inf.rfind(",")
|
||||
if comma_idx != -1:
|
||||
title = current_inf[comma_idx+1:].strip()
|
||||
clean_title = clean_movie_title(title)
|
||||
|
||||
lang_subdir = target_lang.capitalize()
|
||||
|
||||
# The raw IPTV stream URL — this is what goes into the .strm file.
|
||||
# The NAS proxy (nas_strm_proxy.py) reads this and 302-redirects Plex to it.
|
||||
raw_stream_url = line
|
||||
|
||||
if url_type == "movies":
|
||||
dest_dir = os.path.join(vod_dir, "Movies", lang_subdir)
|
||||
os.makedirs(dest_dir, exist_ok=True)
|
||||
strm_path = os.path.join(dest_dir, f"{clean_title}.strm")
|
||||
strm_counts["movies"] += 1
|
||||
else:
|
||||
# Series subfolder structure
|
||||
dest_dir = os.path.join(vod_dir, "Series", lang_subdir, clean_title)
|
||||
os.makedirs(dest_dir, exist_ok=True)
|
||||
url_parts = line.split("/")
|
||||
stream_id = url_parts[-1].split(".")[0] if url_parts else "0"
|
||||
strm_path = os.path.join(dest_dir, f"{clean_title}_{stream_id}.strm")
|
||||
strm_counts["series"] += 1
|
||||
|
||||
active_strm_paths.add(strm_path)
|
||||
|
||||
# Only rewrite the file if the URL has changed (avoids bumping mtime
|
||||
# unnecessarily, which would trigger a Plex rescan for every run)
|
||||
should_write = True
|
||||
if os.path.exists(strm_path):
|
||||
try:
|
||||
with open(strm_path, "r", encoding="utf-8") as strm_f:
|
||||
if strm_f.read().strip() == raw_stream_url:
|
||||
should_write = False
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if should_write:
|
||||
with open(strm_path, "w", encoding="utf-8") as strm_f:
|
||||
strm_f.write(raw_stream_url + "\n")
|
||||
|
||||
current_inf = None
|
||||
|
||||
# Close split file handlers
|
||||
for lang in handlers:
|
||||
for t in types:
|
||||
handlers[lang][t].close()
|
||||
filepath = os.path.join(split_dir, f"{lang}_{t}.m3u")
|
||||
if counts[lang][t] == 0 and os.path.exists(filepath):
|
||||
os.remove(filepath)
|
||||
|
||||
# Clean up (Delete) obsolete .strm files that are no longer in the IPTV playlist
|
||||
print("[*] Checking for obsolete STRM files to prune...")
|
||||
removed_count = 0
|
||||
for root_dir, _, files in os.walk(vod_dir):
|
||||
for file in files:
|
||||
if file.endswith(".strm"):
|
||||
full_path = os.path.join(root_dir, file)
|
||||
if full_path not in active_strm_paths:
|
||||
try:
|
||||
os.remove(full_path)
|
||||
removed_count += 1
|
||||
# If a directory becomes empty, remove it as well
|
||||
parent = os.path.dirname(full_path)
|
||||
if not os.listdir(parent):
|
||||
os.rmdir(parent)
|
||||
except Exception as err:
|
||||
print(f"[-] Error removing obsolete STRM {file}: {err}")
|
||||
|
||||
if removed_count > 0:
|
||||
print(f"[+] Pruned {removed_count} obsolete STRM files.")
|
||||
|
||||
print(f"[+] STRM Generation Complete: {strm_counts['movies']} Movies, {strm_counts['series']} Series.")
|
||||
|
||||
def trigger_plex_scan(settings):
|
||||
server_url = settings.get("plex_server_url", "").rstrip('/')
|
||||
token = settings.get("plex_token", "")
|
||||
movie_section = settings.get("plex_movie_library_id", "")
|
||||
series_section = settings.get("plex_series_library_id", "")
|
||||
|
||||
if not token or token == "YOUR_PLEX_TOKEN":
|
||||
print("[*] Skipping Plex Scan: plex_token not configured.")
|
||||
return
|
||||
|
||||
for section in [movie_section, series_section]:
|
||||
if section:
|
||||
url = f"{server_url}/library/sections/{section}/refresh?X-Plex-Token={token}"
|
||||
try:
|
||||
data = http_get(url, headers={"Accept": "application/json"}, timeout=15)
|
||||
print(f"[+] Triggered Plex library scan for section {section}.")
|
||||
except urllib.error.HTTPError as e:
|
||||
print(f"[-] Failed to trigger Plex scan for section {section} (HTTP {e.code}).")
|
||||
except Exception as e:
|
||||
print(f"[-] Error connecting to Plex Server: {e}")
|
||||
|
||||
def main():
|
||||
print("=" * 60)
|
||||
print(" IPTV NAS AUTOMATION PIPELINE")
|
||||
print("=" * 60)
|
||||
settings = load_settings()
|
||||
iptv = load_iptv_config()
|
||||
|
||||
download_playlist(settings, iptv)
|
||||
split_and_create_strms(settings)
|
||||
trigger_plex_scan(settings)
|
||||
print("[+] Done!")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
x
Reference in New Issue
Block a user