RNN / LSTM — 시계열·텍스트 처리

순서가 있는 데이터를 다루는 RNN의 한계와 이를 극복한 LSTM의 구조를 주가 예측과 텍스트 분류 실습으로 시계열 데이터 처리 흐름 정리

2026.04.10

RNNLSTMtime seriessequential dataNLPtext classificationvanishing gradientdeep learning

0. 시리즈

제목역할
심화 1편CNN — 이미지 분류 완벽 정리이미지 처리
심화 2편⬅️RNN / LSTM — 시계열·텍스트 처리순서 데이터
심화 3편Transformer 구조 완벽 정리현대 AI 핵심
심화 4편BERT / GPT 원리와 활용언어 모델
심화 5편HuggingFace로 LLM Fine-tuningLLM 실전

1. RNN이란?

1-1. CNN(1편)과 무엇이 다른가?

비교 항목CNN (심화 1편)RNN (심화 2편)
처리 대상공간 데이터 (이미지)시간/순서 데이터 (텍스트, 주가)
입력 형태(batch, H, W, C)(batch, timesteps, features)
핵심 아이디어필터로 공간 패턴 추출이전 상태를 기억하며 순서 처리
주요 레이어Conv2D, MaxPooling2DLSTM, GRU

1-2. 순서가 있는 데이터란?

Dense나 CNN은 입력 순서를 고려하지 않습니다. 하지만 현실에는 순서 자체가 의미 인 데이터가 많습니다.

텍스트  : "나는 밥을 먹었다" → 단어 순서가 바뀌면 의미가 달라짐
주가    : 어제 → 오늘 → 내일 의 흐름이 중요
날씨    : 기온 변화 패턴이 중요
음성    : 소리의 시간적 흐름이 중요

1-3. RNN의 동작 원리 — 은닉 상태(Hidden State)

RNN은 이전 시점의 출력(은닉 상태)을 다음 시점의 입력으로 함께 사용 합니다.

일반 신경망:   입력 → [레이어] → 출력
                         (이전을 기억 못 함)

RNN:    x₁ → [레이어] → h₁ ──┐
                               ↓
        x₂ → [레이어] → h₂ ──┐  (h₁을 기억해서 같이 처리)
                               ↓
        x₃ → [레이어] → h₃ → 출력
python# RNN 수식 (한 시점)
h_t = tanh(W_x * x_t + W_h * h_(t-1) + b)
# x_t      : 현재 시점의 입력
# h_(t-1)  : 이전 시점의 은닉 상태 (기억)
# h_t      : 현재 시점의 은닉 상태 (출력)

1-4. RNN의 치명적 한계 — 기울기 소실(Vanishing Gradient)

"오늘 날씨가 맑아서 ... (수십 단어) ... 우산이 필요 없다"
                                              ↑
                              "맑다"는 정보가 여기까지 전달이 안 됨!

역전파 시 기울기가 시간을 거슬러 올라갈수록 점점 작아져 0에 수렴 합니다. 결국 멀리 떨어진 시점의 정보를 학습하지 못합니다. 이 문제를 해결한 것이 LSTM 입니다.


2. LSTM — RNN의 한계를 넘다

2-1. LSTM이란?

LSTM(Long Short-Term Memory) 은 1997년에 제안된 RNN의 개선 구조입니다. "무엇을 기억하고, 무엇을 잊을지"를 게이트(Gate) 로 직접 제어합니다.

RNN  : 은닉 상태(h) 1개 → 단기 기억만 가능
LSTM : 은닉 상태(h) + 셀 상태(C) → 장기 + 단기 기억 모두 가능

2-2. 셀 상태(Cell State) — 장기 기억 통로

 C_(t-1) ──────────────────────────────────→ C_t
              ↑ forget    ↑ input
              (버릴 것)   (추가할 것)

셀 상태는 컨베이어 벨트처럼 정보를 시간 축으로 그대로 흘려보내는 통로 입니다. 게이트들이 정보를 추가하거나 삭제하면서 조절합니다.

2-3. 게이트 3종 세트

python# ① Forget Gate — 이전 기억 중 버릴 것 결정 (0: 완전히 삭제, 1: 완전히 유지)
f_t = sigmoid(W_f · [h_(t-1), x_t] + b_f)

# ② Input Gate — 새로운 정보 중 저장할 것 결정
i_t = sigmoid(W_i · [h_(t-1), x_t] + b_i)  # 얼마나 저장할지
g_t = tanh(W_g · [h_(t-1), x_t] + b_g)     # 어떤 값을 저장할지

