본문 바로가기

Data Science/강화학습

[강화학습] 강화학습을 모델로! Q-learning, Q-Network, DQN

반응형

🔊 해당 포스팅은 딥러닝 파이토치 교과서 서적의 강화학습 챕터 내용과 김성훈 교수님의 강화학습 기초 강의를 기반으로 개인적인 학습 및 정리를 위해 작성되었습니다. 하단에 등장하는 모든 자료들은 필자가 직접 재구성하였음을 알립니다.

 

강화학습의 핵심 Q!


이번 포스팅에서는 저번 포스팅에서 학습한 MDP의 행동-가치 함수 이른바 Q-함수를 활용하는 Q-learning, Q-Network, 그리고 깊은 신경망을 적용한 DQN(Deep Q Network)에 대해서 알아보고, 이를 코드로 구현하는 방법을 소개한다. 해당 포스팅은 반드시 MDP의 행동-가치 함수를 이해해야 하므로 저번 포스팅을 읽고 오거나 해당 포스팅이 너무 내용이 길고 복잡하다면 직관적인 김성훈 교수님의 강의를 보고 오도록 하자.

1. Q-형님이 안내하는 곳으로, Q-learning

가장 먼저 소개할 모델은 Q-learning 이다. 엄밀히 말하면 모델이라기보다는 Q-함수를 사용해서 '어떠한 테이블'의 값을 계속 갱신해나가는 것이다. 여기서 '어떠한 테이블' 'Q-테이블'이라고도 하며 2차원 테이블로 구성되고 행(row)에는 상태를, 열(column)에는 행동을 의미한다. 예를 들어, 상태가 $S_1, S_2, S_3, S_4$ 4가지가 있고, 행동이 $a_1, a_2, a_3$ 라고 한다면 Q-테이블은 아래처럼 구성된다.

 

상태가 4개, 행동이 3개일 때의 Q-table 예시

 

그렇다면 Q-table에 값으로 들어가는 값($v_{ij}$)들은 무엇일까? 바로 특정 상태($S_i$)와 특정 행동($a_j$)이 Q-함수에 들어갔을 때 반환되는 미래 보상의 합인 가치(Q-value) 값이 된다.(보상과 가치의 차이점에 대해서는 직전 포스팅을 참조하자) Q-함수의 구체적인 수식은 다음과 같다.

 

Q-함수의 수식

 

현재 상태 $s$에서 다음 상태 $s'$으로 이동한다고 가정할 때, 상태 $s$에서 $a$라는 행동을 취할 때의 Q-함수값은 상태 $s$에서 $a$라는 행동을 취할 때의 보상 $r$ 과 다음 상태 $s'$에서 가장 보상을 크게 받는 행동 $a'$을 했을 때의 Q-함수값을 더해준다. 그리고 이 값에 할인율(discount factor) 개념인 $\gamma$를 곱해준다. 할인율에 대한 자세한 개념은 직전 포스팅을 참조하자.

 

이렇게 Q-함수를 사용해서 구한 값인 가치는 Q-table에 계속 갱신 시켜주게 되면서 이것이 바로 Q-learning의 동작 과정이다. 또한 Q-learning 과정에서 특정 확률로 Q-table의 안내대로 갈지(exploitation), 아니면 모험적으로 새로운 행동을 취해볼지(exploration)하는 과정이 추가된다. 또한 Stochastic 한 환경을 반영하도록 하기 위해 학습률이라는 learning rate 이라는 값을 Q-함수에 반영해 준다. 이 부분은 아래 구현 코드에서 살펴볼 수 있다.

 

이제 Q-learning을 코드로 구현해보도록 하자. 아래는 FrozenLake-v1 환경에서 Q-learning을 넘파이로 구현한 코드이다.

 

import gym
import numpy as np

env = gym.make("FrozenLake-v1")
n_states = env.observation_space.n
n_actions = env.action_space.n

# Q-table 초기화 -> Q-table에서 행은 상태개수, 열은 행동개수, value는 상태-행동 조합의 가치를 의미
Q_table = np.zeros((n_states, n_actions))
print(Q_table.shape)

#=======
# params
#=======
#number of episode we will run
n_episodes = 10000

