Reinforcement Learning - Introducing Goal Oriented Intelligence

with deeplizard.

Deep Q-Network Image Processing and Environment Management - Reinforcement Learning Code Project

July 21, 2019 by


Managing the Environment - Reinforcement Learning Code Project

Welcome back to this series on reinforcement learning! In this episode, we’ll be continuing to develop the code project we’ve been working on to build a deep Q-network to master the cart and pole problem. We'll see how to manage the environment and process images that will be passed to our deep Q-network as input.

Update to Agent

Before getting to the new code, I have one quick update to share for the Agent class we developed last time. Agent will now require a device, as shown below.

class Agent():
    def __init__(self, strategy, num_actions, device):
        self.current_step = 0
        self.strategy = strategy
        self.num_actions = num_actions
        self.device = device

device will be the device that we tell PyTorch to use for tensor calculations, i.e. a CPU or GPU. We’ll see exactly how to set this later in our main program, but for now, this explanation is all we need to know.

device gets initialized with whatever device we passed in, and we then use it on the tensor we return in our select_action() function, as shown below. We do this simply by calling to() on the tensor and then passing in the device.

if rate > random.random():
    action = random.randrange(self.num_actions)
    return torch.tensor([action]).to(self.device) # explore      
    with torch.no_grad():
        return policy_net(state).argmax(dim=1).to(self.device) # exploit

Alright, that’s it for the update.

Environment Manager

Now that we have created the classes for our all of our crucial objects, like our DQN, ReplayMemory, and Agent, we’re now going to move on to creating a class that we’ll call CartPoleEnvManager.

This class will manage our cart and pole environment. It will wrap several of gym’s environment capabilities, and it will also give us some added functionality, like image preprocessing, for the environment images that will be given to our network as input.

To create a CartPoleEnvManager object, we just require a device be passed to the constructor. Just like the explanation we gave for the Agent class, device will be the device that we’re telling PyTorch to use for tensor calculations.

class CartPoleEnvManager():
    def __init__(self, device):
        self.device = device
        self.env = gym.make('CartPole-v0').unwrapped
        self.current_screen = None
        self.done = False

Within the class constructor, we first initialize the class’s device attribute with the device that was passed in, and we initialize the env attribute to be gym’s cart and pole environment. Calling unwrapped gives us access to behind-the-scenes dynamics of the environment that we wouldn’t have access to otherwise.

Since we must reset the environment to get an initial observation of it, we do this right after initializing env. We then set the current_screen attribute equal to None. current_screen will track the current screen of the environment at any given time, and when it’s set to None, that indicates that we’re at the start of an episode and have not yet rendered the screen of the initial observation. We’ll see more about this soon.

We then set the done attribute equal to False. done will track whether or not any taken action has ended an episode.

Wrapped functions

We now have a few wrapper functions that simply just wrap a function with the same name used by gym. Specifically, we have reset(), close(), and render(), which reset, close, and render the environment using gym’s reset(), close(), and render() functions.

def reset(self):
    self.current_screen = None
def close(self):
def render(self, mode='human'):
    return self.env.render(mode)

As a reminder from when we covered these gym functions during the Frozen Lake project, we call reset() on the gym environment when we want the environment to be reset to a starting state. reset() returns an initial observation from the environment.

We call close() to close the environment when we’re finished with it, and we call render() on the environment to render the current state to the screen. We can also get a numpy array version of the rendered screen from this function as well.

The only one of our wrapper functions that does anything outside of calling gym’s function with the same name is reset(). You can see that we’re also setting the current_screen to None here.

When we reset the environment, we’re typically going to be at the end of an episode, and therefore, we want to set the current_screen back to None since this indicates that we’re at the start of an episode and have not yet rendered the screen of the initial observation.

We’re wrapping these functions in this way so that later, in our main program, we’ll only have to deal with a CartPoleEnvManager, and not both this manager and an environment.

Instead, we’re encapsulating the environment functionality within our environment manager, so that our manager can completely manage the environment using these functions, as well as new functions that will be introduced in just a moment that an original gym environment wouldn’t have access to. This gives our main program a clean and consistent interface for interacting with the environment.

Number of actions available to agent

Moving on to our next function, num_actions_available() returns the number of actions available to an agent in the environment. In our cart and pole environment, at any given time, an agent will only have two actions available: move left or move right.

def num_actions_available(self):
    return self.env.action_space.n

Taking an action in the environment

Next, take_action() is a function that requires an action to be passed in. Using this action, we call step() on the environment, which will execute the given action taken by the agent in the environment.

def take_action(self, action):        
    _, reward, self.done, _ = self.env.step(action.item())
    return torch.tensor([reward], device=self.device)