# 셀 상태 업데이트
C_t = f_t * C_(t-1) + i_t * g_t
#      ↑ 버릴 것 지우고   ↑ 새 정보 추가

# ③ Output Gate — 최종적으로 출력할 정보 결정
o_t = sigmoid(W_o · [h_(t-1), x_t] + b_o)
h_t = o_t * tanh(C_t)

2-4. GRU — LSTM의 경량화 버전

GRU(Gated Recurrent Unit) 는 LSTM에서 셀 상태를 없애고 게이트를 2개로 줄인 버전입니다.

비교 항목LSTMGRU
게이트 수3개 (Forget, Input, Output)2개 (Reset, Update)
상태셀 상태 + 은닉 상태은닉 상태만
파라미터 수많음적음 (약 25% 절감)
학습 속도느림빠름
성능대부분 우세데이터 작을 때 유리
python# Keras — 레이어 이름만 바꾸면 됨
from tensorflow.keras.layers import LSTM, GRU

LSTM(64, return_sequences=True)  # LSTM
GRU(64, return_sequences=True)   # GRU (나머지 코드 동일)

# PyTorch — 동일하게 클래스만 교체
nn.LSTM(input_size, hidden_size, ...)  # LSTM
nn.GRU(input_size, hidden_size, ...)   # GRU

3. 시퀀스 데이터 전처리

3-1. 슬라이딩 윈도우 — 시계열 데이터를 X/y로 만드는 방법

원본 데이터: [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
window_size = 3 으로 슬라이딩 윈도우 적용:

X (입력)          y (정답)
[10, 20, 30]  →   40
[20, 30, 40]  →   50
[30, 40, 50]  →   60
[40, 50, 60]  →   70
...
pythonimport numpy as np

def create_sequences(data, window_size):
    """
    data        : 1D numpy array (정규화된 시계열 데이터)
    window_size : 몇 개의 과거 시점으로 다음을 예측할지
    반환        : X shape (samples, window_size, 1)
                  y shape (samples,)
    """
    X, y = [], []
    for i in range(len(data) - window_size):
        X.append(data[i : i + window_size])  # 과거 window_size 개
        y.append(data[i + window_size])       # 다음 1개
    X = np.array(X)
    y = np.array(y)
    # LSTM 입력은 3D: (samples, timesteps, features)
    X = X.reshape(X.shape0], X.shape[1], 1)
    return X, y

3-2. 스케일링 (MinMaxScaler)

LSTM은 tanh 활성화 함수를 사용하므로 입력값을 0~1 범위로 정규화 해야 학습이 안정적입니다.

pythonfrom sklearn.preprocessing import MinMaxScaler

scaler = MinMaxScaler(feature_range=(0, 1))

# 반드시 훈련 데이터로만 fit!
train_scaled = scaler.fit_transform(train_data.reshape(-1, 1))
test_scaled  = scaler.transform(test_data.reshape(-1, 1))

# 예측 후 원래 스케일로 복원
y_pred_original = scaler.inverse_transform(y_pred)

3-3. 3차원 입력 형태 이해

RNN/LSTM의 입력은 반드시 3차원 이어야 합니다.

(samples, timesteps, features)
    ↑          ↑          ↑
 데이터 수  시간 길이   변수 개수

예시:
- (1000, 30, 1)  : 1000개 샘플, 과거 30일, 1개 변수(주가)
- (500, 10, 5)   : 500개 샘플, 과거 10일, 5개 변수(기온/습도/강수량/기압/풍속)

# Dense 입력 (2D) vs LSTM 입력 (3D)
Dense : (1000, 30)       # 2D
LSTM  : (1000, 30, 1)    # 3D — 마지막 차원(features)을 반드시 붙여야 함

4. Keras로 LSTM 구현하기 — 시계열 예측

4-1. 데이터 준비 (항공 승객 수 예측)

pythonimport numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
from tensorflow.keras import Sequential
from tensorflow.keras.layers import LSTM, GRU, SimpleRNN, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping

# seaborn 내장 데이터셋 사용 (인터넷 없이 실행 가능)
import seaborn as sns
df = sns.load_dataset("flights")  # 1949~1960년 월별 항공 승객 수

print(df.head(10))
print(f"데이터 크기: {df.shape}")  # (144, 3)
print(f"컬럼: {df.columns.tolist()}")  # ['year', 'month', 'passengers']

# 승객 수만 추출
data = df["passengers"].values.astype("float32")
print(f"최소: {data.min()}, 최대: {data.max()}")  # 104 ~ 622

