Combinatorial Games. Episode 3: Connect-N (Tic-Tac-Toe, Gomoku) OpenAI Gym GUI Env

OpenAI GYM GUI Env Based on Pygame with Enhanced Minimax Strategy

In last episode, we finished up minimax strategy for Connect-N games, including Tic-Tac-Toe and Gomoku. This episode, we will implement its GUI environment based on Pygame library for human vs. human, AI vs. AI or human vs. AI plays, which is essential for self-play AlphaGo Zero reinforcement learning. The environment is further embedded into OpenAI Gym as it’s the standard in game reinforcement learning. All code in this series is in ConnectNGym github .

Connect-N Pygame Implementation

Pygame Tic-Tac-Toe Human Play

Python has several well-known multi-platform GUI libraries such as Tkinter, PyQt. They are mainly targeted at desktop GUI programming, whose API family is complicated and learning curve is steep. In contrast, Pygame is tailored specifically for desktop small game development so we adopt it.

Pygame 101

Pygame is, no exceptionally, the same as all GUI development, that is based on single thread event driven model. Here is the simplest desktop Pygame application showing a window. while True infinitely retrieves events dispatched by OS to the window. In the example, we only handle quit event (user clicking on close button) to exit the whole process. In addition, clock variable controls FPS, which we won’t elaborate on.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import sys
import pygame
pygame.init()
display = pygame.display.set_mode((800,600))
clock = pygame.time.Clock()

while True:
	for event in pygame.event.get():
		if event.type == pygame.QUIT:
			sys.exit(0)
		else:
			pygame.display.update()
			clock.tick(1)

PyGameBoard Class

PyGameBoard class encapsulates GUI interaction and rendering logics. In last episode, we have coded ConnectNGame class. PyGameBoard is instantiated with a pre-initialized ConnectNGame instance. It handles GUI mouse event to determine next valid move and then further manipulates its internal state, which is just the ConnectNGame instance passed in. Concretely, PyGameBoard instance method, next_user_input(self) loops until a valid action is identified by current player.

PyGameBoard Class Diagram
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class PyGameBoard:

	def __init__(self, connectNGame: ConnectNGame):
		self.connectNGame = connectNGame
		pygame.init()

	def next_user_input(self) -> Tuple[int, int]:
		self.action = None
		while not self.action:
			self.check_event()
			self._render()
			self.clock.tick(60)
		return self.action
  
  def move(self, r: int, c: int) -> int:
		return self.connectNGame.move(r, c)
  
if __name__ == '__main__':
	connectNGame = ConnectNGame()
	pygameBoard = PyGameBoard(connectNGame)
	while not pygameBoard.isGameOver():
		pos = pygameBoard.next_user_input()
		pygameBoard.move(*pos)

	pygame.quit()

Following Pygame 101, method check_event() handles events dispatched by OS and only player mouse event is consumed. Method _handle_user_input() converts mouse event into row and column indices, validates the move and returns the move in the form of Tuple[int, int]. For instance, (0, 0) is the upper left corner position.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def check_event(self):
	for e in pygame.event.get():
		if e.type == pygame.QUIT:
			pygame.quit()
			sys.exit(0)
		elif e.type == pygame.MOUSEBUTTONDOWN:
			self._handle_user_input(e)
    