As you may recall from previous episodes where we used gym to set up our Frozen Lake environment, step() returns a tuple containing the environment observation, reward, whether or not the episode ended, and diagnostic info, all of which resulted from the agent executing that particular action.

For our purposes, we only care about the reward and whether or not the episode ended from taking the given action, so we set the reward variable accordingly, and also update the class’s done attribute with the boolean value of whether or not the episode ended by taking the given step.

Notice that we’re calling item() on the action we’re passing to step(). This is because the action that will be passed to this function in our main program will be a tensor. We’ll be consistently working with tensors throughout the main program. item() just returns the value of this tensor as a standard Python number, which is what step() expects.

Our take_action() function then returns the reward wrapped in this PyTorch tensor. We’re processing the reward in this way, by wrapping it in a tensor, to put it in the format that will be needed later on in our main program.

So, we have a tensor coming into the function, and a tensor coming out of it. This is how we keep the data type consistent in our main program. Here, we can see where the device comes into play, as we’re setting the device of this tensor to be the device that was passed in to the CartPoleEnvManager.

Starting an episode

Next, we have this function just_starting() that returns True when the current_screen is None and returns False otherwise.

def just_starting(self):
    return self.current_screen is None

Remember, current_screen is set to None in the class constructor and also gets set to None when the environment is reset after ending an episode. So, if current_screen is None, that means we are at the starting state of an episode and haven’t yet rendered an initial observation from the environment.

Getting the state of the environment

Next we define the function get_state(). The point of this function is to return the current state of the environment in the form of a processed image of the screen. Remember, a deep Q-network takes states of the environment as input, and we previously mentioned that for our environment, states would be represented using screenshot-like images.

Actually, note that we will represent a single state in the environment as the difference between the current screen and the previous screen. This will allow the agent to take the velocity of the pole into account from one single image. So, a single state will be represented as a processed image of the difference between two consecutive screens. We’ll see in a moment what type of processing is being done.

def get_state(self):
    if self.just_starting() or self.done:
        self.current_screen = self.get_processed_screen()
        black_screen = torch.zeros_like(self.current_screen)
        return black_screen
        s1 = self.current_screen
        s2 = self.get_processed_screen()
        self.current_screen = s2
        return s2 - s1

We have two conditions we’re checking for in this function.

We check first to see if we are just starting or if we’re done with the episode. Remember, if we’re just starting, then the initial screen has not yet been rendered from the initial observation in the environment. If done == True, then that means the last action taken by the agent ended the episode.

We said that states would be represented as the difference between the last two screens. Well, when we’re at the start of a new episode, there is no last screen to compare to the current screen. So, we’re going to represent our starting state with a fully black screen. The fact we’re doing this will make more sense once we see some visual example of states in a few minutes.

When we’re in the next state that occurs after an agent has taken an action that ended the episode, we’ll also represent this state with a fully black screen as well.

We do this by first calling get_processed_screen(), which returns the processed screen from the environment and assigns this result to the current screen. We then create a fully black screen of the same shape as the current_screen using torch.zeros_like(). We’ll explore the get_processed_screen() function more in a moment.

If we’re not just starting an episode, and we’re not ending it either, then we’re somewhere in the middle of an episode.

In this case we’ll take the difference between the current screen and the last screen and return this result.

In the code above, s1 stands for screen1 and is set to the current_screen. s2 stands for screen2 and is set to the result of a new call to get_processed_screen(). We then update our current_screen to the value of s2. So now, s2 is our current_screen, and s1 is our previous screen, so we return the difference of these two screens to represent a single state.

Get processed screen dimensions

Next we have these two simple functions get_screen_height() and get_screen_width().

def get_screen_height(self):
    screen = self.get_processed_screen()
    return screen.shape[2]
def get_screen_width(self):
    screen = self.get_processed_screen()
    return screen.shape[3]

These functions return the height and width of a processed screen by first getting a processed screen from the get_processed_screen() function (which we’re about to cover), and then indexing into the shape of the screen with a 2 to get the height, or with a 3 to get the width.

Processing the screen image

Now, we’ll move on to the get_processed_screen() function we’ve been referencing.

def get_processed_screen(self):
    screen = self.render('rgb_array').transpose((2, 0, 1)) # PyTorch expects CHW
    screen = self.crop_screen(screen)
    return self.transform_screen_data(screen)

This function first renders the environment as an RGB array using the render() function and then transposes this array into the order of channels by height by width, which is what our PyTorch DQN will expect.

This result is then cropped by passing it to the crop_screen() function, which we’ll cover next. We then pass the cropped screen to the function transform_screen_data(), again, which we’ll cover in a moment, which just does some final data conversion and rescaling to the cropped image.