# 시각화
plt.figure(figsize=(12, 4))
plt.plot(data, color="steelblue")
plt.title("월별 항공 승객 수 (1949~1960)")
plt.xlabel("월 (0=1949년 1월)")
plt.ylabel("승객 수 (천 명)")
plt.grid(True)
plt.show()
python# 훈련 / 테스트 분리 (80% / 20%)
train_size = int(len(data) * 0.8)
train_data = data[:train_size]   # 0~115
test_data  = data[train_size:]   # 116~143

print(f"훈련 데이터: {len(train_data)}개")  # 115
print(f"테스트 데이터: {len(test_data)}개")  # 29

# 정규화
scaler = MinMaxScaler(feature_range=(0, 1))
train_scaled = scaler.fit_transform(train_data.reshape(-1, 1)).flatten()
test_scaled  = scaler.transform(test_data.reshape(-1, 1)).flatten()
python# 슬라이딩 윈도우 적용
WINDOW_SIZE = 12  # 과거 12개월로 다음 달 예측

X_train, y_train = create_sequences(train_scaled, WINDOW_SIZE)
X_test,  y_test  = create_sequences(test_scaled,  WINDOW_SIZE)

print(f"X_train shape: {X_train.shape}")  # (103, 12, 1)
print(f"y_train shape: {y_train.shape}")  # (103,)
print(f"X_test shape:  {X_test.shape}")   # (17, 12, 1)

4-2. 단층 LSTM 모델

pythondef build_lstm(units=64, dropout=0.2):
    model = Sequential([
        LSTM(units, input_shape=(WINDOW_SIZE, 1), return_sequences=False),
        Dropout(dropout),
        Dense(32, activation="relu"),
        Dense(1)  # 회귀 → 활성화 함수 없음
    ])
    model.compile(optimizer="adam", loss="mse", metrics=["mae"])
    return model

model_lstm = build_lstm(units=64)
model_lstm.summary()
출력 예시:
Layer (type)         Output Shape       Param #
────────────────────────────────────────────────
lstm (LSTM)          (None, 64)         16,896
dropout (Dropout)    (None, 64)         0
dense (Dense)        (None, 32)         2,080
dense_1 (Dense)      (None, 1)          33
────────────────────────────────────────────────
Total params: 19,009

4-3. 다층 LSTM 모델