#maximum of iteration per episode
max_iter_episode = 100

#initialize the exploration probability to 1
exploration_proba = 1

#exploartion decreasing decay for exponential decreasing
exploration_decreasing_decay = 0.001

# minimum of exploration proba
min_exploration_proba = 0.01

#discounted factor
gamma = 0.99

#learning rate
lr = 0.1

# 각 에피소드마다 얻는 보상값 기록
rewards_per_episode = []

#=================
# train Q-learning
#=================
# episode (ML에서는 epochs)
for e in range(n_episodes):
    current_state, current_prob = env.reset()
    done = False
    
    total_episode_reward = 0
    
    # max_iter_episode (ML에서는 n_iterations by mini-batch)
    for i in range(max_iter_episode):
        # exploration
        if np.random.uniform(0, 1) < exploration_proba:
            action = env.action_space.sample() # 랜덤하게 액션 수행
        # exploitation using bellman-equation
        else:
            action = np.argmax(Q_table[current_state,:]) # Q-table 속 현재 상태에서 할 수 있는 행동들 중 가치가 가장 큰 값의 행동 indice를 반환
        
        # 행동을 수행함으로써 변화되는 [다음 상태, 보상, 종료상태 여부] -> next_state로 갈 때는 (사전에 정의된) 확률들에 기반해서 결정됨 -> 직접 지정..?
        next_state, reward, done, _, _ = env.step(action)

        current_state_value = Q_table[current_state, action]  # 현재 상태에서 행동을 했을 때의 가치
        next_state_value = Q_table[next_state,:].max()        # 다음 상태에서 행동을 했을 때의 가치 
        Q_table[current_state, action] = (1-lr) * current_state_value + lr * (reward + gamma * next_state_value)
        total_episode_reward += reward
        
        # 종료 상태가 되었다면 끝
        if done:
            break
        
        # 다음 상태를 현재 상태로 갱신
        current_state = next_state
    
    exploration_proba = max(min_exploration_proba, np.exp(-exploration_decreasing_decay * e))
    rewards_per_episode.append(total_episode_reward)

 

Q-learning의 동작과정을 시각화한 좋은 영상이 있다. 보면 좀 더 직관적으로 동작 과정을 이해해 볼 수 있다.

2. 각 액션의 Q-value(가치)를 예측해보자, Q-Network

위에서 배운 Q-learning은 FrozenLake에서 잘 동작한다. FrozenLake 환경은 상태와 행동의 종류가 매우 제한적인 개수밖에 없어서 단순한 모델인 Q-learning으로도 문제 해결이 가능하다. 하지만 우리가 마주하는 실제환경은 상태와 행동의 종류가 매우 다양하며, 고전게임인 벽돌 깨기, 미로 찾기만 하더라도 수많은 경우의 수의 상태와 행동이 존재할 것으로 예상할 수 있다.

 

100 by 100 사이즈의 미로를 Q-learning으로 가능할까?

 

위 사진처럼 이러한 환경에서 Q-learning을 학습하려고 한다면 사용할 Q-테이블의 크기 개수가 어마무시해진다. 이는 아무리 좋은 슈퍼컴퓨터가 등장한다고 하여도 빠른 연산이 불가능하다. 

 

이를 해결하기 위해 Q-Network가 등장한다. 그렇다면 Q-Network와 Q-learning의 차이점은 무엇일까? 단지 Q-learning에서의 '함수'가 Q-Network에서는 '신경망'으로 변경된다는 점이다. 두 개의 컨셉츄얼한 차이를 보면 아래와 같다.

 

Q-learning 과 Q-Network의 개념적 차이

 

Q-learning은 Q-Function을 사용해서 가치 즉, Q-Value 값을 결정했다면, Q-Network는 신경망 모델을 사용해서 Q-Value 값을 근사(Approximation)한다. 다시 말해서 Q-Network는 Q-Function을 Approximation 한다는 것.

 

