X.com에 노션 스크랩하는 코드를 올려 놓습니다
X 게시물을 내 노션에 스크랩하는 코드

#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ X(Twitter) → Notion 자동 파이프라인 (macOS, 최종판) - 창 열림 + 로그인 대기(콘솔 Enter) 후 수집 시작 - macOS Chrome 프로필 자동 감지 + 잠금 시 임시 프로필 복제(B 방안) - webdriver_manager의 THIRD_PARTY_NOTICES.chromedriver 경로 보정 - 무한 스크롤로 TARGET_COUNT 까지 수집 - 이미지/동영상 다운로드 → 5MiB 이하 자동 압축 → Notion 업로드(새 File Upload API) - DB 스키마 유연 적응: Title 자동 탐색, 기타 속성은 있으면 매핑/없으면 생략 - Media(files) 없으면 생성 시도 → 실패 시 file 블록(children)로 첨부 - 중복 방지: 페이지 재활용, 동일 파일명 재업로드 방지 - 업로드 성공 파일은 즉시 로컬 삭제 """ import os, re, csv, time, stat, json, logging, io, subprocess, shutil, tempfile from pathlib import Path from typing import Dict, Any, List, Optional, Tuple import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from urllib.parse import urlparse, parse_qsl, urlencode, urlunparse from PIL import Image from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from selenium.common.exceptions import SessionNotCreatedException, WebDriverException from webdriver_manager.chrome import ChromeDriverManager import yt_dlp # ===================== 설정 ===================== ACCOUNT = "수집대상" # 수집 대상 X 계정 TARGET_COUNT = 8000 # 원하는 수집 개수 MAX_SCROLL = 100000 # 최대 스크롤 시도 SCROLL_PAUSE = 7.5 # 스크롤 간 대기(안정성) HEADLESS = False # 창 열림 권장(True면 로그인 대기 비활성) WAIT_FOR_LOGIN= True # 창 뜬 뒤 로그인하고 콘솔 Enter로 시작 OUT_DIR = Path("./NatureIsAmazing_v3") # Notion (개인용, 하드코딩 요청 반영) NOTION_API_KEY = "노션API" NOTION_DATABASE_ID_RAW = "노션DP" # 하이픈 없어도 허용 NOTION_VERSION = "2022-06-28" # 선택: X 로그인 쿠키(Netscape 형식, yt-dlp용) COOKIES_FILE = "" # 예) "/Users/아이디(영문이름)/cookies_twitter.txt" UA = ("Mozilla/5.0 (Macintosh; Intel Mac OS X 13_5) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36") MAX_UPLOAD_BYTES = 5 * 1024 * 1024 # 5 MiB logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s", datefmt="%H:%M:%S") # ===================== 공통 유틸 ===================== def ensure_executable(path: str): if not os.access(path, os.X_OK): os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) def build_http_session() -> requests.Session: s = requests.Session() retries = Retry(total=5, connect=5, read=5, backoff_factor=0.6, status_forcelist=(429,500,502,503,504), allowed_methods=["GET","HEAD","OPTIONS"]) s.headers.update({"User-Agent": UA, "Accept": "*/*"}) s.mount("http://", HTTPAdapter(max_retries=retries)) s.mount("https://", HTTPAdapter(max_retries=retries)) return s def normalize_x_url(u: str) -> str: return u.replace("x.com", "twitter.com") def force_orig(url: str) -> str: if "twimg.com/media" not in url: return url parts = list(urlparse(url)) qs = dict(parse_qsl(parts[4])) qs["name"] = "orig" parts[4] = urlencode(qs) return urlunparse(parts) def hyphenate_uuid(s: str) -> str: s = re.sub(r"[^0-9a-fA-F]", "", s) return f"{s[0:8]}-{s[8:12]}-{s[12:16]}-{s[16:20]}-{s[20:32]}" if len(s)==32 else s NOTION_DATABASE_ID = hyphenate_uuid(NOTION_DATABASE_ID_RAW) # ===================== macOS Chrome 프로필 감지/복제 ===================== def _profile_locked(profile_path: Path) -> bool: for name in ("SingletonLock","SingletonCookie","SingletonSocket"): if (profile_path / name).exists(): return True return False def _make_ephemeral_profile(base_dir: Path, profile_dir: str) -> Tuple[str,str]: """ 원본 프로필에서 핵심 데이터만 복사하여 임시 user-data-dir 생성. 큰 캐시류는 제외해 충돌 최소화. """ src = base_dir / profile_dir tmp_root = Path(tempfile.mkdtemp(prefix="chrome-epi-")) dst = tmp_root / "Default" dst.mkdir(parents=True, exist_ok=True) whitelist = [ "Cookies","Cookies-journal","Network","Preferences","Secure Preferences", "Local Storage","History","Favicons","Visited Links","Bookmarks","Session Storage" ] blacklist = {"Cache","Code Cache","GPUCache","Service Worker","ShaderCache","GrShaderCache"} def _copy_entry(name: str): s = src / name d = dst / name if not s.exists(): return if s.is_dir(): if name in blacklist: return shutil.copytree(s, d, dirs_exist_ok=True) else: try: shutil.copy2(s, d) except Exception: pass for entry in whitelist: _copy_entry(entry) for name in os.listdir(src): if name in whitelist or name in blacklist: continue sp = src / name dp = dst / name try: if sp.is_file() and sp.stat().st_size <= 5*1024*1024: shutil.copy2(sp, dp) except Exception: pass return str(tmp_root), "Default" def find_chrome_profile_on_macos() -> Tuple[Optional[str], Optional[str]]: home = Path.home() base = home / "Library" / "Application Support" / "Google" / "Chrome" local_state = base / "Local State" try: if local_state.is_file(): data = json.loads(local_state.read_text(encoding="utf-8")) last_used = data.get("profile", {}).get("last_used") if last_used and (base / last_used).is_dir(): return str(base), last_used except Exception: pass for name in ["Default"]+[f"Profile {i}" for i in range(1,12)]: if (base / name).is_dir(): return str(base), name return None, None # ===================== 압축기 ===================== def compress_image_to_limit(src: Path, dst: Path, max_bytes: int = MAX_UPLOAD_BYTES) -> bool: try: img = Image.open(src).convert("RGB") except Exception as e: logging.error(f"이미지 열기 실패: {src} -> {e}") return False def resize(img, max_side): w, h = img.size if max(w,h) <= max_side: return img ratio = max_side / max(w,h) return img.resize((int(w*ratio), int(h*ratio)), Image.LANCZOS) for ms in [1600, 1400, 1200, 1000, 800, 640, 560, 480]: test = resize(img, ms) for q in [85,80,72,65,57,50,42,35,30,25]: buf = io.BytesIO() test.save(buf, format="JPEG", quality=q, optimize=True, progressive=True) data = buf.getvalue() if len(data) <= max_bytes: dst.write_bytes(data); return True buf = io.BytesIO() resize(img, 432).save(buf, format="JPEG", quality=24, optimize=True, progressive=True) dst.write_bytes(buf.getvalue()) return dst.stat().st_size <= max_bytes def _run_ffmpeg(args: List[str]) -> bool: try: res = subprocess.run(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False) return res.returncode == 0 except Exception: return False def compress_video_to_limit(src: Path, dst: Path, max_bytes: int = MAX_UPLOAD_BYTES) -> bool: steps = [ ("scale='min(720,iw)':-2", 28, 64), ("scale='min(640,iw)':-2", 30, 56), ("scale='min(540,iw)':-2", 32, 48), ("scale='min(480,iw)':-2", 35, 40), ] for vf, crf, a_kbps in steps: args = ["ffmpeg","-y","-i",str(src), "-vf",vf,"-c:v","libx264","-preset","veryfast","-crf",str(crf), "-c:a","aac","-b:a",f"{a_kbps}k","-movflags","+faststart", "-fs",str(max_bytes-1024), str(dst)] if _run_ffmpeg(args) and dst.exists() and dst.stat().st_size <= max_bytes: return True args = ["ffmpeg","-y","-i",str(src), "-vf","scale='min(432,iw)':-2","-c:v","libx264","-preset","veryfast","-crf","38", "-c:a","aac","-b:a","40k","-movflags","+faststart","-fs",str(max_bytes-1024), str(dst)] return _run_ffmpeg(args) and dst.exists() and dst.stat().st_size <= max_bytes # ===================== Selenium / Driver (안정화 리트라이) ===================== def _system_chrome_binary() -> Optional[str]: candidates = [ "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome", str(Path.home() / "Applications/Google Chrome.app/Contents/MacOS/Google Chrome"), ] for c in candidates: if os.path.exists(c): return c return None def _resolve_chromedriver_path(raw_path: str) -> str: path = raw_path if os.path.basename(path) == "THIRD_PARTY_NOTICES.chromedriver": d = os.path.dirname(path) for f in os.listdir(d): p = os.path.join(d, f) if f.startswith("chromedriver") and os.path.isfile(p): path = p; break if not os.path.isfile(path) or not os.access(path, os.X_OK): d = os.path.dirname(path) for f in os.listdir(d): p = os.path.join(d, f) if f.startswith("chromedriver") and os.path.isfile(p) and os.access(p, os.X_OK): path = p; break ensure_executable(path) return path def _base_options(headless: bool) -> Options: opts = Options() # 공통 옵션 opts.add_argument("--no-sandbox") opts.add_argument("--disable-dev-shm-usage") opts.add_argument("--disable-gpu") opts.add_argument("--window-size=1280,2400") opts.add_argument("--disable-notifications") opts.add_argument("--disable-extensions") opts.add_argument("--mute-audio") opts.add_argument("--log-level=3") opts.add_argument(f"--user-agent={UA}") if headless: opts.add_argument("--headless=new") opts.add_argument("--disable-software-rasterizer") opts.add_argument("--disable-features=VizDisplayCompositor") # 시스템 Chrome 바이너리 지정 sys_bin = _system_chrome_binary() if sys_bin: opts.binary_location = sys_bin logging.info(f"Using system Chrome binary: {sys_bin}") else: logging.warning("System Chrome binary not found. Relying on default.") return opts def _start_driver_with(opts: Options) -> webdriver.Chrome: raw = ChromeDriverManager().install() path = _resolve_chromedriver_path(raw) return webdriver.Chrome(service=Service(path), options=opts) def setup_driver(headless: bool = False): """ 부팅 전략(순차 재시도): 1) 시스템 프로필 사용 2) 시스템 프로필 잠금/충돌 → 임시 복제 프로필 사용 3) 프로필 없이 기본 실행 4) headless 최소 옵션 """ # 1) 시스템 프로필 시도 base, prof = find_chrome_profile_on_macos() if base and prof: profile_path = Path(base) / prof opts = _base_options(headless=headless) if _profile_locked(profile_path): logging.info("Profile locked → try ephemeral clone") else: opts.add_argument(f"--user-data-dir={base}") opts.add_argument(f"--profile-directory={prof}") try: logging.info(f"Launching Chrome with profile: {prof}") return _start_driver_with(opts) except (SessionNotCreatedException, WebDriverException) as e: logging.warning(f"Profile launch failed: {e}") # 2) 임시 복제 프로필 try: epi_dir, epi_prof = _make_ephemeral_profile(Path(base), prof) opts = _base_options(headless=headless) opts.add_argument(f"--user-data-dir={epi_dir}") opts.add_argument(f"--profile-directory={epi_prof}") logging.info("Launching Chrome with ephemeral profile") return _start_driver_with(opts) except (SessionNotCreatedException, WebDriverException) as e: logging.warning(f"Ephemeral profile launch failed: {e}") # 3) 프로필 없이 try: opts = _base_options(headless=headless) logging.info("Launching Chrome without user profile") return _start_driver_with(opts) except (SessionNotCreatedException, WebDriverException) as e: logging.warning(f"No-profile launch failed: {e}") # 4) 최후: headless 최소 옵션 try: opts = _base_options(headless=True) logging.info("Launching Chrome headless (fallback)") return _start_driver_with(opts) except Exception as e: logging.error(f"All launch strategies failed: {e}") raise # ===================== 로그인 대기 ===================== def wait_for_login_and_keypress(driver, account: str) -> bool: try: # 대상 프로필 페이지로 포커스 target = f"https://x.com/{account}" if "x.com" not in driver.current_url and "twitter.com" not in driver.current_url: driver.get(target) time.sleep(2) print("\n==============================================") print(" 로그인 후 계속 진행 안내") print("----------------------------------------------") print(" 1) 열린 Chrome 창에서 X(Twitter)에 로그인하세요.") print(" 2) 프로필/타임라인이 정상 로드되는지 확인하세요.") print(" 3) 여기 콘솔에서 Enter를 누르면 스크랩을 시작합니다.") print(" (s 입력 후 Enter: 즉시 시작)") print("==============================================\n") _ = input("로그인 완료 후 Enter(또는 s 입력 후 Enter)를 눌러 계속: ").strip().lower() return True except Exception as e: logging.warning(f"로그인 대기 중 경고: {e}") return True # ===================== 스크랩 ===================== def download_file(session: requests.Session, url: str, dest: Path) -> bool: try: tmp = dest.with_suffix(dest.suffix + ".part") with session.get(url, stream=True, timeout=30) as r: r.raise_for_status() with open(tmp, "wb") as f: for chunk in r.iter_content(8192): if chunk: f.write(chunk) os.replace(tmp, dest) return True except Exception as e: logging.warning(f"이미지 다운로드 실패: {url} -> {e}") return False def find_video_links(card) -> Optional[str]: links = [a.get_attribute("href") for a in card.find_elements(By.TAG_NAME, "a")] for l in links: if l and ("/video/" in l or "/i/status/" in l or "/status/" in l): return l return None def download_video(tweet_url: str, dest: Path) -> bool: try: url = normalize_x_url(tweet_url) ydl_opts = { "quiet": True, "no_warnings": True, "format": "bv*[ext=mp4]+ba[ext=m4a]/b[ext=mp4]/bv*+ba/b", "merge_output_format": "mp4", "outtmpl": str(dest), "retries": 5, "fragment_retries": 10, "socket_timeout": 20, "user_agent": UA, "geo_bypass": True, "concurrent_fragment_downloads": 3, } if COOKIES_FILE and Path(COOKIES_FILE).exists(): ydl_opts["cookiefile"] = COOKIES_FILE with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([url]) return True except Exception as e: logging.warning(f"영상 다운로드 실패: {url} -> {e}") return False def scrape(account: str, out_dir: Path, max_scroll: int, scroll_pause: float, headless: bool, target_count: int = 100) -> Dict[str, Any]: out_dir.mkdir(exist_ok=True) media_dir = out_dir / "media"; media_dir.mkdir(exist_ok=True) csv_path = out_dir / "tweets.csv" driver = setup_driver(headless=headless) driver.get(f"https://x.com/{account}") time.sleep(5) # 로그인 대기 if WAIT_FOR_LOGIN and not headless: if not wait_for_login_and_keypress(driver, account): logging.error("로그인 대기 실패"); driver.quit() return {"tweets": {}, "csv": csv_path, "media_dir": media_dir, "account": account} tweets: Dict[str, Dict[str, Any]] = {} last_height = driver.execute_script("return document.body.scrollHeight") stalls, stall_limit = 0, 12 session = build_http_session() logging.info(f"@{account} 수집 시작… (목표 {target_count}개)") for _ in range(max_scroll): cards = driver.find_elements(By.XPATH, "//article[@data-testid='tweet']") new_count = 0 for c in cards: try: link = c.find_element(By.XPATH, ".//a[contains(@href,'status')]") tid = link.get_attribute("href").split("/")[-1] if not tid or not tid.isdigit() or tid in tweets: continue text = c.text.replace("\n", " ") imgs = [force_orig(i.get_attribute("src")) for i in c.find_elements(By.XPATH, ".//img[contains(@src,'twimg.com/media')]")] vids = [v.get_attribute("src") for v in c.find_elements(By.XPATH, ".//video") if v.get_attribute("src")] orig_video = find_video_links(c) tweets[tid] = {"id": tid, "text": text, "images": imgs, "videos": vids, "orig_video": orig_video} new_count += 1 except Exception: pass if len(tweets) >= target_count: logging.info(f"목표 {target_count}개 달성, 스크랩 종료"); break # 스크롤 지그재그 driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") time.sleep(scroll_pause) driver.execute_script("window.scrollTo(0, document.body.scrollHeight*0.85);") time.sleep(scroll_pause) new_height = driver.execute_script("return document.body.scrollHeight") if new_height == last_height and new_count == 0: stalls += 1 if stalls >= stall_limit: logging.info("더 이상 컨텐츠 로드 없음 → 종료"); break else: stalls = 0 last_height = new_height driver.quit() logging.info(f"트윗 {len(tweets)}개 수집 완료") # CSV with open(csv_path, "w", newline="", encoding="utf-8") as f: w = csv.writer(f) w.writerow(["id", "text", "image_count", "video_count", "orig_video"]) for t in tweets.values(): w.writerow([t["id"], t["text"], len(t["images"]), len(t["videos"]), t["orig_video"]]) logging.info(f"CSV 저장: {csv_path}") # 미디어 다운로드 logging.info("미디어 다운로드…") for tid, t in tweets.items(): tweet_url = normalize_x_url(t.get("orig_video") or f"https://x.com/{account}/status/{tid}") for idx, m in enumerate(t["images"], 1): dest = media_dir / f"{tid}_img{idx}.jpg" if not dest.exists(): download_file(session, m, dest) if bool(t.get("videos")) or bool(t.get("orig_video")): dest = media_dir / f"{tid}_video.mp4" if not dest.exists(): download_video(tweet_url, dest) logging.info(f"미디어 저장 완료: {media_dir}") return {"tweets": tweets, "csv": csv_path, "media_dir": media_dir, "account": account} # ===================== Notion (유연 매핑) ===================== def notion_headers() -> Dict[str, str]: h = {"Authorization": f"Bearer {NOTION_API_KEY}", "Content-Type": "application/json"} if NOTION_VERSION: h["Notion-Version"] = NOTION_VERSION return h def notion_get_db() -> Dict[str, Any]: r = requests.get(f"https://api.notion.com/v1/databases/{NOTION_DATABASE_ID}", headers=notion_headers(), timeout=30) r.raise_for_status(); return r.json() def find_title_prop(db: Dict[str, Any]) -> str: for name, meta in db.get("properties", {}).items(): if meta.get("type") == "title": return name raise RuntimeError("이 DB에는 Title 속성이 없습니다. (제목 속성 1개 필요)") def map_optional_props(db: Dict[str, Any]) -> Dict[str, Optional[str]]: props = db.get("properties", {}) def norm(s): return re.sub(r"[\s_]+","",s.lower()) candidates = { "text": ["text","tweettext","본문","내용"], "image_count": ["imagecount","images","imgcount","이미지개수","이미지수"], "video_count": ["videocount","videos","vidcount","동영상개수","비디오개수"], "tweet_url": ["tweeturl","url","링크","원문","원본url"], "media": ["media","files","attachments","첨부","파일"], } found = {"text":None,"image_count":None,"video_count":None,"tweet_url":None,"media":None} for name, meta in props.items(): t = meta.get("type"); n = norm(name) if t=="rich_text" and found["text"] is None and (n in candidates["text"]): found["text"]=name if t=="number" and found["image_count"] is None and (n in candidates["image_count"]): found["image_count"]=name if t=="number" and found["video_count"] is None and (n in candidates["video_count"]): found["video_count"]=name if t=="url" and found["tweet_url"] is None and (n in candidates["tweet_url"]): found["tweet_url"]=name if t=="files" and found["media"] is None and (n in candidates["media"]): found["media"]=name # 타입 기반 폴백 if not found["text"]: for n,m in props.items(): if m.get("type")=="rich_text": found["text"]=n; break if not found["tweet_url"]: for n,m in props.items(): if m.get("type")=="url": found["tweet_url"]=n; break if not found["media"]: for n,m in props.items(): if m.get("type")=="files": found["media"]=n; break if not found["image_count"]: for n,m in props.items(): if m.get("type")=="number": found["image_count"]=n; break if not found["video_count"]: for n,m in props.items(): if m.get("type")=="number" and n!=found["image_count"]: found["video_count"]=n; break return found def ensure_media_property(db: Dict[str, Any]) -> Optional[str]: for name, meta in db.get("properties", {}).items(): if meta.get("type")=="files": return name patch = {"properties":{"Media":{"name":"Media","type":"files","files":{}}}} r = requests.patch(f"https://api.notion.com/v1/databases/{NOTION_DATABASE_ID}", headers=notion_headers(), json=patch, timeout=30) if r.status_code in (200,202): return "Media" logging.warning(f"DB files 속성 생성 실패: {r.status_code} {r.text}") return None def notion_query_by_title(title_prop: str, value: str) -> Optional[str]: payload = {"filter":{"property":title_prop,"title":{"equals":value}},"page_size":1} r = requests.post(f"https://api.notion.com/v1/databases/{NOTION_DATABASE_ID}/query", headers=notion_headers(), json=payload, timeout=30) r.raise_for_status(); res = r.json().get("results", []) return res[0]["id"] if res else None def notion_create_page(row: Dict[str, Any], title_prop: str, opt: Dict[str, Optional[str]]) -> str: props: Dict[str, Any] = {title_prop: {"title":[{"text":{"content":row["id"]}}]}} if opt["text"]: txt = (row.get("text") or "")[:2000] props[opt["text"]] = {"rich_text":[{"text":{"content":txt}}]} if txt else {"rich_text":[]} if opt["image_count"]: props[opt["image_count"]] = {"number": len(row.get("images", []))} if opt["video_count"]: props[opt["video_count"]] = {"number": len(row.get("videos", []))} if opt["tweet_url"]: props[opt["tweet_url"]] = {"url": row["tweet_url"]} payload = {"parent":{"database_id":NOTION_DATABASE_ID}, "properties": props, "children":[{"object":"block","type":"bookmark","bookmark":{"url":row["tweet_url"]}}]} r = requests.post("https://api.notion.com/v1/pages", headers=notion_headers(), json=payload, timeout=30) if r.status_code >= 400: logging.error(f"페이지 생성 실패: {r.status_code} {r.text}\nPayload: {payload}") r.raise_for_status() return r.json()["id"] def notion_get_page_media_names(page_id: str, media_prop: Optional[str]) -> List[str]: if not media_prop: return [] r = requests.get(f"https://api.notion.com/v1/pages/{page_id}", headers=notion_headers(), timeout=30) r.raise_for_status() items = r.json().get("properties", {}).get(media_prop, {}).get("files", []) return [i.get("name") for i in items if i.get("name")] # --- File Upload API --- def mime_from_suffix(p: Path) -> str: ext = p.suffix.lower() if ext in [".jpg",".jpeg"]: return "image/jpeg" if ext == ".png": return "image/png" if ext == ".gif": return "image/gif" if ext == ".mp4": return "video/mp4" return "application/octet-stream" def notion_create_file_upload(name: str, mime: str) -> Dict[str, Any]: r = requests.post("https://api.notion.com/v1/file_uploads", headers=notion_headers(), json={"file_name": name, "file_mime_type": mime}, timeout=60) r.raise_for_status(); return r.json() def notion_send_file_upload(file_upload_id: str, filepath: Path): headers = {k:v for k,v in notion_headers().items() if k!="Content-Type"} with open(filepath, "rb") as fh: r = requests.post(f"https://api.notion.com/v1/file_uploads/{file_upload_id}/send", headers=headers, files={"file": (filepath.name, fh)}, timeout=600) r.raise_for_status(); return r.json() def as_uploaded_file_obj(file_upload_id: str, name: str) -> Dict[str, Any]: return {"name": name, "type": "file_upload", "file_upload": {"id": file_upload_id}} def notion_update_media_property(page_id: str, media_prop: str, files: List[Dict[str, Any]]): payload = {"properties": {media_prop: {"files": files}}} r = requests.patch(f"https://api.notion.com/v1/pages/{page_id}", headers=notion_headers(), json=payload, timeout=60) if r.status_code >= 400: logging.error(f"Media 속성 업데이트 실패: {r.status_code} {r.text}\nPayload: {payload}") r.raise_for_status() def notion_append_file_blocks(page_id: str, files: List[Dict[str, Any]]): children = [] for f in files: children.append({"object":"block","type":"file", "file":{"type":"file_upload","file_upload":{"file_upload_id": f["file_upload"]["id"]}}}) r = requests.patch(f"https://api.notion.com/v1/blocks/{page_id}/children", headers=notion_headers(), json={"children": children}, timeout=60) if r.status_code >= 400: logging.error(f"파일 블록 첨부 실패: {r.status_code} {r.text}") r.raise_for_status() # ===================== 파이프라인 ===================== def run_pipeline(account: str, out_dir: Path, target_count: int, max_scroll: int, headless: bool): result = scrape(account, out_dir, max_scroll, SCROLL_PAUSE, headless, target_count=target_count) tweets = result["tweets"]; media_dir: Path = result["media_dir"] if not (NOTION_API_KEY and NOTION_DATABASE_ID): logging.error("NOTION_API_KEY/NOTION_DATABASE_ID 필요"); return db = notion_get_db() title_prop = find_title_prop(db) opt_props = map_optional_props(db) if not opt_props["media"]: created = ensure_media_property(db) if created: opt_props["media"] = created for tid, t in tweets.items(): row = { "id": tid, "text": t.get("text",""), "images": t.get("images",[]), "videos": t.get("videos",[]), "tweet_url": normalize_x_url(t.get("orig_video") or f"https://x.com/{account}/status/{tid}"), } page_id = notion_query_by_title(title_prop, row["id"]) if not page_id: try: page_id = notion_create_page(row, title_prop, opt_props) except requests.HTTPError as e: logging.error(f"[{tid}] 페이지 생성 실패 → 스킵: {e}") continue time.sleep(0.25) existing_names = set() if opt_props["media"]: try: existing_names = set(notion_get_page_media_names(page_id, opt_props["media"])) except Exception as e: logging.warning(f"[{tid}] 기존 Media 조회 실패(무시): {e}") uploaded_files: List[Dict[str, Any]] = [] # 이미지 for p in sorted(media_dir.glob(f"{tid}_img*.jpg")): if not p.exists() or p.name in existing_names: continue temp = p.with_suffix(".upload.jpg") if not compress_image_to_limit(p, temp, MAX_UPLOAD_BYTES): logging.error(f"[{tid}] 이미지 5MiB 이하 압축 실패: {p.name}") if temp.exists(): temp.unlink(missing_ok=True) continue try: meta = notion_create_file_upload(p.name, mime_from_suffix(temp)) notion_send_file_upload(meta["id"], temp) uploaded_files.append(as_uploaded_file_obj(meta["id"], p.name)) temp.unlink(missing_ok=True); p.unlink(missing_ok=True) logging.info(f"[{tid}] 이미지 업로드 성공 → {p.name}") except Exception as e: logging.error(f"[{tid}] 이미지 업로드 실패({p.name}): {e}") if temp.exists(): temp.unlink(missing_ok=True) time.sleep(0.12) # 동영상 v = media_dir / f"{tid}_video.mp4" if v.exists() and v.name not in existing_names: tempv = v.with_suffix(".upload.mp4") if compress_video_to_limit(v, tempv, MAX_UPLOAD_BYTES): try: meta = notion_create_file_upload(v.name, mime_from_suffix(tempv)) notion_send_file_upload(meta["id"], tempv) uploaded_files.append(as_uploaded_file_obj(meta["id"], v.name)) tempv.unlink(missing_ok=True); v.unlink(missing_ok=True) logging.info(f"[{tid}] 동영상 업로드 성공 → {v.name}") except Exception as e: logging.error(f"[{tid}] 동영상 업로드 실패({v.name}): {e}") if tempv.exists(): tempv.unlink(missing_ok=True) else: logging.error(f"[{tid}] 동영상 5MiB 이하 압축 실패: {v.name}") if tempv.exists(): tempv.unlink(missing_ok=True) # 반영 if uploaded_files: try: if opt_props["media"]: notion_update_media_property(page_id, opt_props["media"], uploaded_files) logging.info(f"[{tid}] Media 속성에 {len(uploaded_files)}개 설정") else: notion_append_file_blocks(page_id, uploaded_files) logging.info(f"[{tid}] 파일 블록으로 {len(uploaded_files)}개 첨부") except Exception as e: logging.error(f"[{tid}] 파일 반영 실패: {e}") else: logging.info(f"[{tid}] 신규 업로드 없음") # ===================== 엔트리포인트 ===================== def main(): run_pipeline(ACCOUNT, OUT_DIR, TARGET_COUNT, MAX_SCROLL, HEADLESS) if __name__ == "__main__": main()