pythondef build_deep_lstm(units=64, dropout=0.2):
    model = Sequential([
        # return_sequences=True → 다음 LSTM 레이어에 시퀀스 전달
        LSTM(units, input_shape=(WINDOW_SIZE, 1), return_sequences=True),
        Dropout(dropout),
        LSTM(units // 2, return_sequences=False),
        Dropout(dropout),
        Dense(32, activation="relu"),
        Dense(1)
    ])
    model.compile(optimizer="adam", loss="mse", metrics=["mae"])
    return model

model_deep = build_deep_lstm(units=64)

💡 return_sequences — LSTM 레이어를 여러 개 쌓을 때 중간 레이어는 반드시 return_sequences=True로 설정해야 합니다. 마지막 LSTM만 False(기본값)로 설정합니다.

return_sequences=True  → 모든 timestep의 출력 반환 (다음 LSTM에 전달)
                          출력 shape: (batch, timesteps, units)

return_sequences=False → 마지막 timestep의 출력만 반환
                          출력 shape: (batch, units)

4-4. 학습

pythonearly_stop = EarlyStopping(
    monitor="val_loss",
    patience=15,
    restore_best_weights=True,
    verbose=1
)

history = model_deep.fit(
    X_train, y_train,
    epochs=200,
    batch_size=16,
    validation_split=0.1,
    callbacks=[early_stop],
    verbose=1
)

# 학습 곡선
plt.figure(figsize=(10, 4))
plt.plot(history.history["loss"],     label="훈련 손실")
plt.plot(history.history["val_loss"], label="검증 손실", linestyle="--")
plt.title("LSTM 학습 손실 변화")
plt.xlabel("Epoch")
plt.ylabel("MSE Loss")
plt.legend()
plt.grid(True)
plt.show()

4-5. 예측 및 시각화

python# 예측
y_pred_scaled = model_deep.predict(X_test)

# 역정규화 (원래 승객 수로 복원)
y_pred = scaler.inverse_transform(y_pred_scaled).flatten()
y_true = scaler.inverse_transform(y_test.reshape(-1, 1)).flatten()

# 성능 평가
from sklearn.metrics import mean_absolute_error, mean_squared_error

mae  = mean_absolute_error(y_true, y_pred)
rmse = np.sqrt(mean_squared_error(y_true, y_pred))
print(f"MAE:  {mae:.2f} (천 명)")
print(f"RMSE: {rmse:.2f} (천 명)")

# 전체 데이터 위에 예측 결과 시각화
plt.figure(figsize=(14, 5))
plt.plot(range(len(data)), data, label="실제 승객 수", color="steelblue")
plt.plot(
    range(train_size + WINDOW_SIZE, train_size + WINDOW_SIZE + len(y_pred)),
    y_pred,
    label="LSTM 예측", color="red", linestyle="--", linewidth=2
)
plt.axvline(x=train_size, color="gray", linestyle=":", label="훈련/테스트 경계")
plt.title("항공 승객 수 예측 — Keras LSTM")
plt.xlabel("월")
plt.ylabel("승객 수 (천 명)")
plt.legend()
plt.grid(True)
plt.show()

4-6. RNN vs LSTM vs GRU 성능 비교

pythondef build_rnn(units=64):
    model = Sequential([
        SimpleRNN(units, input_shape=(WINDOW_SIZE, 1)),
        Dropout(0.2),
        Dense(1)
    ])
    model.compile(optimizer="adam", loss="mse")
    return model

def build_gru(units=64):
    model = Sequential([
        GRU(units, input_shape=(WINDOW_SIZE, 1)),
        Dropout(0.2),
        Dense(32, activation="relu"),
        Dense(1)
    ])
    model.compile(optimizer="adam", loss="mse")
    return model

results = {}

for name, model in [("SimpleRNN", build_rnn()),
                    ("LSTM",      build_deep_lstm()),
                    ("GRU",       build_gru())]:
    model.fit(X_train, y_train, epochs=100, batch_size=16,
              validation_split=0.1, verbose=0,
              callbacks=[EarlyStopping(patience=10, restore_best_weights=True)])
    pred = scaler.inverse_transform(model.predict(X_test)).flatten()
    rmse = np.sqrt(mean_squared_error(y_true, pred))
    results[name] = rmse
    print(f"{name:10s} — RMSE: {rmse:.2f}")

# 시각화
plt.figure(figsize=(8, 4))
plt.bar(results.keys(), results.values(),
        color=["skyblue", "coral", "lightgreen"], edgecolor="black")
plt.title("RNN vs LSTM vs GRU 성능 비교 (RMSE)")
plt.ylabel("RMSE (낮을수록 좋음)")
plt.grid(axis="y")
for i, (name, val) in enumerate(results.items()):
    plt.text(i, val + 0.5, f"{val:.2f}", ha="center", fontsize=11)
plt.show()

5. PyTorch로 LSTM 구현하기 — 시계열 예측

5-1. Dataset / DataLoader 구성

pythonimport torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"사용 장치: {device}")

class TimeSeriesDataset(Dataset):
    """시계열 슬라이딩 윈도우 Dataset"""
    def __init__(self, data, window_size):
        self.X = []
        self.y = []
        for i in range(len(data) - window_size):
            self.X.append(data[i : i + window_size])
            self.y.append(data[i + window_size])
        # Tensor 변환 (float32)
        self.X = torch.tensor(self.X, dtype=torch.float32).unsqueeze(-1)  # (N, W, 1)
        self.y = torch.tensor(self.y, dtype=torch.float32)                 # (N,)

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]


# 훈련 / 테스트 분리는 위에서 사용한 train_scaled, test_scaled 그대로 사용
train_dataset = TimeSeriesDataset(train_scaled, WINDOW_SIZE)
test_dataset  = TimeSeriesDataset(test_scaled,  WINDOW_SIZE)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader  = DataLoader(test_dataset,  batch_size=16, shuffle=False)

print(f"훈련 샘플 수: {len(train_dataset)}")  # 103
print(f"테스트 샘플 수: {len(test_dataset)}")  # 17

# 입력 shape 확인
X_sample, y_sample = next(iter(train_loader))
print(f"X batch shape: {X_sample.shape}")  # (16, 12, 1)
print(f"y batch shape: {y_sample.shape}")  # (16,)

5-2. LSTM 모델 클래스 설계