This transposed, cropped, and transformed version of the original screen returned by gym is what is returned by get_processed_screen().

Crop screen image

The crop_screen() function accepts a screen and will return a cropped version of it. We first get the height of the screen that was passed in, and then we strip off the top and bottom of the screen.

We’ll see an example of a screen both before and after it’s been processed in a moment, and there you’ll see how there is a lot of plain white space at the top and bottom of the cart and pole environment, so we’re removing this empty space here.

def crop_screen(self, screen):
    screen_height = screen.shape[1]
    # Strip off top and bottom
    top = int(screen_height * 0.4)
    bottom = int(screen_height * 0.8)
    screen = screen[:, top:bottom, :]
    return screen

We set top equal to the value that corresponds to 40% of the screen_height. Similarly, we set bottom equal to the value that corresponds to 80% of the screen_height.

With these top and bottom values, we then take a slice of the screen starting from the top value down to the bottom value so that we’ve essentially stripped off the top 40% of the original screen and the bottom 20%.

Convert and rescale screen image data

Our last image processing function is transform_screen_data(). This function accepts a screen.

def transform_screen_data(self, screen):       
    # Convert to float, rescale, convert to tensor
    screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
    screen = torch.from_numpy(screen)
    # Use torchvision package to compose image transforms
    resize = T.Compose([
    return resize(screen).unsqueeze(0).to(self.device) # add a batch dimension (BCHW)

We first pass this screen to the numpy ascontiguousarray() function, which returns a contiguous array of the same shape and content as screen, meaning that all the values of this array will be stored sequentially next to each other in memory.

We’re also converting the individual pixel values into type float32 and rescaling all the values by dividing them each by 255. This is a common rescaling process that occurs during image processing for neural network input.

We then convert this array to a PyTorch tensor.

We then use torchvision’s Compose class to chain together several image transformations. We’ll call this compose resize. So, when a tensor is passed to resize, it will first be converted to a PIL image, then it will be resized to a 40 x 90 image. The PIL image is then transformed to a tensor.

So, we pass our screen from above to resize and then add an extra batch dimension to the tensor by calling unsqueeze(). This result is then what is returned by the transform_screen_data() function.

Example screens

Alright, we’re now finished up with the our CartPoleEnvManager class. Let’s now take a look visually at the results of all the image processing that we went over from this class.

Non-processed screen

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
em = CartPoleEnvManager(device)
screen = em.render('rgb_array')

plt.title('Non-processed screen example')

This first screen is an example of what a non-processed screen looks like from the environment. We’re getting this screen just by setting up an instance of CartPoleEnvManager, calling reset() to get an initial observation, and then rendering the screen.

Processed screen

screen = em.get_processed_screen()

plt.imshow(screen.squeeze(0).permute(1, 2, 0).cpu(), interpolation='none')
plt.title('Processed screen example')

After we do all the processing to this image simply from calling get_processed_screen(), we now get this processed image. There is actually some further processing we could do, like stripping some of the empty space from the right and left sides, but we’ll save that for a later episodes as an experiment to try to improve performance.

Starting state

screen = em.get_state()
plt.imshow(screen.squeeze(0).permute(1, 2, 0).cpu(), interpolation='none')
plt.title('Starting state example')

Now, remember how a state from the environment is created from the difference between two processed images? And also how the starting state will always be a fully black screen based on our earlier discussion? Well, here we can see that example by calling get_state() on our environment manager.

Non-starting state

for i in range(5):
screen = em.get_state()

plt.imshow(screen.squeeze(0).permute(1, 2, 0).cpu(), interpolation='none')
plt.title('Non starting state example')

If we want to see what a state looks that is not a starting state, we can take some actions in the environment and call get_state() again to get this result.

Since we’re taking the difference between the current screen and the previous screen, most pixel values will become zero. The only ones that are anything but zero are just the kind of highlight that we’re seeing here which gives us an idea of where are cart and pole were in the previous screen, and where they have moved to now.

Ending state

em.done = True
screen = em.get_state()

plt.imshow(screen.squeeze(0).permute(1, 2, 0).cpu(), interpolation='none')
plt.title('Ending state example')

Lastly, if we want to see the state of the environment after an episode has ended, we specify done = True and call em.get_state() again, and we can see the fully black screen that we’d expect.

So, these states are exactly what will be passed to our DQN as input during training.

Utility functions

Now, we’re going to move on to a couple of quick utility functions we’ll have available to us during training so that we can plot our performance on a chart.

We’re creating this function called plot() that accepts values and a moving average period. This plot will plot the duration of each episode, as well as the 100 episode moving average.

To solve cart and pole, the average reward must be greater than or equal to 195 over 100 consecutive episodes. Recall that our agent gets a reward of +1 for each step it takes that doesn’t end the episode. So, the duration of an episode measured in timesteps is exactly equivalent to the reward for that episode.

def plot(values, moving_avg_period):
    plt.plot(get_moving_average(moving_avg_period, values))
    if is_ipython: display.clear_output(wait=True)

Using matplotlib’s pyplot module, we set up the figure, give it a title, name the axes, and give it the values to plot, which in our case will be episode durations.

We’ll also want to plot the 100 episode moving average, so we do so by calling the function get_moving_average(), which accepts the moving_average_period and the values for which it will be calculating the moving average from.

def get_moving_average(period, values):
    values = torch.tensor(values, dtype=torch.float)
    if len(values) >= period:
        moving_avg = values.unfold(dimension=0, size=period, step=1) \
        moving_avg =, moving_avg))
        return moving_avg.numpy()
        moving_avg = torch.zeros(len(values))
        return moving_avg.numpy()