Q-Network를 사용함에 따라 신경말 모델을 사용하면서 약간의 융통성을 얻었다. 바로 입, 출력을 자유롭게 바꿀 수 있다는 점이다. 우리는 위 Q-Network 구조에서 입력을 보다 간단하게 바꾸어주도록 하자. 위 그림 상으로는 Q-Network도 상태, 액션 2가지를 입력받아 하나의 Q-Value 값을 출력으로 내뱉도록 했지만, 입력을 상태 1개로만 받아서 출력을 각 액션의 Q-Value를 내뱉도록 바꾸어보자. 바뀐 Q-Network의 구조는 아래와 같다.

 

입력, 출력 구조가 변경된 Q-Network

 

바뀐 Q-Network 구조에서는 상태만을 입력받아서 해당 상태에서 취할 수 있는 모든 액션들의 각 가치(Q-value)를 구하고, 그 Q-value들 중에서 가장 값이 큰 액션을 선택한다. 가장 큰 Q-value를 갖는 액션을 선택한다는 것은 곧 현재 상태에서 해당 액션을 선택했을 때 미래에 발생할 보상의 합이 가장 큰 즉, 최종 목표 지점에 도달할 가능성이 큰 액션을 의미한다.

 

이제 신경망 모델의 입, 출력을 알아보았으니 다음으로는 Q-Network가 학습되는 방향의 지침이 되는 손실함수에 대해 알아보도록 하자. 손실함수를 계산하기 위해서는 우리는 '예측값'과 '정답값'이 필요하다. 하나씩 알아보자. 먼저 '예측값'은 현재 상태($s$)를 Q-Network에 입력시켰을 때 나오는 액션들의 Q-value($\hat{Q}$)들이다. 그리고 '정답값'은 현재 상태($s$)를 Q-Network에 입력시켰을 때 나오는 액션들의 최적의 Q-value($\dot{Q}$)들이다. 

 

Q-Network의 손실함수 계산

 

여기서 특이한 점이 있다. 타겟 값인 $\dot{Q}$도 Q-Network라는 모델의 출력값이라니? 위 그림으로만 이해하자면 예측값인 $\hat{$}$ 와 정답인 $\dot{Q}$ 가 동일한 값이라고 생각할 수 있다. 하지만 단계 하나가 빠져있다. 그 '단계'란 $\dot{Q}$를 '진짜 정답으로 만들어주는 과정'이 필요하다. 이 과정을 이해하기 위해서 우리는 Q-Network에서 정답을 어떤 수식으로 정의하는지부터 살펴보자.

 

Q-Network의 정답 $y$ 정의

 

위 수식을 이햐하기 위해 annotation에 대해 알아보자. $j$는 타임스텝을 의미한다(현재, 다음, 다다음,...) $r$은 보상을 의미하며 $r_j$는 $j$번째 상태일 때, $j+1$번째 상태로 이동했을 때 받게 되는 보상을 의미한다. $\gamma$는 MDP에서 배운 할인율(discount factor)을 의미한다. $\phi$는 상태를 의미하며 $a$는 행동을 의미한다. 마지막으로 $\theta$는 Q-Network의 Weight를, $Q()$는 Q-Network를 의미한다.

 

위 annotation 정의를 기반으로 정답 $y$를 정의하면 다음($j+1$) 상태가 종료 상태(만약 게임이라면 게임 종료 상태)라면 보상 $r$만 주어지고, 종료 상태가 아니라면 보상 $r$에다가 특정 값(빨간색 네모칸)을 더해준다. 중요한 부분이 이 빨간색 네모칸이다.

 

빨간색 네모칸 수식은 다음 상태인 $\phi_{j+1}$ 와 다음 행동인 $a'$을 ($\theta$를 가중치로 갖는) Q-Network($Q()$)에 입력으로 넣었을 때 출력되는 Q-value들 중 가장 큰 값을 의미한다. 그리고 그것에 할인율(discount factor)인 $\gamma$ 값을 곱해준다. 다시 말해, 이 빨간색 수식 칸이 우리가 직전 포스팅 MDP에서 배웠던 행동-가치 함수를 근거로 해서 '정답'으로 만들어주는 부분이다.

 

마지막으로, Q-Network의 학습과정을 순서도로 정리해보면 다음과 같아진다. 

 

Q-Network의 입/출력 및 학습 과정

 

이제 위 학습 과정을 파이토치, 넘파이로 활용해서 구현한 코드는 아래와 같다.

 