pythonclass LSTMModel(nn.Module):
    def __init__(self, input_size=1, hidden_size=64,
                 num_layers=2, dropout=0.2):
        super().__init__()
        self.hidden_size = hidden_size
        self.num_layers  = num_layers

        self.lstm = nn.LSTM(
            input_size  = input_size,
            hidden_size = hidden_size,
            num_layers  = num_layers,    # 다층 LSTM
            batch_first = True,          # (batch, seq, feature) 순서
            dropout     = dropout if num_layers > 1 else 0.0
        )
        self.dropout = nn.Dropout(dropout)
        self.fc1 = nn.Linear(hidden_size, 32)
        self.fc2 = nn.Linear(32, 1)
        self.relu = nn.ReLU()

    def forward(self, x):
        # 은닉 상태 초기화 (0으로)
        batch_size = x.size(0)
        h0 = torch.zeros(self.num_layers, batch_size,
                         self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, batch_size,
                         self.hidden_size).to(x.device)

        # LSTM 순전파
        out, (hn, cn) = self.lstm(x, (h0, c0))
        # out shape: (batch, seq_len, hidden_size)
        # hn  shape: (num_layers, batch, hidden_size)

        # 마지막 timestep의 출력만 사용
        out = out[:, -1, :]             # (batch, hidden_size)
        out = self.dropout(out)
        out = self.relu(self.fc1(out))  # (batch, 32)
        out = self.fc2(out)             # (batch, 1)
        return out.squeeze(-1)          # (batch,)


class GRUModel(nn.Module):
    """GRU 버전 — LSTM과 구조 동일, nn.GRU만 다름"""
    def __init__(self, input_size=1, hidden_size=64,
                 num_layers=2, dropout=0.2):
        super().__init__()
        self.hidden_size = hidden_size
        self.num_layers  = num_layers

        self.gru = nn.GRU(
            input_size  = input_size,
            hidden_size = hidden_size,
            num_layers  = num_layers,
            batch_first = True,
            dropout     = dropout if num_layers > 1 else 0.0
        )
        self.dropout = nn.Dropout(dropout)
        self.fc1 = nn.Linear(hidden_size, 32)
        self.fc2 = nn.Linear(32, 1)
        self.relu = nn.ReLU()

    def forward(self, x):
        batch_size = x.size(0)
        h0 = torch.zeros(self.num_layers, batch_size,
                         self.hidden_size).to(x.device)
        out, hn = self.gru(x, h0)   # GRU는 셀 상태(c0) 없음
        out = out[:, -1, :]
        out = self.dropout(out)
        out = self.relu(self.fc1(out))
        out = self.fc2(out)
        return out.squeeze(-1)

💡 batch_first=True — PyTorch LSTM의 기본 입력은 (seq, batch, feature) 순서입니다. batch_first=True로 설정하면 (batch, seq, feature) 순서로 바꿀 수 있어 직관적입니다.

5-3. 학습 루프

pythondef train_model(model, train_loader, test_loader, epochs=200, lr=0.001):
    model = model.to(device)
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode="min", factor=0.5, patience=10, verbose=False
    )

    train_losses = []
    best_val_loss = float("inf")
    best_weights  = None
    patience_cnt  = 0
    PATIENCE = 20  # EarlyStopping 기준

    for epoch in range(epochs):
        # ── 학습 ─────────────────────────────────
        model.train()
        running_loss = 0.0
        for X_batch, y_batch in train_loader:
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)

            optimizer.zero_grad()
            output = model(X_batch)
            loss   = criterion(output, y_batch)
            loss.backward()
            # 기울기 폭발 방지 — 기울기 norm 상한 설정
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            running_loss += loss.item()

        avg_loss = running_loss / len(train_loader)
        train_losses.append(avg_loss)

        # ── 검증 ─────────────────────────────────
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for X_batch, y_batch in test_loader:
                X_batch = X_batch.to(device)
                y_batch = y_batch.to(device)
                output = model(X_batch)
                val_loss += criterion(output, y_batch).item()
        val_loss /= len(test_loader)

        scheduler.step(val_loss)

        # EarlyStopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_weights  = {k: v.clone() for k, v in model.state_dict().items()}
            patience_cnt  = 0
        else:
            patience_cnt += 1
            if patience_cnt >= PATIENCE:
                print(f"  Early stopping at epoch {epoch+1}")
                break

        if (epoch + 1) % 20 == 0:
            print(f"Epoch {epoch+1:3d} | Train Loss: {avg_loss:.6f} "
                  f"| Val Loss: {val_loss:.6f}")

    # 최고 가중치 복원
    model.load_state_dict(best_weights)
    return model, train_losses


model_pt = LSTMModel(input_size=1, hidden_size=64, num_layers=2, dropout=0.2)
print(f"파라미터 수: {sum(p.numel() for p in model_pt.parameters()):,}")

model_pt, losses = train_model(model_pt, train_loader, test_loader, epochs=200)

5-4. 예측 결과 역정규화 및 시각화

pythonmodel_pt.eval()
all_preds  = []
all_labels = []

with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch = X_batch.to(device)
        preds = model_pt(X_batch).cpu().numpy()
        all_preds.extend(preds)
        all_labels.extend(y_batch.numpy())