Within get_moving_average(), we first transform the values to a PyTorch tensor and then check to see if the length of the values is greater than or equal to the period. We do this because we can’t calculate a moving average of a data set when the data set is not at least as large as the period we want to calculate the moving average for.

For example, if we want to calculate the 100-period moving average of episode durations, then if we’ve only played 90 episodes, a 100-period moving average can’t be calculated.

If this condition is met, then we calculate the moving average by first calling unfold() on the tensor, which returns a tensor that contains all slices with a size equal to the period that was passed in (in our case, that is going to be 100). It does this on the zeroth dimension of the original values tensor.

This gives us a new tensor containing all slices of size 100 across the original value tensor. We then take the mean of each of these slices and flatten the tensor so that now moving_avg is equal to a tensor containing all 100-period averages from the values that were passed in.

We then concatenate this resulting tensor to a tensor of zeros with a size equal to period-1. This is to show that the moving average for the first period-1 values is zero given the explanation we just gave a moment ago. So, if our period is 100, then the first 99 values of the moving_avg tensor will be 0, and then each value afterwards will be the actual calculated 100-period moving average.

We then convert the moving_avg tensor to a numpy array and return this result.

Now, if our initial condition was not met, so that the length of the values array that was initially passed in was not at least the period size, then we just return a numpy array of all zeros with a length equal to the values array that was passed in.

Example plot

To show an example of this plot() function, we’ll pass in a numpy array that contains 300 random values between 0 and 1, and we’ll specify 100 as our moving average period.

plot(np.random.rand(300), 100)

The actual values are plotted in blue, and the orange line is the 100-period moving average across these values.

We can see that the 100-period moving average is 0 for the first 99 values, and then we get the first calculated moving average at the 100th value. This represents the average of the first 100 values in the array. If we skip over to the moving average at value 200, then the orange line at this point represents the average of the second 100 values between value 100 to 200.

When we train our network, we’ll be using this plot to show our performance over time.

Wrapping up

Next time we’ll be picking up with developing our main program! We’ll take all of these classes and functions we’ve developed over the last couple episodes and see how they all come together in our main program to train our DQN.

Until then, please like this video to let us know you’re learning, and take the corresponding quiz to test your own understanding! Don’t forget about the deeplizard hivemind for exclusive perks and rewards. See ya in the next one!


Welcome back to this series on reinforcement learning! In this episode, we’ll be continuing to develop the code project we’ve been working on to build a deep Q-network to master the cart and pole problem. We'll see how to manage the environment and process images that will be passed to our deep Q-network as input. 💥🦎 DEEPLIZARD COMMUNITY RESOURCES 🦎💥 👀 OUR VLOG: 🔗 👉 Check out the blog post and other resources for this video: 🔗 💻 DOWNLOAD ACCESS TO CODE FILES 🤖 Available for members of the deeplizard hivemind: 🔗 🧠 Support collective intelligence, join the deeplizard hivemind: 🔗 🤜 Support collective intelligence, create a quiz question for this video: 🔗 🚀 Boost collective intelligence by sharing this video on social media! ❤️🦎 Special thanks to the following polymaths of the deeplizard hivemind: yasser Prash 👀 Follow deeplizard: Our vlog: Twitter: Facebook: Patreon: YouTube: Instagram: 🎓 Deep Learning with deeplizard: Fundamental Concepts - Beginner Code - Advanced Code - Advanced Deep RL - 🎓 Other Courses: Data Science - Trading - 🛒 Check out products deeplizard recommends on Amazon: 🔗 📕 Get a FREE 30-day Audible trial and 2 FREE audio books using deeplizard’s link: 🔗 🎵 deeplizard uses music by Kevin MacLeod 🔗 🔗 ❤️ Please use the knowledge gained from deeplizard content for good, not evil.