import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import SGD


# 모델 정의
class Network(nn.Module):
    def __init__(self, n_states, n_actions):
        super(Network, self).__init__()
        
        self.n_states = n_states
        self.n_actions = n_actions
        
        self.linear = nn.Linear(in_features=self.n_states, out_features=self.n_actions)
        
    def forward(self, x):
        x = torch.Tensor([x])
        x = F.one_hot(x.to(torch.int64), num_classes=self.n_states)
        x = x.to(torch.float32)
        y = self.linear(x)
        return y

# setting environment
env = gym.make("FrozenLake-v1")

# params 
n_states = env.observation_space.n
n_actions = env.action_space.n
num_episodes = 1
exploration_prob = 0.5
gamma = 0.99
lr = 0.1


# model, criterion, optimizer
network = Network(n_states, n_actions)
criterion = nn.MSELoss()
optimizer = SGD(params=network.parameters(), lr=lr)

# train
for i in range(num_episodes):
    done = False
    all_reward = 0
    
    # init state
    current_state, current_prob = env.reset()
    
    while not done:
        # init gradients in Q-network
        optimizer.zero_grad()
        
        # create label step-1: from Q-network using `current_state`
        Qs = network(current_state)
        
        # exploration & exploitation
        if torch.rand(1) < exploration_prob:
            action = env.action_space.sample()
        else:
            action = torch.argmax(Qs).item()
        
        # step to next-state(define user-defined function)
        next_state, reward, done, _, _ = env.step(action)
        
        if done:
            Qs[0, action] = reward
        else:
            # create label step-2: from Q-network using `next_state`
            Qs_next = network(next_state)
            Qs[0, action] = reward + gamma * torch.max(Qs_next).item()
            
            # create prediction: from Q-network using `current_state`
            Qs_pred = network(current_state)
            
            # Loss based on label and prediction
            loss = criterion(Qs, Qs_pred)
            loss.backward()

            # update params
            optimizer.step()
        
        # update next_state to current_state
        current_state = next_state
        all_reward += reward
        
        print("Loss:", loss.item(), "Reward:", reward, "done:", done)

3. DQN(Deep Q Network)

직전에 구현한 Q-Network는 몇 가지 문제점이 있어 복잡한 현실 문제를 잘 해결하지 못한다. 신경망 모델을 사용해서 액션의 Q-value들을 근사 시키며 학습한다는 점은 좋았지만, 부족한 점들이 있다. 이 점들을 해결해 나가는 그 유명한 DQN에 대해서 알아보도록 하자.

 

DQN은 Q-Network의 부족한 점들을 3가지 특징으로 해결해나간다. 하나씩 살펴보자.

3-1. 미꾸라지 한 마리가 물을 흐린다 : Data Correlation 문제

"미꾸라지 한 마리가 물을 흐린다." 갑자기 웬 속담이 등장했다. Q-Network의 문제점 중 하나인 Data Correlation 문제를 필자가 직접 비유를 든 것이다. 왜 이런 비유를 든 것인지 알아보자. 

 

강화학습 분야에 국한되지 않고 모든 머신러닝 분야에 있어서는 다음과 같은 공통점이 있다. 바로 "데이터를 통해서 모델이 점진적으로 학습해 나가면서 모델의 정확도가 높아진다"는 점이다. 

 

출처 :&nbsp;https://www.deepwizai.com/projects/how-to-invent-your-own-new-ml-algorithm-a-novel-ensembling-technique

 

강화학습도 위처럼 데이터를 통해서 학습하게 된다. 강화학습에서 '데이터'라고 한다면 계속적으로 시행착오를 거치면서 반복해서 수행하는 것이다. 아래는 대표적인 강화학습의 CartPole 환경이다.

 

출처 :&nbsp;https://inspaceai.github.io/2019/05/30/CartPole_RL_Comparison/

 

그런데 강화학습은 다른 머신러닝 분야와 약간 차이점이 존재한다. 일반적인 머신러닝 분야(분류, 회귀 문제 같은)에서는 다양성을 포함하고 있는 데이터가 미리 한 번에 주어진다. 그렇기 때문에 모델이 학습할 때는 이 다양성이 포함되어 있는 데이터를 동시에 골고루 학습시킬 수 있다. 예를 하나 들어보자. 

 