all_preds  = np.array(all_preds).reshape(-1, 1)
all_labels = np.array(all_labels).reshape(-1, 1)

# 역정규화
y_pred_pt = scaler.inverse_transform(all_preds).flatten()
y_true_pt = scaler.inverse_transform(all_labels).flatten()

rmse_pt = np.sqrt(mean_squared_error(y_true_pt, y_pred_pt))
mae_pt  = mean_absolute_error(y_true_pt, y_pred_pt)
print(f"PyTorch LSTM — RMSE: {rmse_pt:.2f}, MAE: {mae_pt:.2f}")

# 시각화
plt.figure(figsize=(14, 5))
plt.plot(range(len(data)), data, label="실제 승객 수", color="steelblue")
plt.plot(
    range(train_size + WINDOW_SIZE, train_size + WINDOW_SIZE + len(y_pred_pt)),
    y_pred_pt,
    label="PyTorch LSTM 예측", color="red", linestyle="--", linewidth=2
)
plt.axvline(x=train_size, color="gray", linestyle=":", label="훈련/테스트 경계")
plt.title("항공 승객 수 예측 — PyTorch LSTM")
plt.xlabel("월")
plt.ylabel("승객 수 (천 명)")
plt.legend()
plt.grid(True)
plt.show()

6. Keras로 LSTM 구현하기 — 텍스트 분류 (감성 분석)

6-1. IMDB 영화 리뷰 데이터셋

pythonfrom tensorflow.keras.datasets import imdb
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.layers import Embedding, LSTM, Dense, Dropout, Bidirectional

# 상위 10,000개 단어만 사용
VOCAB_SIZE  = 10000
MAX_LEN     = 200   # 최대 200 단어로 패딩

(X_train, y_train), (X_test, y_test) = imdb.load_data(num_words=VOCAB_SIZE)

print(f"훈련 샘플 수: {len(X_train)}")   # 25,000
print(f"테스트 샘플 수: {len(X_test)}")  # 25,000
print(f"클래스: 0=부정, 1=긍정")

# 샘플 길이 분포 확인
lengths = [len(x) for x in X_train]
print(f"리뷰 길이 — 최소: {min(lengths)}, 최대: {max(lengths)}, "
      f"평균: {np.mean(lengths):.0f}")

# 단어 인덱스로 원문 확인 (선택 사항)
word_index = imdb.get_word_index()
idx_to_word = {v+3: k for k, v in word_index.items()}
idx_to_word.update({0: "<PAD>", 1: "<START>", 2: "<UNK>", 3: "<UNUSED>"})
sample_review = " ".join(idx_to_word.get(i, "?") for i in X_train[0][:30])
print(f"리뷰 샘플: {sample_review}...")
print(f"정답: {'긍정' if y_train[0] == 1 else '부정'}")

6-2. 패딩(Padding) — 길이 통일

python# 모든 리뷰를 MAX_LEN으로 길이 통일
# 짧은 리뷰 → 앞에 0을 채움 (pre-padding)
# 긴 리뷰   → 앞을 잘라냄 (pre-truncating)
X_train_pad = pad_sequences(X_train, maxlen=MAX_LEN, padding="pre", truncating="pre")
X_test_pad  = pad_sequences(X_test,  maxlen=MAX_LEN, padding="pre", truncating="pre")

print(f"X_train_pad shape: {X_train_pad.shape}")  # (25000, 200)
print(f"패딩 전 리뷰 길이: {len(X_train[0])}")
print(f"패딩 후 리뷰 길이: {len(X_train_pad[0])}")  # 200
패딩 예시:
원본  : [14, 22, 16, 43, 530]
패딩  : [0, 0, 0, ..., 14, 22, 16, 43, 530]
         ↑ 앞에 0으로 채움          ↑ 원본

6-3. Embedding Layer란?

단어 인덱스 → 의미 있는 실수 벡터로 변환

"고양이" → 14 → [0.23, -0.17, 0.88, ..., 0.41]  (128차원 벡터)
"강아지" → 38 → [0.19, -0.14, 0.91, ..., 0.39]  (128차원 벡터)
                  ↑ 의미가 비슷한 단어 → 벡터가 유사함

Embedding(vocab_size, embed_dim)
= vocab_size개의 단어를 각각 embed_dim 차원 벡터로 표현
= 학습을 통해 최적의 단어 표현을 찾아냄

6-4. 모델 설계

