월요일, 11월 10, 2025
HomeCodingPythonX 게시물을 내 노션에 스크랩하는 코드

X 게시물을 내 노션에 스크랩하는 코드

X.com에 노션 스크랩하는 코드를 올려 놓습니다

X 게시물을 내 노션에 스크랩하는 코드

X 게시물을 내 노션에 스크랩하는 코드
X 게시물을 내 노션에 스크랩하는 코드
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
X(Twitter) → Notion 자동 파이프라인 (macOS, 최종판)
- 창 열림 + 로그인 대기(콘솔 Enter) 후 수집 시작
- macOS Chrome 프로필 자동 감지 + 잠금 시 임시 프로필 복제(B 방안)
- webdriver_manager의 THIRD_PARTY_NOTICES.chromedriver 경로 보정
- 무한 스크롤로 TARGET_COUNT 까지 수집
- 이미지/동영상 다운로드 → 5MiB 이하 자동 압축 → Notion 업로드(새 File Upload API)
- DB 스키마 유연 적응: Title 자동 탐색, 기타 속성은 있으면 매핑/없으면 생략
- Media(files) 없으면 생성 시도 → 실패 시 file 블록(children)로 첨부
- 중복 방지: 페이지 재활용, 동일 파일명 재업로드 방지
- 업로드 성공 파일은 즉시 로컬 삭제
"""

import os, re, csv, time, stat, json, logging, io, subprocess, shutil, tempfile
from pathlib import Path
from typing import Dict, Any, List, Optional, Tuple

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from urllib.parse import urlparse, parse_qsl, urlencode, urlunparse

from PIL import Image

from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.common.exceptions import SessionNotCreatedException, WebDriverException
from webdriver_manager.chrome import ChromeDriverManager
import yt_dlp

# ===================== 설정 =====================
ACCOUNT       = "수집대상"      # 수집 대상 X 계정
TARGET_COUNT  = 8000                  # 원하는 수집 개수
MAX_SCROLL    = 100000                 # 최대 스크롤 시도
SCROLL_PAUSE  = 7.5                  # 스크롤 간 대기(안정성)
HEADLESS      = False                # 창 열림 권장(True면 로그인 대기 비활성)
WAIT_FOR_LOGIN= True                 # 창 뜬 뒤 로그인하고 콘솔 Enter로 시작

OUT_DIR       = Path("./NatureIsAmazing_v3")

# Notion (개인용, 하드코딩 요청 반영)
NOTION_API_KEY         = "노션API"
NOTION_DATABASE_ID_RAW = "노션DP"  # 하이픈 없어도 허용
NOTION_VERSION         = "2022-06-28"

# 선택: X 로그인 쿠키(Netscape 형식, yt-dlp용)
COOKIES_FILE = ""  # 예) "/Users/아이디(영문이름)/cookies_twitter.txt"

UA = ("Mozilla/5.0 (Macintosh; Intel Mac OS X 13_5) AppleWebKit/537.36 "
      "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36")

MAX_UPLOAD_BYTES = 5 * 1024 * 1024  # 5 MiB

logging.basicConfig(level=logging.INFO,
                    format="%(asctime)s | %(levelname)s | %(message)s",
                    datefmt="%H:%M:%S")

# ===================== 공통 유틸 =====================
def ensure_executable(path: str):
    if not os.access(path, os.X_OK):
        os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR |
                       stat.S_IRGRP | stat.S_IXGRP |
                       stat.S_IROTH | stat.S_IXOTH)

def build_http_session() -> requests.Session:
    s = requests.Session()
    retries = Retry(total=5, connect=5, read=5, backoff_factor=0.6,
                    status_forcelist=(429,500,502,503,504),
                    allowed_methods=["GET","HEAD","OPTIONS"])
    s.headers.update({"User-Agent": UA, "Accept": "*/*"})
    s.mount("http://", HTTPAdapter(max_retries=retries))
    s.mount("https://", HTTPAdapter(max_retries=retries))
    return s

def normalize_x_url(u: str) -> str:
    return u.replace("x.com", "twitter.com")

def force_orig(url: str) -> str:
    if "twimg.com/media" not in url:
        return url
    parts = list(urlparse(url))
    qs = dict(parse_qsl(parts[4]))
    qs["name"] = "orig"
    parts[4] = urlencode(qs)
    return urlunparse(parts)

def hyphenate_uuid(s: str) -> str:
    s = re.sub(r"[^0-9a-fA-F]", "", s)
    return f"{s[0:8]}-{s[8:12]}-{s[12:16]}-{s[16:20]}-{s[20:32]}" if len(s)==32 else s

NOTION_DATABASE_ID = hyphenate_uuid(NOTION_DATABASE_ID_RAW)

# ===================== macOS Chrome 프로필 감지/복제 =====================
def _profile_locked(profile_path: Path) -> bool:
    for name in ("SingletonLock","SingletonCookie","SingletonSocket"):
        if (profile_path / name).exists():
            return True
    return False

def _make_ephemeral_profile(base_dir: Path, profile_dir: str) -> Tuple[str,str]:
    """
    원본 프로필에서 핵심 데이터만 복사하여 임시 user-data-dir 생성.
    큰 캐시류는 제외해 충돌 최소화.
    """
    src = base_dir / profile_dir
    tmp_root = Path(tempfile.mkdtemp(prefix="chrome-epi-"))
    dst = tmp_root / "Default"
    dst.mkdir(parents=True, exist_ok=True)

    whitelist = [
        "Cookies","Cookies-journal","Network","Preferences","Secure Preferences",
        "Local Storage","History","Favicons","Visited Links","Bookmarks","Session Storage"
    ]
    blacklist = {"Cache","Code Cache","GPUCache","Service Worker","ShaderCache","GrShaderCache"}

    def _copy_entry(name: str):
        s = src / name
        d = dst / name
        if not s.exists(): return
        if s.is_dir():
            if name in blacklist: return
            shutil.copytree(s, d, dirs_exist_ok=True)
        else:
            try: shutil.copy2(s, d)
            except Exception: pass

    for entry in whitelist: _copy_entry(entry)
    for name in os.listdir(src):
        if name in whitelist or name in blacklist: continue
        sp = src / name
        dp = dst / name
        try:
            if sp.is_file() and sp.stat().st_size <= 5*1024*1024:
                shutil.copy2(sp, dp)
        except Exception:
            pass

    return str(tmp_root), "Default"

def find_chrome_profile_on_macos() -> Tuple[Optional[str], Optional[str]]:
    home = Path.home()
    base = home / "Library" / "Application Support" / "Google" / "Chrome"
    local_state = base / "Local State"
    try:
        if local_state.is_file():
            data = json.loads(local_state.read_text(encoding="utf-8"))
            last_used = data.get("profile", {}).get("last_used")
            if last_used and (base / last_used).is_dir():
                return str(base), last_used
    except Exception:
        pass
    for name in ["Default"]+[f"Profile {i}" for i in range(1,12)]:
        if (base / name).is_dir():
            return str(base), name
    return None, None

# ===================== 압축기 =====================
def compress_image_to_limit(src: Path, dst: Path, max_bytes: int = MAX_UPLOAD_BYTES) -> bool:
    try:
        img = Image.open(src).convert("RGB")
    except Exception as e:
        logging.error(f"이미지 열기 실패: {src} -> {e}")
        return False

    def resize(img, max_side):
        w, h = img.size
        if max(w,h) <= max_side: return img
        ratio = max_side / max(w,h)
        return img.resize((int(w*ratio), int(h*ratio)), Image.LANCZOS)

    for ms in [1600, 1400, 1200, 1000, 800, 640, 560, 480]:
        test = resize(img, ms)
        for q in [85,80,72,65,57,50,42,35,30,25]:
            buf = io.BytesIO()
            test.save(buf, format="JPEG", quality=q, optimize=True, progressive=True)
            data = buf.getvalue()
            if len(data) <= max_bytes:
                dst.write_bytes(data); return True
    buf = io.BytesIO()
    resize(img, 432).save(buf, format="JPEG", quality=24, optimize=True, progressive=True)
    dst.write_bytes(buf.getvalue())
    return dst.stat().st_size <= max_bytes

def _run_ffmpeg(args: List[str]) -> bool:
    try:
        res = subprocess.run(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False)
        return res.returncode == 0
    except Exception:
        return False

def compress_video_to_limit(src: Path, dst: Path, max_bytes: int = MAX_UPLOAD_BYTES) -> bool:
    steps = [
        ("scale='min(720,iw)':-2", 28, 64),
        ("scale='min(640,iw)':-2", 30, 56),
        ("scale='min(540,iw)':-2", 32, 48),
        ("scale='min(480,iw)':-2", 35, 40),
    ]
    for vf, crf, a_kbps in steps:
        args = ["ffmpeg","-y","-i",str(src),
                "-vf",vf,"-c:v","libx264","-preset","veryfast","-crf",str(crf),
                "-c:a","aac","-b:a",f"{a_kbps}k","-movflags","+faststart",
                "-fs",str(max_bytes-1024), str(dst)]
        if _run_ffmpeg(args) and dst.exists() and dst.stat().st_size <= max_bytes:
            return True
    args = ["ffmpeg","-y","-i",str(src),
            "-vf","scale='min(432,iw)':-2","-c:v","libx264","-preset","veryfast","-crf","38",
            "-c:a","aac","-b:a","40k","-movflags","+faststart","-fs",str(max_bytes-1024), str(dst)]
    return _run_ffmpeg(args) and dst.exists() and dst.stat().st_size <= max_bytes

# ===================== Selenium / Driver (안정화 리트라이) =====================
def _system_chrome_binary() -> Optional[str]:
    candidates = [
        "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
        str(Path.home() / "Applications/Google Chrome.app/Contents/MacOS/Google Chrome"),
    ]
    for c in candidates:
        if os.path.exists(c):
            return c
    return None

def _resolve_chromedriver_path(raw_path: str) -> str:
    path = raw_path
    if os.path.basename(path) == "THIRD_PARTY_NOTICES.chromedriver":
        d = os.path.dirname(path)
        for f in os.listdir(d):
            p = os.path.join(d, f)
            if f.startswith("chromedriver") and os.path.isfile(p):
                path = p; break
    if not os.path.isfile(path) or not os.access(path, os.X_OK):
        d = os.path.dirname(path)
        for f in os.listdir(d):
            p = os.path.join(d, f)
            if f.startswith("chromedriver") and os.path.isfile(p) and os.access(p, os.X_OK):
                path = p; break
    ensure_executable(path)
    return path

def _base_options(headless: bool) -> Options:
    opts = Options()
    # 공통 옵션
    opts.add_argument("--no-sandbox")
    opts.add_argument("--disable-dev-shm-usage")
    opts.add_argument("--disable-gpu")
    opts.add_argument("--window-size=1280,2400")
    opts.add_argument("--disable-notifications")
    opts.add_argument("--disable-extensions")
    opts.add_argument("--mute-audio")
    opts.add_argument("--log-level=3")
    opts.add_argument(f"--user-agent={UA}")
    if headless:
        opts.add_argument("--headless=new")
        opts.add_argument("--disable-software-rasterizer")
        opts.add_argument("--disable-features=VizDisplayCompositor")
    # 시스템 Chrome 바이너리 지정
    sys_bin = _system_chrome_binary()
    if sys_bin:
        opts.binary_location = sys_bin
        logging.info(f"Using system Chrome binary: {sys_bin}")
    else:
        logging.warning("System Chrome binary not found. Relying on default.")
    return opts

def _start_driver_with(opts: Options) -> webdriver.Chrome:
    raw = ChromeDriverManager().install()
    path = _resolve_chromedriver_path(raw)
    return webdriver.Chrome(service=Service(path), options=opts)

def setup_driver(headless: bool = False):
    """
    부팅 전략(순차 재시도):
    1) 시스템 프로필 사용
    2) 시스템 프로필 잠금/충돌 → 임시 복제 프로필 사용
    3) 프로필 없이 기본 실행
    4) headless 최소 옵션
    """
    # 1) 시스템 프로필 시도
    base, prof = find_chrome_profile_on_macos()
    if base and prof:
        profile_path = Path(base) / prof
        opts = _base_options(headless=headless)
        if _profile_locked(profile_path):
            logging.info("Profile locked → try ephemeral clone")
        else:
            opts.add_argument(f"--user-data-dir={base}")
            opts.add_argument(f"--profile-directory={prof}")
            try:
                logging.info(f"Launching Chrome with profile: {prof}")
                return _start_driver_with(opts)
            except (SessionNotCreatedException, WebDriverException) as e:
                logging.warning(f"Profile launch failed: {e}")

        # 2) 임시 복제 프로필
        try:
            epi_dir, epi_prof = _make_ephemeral_profile(Path(base), prof)
            opts = _base_options(headless=headless)
            opts.add_argument(f"--user-data-dir={epi_dir}")
            opts.add_argument(f"--profile-directory={epi_prof}")
            logging.info("Launching Chrome with ephemeral profile")
            return _start_driver_with(opts)
        except (SessionNotCreatedException, WebDriverException) as e:
            logging.warning(f"Ephemeral profile launch failed: {e}")

    # 3) 프로필 없이
    try:
        opts = _base_options(headless=headless)
        logging.info("Launching Chrome without user profile")
        return _start_driver_with(opts)
    except (SessionNotCreatedException, WebDriverException) as e:
        logging.warning(f"No-profile launch failed: {e}")

    # 4) 최후: headless 최소 옵션
    try:
        opts = _base_options(headless=True)
        logging.info("Launching Chrome headless (fallback)")
        return _start_driver_with(opts)
    except Exception as e:
        logging.error(f"All launch strategies failed: {e}")
        raise

# ===================== 로그인 대기 =====================
def wait_for_login_and_keypress(driver, account: str) -> bool:
    try:
        # 대상 프로필 페이지로 포커스
        target = f"https://x.com/{account}"
        if "x.com" not in driver.current_url and "twitter.com" not in driver.current_url:
            driver.get(target)
            time.sleep(2)

        print("\n==============================================")
        print(" 로그인 후 계속 진행 안내")
        print("----------------------------------------------")
        print(" 1) 열린 Chrome 창에서 X(Twitter)에 로그인하세요.")
        print(" 2) 프로필/타임라인이 정상 로드되는지 확인하세요.")
        print(" 3) 여기 콘솔에서 Enter를 누르면 스크랩을 시작합니다.")
        print("    (s 입력 후 Enter: 즉시 시작)")
        print("==============================================\n")
        _ = input("로그인 완료 후 Enter(또는 s 입력 후 Enter)를 눌러 계속: ").strip().lower()
        return True
    except Exception as e:
        logging.warning(f"로그인 대기 중 경고: {e}")
        return True

# ===================== 스크랩 =====================
def download_file(session: requests.Session, url: str, dest: Path) -> bool:
    try:
        tmp = dest.with_suffix(dest.suffix + ".part")
        with session.get(url, stream=True, timeout=30) as r:
            r.raise_for_status()
            with open(tmp, "wb") as f:
                for chunk in r.iter_content(8192):
                    if chunk: f.write(chunk)
        os.replace(tmp, dest)
        return True
    except Exception as e:
        logging.warning(f"이미지 다운로드 실패: {url} -> {e}")
        return False

def find_video_links(card) -> Optional[str]:
    links = [a.get_attribute("href") for a in card.find_elements(By.TAG_NAME, "a")]
    for l in links:
        if l and ("/video/" in l or "/i/status/" in l or "/status/" in l):
            return l
    return None

def download_video(tweet_url: str, dest: Path) -> bool:
    try:
        url = normalize_x_url(tweet_url)
        ydl_opts = {
            "quiet": True,
            "no_warnings": True,
            "format": "bv*[ext=mp4]+ba[ext=m4a]/b[ext=mp4]/bv*+ba/b",
            "merge_output_format": "mp4",
            "outtmpl": str(dest),
            "retries": 5,
            "fragment_retries": 10,
            "socket_timeout": 20,
            "user_agent": UA,
            "geo_bypass": True,
            "concurrent_fragment_downloads": 3,
        }
        if COOKIES_FILE and Path(COOKIES_FILE).exists():
            ydl_opts["cookiefile"] = COOKIES_FILE
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            ydl.download([url])
        return True
    except Exception as e:
        logging.warning(f"영상 다운로드 실패: {url} -> {e}")
        return False

def scrape(account: str, out_dir: Path, max_scroll: int, scroll_pause: float,
           headless: bool, target_count: int = 100) -> Dict[str, Any]:
    out_dir.mkdir(exist_ok=True)
    media_dir = out_dir / "media"; media_dir.mkdir(exist_ok=True)
    csv_path = out_dir / "tweets.csv"

    driver = setup_driver(headless=headless)
    driver.get(f"https://x.com/{account}")
    time.sleep(5)

    # 로그인 대기
    if WAIT_FOR_LOGIN and not headless:
        if not wait_for_login_and_keypress(driver, account):
            logging.error("로그인 대기 실패"); driver.quit()
            return {"tweets": {}, "csv": csv_path, "media_dir": media_dir, "account": account}

    tweets: Dict[str, Dict[str, Any]] = {}
    last_height = driver.execute_script("return document.body.scrollHeight")
    stalls, stall_limit = 0, 12
    session = build_http_session()

    logging.info(f"@{account} 수집 시작… (목표 {target_count}개)")
    for _ in range(max_scroll):
        cards = driver.find_elements(By.XPATH, "//article[@data-testid='tweet']")
        new_count = 0
        for c in cards:
            try:
                link = c.find_element(By.XPATH, ".//a[contains(@href,'status')]")
                tid = link.get_attribute("href").split("/")[-1]
                if not tid or not tid.isdigit() or tid in tweets:
                    continue
                text = c.text.replace("\n", " ")
                imgs = [force_orig(i.get_attribute("src")) for i in c.find_elements(By.XPATH, ".//img[contains(@src,'twimg.com/media')]")]
                vids = [v.get_attribute("src") for v in c.find_elements(By.XPATH, ".//video") if v.get_attribute("src")]
                orig_video = find_video_links(c)
                tweets[tid] = {"id": tid, "text": text, "images": imgs, "videos": vids, "orig_video": orig_video}
                new_count += 1
            except Exception:
                pass

        if len(tweets) >= target_count:
            logging.info(f"목표 {target_count}개 달성, 스크랩 종료"); break

        # 스크롤 지그재그
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(scroll_pause)
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight*0.85);")
        time.sleep(scroll_pause)

        new_height = driver.execute_script("return document.body.scrollHeight")
        if new_height == last_height and new_count == 0:
            stalls += 1
            if stalls >= stall_limit:
                logging.info("더 이상 컨텐츠 로드 없음 → 종료"); break
        else:
            stalls = 0
        last_height = new_height

    driver.quit()
    logging.info(f"트윗 {len(tweets)}개 수집 완료")

    # CSV
    with open(csv_path, "w", newline="", encoding="utf-8") as f:
        w = csv.writer(f)
        w.writerow(["id", "text", "image_count", "video_count", "orig_video"])
        for t in tweets.values():
            w.writerow([t["id"], t["text"], len(t["images"]), len(t["videos"]), t["orig_video"]])
    logging.info(f"CSV 저장: {csv_path}")

    # 미디어 다운로드
    logging.info("미디어 다운로드…")
    for tid, t in tweets.items():
        tweet_url = normalize_x_url(t.get("orig_video") or f"https://x.com/{account}/status/{tid}")
        for idx, m in enumerate(t["images"], 1):
            dest = media_dir / f"{tid}_img{idx}.jpg"
            if not dest.exists():
                download_file(session, m, dest)
        if bool(t.get("videos")) or bool(t.get("orig_video")):
            dest = media_dir / f"{tid}_video.mp4"
            if not dest.exists():
                download_video(tweet_url, dest)

    logging.info(f"미디어 저장 완료: {media_dir}")
    return {"tweets": tweets, "csv": csv_path, "media_dir": media_dir, "account": account}

# ===================== Notion (유연 매핑) =====================
def notion_headers() -> Dict[str, str]:
    h = {"Authorization": f"Bearer {NOTION_API_KEY}",
         "Content-Type": "application/json"}
    if NOTION_VERSION: h["Notion-Version"] = NOTION_VERSION
    return h

def notion_get_db() -> Dict[str, Any]:
    r = requests.get(f"https://api.notion.com/v1/databases/{NOTION_DATABASE_ID}",
                     headers=notion_headers(), timeout=30)
    r.raise_for_status(); return r.json()

def find_title_prop(db: Dict[str, Any]) -> str:
    for name, meta in db.get("properties", {}).items():
        if meta.get("type") == "title": return name
    raise RuntimeError("이 DB에는 Title 속성이 없습니다. (제목 속성 1개 필요)")

def map_optional_props(db: Dict[str, Any]) -> Dict[str, Optional[str]]:
    props = db.get("properties", {})
    def norm(s): return re.sub(r"[\s_]+","",s.lower())
    candidates = {
        "text": ["text","tweettext","본문","내용"],
        "image_count": ["imagecount","images","imgcount","이미지개수","이미지수"],
        "video_count": ["videocount","videos","vidcount","동영상개수","비디오개수"],
        "tweet_url": ["tweeturl","url","링크","원문","원본url"],
        "media": ["media","files","attachments","첨부","파일"],
    }
    found = {"text":None,"image_count":None,"video_count":None,"tweet_url":None,"media":None}
    for name, meta in props.items():
        t = meta.get("type"); n = norm(name)
        if t=="rich_text" and found["text"] is None and (n in candidates["text"]): found["text"]=name
        if t=="number"    and found["image_count"] is None and (n in candidates["image_count"]): found["image_count"]=name
        if t=="number"    and found["video_count"] is None and (n in candidates["video_count"]): found["video_count"]=name
        if t=="url"       and found["tweet_url"] is None and (n in candidates["tweet_url"]): found["tweet_url"]=name
        if t=="files"     and found["media"] is None and (n in candidates["media"]): found["media"]=name
    # 타입 기반 폴백
    if not found["text"]:
        for n,m in props.items():
            if m.get("type")=="rich_text": found["text"]=n; break
    if not found["tweet_url"]:
        for n,m in props.items():
            if m.get("type")=="url": found["tweet_url"]=n; break
    if not found["media"]:
        for n,m in props.items():
            if m.get("type")=="files": found["media"]=n; break
    if not found["image_count"]:
        for n,m in props.items():
            if m.get("type")=="number": found["image_count"]=n; break
    if not found["video_count"]:
        for n,m in props.items():
            if m.get("type")=="number" and n!=found["image_count"]: found["video_count"]=n; break
    return found

def ensure_media_property(db: Dict[str, Any]) -> Optional[str]:
    for name, meta in db.get("properties", {}).items():
        if meta.get("type")=="files": return name
    patch = {"properties":{"Media":{"name":"Media","type":"files","files":{}}}}
    r = requests.patch(f"https://api.notion.com/v1/databases/{NOTION_DATABASE_ID}",
                       headers=notion_headers(), json=patch, timeout=30)
    if r.status_code in (200,202): return "Media"
    logging.warning(f"DB files 속성 생성 실패: {r.status_code} {r.text}")
    return None

def notion_query_by_title(title_prop: str, value: str) -> Optional[str]:
    payload = {"filter":{"property":title_prop,"title":{"equals":value}},"page_size":1}
    r = requests.post(f"https://api.notion.com/v1/databases/{NOTION_DATABASE_ID}/query",
                      headers=notion_headers(), json=payload, timeout=30)
    r.raise_for_status(); res = r.json().get("results", [])
    return res[0]["id"] if res else None

def notion_create_page(row: Dict[str, Any], title_prop: str, opt: Dict[str, Optional[str]]) -> str:
    props: Dict[str, Any] = {title_prop: {"title":[{"text":{"content":row["id"]}}]}}
    if opt["text"]:
        txt = (row.get("text") or "")[:2000]
        props[opt["text"]] = {"rich_text":[{"text":{"content":txt}}]} if txt else {"rich_text":[]}
    if opt["image_count"]:
        props[opt["image_count"]] = {"number": len(row.get("images", []))}
    if opt["video_count"]:
        props[opt["video_count"]] = {"number": len(row.get("videos", []))}
    if opt["tweet_url"]:
        props[opt["tweet_url"]] = {"url": row["tweet_url"]}

    payload = {"parent":{"database_id":NOTION_DATABASE_ID},
               "properties": props,
               "children":[{"object":"block","type":"bookmark","bookmark":{"url":row["tweet_url"]}}]}
    r = requests.post("https://api.notion.com/v1/pages", headers=notion_headers(), json=payload, timeout=30)
    if r.status_code >= 400:
        logging.error(f"페이지 생성 실패: {r.status_code} {r.text}\nPayload: {payload}")
    r.raise_for_status()
    return r.json()["id"]

def notion_get_page_media_names(page_id: str, media_prop: Optional[str]) -> List[str]:
    if not media_prop: return []
    r = requests.get(f"https://api.notion.com/v1/pages/{page_id}", headers=notion_headers(), timeout=30)
    r.raise_for_status()
    items = r.json().get("properties", {}).get(media_prop, {}).get("files", [])
    return [i.get("name") for i in items if i.get("name")]

# --- File Upload API ---
def mime_from_suffix(p: Path) -> str:
    ext = p.suffix.lower()
    if ext in [".jpg",".jpeg"]: return "image/jpeg"
    if ext == ".png": return "image/png"
    if ext == ".gif": return "image/gif"
    if ext == ".mp4": return "video/mp4"
    return "application/octet-stream"

def notion_create_file_upload(name: str, mime: str) -> Dict[str, Any]:
    r = requests.post("https://api.notion.com/v1/file_uploads",
                      headers=notion_headers(),
                      json={"file_name": name, "file_mime_type": mime}, timeout=60)
    r.raise_for_status(); return r.json()

def notion_send_file_upload(file_upload_id: str, filepath: Path):
    headers = {k:v for k,v in notion_headers().items() if k!="Content-Type"}
    with open(filepath, "rb") as fh:
        r = requests.post(f"https://api.notion.com/v1/file_uploads/{file_upload_id}/send",
                          headers=headers, files={"file": (filepath.name, fh)}, timeout=600)
    r.raise_for_status(); return r.json()

def as_uploaded_file_obj(file_upload_id: str, name: str) -> Dict[str, Any]:
    return {"name": name, "type": "file_upload", "file_upload": {"id": file_upload_id}}

def notion_update_media_property(page_id: str, media_prop: str, files: List[Dict[str, Any]]):
    payload = {"properties": {media_prop: {"files": files}}}
    r = requests.patch(f"https://api.notion.com/v1/pages/{page_id}",
                       headers=notion_headers(), json=payload, timeout=60)
    if r.status_code >= 400:
        logging.error(f"Media 속성 업데이트 실패: {r.status_code} {r.text}\nPayload: {payload}")
    r.raise_for_status()

def notion_append_file_blocks(page_id: str, files: List[Dict[str, Any]]):
    children = []
    for f in files:
        children.append({"object":"block","type":"file",
                         "file":{"type":"file_upload","file_upload":{"file_upload_id": f["file_upload"]["id"]}}})
    r = requests.patch(f"https://api.notion.com/v1/blocks/{page_id}/children",
                       headers=notion_headers(), json={"children": children}, timeout=60)
    if r.status_code >= 400:
        logging.error(f"파일 블록 첨부 실패: {r.status_code} {r.text}")
    r.raise_for_status()

# ===================== 파이프라인 =====================
def run_pipeline(account: str, out_dir: Path, target_count: int, max_scroll: int, headless: bool):
    result = scrape(account, out_dir, max_scroll, SCROLL_PAUSE, headless, target_count=target_count)
    tweets = result["tweets"]; media_dir: Path = result["media_dir"]

    if not (NOTION_API_KEY and NOTION_DATABASE_ID):
        logging.error("NOTION_API_KEY/NOTION_DATABASE_ID 필요"); return

    db = notion_get_db()
    title_prop = find_title_prop(db)
    opt_props  = map_optional_props(db)
    if not opt_props["media"]:
        created = ensure_media_property(db)
        if created: opt_props["media"] = created

    for tid, t in tweets.items():
        row = {
            "id": tid,
            "text": t.get("text",""),
            "images": t.get("images",[]),
            "videos": t.get("videos",[]),
            "tweet_url": normalize_x_url(t.get("orig_video") or f"https://x.com/{account}/status/{tid}"),
        }

        page_id = notion_query_by_title(title_prop, row["id"])
        if not page_id:
            try:
                page_id = notion_create_page(row, title_prop, opt_props)
            except requests.HTTPError as e:
                logging.error(f"[{tid}] 페이지 생성 실패 → 스킵: {e}")
                continue
        time.sleep(0.25)

        existing_names = set()
        if opt_props["media"]:
            try:
                existing_names = set(notion_get_page_media_names(page_id, opt_props["media"]))
            except Exception as e:
                logging.warning(f"[{tid}] 기존 Media 조회 실패(무시): {e}")

        uploaded_files: List[Dict[str, Any]] = []

        # 이미지
        for p in sorted(media_dir.glob(f"{tid}_img*.jpg")):
            if not p.exists() or p.name in existing_names: continue
            temp = p.with_suffix(".upload.jpg")
            if not compress_image_to_limit(p, temp, MAX_UPLOAD_BYTES):
                logging.error(f"[{tid}] 이미지 5MiB 이하 압축 실패: {p.name}")
                if temp.exists(): temp.unlink(missing_ok=True)
                continue
            try:
                meta = notion_create_file_upload(p.name, mime_from_suffix(temp))
                notion_send_file_upload(meta["id"], temp)
                uploaded_files.append(as_uploaded_file_obj(meta["id"], p.name))
                temp.unlink(missing_ok=True); p.unlink(missing_ok=True)
                logging.info(f"[{tid}] 이미지 업로드 성공 → {p.name}")
            except Exception as e:
                logging.error(f"[{tid}] 이미지 업로드 실패({p.name}): {e}")
                if temp.exists(): temp.unlink(missing_ok=True)
            time.sleep(0.12)

        # 동영상
        v = media_dir / f"{tid}_video.mp4"
        if v.exists() and v.name not in existing_names:
            tempv = v.with_suffix(".upload.mp4")
            if compress_video_to_limit(v, tempv, MAX_UPLOAD_BYTES):
                try:
                    meta = notion_create_file_upload(v.name, mime_from_suffix(tempv))
                    notion_send_file_upload(meta["id"], tempv)
                    uploaded_files.append(as_uploaded_file_obj(meta["id"], v.name))
                    tempv.unlink(missing_ok=True); v.unlink(missing_ok=True)
                    logging.info(f"[{tid}] 동영상 업로드 성공 → {v.name}")
                except Exception as e:
                    logging.error(f"[{tid}] 동영상 업로드 실패({v.name}): {e}")
                    if tempv.exists(): tempv.unlink(missing_ok=True)
            else:
                logging.error(f"[{tid}] 동영상 5MiB 이하 압축 실패: {v.name}")
                if tempv.exists(): tempv.unlink(missing_ok=True)

        # 반영
        if uploaded_files:
            try:
                if opt_props["media"]:
                    notion_update_media_property(page_id, opt_props["media"], uploaded_files)
                    logging.info(f"[{tid}] Media 속성에 {len(uploaded_files)}개 설정")
                else:
                    notion_append_file_blocks(page_id, uploaded_files)
                    logging.info(f"[{tid}] 파일 블록으로 {len(uploaded_files)}개 첨부")
            except Exception as e:
                logging.error(f"[{tid}] 파일 반영 실패: {e}")
        else:
            logging.info(f"[{tid}] 신규 업로드 없음")

# ===================== 엔트리포인트 =====================
def main():
    run_pipeline(ACCOUNT, OUT_DIR, TARGET_COUNT, MAX_SCROLL, HEADLESS)

if __name__ == "__main__":
    main()
RELATED ARTICLES
- Advertisment -

Most Popular

Recent Comments