우리는 Image Classification을 해결하려고 CNN 모델을 학습시키려고 한다. 이 모델은 개, 고양이, 기린, 하마, 사자 총 5가지의 클래스가 레이블링 되어 있다. 주어진 이미지 데이터는 100만 장이고, 하나의 클래스당 20만 장의 이미지 데이터가 주어졌다. CNN 모델을 학습시키는데, 한 번의 배치 사이즈는 100장을 하려고 한다. 이때, 하나의 미니 배치 데이터 100장에는 5가지의 클래스가 골고루 섞여 있도록 하는 것이 좋다. 그래야 CNN 모델이 각 클래스로 분류할 수 있도록 특징을 잘 학습할 수 있을 것이다. 만약 배치 데이터 100장이 모두 고양이 이미지로만 구성되어 있다고 해보자. 이렇게 되면 당연히 모델의 가중치는 고양이 특성만을 추출하도록 bias 되어 학습될 것임을 추측할 수 있다.

 

반면에 강화학습은 약간 다르다. 위에서 살펴본 Image Classification은 다양한 데이터가 미리 한 번에 주어지도록 여러 클래스의 데이터가 골고루 속하도록 샘플링 할 수 있지만, 강화학습은 목표(미로탈출이라고 한다면 탈출 지점에 도착)를 달성하기 까지 초반에는 매번 목표에 도달하지 못하는 터무니없는 동작만 수행할 것이다. 예를 들어, 벽돌 깨기에 강화학습을 활용한다고 해보자. 벽돌 깨기를 시작한 초반, 강화학습의 주체인 Agent는 매번 벽돌을 몇 번 깨 보지도 못하고 계속 낭떠러지도 떨어질 것이다. 그런데 이것이 바로 문제가 된다. 강화학습에서 학습 초반 단계에서는 Agent가 당연히 목표에 도달하지 못하는 터무니없는 액션만 계속 수행하게 될 것이고, 결국 이렇게 반복되는 '터무니 없는 액션'에만 맞춰서 강화학습 모델의 가중치가 업데이트 되게 된다. 이것이 바로 Data Correlation 문제로 인해 Q-Network가 학습이 잘 되지 못하는 이유이다. 

 

이러한 문제를 해결하기 위해서 Capture and Replay 또는 Experience and Replay 라는 기법을 사용한다. 이름이 어렵게 보이지만 해결책은 간단하다. 모델에 "다양한 데이터"를 주입해 주는 것! 해당 기법을 구현하는 단계는 다음과 같다.

 

첫 번째로, 입력으로 들어오는 Batch 데이터(상태, 행동, 보상)를 잠깐 buffer라는 메모리에다가 저장을 해놓는다. 단, buffer에 저장만 해놓고 해당 Batch 데이터로 모델의 가중치를 업데이트(학습)하는 과정을 수행하지 않는 것이 핵심이다.

 

두 번째로, 첫 번째 동작으로 인해 buffer에 담겨있는 다양한 데이터(상태, 행동, 보상)가 있을 것이다. 여기에서 모든 데이터를 사용하지 않고 랜덤 샘플링을 하여 보다 또 다른 Batch 데이터로 재구성한다.(이 때, 재구성된 Batch 데이터는 사이즈가 보다 작을 것임을 예상할 수 있다) 이렇게 되면 재구성된 Batch 데이터에는 다양한 상태, 행동, 보상들이 들어있을 것이다.

 

마지막으로, 재구성된 Batch 데이터로 모델을 이제 학습시켜 가중치를 업데이트한다. 이렇게 되면 모델은 다양한 데이터를 학습하게 되어 Data Correlation 문제를 방지할 수 있다.

3-2. 예측값과 정답을 만드는 모델을 분리시키자 : Seperated Network

Q-Network는 모델의 예측값과 정답 모두 하나의 모델로부터 생성시키는 것을 Q-Network 목차에서 살펴보았다. 그런데 좀 이상하지 않은가? 애초에 머신러닝의 핵심은 모델의 예측값이 정답을 따라감으로써 모델이 학습되는 것인데, 현재 구조로만 본다면 모델의 예측값이 정답을 애초에 따라갈 수 없는 구조이다. 

 