python# 단방향 LSTM
def build_text_lstm():
    model = Sequential([
        # (batch, 200) → (batch, 200, 128) : 각 단어를 128차원 벡터로
        Embedding(input_dim=VOCAB_SIZE, output_dim=128, input_length=MAX_LEN),
        LSTM(64, return_sequences=True),
        Dropout(0.3),
        LSTM(32),
        Dropout(0.3),
        Dense(32, activation="relu"),
        Dense(1, activation="sigmoid")  # 이진 분류 → sigmoid
    ])
    model.compile(optimizer="adam",
                  loss="binary_crossentropy",
                  metrics=["accuracy"])
    return model


# 양방향 LSTM (Bidirectional) — 앞→뒤 + 뒤→앞 동시 처리
def build_bidirectional_lstm():
    model = Sequential([
        Embedding(input_dim=VOCAB_SIZE, output_dim=128, input_length=MAX_LEN),
        # Bidirectional: hidden_size가 2배 (앞방향 + 뒷방향)
        Bidirectional(LSTM(64, return_sequences=True)),
        Dropout(0.3),
        Bidirectional(LSTM(32)),
        Dropout(0.3),
        Dense(32, activation="relu"),
        Dense(1, activation="sigmoid")
    ])
    model.compile(optimizer="adam",
                  loss="binary_crossentropy",
                  metrics=["accuracy"])
    return model

model_text = build_bidirectional_lstm()
model_text.summary()
Bidirectional(LSTM(64)) → 출력 shape: (batch, 200, 128)
                                               ↑
                         앞방향 64 + 뒷방향 64 = 128

6-5. 학습 및 평가

pythonearly_stop = EarlyStopping(monitor="val_accuracy", patience=3,
                           restore_best_weights=True, verbose=1)

history = model_text.fit(
    X_train_pad, y_train,
    epochs=10,
    batch_size=128,
    validation_split=0.2,
    callbacks=[early_stop],
    verbose=1
)

# 평가
test_loss, test_acc = model_text.evaluate(X_test_pad, y_test, verbose=0)
print(f"테스트 정확도: {test_acc:.4f}")  # 예: 0.8720

# 학습 곡선
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 4))
ax1.plot(history.history["accuracy"],     label="훈련 정확도")
ax1.plot(history.history["val_accuracy"], label="검증 정확도", linestyle="--")
ax1.set_title("정확도 변화")
ax1.legend(); ax1.grid(True)

ax2.plot(history.history["loss"],     label="훈련 손실")
ax2.plot(history.history["val_loss"], label="검증 손실", linestyle="--")
ax2.set_title("손실 변화")
ax2.legend(); ax2.grid(True)
plt.tight_layout()
plt.show()

# 직접 예측해보기
def predict_sentiment(model, review_idx_list):
    padded = pad_sequences([review_idx_list], maxlen=MAX_LEN,
                           padding="pre", truncating="pre")
    prob = model.predict(padded, verbose=0)[0][0]
    label = "긍정 😊" if prob >= 0.5 else "부정 😞"
    print(f"예측: {label} (확률: {prob:.4f})")

# 테스트 샘플로 확인
predict_sentiment(model_text, X_test[0])
print(f"실제 정답: {'긍정' if y_test[0] == 1 else '부정'}")

7. PyTorch로 LSTM 구현하기 — 텍스트 분류

7-1. 데이터 준비

pythonimport torch
from torch.utils.data import TensorDataset, DataLoader

# Keras에서 불러온 X_train_pad, X_test_pad 그대로 활용
X_tr = torch.tensor(X_train_pad, dtype=torch.long)
y_tr = torch.tensor(y_train,     dtype=torch.float32)
X_te = torch.tensor(X_test_pad,  dtype=torch.long)
y_te = torch.tensor(y_test,      dtype=torch.float32)

train_ds = TensorDataset(X_tr, y_tr)
test_ds  = TensorDataset(X_te, y_te)

train_loader_text = DataLoader(train_ds, batch_size=128, shuffle=True)
test_loader_text  = DataLoader(test_ds,  batch_size=128, shuffle=False)

print(f"X_tr shape: {X_tr.shape}")  # (25000, 200)
print(f"y_tr shape: {y_tr.shape}")  # (25000,)

7-2. 모델 클래스 설계