def _handle_user_input(self, e: Event) -> Tuple[int, int]:
	origin_x = self.start_x - self.edge_size
	origin_y = self.start_y - self.edge_size
	size = (self.board_size - 1) * self.grid_size + self.edge_size * 2
	pos = e.pos
	if origin_x <= pos[0] <= origin_x + size and origin_y <= pos[1] <= origin_y + size:
		if not self.connectNGame.gameOver:
			x = pos[0] - origin_x
			y = pos[1] - origin_y
			r = int(y // self.grid_size)
			c = int(x // self.grid_size)
			valid = self.connectNGame.checkAction(r, c)
			if valid:
				self.action = (r, c)
				return self.action

Integrated into OpenAI Gym

OpenAI Gym specifies how Agent interacts with Env. Env is defined as gym.Env and the major task of creating a new game Environment is subclassing it and overriding reset, step and render methods. Let’s see how our ConnectNGym looks like.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class ConnectNGym(gym.Env):

	def reset(self) -> ConnectNGame:
		"""Resets the state of the environment and returns an initial observation.

		Returns:
			observation (object): the initial observation.
		"""
		raise NotImplementedError


	def step(self, action: Tuple[int, int]) -> Tuple[ConnectNGame, int, bool, None]:
		"""Run one timestep of the environment's dynamics. When end of
		episode is reached, you are responsible for calling `reset()`
		to reset this environment's state.

		Accepts an action and returns a tuple (observation, reward, done, info).

		Args:
			action (object): an action provided by the agent

		Returns:
			observation (object): agent's observation of the current environment
			reward (float) : amount of reward returned after previous action
			done (bool): whether the episode has ended, in which case further step() calls will return undefined results
			info (dict): contains auxiliary diagnostic information (helpful for debugging, and sometimes learning)
		"""
		raise NotImplementedError



	def render(self, mode='human'):
		"""
		Renders the environment.

		The set of supported modes varies per environment. (And some
		environments do not support rendering at all.) By convention,
		if mode is:

		- human: render to the current display or terminal and
			return nothing. Usually for human consumption.
		- rgb_array: Return an numpy.ndarray with shape (x, y, 3),
			representing RGB values for an x-by-y pixel image, suitable
			for turning into a video.
		- ansi: Return a string (str) or StringIO.StringIO containing a
			terminal-style text representation. The text can include newlines
			and ANSI escape sequences (e.g. for colors).

		Note:
		Make sure that your class's metadata 'render.modes' key includes
		the list of supported modes. It's recommended to call super()
		in implementations to use the functionality of this method.

		Args:
			mode (str): the mode to render with
		"""
		raise NotImplementedError

Method reset()

def reset(self) -> ConnectNGame

Resets environment internal state and returns corresponding initial status that can be observed by agent. ConnectNGym holds an instance of ConnectNGame as its internal state and because of the complete observability property in any board games, the observable state by agent is exactly the same as board game internal state. So we return a deepcopy of ConnectNGame instance in reset().

Method step()

def step(self, action: Tuple[int, int]) -> Tuple[ConnectNGame, int, bool, None]

Once the agent selects an action and hands back to env, env would execute the action and change its internal state via step() and returns following four items.

  1. The new state observed by the agent
  2. The reward associated with the action
  3. Environment terminated or not
  4. Other auxiliary information

step() is the most core API of gym.Env. We illustrate a sequence of game state transitions together with input and output

Initial State:

State ((0, 0, 0), (0, 0, 0), (0, 0, 0))

Agent A selects action = (0, 0). ConnectNGym.step() executes the action and returns

status = ((1, 0, 0), (0, 0, 0), (0, 0, 0)),reward = 0,game_end = False

State ((1, 0, 0), (0, 0, 0), (0, 0, 0))

Agent B selects action = (1, 1),ConnectNGym.step() executes the action and returns

status = ((1, 0, 0), (0, -1, 0), (0, 0, 0)),reward = 0,game_end = False

State ((1, 0, 0), (0, -1, 0), (0, 0, 0))

Repeat the process and game may end up with following terminal state after 5 rounds.

Terminal State ((1, 1, 1), (-1, -1, 0), (0, 0, 0))

The last step() returns

status = ((1, 1, 1), (-1, -1, 0), (0, 0, 0)),reward = 1,game_end = True

Method render()

def render(self, mode='human')

Renders the env. Mode parameter differentiates player being human or AI agent.

ConnectNGym Implementation

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class ConnectNGym(gym.Env):

	def __init__(self, pygameBoard: PyGameBoard, isGUI=True, displaySec=2):
		self.pygameBoard = pygameBoard
		self.isGUI = isGUI
		self.displaySec = displaySec
		self.action_space = spaces.Discrete(pygameBoard.board_size * pygameBoard.board_size)
		self.observation_space = spaces.Discrete(pygameBoard.board_size * pygameBoard.board_size)
		self.seed()
		self.reset()

	def reset(self) -> ConnectNGame:
		self.pygameBoard.connectNGame.reset()
		return copy.deepcopy(self.pygameBoard.connectNGame)

	def step(self, action: Tuple[int, int]) -> Tuple[ConnectNGame, int, bool, None]:
		# assert self.action_space.contains(action)

		r, c = action
		reward = REWARD_NONE
		result = self.pygameBoard.move(r, c)
		if self.pygameBoard.isGameOver():
			reward = result

		return copy.deepcopy(self.pygameBoard.connectNGame), reward, not result is None, None

	def render(self, mode='human'):
		if not self.isGUI:
			self.pygameBoard.connectNGame.drawText()
			time.sleep(self.displaySec)
		else:
			self.pygameBoard.display(sec=self.displaySec)

	def get_available_actions(self) -> List[Tuple[int, int]]:
		return self.pygameBoard.getAvailablePositions()

Connect-N Enhanced Minimax Strategy

The following animation shows two minimax AI players playing Tic-Tac-Toe game (k=3,m=n=3). We know the conclusion from previous episode that Tic-Tac-Toe is solved to be a draw, meaning when two players both play optimal strategy, the first player is forced tie by second one, which corresponds to animation result.

Minimax AI Self-Play

Game State Rotation Enhancement

In last episode, we have confirmed Tic-Tac-Toe has 5478 total states. The number grows exponentially as k, m and n increase. For instance, in case where k=3, m=n=4 the total state number is 6035992 whereas k=4, m=n=4 it’s 9722011. We could improve Minimax DP strategy by pruning those game states that are rotated from one solved game state. That is, once a game state is solved, we not only cache this game state but also cache other three game states derived by rotation that share the same result.

For example, game state below has same result as other three rotated ones.

Game State 1
Other Rotated 3 States
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
	def similarStatus(self, status: Tuple[Tuple[int, ...]]) -> List[Tuple[Tuple[int, ...]]]:
		ret = []
		rotatedS = status
		for _ in range(4):
			rotatedS = self.rotate(rotatedS)
			ret.append(rotatedS)
		return ret

	def rotate(self, status: Tuple[Tuple[int, ...]]) -> Tuple[Tuple[int, ...]]:
		N = len(status)
		board = [[ConnectNGame.AVAILABLE] * N for _ in range(N)]

		for r in range(N):
			for c in range(N):
				board[c][N - 1 - r] = status[r][c]

		return tuple([tuple(board[i]) for i in range(N)])

Minimax Strategy Precomputation

In last version of Minimax DP strategy implementation, we searched best game result given a game state. In the computation, we also leveraged pruning to shortcut if the result is already best. However, for AI agent, we still have to call minimax for each new game state encountered. This is very inefficient because we are solving same game states again and again during top down recursion. An obvious improvement is to compute all game states in first step and cache them all. Later for each given state encountered, we only need to aggregate result by looking at all possible next move positions of that game state. Code of aggregating possible moves is listed below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class PlannedMinimaxStrategy(Strategy):
	def __init__(self, game: ConnectNGame):
		super().__init__()
		self.game = copy.deepcopy(game)
		self.dpMap = {}  # game_status => result, move
		self.result = self.minimax(game.getStatus())


	def action(self, game: ConnectNGame) -> Tuple[int, Tuple[int, int]]:
		game = copy.deepcopy(game)

		player = game.currentPlayer
		bestResult = player * -1  # assume opponent win as worst result
		bestMove = None
		for move in game.getAvailablePositions():
			game.move(*move)
			status = game.getStatus()
			game.undo()

			result = self.dpMap[status]

			if player == ConnectNGame.PLAYER_A:
				bestResult = max(bestResult, result)
			else:
				bestResult = min(bestResult, result)
			# update bestMove if any improvement
			bestMove = move if bestResult == result else bestMove
			print(f'move {move} => {result}')

		return bestResult, bestMove

Agent Class and Playing Logic

We also construct Agent class hierarchy, allowing AI player and human player to share common code.

BaseAgent is the root class with default act() method being making random decisions over all available actions.

1
2
3
4
5
6
class BaseAgent(object):
	def __init__(self):
		pass

	def act(self, game: PyGameBoard, available_actions):
		return random.choice(available_actions)

AIAgent has its act() behavior delegated to strategy’s action() method.

1
2
3
4
5
6
7
8
class AIAgent(BaseAgent):
	def __init__(self, strategy: Strategy):
		self.strategy = strategy

	def act(self, game: PyGameBoard, available_actions):
		result, move = self.strategy.action(game.connectNGame)
		assert move in available_actions
		return move

HumanAgent delegates act() to PyGameBoard next_user_input().

1
2
3
4
5
6
class HumanAgent(BaseAgent):
	def __init__(self):
		pass

	def act(self, game: PyGameBoard, available_actions):
		return game.next_user_input()
Agent Class Diagram

Below are code snippets showing how Agent, ConnectNGym, PyGameBoard are connected together to make game play.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def play_ai_vs_ai(env: ConnectNGym):
	plannedMinimaxAgent = AIAgent(PlannedMinimaxStrategy(env.pygameBoard.connectNGame))
	play(env, plannedMinimaxAgent, plannedMinimaxAgent)


def play(env: ConnectNGym, agent1: BaseAgent, agent2: BaseAgent):
	agents = [agent1, agent2]

	while True:
		env.reset()
		done = False
		agent_id = -1
		while not done:
			agent_id = (agent_id + 1) % 2
			available_actions = env.get_available_actions()
			agent = agents[agent_id]
			action = agent.act(pygameBoard, available_actions)
			_, reward, done, info = env.step(action)
			env.render(True)

			if done:
				print(f'result={reward}')
				time.sleep(3)
				break


if __name__ == '__main__':
	pygameBoard = PyGameBoard(connectNGame=ConnectNGame(board_size=3, N=3))
	env = ConnectNGym(pygameBoard)
	env.render(True)

	play_ai_vs_ai(env)
Class Diagram Overview

Welcome to subscribe MyEncyclopedia Wechat Account
All Rights Reserved. Contact me for commercial reference. Non commercial usage please include this link.

Related