예를 들어서, 1번째 횟수에서 예측값과 정답을 Q-Network(모델)로부터 생성시켰다. 그리고 손실함수 계산 및 가중치 업데이트를 진행했다. 그럼으로써 Q-Network의 가중치가 업데이트되었다. 그런 다음 2번째 횟수에서 예측값과 정답을 생성하려고 하니 정답도 이미 가중치가 변경된 Q-Network로부터 생성되는 것이다. 핵심은 '정답도 가중치가 변경된 Q-Network로부터 생성된다는 점'이다.

 

따라서 이러한 문제를 해결하기 위해 예측값과 정답을 만드는 신경망 모델을 분리시켜 각각 개별로 만들어야 한다. 좀 더 구체적으로, 예측값을 만드는 모델 1개, 정답을 만드는 모델 1개를 동일한 구조로 생성한 후 주기적으로 예측값을 만드는 모델의 가중치를 정답을 만드는 모델의 가중치로 복사, 붙여 넣기 한다. 과정을 도식화하면 아래와 같다. 주기는 4회로 설정했을 때의 GIF 그림이다.

 

4회마다 예측값 네트워크의 가중치를 정답 네트워크의 가중치로 복사/붙여넣기 한다

3-3. 신경망을 더 깊게 : Go Deep!

마지막의 해결법은 매우 간단하다. 신경망을 더 깊게 만들어 모델의 복잡도를 늘리는 것이다. 실제 논문에서는 컨볼루션 레이어를 사용한 것으로 알고 있다. 

 

이렇게까지 해서 Q-Network의 단점을 해결하는 특징 3가지가 반영된 DQN에 대해서 알아보았다. 마지막으로 DQN을 구현한 파이토치 코드는 아래와 같다. CartPole 환경에서 수행하였고 코드는 딥러닝 파이토치 교과서 서적을 참고하였다.

 

import gym
import math
import random
import pyglet
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple
from itertools import count
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T


class ReplayMemory(object):
    def __init__(self, capacity):
        self.capacity = capacity  # 한 번 buffer에 담아놓을 데이터 max limit
        self.memory = []          # buffer
        self.position = 0         # buffer 내에 데이터가 위치할 indices
        
    def push(self, *args):
        if len(self.memory) < self.capacity: 
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity  # +1 하는 이유는 최초에 buffer에 None이 들어갔기 때문
    
    def sample(self, batch_size):
        return random.sample(self.memory, batch_size) # buffer에서 batch-size 만큼 데이터 추출

    def __len__(self):
        return len(self.meemory)
    

class DQN(nn.Module):
    def __init__(self, h, w, outputs):
        super(DQN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=16, kernel_size=5, stride=2)
        self.bn1 = nn.BatchNorm2d(num_features=16)
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=5, stride=2)
        self.bn2 = nn.BatchNorm2d(num_features=32)
        self.conv3 = nn.Conv2d(in_channels=32, out_channels=32, kernel_size=5, stride=2)
        self.bn3 = nn.BatchNorm2d(num_features=32)
        
        def conv2d_size_out(size, kernel_size=5, stride=2):
            return (size - (kernel_size-1)-1) // stride + 1
        
        convw = conv2d_size_out(conv2d_size_out(conv2d_size_out(w)))
        convh = conv2d_size_out(conv2d_size_out(conv2d_size_out(h)))
        in_features = convw * convh * 32
        
        self.head = nn.Linear(in_features=in_features, out_features=outputs)
    
    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        x = x.view(x.size(0), -1)  # flatten all dims excluding batch-size dimension
        y = self.head(x)
        return y
    


resize = T.Compose([T.ToPILImage(),
                    T.Resize(40, interpolation=Image.BICUBIC),
                    T.ToTensor()])

# Cart의 우치ㅣ정보 가져오기
def get_cart_location(screen_width):
    world_width = env.x_threshold * 2
    scale = screen_width / world_width
    return int(env.state[0] * scale + screen_width / 2.0)