pythonclass TextLSTM(nn.Module):
    def __init__(self, vocab_size=10000, embed_dim=128,
                 hidden_size=64, num_layers=2, dropout=0.3):
        super().__init__()
        self.embedding = nn.Embedding(
            num_embeddings = vocab_size,
            embedding_dim  = embed_dim,
            padding_idx    = 0          # 패딩 토큰(0)은 학습에서 제외
        )
        self.lstm = nn.LSTM(
            input_size  = embed_dim,
            hidden_size = hidden_size,
            num_layers  = num_layers,
            batch_first = True,
            dropout     = dropout if num_layers > 1 else 0.0,
            bidirectional = True        # 양방향 LSTM
        )
        # bidirectional=True → hidden_size * 2
        self.dropout = nn.Dropout(dropout)
        self.fc1 = nn.Linear(hidden_size * 2, 32)
        self.fc2 = nn.Linear(32, 1)
        self.relu    = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # x: (batch, seq_len)
        embedded = self.embedding(x)            # (batch, seq_len, embed_dim)

        out, (hn, cn) = self.lstm(embedded)
        # out: (batch, seq_len, hidden*2)

        # 마지막 timestep
        out = out[:, -1, :]                     # (batch, hidden*2)
        out = self.dropout(out)
        out = self.relu(self.fc1(out))          # (batch, 32)
        out = self.sigmoid(self.fc2(out))       # (batch, 1)
        return out.squeeze(-1)                  # (batch,)


model_text_pt = TextLSTM(vocab_size=VOCAB_SIZE).to(device)
total = sum(p.numel() for p in model_text_pt.parameters())
print(f"파라미터 수: {total:,}")

7-3. 학습 루프

pythoncriterion_text = nn.BCELoss()   # 이진 분류 → Binary Cross Entropy
optimizer_text = torch.optim.Adam(model_text_pt.parameters(), lr=0.001)

best_acc_text = 0.0
best_weights_text = None

for epoch in range(10):
    # ── 학습 ─────────────────────────────────
    model_text_pt.train()
    running_loss = 0.0
    for X_batch, y_batch in train_loader_text:
        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)

        optimizer_text.zero_grad()
        output = model_text_pt(X_batch)
        loss   = criterion_text(output, y_batch)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model_text_pt.parameters(), 1.0)
        optimizer_text.step()
        running_loss += loss.item()

    # ── 검증 ─────────────────────────────────
    model_text_pt.eval()
    correct = 0
    total_cnt = 0
    with torch.no_grad():
        for X_batch, y_batch in test_loader_text:
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)
            preds = (model_text_pt(X_batch) >= 0.5).float()
            correct += (preds == y_batch).sum().item()
            total_cnt += y_batch.size(0)

    acc = correct / total_cnt
    avg_loss = running_loss / len(train_loader_text)

    if acc > best_acc_text:
        best_acc_text = acc
        best_weights_text = {k: v.clone() for k, v in
                             model_text_pt.state_dict().items()}

    print(f"Epoch {epoch+1:2d}/10 | Loss: {avg_loss:.4f} | "
          f"Test Acc: {acc:.4f}")

model_text_pt.load_state_dict(best_weights_text)
print(f"\n최고 테스트 정확도: {best_acc_text:.4f}")

8. Keras vs PyTorch 핵심 코드 비교

항목KerasPyTorch
LSTM 레이어LSTM(64, return_sequences=True)nn.LSTM(input, 64, batch_first=True)
입력 채널 명시❌ 자동 추론input_size 직접 지정
다층 LSTM레이어 여러 개 쌓기num_layers=2
양방향Bidirectional(LSTM(...))bidirectional=True
EmbeddingEmbedding(vocab, dim)nn.Embedding(vocab, dim, padding_idx=0)
은닉 상태 초기화자동h0, c0 직접 생성
기울기 클리핑clipnorm=1.0 (compile 옵션)clip_grad_norm_(model.parameters(), 1.0)
학습 루프model.fit(...)직접 작성
이진 분류 손실binary_crossentropynn.BCELoss()

⚠️ PyTorch nn.LSTM의 출력 out(batch, seq_len, hidden_size * directions) 형태입니다. 양방향 (bidirectional=True)이면 hidden_size * 2가 됩니다. 다음 Linear 레이어의 in_features를 반드시 맞춰야 합니다.


9. 마무리

9-1. 오늘 배운 것 한눈에 정리

개념핵심 내용
RNN이전 은닉 상태를 현재 입력과 함께 처리, 기울기 소실 문제
LSTMForget·Input·Output 게이트로 장기 기억 해결
GRULSTM 경량화, 게이트 2개, 파라미터 적음
슬라이딩 윈도우시계열 → (samples, timesteps, features) 3D 변환
return_sequences다음 LSTM에 전달 시 True, 마지막 레이어만 False
Embedding정수 토큰 → 의미 있는 실수 벡터 변환
Bidirectional앞뒤 방향 동시 처리, hidden_size 2배
clip_grad_norm기울기 폭발 방지, RNN 계열 필수