def get_screen():
    screen = env.render(mode='rgb_array').transpose((2, 0, 1)) # torch에게 맞는 (C, H, W)로 transpose
    _, screen_height, screen_width = screen.shape
    screen = screen[:, int(screen_height*0.4):int(screen_height*0.8)]
    view_width = int(screen_width * 0.6)
    cart_location = get_cart_location(screen_width)
    
    if cart_location < view_width // 2:
        slice_range = slice(view_width)
    elif cart_location > (screen_width - view_width // 2):
        slice_range = slice(-view_width, None)
    else:
        slice_range = slice(cart_location - view_width // 2,
                            cart_location + view_width // 2)
    screen = screen[:, :, slice_range]
    screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
    screen = torch.from_numpy(screen)
    return resize(screen).unsqueeze(0).to(device)


BATCH_SIZE = 128
GAMMA = 0.999
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 200
TARGET_UPDATE = 10

screen_height, screen_width = 800, 1200
n_actions = env.action_space.n

policy_net = DQN(screen_height, screen_width, n_actions)
target_net = DQN(screen_height, screen_width, n_actions)
target_net.load_state_dict(policy_net.state_dict())  # policy_net의 초기 params -> target_net으로 sync
target_net.eval() # target_net의 params freeze

optimizer = optim.RMSprop(policy_net.parameters())
memory = ReplayMemory(10000)

steps_done = 0


# 다음에 수행할 action을 선택 based exploitation or exploration
def select_action(state):
    global steps_done
    
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1 * steps_done / EPS_DECAY)
    stpes_done += 1
    
    # exploitation or exploration
    if sample > eps_threshold:
        with torch.no_grad():
            action = policy_net(state).max(dim=1).indices.view(1, 1) # value는 Q-value, indices는 action을 의미
            return action
    else:
        action = torch.Tensor([[random.randrange(n_actions)]], device=device, dtype=torch.Long)
        return action

    
def optimizer_model():
    if len(memory) < BATCH_SIZE: # buffer에 최소 BATCH_SIZE만큼의 데이터가 있어야 함
        return
    
    transitions = memory.sample(BATCH_SIZE) # buffer에서 데이터 랜덤 샘플링
    batch = Transition(*zip(*transitions))
    non_final_mask = torch.Tensor(tuple(map(lambda s: s is not None, batch.next_state)), device=device, dtype=torch.bool) # 
    non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
    
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)
    
    # 현재 state들을 policy Q-Network에 입력하여 각 action에 대한 Q-value 예측값 반환
    state_action_values = policy_net(state_batch).gather(1, action_batch) # 각 액션 indices에 해당하는 Q-value들 추출
    Q_pred = state_action_valuees
    
    # 다음 state들을 target Q-Network에 입력하여 각 action에 대한 Q-value '정답' 반환
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()
    Q_label = next_state_values * GAMMA + reward_batch
    
    # init gradients
    optimizer.zero_grad()
    
    # loss basd on prediction and target
    loss = F.smooth_l1_loss(Q_pred, Q_label)
    loss.backward()
    
    # gradient clipping between -1 and 1
    for p in policy_net.parameters():
        p.grad.data.clamp_(-1, 1)
    
    # update params
    optimizer.step()
    
    
# endpoint
from itertools import count

num_episodes = 50

for i_episode in range(num_episodes):
    env.reset()
    # last_screen = get_screen()
    # current_screen = get_screen()
    
    # 초기 상태
    state = current_screen - last_screen
    
    # train
    for t in count(): # infinite-loop
        # 현재 state --입력--> policy Q-Network --출력--> Q-value가 가장큰 action
        action = select_action(state)
        _, reward, done _ = env.step(action.item())
        reward = torch.Tensor([reward], device=device)
        
        last_screen = current_screen
        current_screen = get_screen()
        
        if not done:
            next_state = current_screen - last_screen
        else:
            next_state = None  # when terminal-state
        
        memory.push(state, action, next_state, reward)
        state = next_state
        
        optimizer_model()
        if done:
            episode_durations.append(t+1)
            break
    
    # cp from weight of policy-net to weight of target-net periodically
    if i_episode % TARGET_UPDATE == 0:
        target_net.load_state_dict(policy_net.state_dict())
반응형