Source code for codpy.KQLearning

import sys
import os

import codpy.core
import codpy.permutation

os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
import random

import numpy as np
import pandas as pd
from scipy import optimize
from scipy.special import softmax

import codpy
import codpy.core as core
from codpy.core import get_matrix
import codpy.selection as selection
from codpy.data_processing import hot_encoder
from codpy.kernel import Kernel, KernelClassifier, get_tensor_probas
from codpy.lalg import LAlg
from codpy.algs import Alg
from codpy.sampling import get_normals
from codpy.utils import gather, cartesian_sum, cartesian_outer_product, fit_to_cov
import codpy.conditioning
from codpy.permutation import map_invertion
from codpy.clustering import MiniBatchkmeans, BalancedClustering
import codpy.optimization as optimization

class ReplayBuffer(object):
    def __init__(self, capacity=None, memory=None, **kwargs):
        if memory is None:
            self.memory = None
        else:
            self.memory = list(memory)
        if capacity is None:
            self.capacity = sys.maxsize
        else:
            self.capacity = capacity
        self.games_list = []
    def is_full(self):
        return len(self) >= self.capacity

    def update(self, capacity=None, worst_game = False,**kwargs):
        if capacity is None:
            capacity = self.capacity
        if len(self) < capacity:
            return True
        returned = True
        if worst_game == True:
            while len(self) > capacity:
                scores = np.array([self.games_list[i][3].sum() - self.games_list[i][3].var() for i in range(len(self.games_list))])
                min_indices = scores.argmin()
                if min_indices == len(self.games_list)-1 :
                    returned = False
                self.remove_games(min_indices)

            return returned

        else:
            def helper(i):
                self.memory[i] = self.memory[i][
                    max(self.memory[i].shape[0] - capacity, 0) :
                ]

            [helper(i) for i in range(len(self.memory))]
        return True

    def empty(self, N=None, *args):
        if N is None:
            self.memory = None
            self.games_list = []
        else:

            def helper(i):
                self.memory[i] = self.memory[i][
                    max(self.memory[i].shape[0] - self.capacity, N) :
                ]

            [helper(i) for i in range(len(self.memory))]

    def push(self, *sarsd, capacity=None, **kwargs):
        self.games_list.append(sarsd)

        if self.memory is None:
            self.memory = sarsd
        else:

            def helper(i):
                if len(sarsd[i]) == 0:
                    return self.memory[i]
                return np.concatenate([self.memory[i], sarsd[i]], axis=0)

            self.memory = [helper(i) for i in range(len(sarsd)) ]
        return self.update(capacity,**kwargs)

    def games(self):
        return self.games_list
    def remove_games(self,indices):
        if isinstance(indices,list):
            [self.remove_games(i) for i in indices]
        else:
            del self.games_list[indices]
            def helper(i):
                self.memory[i] = np.concatenate([self.games_list[j][i] for j in range(len(self.games_list))], axis=0)
            [helper(i) for i in range(len(self.memory)) ]                    
        return self

    def last_game(self):
        return self.games()[-1]

    def last(self):
        return self.last_game()

    def sample(self, batch_size):
        if len(self) == 0:
            return None
        indices = list(range(len(self)))
        if batch_size > len(self):
            self.last_indices = indices
        else:
            self.last_indices = random.sample(indices, batch_size)
        return self[self.last_indices]

    def __len__(self):
        if self.memory is None:
            return 0
        return len(self.memory[0])

    def __getitem__(self, indices):
        if self.memory is None:
            return None

        def helper(i):
            out = self.memory[i]
            return out[indices]

        return [helper(i) for i in range(len(self.memory))]


class GamesClustering:
    def __init__(self, kernel):
        self.k = kernel

    def get_labels(self):
        return self(self.k.get_x())

    def __call__(self, z, **kwargs):
        labels = None
        for i in self.k.kernels.keys():
            k = self.k.kernels[i]
            labelsk = core.get_matrix(k.dnm(z, k.get_x()).min(1))
            if labels is None:
                labels = labelsk
            else:
                labels = np.concatenate([labels, labelsk], axis=1)
        return core.get_matrix(labels.argmin(1), dtype=int)


[docs] class GamesKernel(Kernel): """A specific type of kernel for deterministic policies, handling clustering """ def __init__( self, latent_distribution=None, max_size=None, next_states=None, **kwargs ): if kwargs.get("latent_shape", None) is None: self.latent_distribution = None else: shape = kwargs["latent_shape"] self.latent_distribution = get_normals(shape[0], shape[1]) self.max_size = max_size self.map_ = None params = kwargs.get("HJBModel", kwargs) super().__init__(**kwargs) self.clustering = self.set_clustering() self.update_kernels = False self.next_states = next_states self.kernels = {} # self.gamma = gamma def default_clustering_functor(self) -> callable: return lambda: GamesClustering(self) def __call__(self, z, **kwargs): if len(self.kernels) == 0: return super().__call__(z, **kwargs) if len(self.kernels) == 1: return self.kernels[0](z, **kwargs) z = core.get_matrix(z) dim = self.kernels[0].get_fx().shape[1] knm = np.zeros([z.shape[0], dim]) mapped_indices = self.clustering(z) mapped_indices = map_invertion(mapped_indices) for key in mapped_indices.keys(): indices = list(mapped_indices[key]) knm[indices] += self.kernels[key](z[indices]) return knm class helper: xs, fxs, ds, vals, rs, ns = None, None, None, None, None, None def __init__(self, k): self.ref_kernel = k def __call__(self, k): d = k.knm(z, k.get_x()) # vals = k(z) arg_x = d.argmax(1) x, fx, d, r, n = ( k.get_x()[arg_x], k.get_fx()[arg_x], gather(d, arg_x), k.games[3][arg_x], k.games[2][arg_x], ) if self.xs is None: self.xs, self.fxs, self.ds, self.rs, self.ns = x, fx, d, r, n # self.vals=vals else: self.xs, self.fxs, self.ds, self.rs, self.ns = ( np.concatenate([self.xs, x], axis=0), np.concatenate([self.fxs, fx], axis=0), np.concatenate([self.ds, d], axis=0), np.concatenate([self.rs, r], axis=0), np.concatenate([self.ns, n], axis=0), ) # self.vals=np.concatenate([self.vals,vals], axis = 1) helper_ = helper(self) [helper_(k) for k in self.kernels.values()] # knm = helper_.vals.max(1)[:,None] # return knm # knm = codpy.conditioning.ConditionerKernel(x=helper_.xs,y=helper_.fxs).get_transition_kernel()(z, reg=1e-9) # knm = Kernel(x=helper_.xs)(z) kernel_ptr = Kernel(x=helper_.xs).get_kernel() knm = codpy.core.KerOp.projection( x=helper_.xs, y=helper_.xs, z=z, # fx=np.concatenate([helper_.fxs, helper_.rs], axis=1), fx=helper_.fxs, reg=1e-4, order=2, kernel_ptr=kernel_ptr, ) # knm = self.gamma*knm[:,[0]] +knm[:,[1]] return knm def add_kernel(self, k, max_kernel=1e30, **kwargs) -> None: if self.get_x() is None: self.set(x=k.get_x(), fx=k.get_fx()) else: if self.get_x().shape[0] < 1000000: x, fx = self.get_x(), self.get_fx() x = np.concatenate([x, k.get_x()], axis=0) fx = np.concatenate([fx, k.get_fx()], axis=0) self.set(x=x, fx=fx) else: self.add(y=k.get_x(), fy=k.get_fx(), **kwargs) self.kernels[len(self.kernels) % max_kernel] = k return self
[docs] class GamesKernelClassifier(GamesKernel): """A specific type of kernel for stochastic policies. Outputs probabilities """ def __init__(self, **kwargs): super().__init__(**kwargs) def __call__(self, z, **kwargs): knm = super().__call__(z, **kwargs) if len(self.kernels) == 1: return knm return softmax(knm, axis=1) def set_fx( self, fx: np.ndarray, set_polynomial_regressor: bool = True, clip=Alg.probas_projection, **kwargs, ) -> None: fx_ = fx if fx is not None: if clip is not None: fx_ = clip(fx, axis=1) debug = np.where(fx_ < 1e-9, 1e-9, fx) fx_ = np.log(debug) super().set_fx(fx_, set_polynomial_regressor=set_polynomial_regressor, **kwargs)
[docs] def rl_hot_encoder(actions, actions_dim): """Hot encodes actions over actions_dim. :param actions: :class:`numpy.ndarray`. :param actions_dim: :class:`int` The dimension of the actions. :return: :class:`pandas.DataFrame` """ out = hot_encoder(pd.DataFrame(np.float64(actions)), cat_cols_include=[0]) if out.shape[1] != actions_dim: for i in range(actions_dim): col_name = "0_" + str(i) + ".0" if len(selection.get_starting_cols(out, [col_name])) == 0: out[col_name] = 0.0 pass out = out.reindex(sorted(out.columns), axis=1) return out.values
def Verhulst(probs, advantages): # d/dt pi(t) = pi(t)(1-pi(t)) A(t) (Verhulst) # pi(t^{n+1}) = 1/ ( 1.+ (1 / pi(t^{n} - 1) \exp(-A(t^n)(t^{n+1}-t^{n})) ) # advantages = advantages-core.get_matrix(advantages.mean(1)) out = np.log(probs) + advantages out = softmax(out, axis=1) return out
[docs] class KAgent: """ Basic KAgent. Has most of the usefull methods for other Reinforcement Learning algorithms. """
[docs] def __init__( self, actions_dim, state_dim, gamma=0.99, kernel_type=GamesKernel, **kwargs ): """Initializes the KAgent with the given parameters. Every agent has an actor and critic kernel. Some classes might not use both. :param actions_dim: :class:`int` The action dimension of the environment. :param state_dim: :class:`int` The state dimension of the environment. :param gamma: :class:`float` Discount factor. :param kernel_type: :class:`codpy.kernel.Kernel` Type of kernel to be used as actor and critic. :type kwargs: :class:`dict` """ self.kernel_type = kernel_type self.actions_dim = actions_dim self.state_dim = state_dim self.gamma = gamma self.all_actions_ = None params = kwargs.get("KActor", {}) self.actor = self.kernel_type(gamma=gamma, **params) self.target = self.kernel_type(gamma=gamma, **params) params = kwargs.get("KCritic", {}) self.critic = self.kernel_type(gamma=gamma, **params) params = kwargs.get("KNextStates", {}) self.next_states = self.kernel_type(gamma=gamma, **params) params = kwargs.get("Rewards", {}) self.rewards = self.kernel_type(gamma=gamma, **params) self.replay_buffer = ReplayBuffer(gamma=gamma, **kwargs) self.eps_threshold = kwargs.get("eps_threshold", 0.0)
def get_expectation_kernel(self, games, **kwargs): return get_expectation_kernel(games, **kwargs) def get_conditioned_kernel(self, games, expectation_kernel, **kwargs): return get_conditioned_kernel( games, expectation_kernel=expectation_kernel, **kwargs ) states, actions, next_states, rewards, returns, dones = games states_actions = self.all_states_actions(states) expected_states = expectation_kernel(states_actions) # noise = next_states-expected_states params = kwargs.get("HJBModel", kwargs) out = codpy.conditioning.ConditionerKernel( x=np.concatenate([expected_states, actions], axis=1), y=next_states, expectation_kernel=expectation_kernel, **params, ) return out
[docs] def compute_returns(self, states, actions, next_states, rewards, dones, **kwargs): """ Computes $G_t = R_t + \gamma G_{t+1}$ for the given history. :param states: :class:`np.ndarray` The states of the game, in reverse order. :param actions: :param next_states: :param rewards: :param dones: :return: :class:`np.ndarray` """ returns, next_return = [], 0.0 # for t in reversed(range(len(rewards))): # we already reversed time for t in range(len(rewards)): next_return = rewards[t] + self.gamma * next_return returns.append(next_return) return np.array(returns)
def bellman_error( self, games, value_function, policy=None ): states, actions, next_states, rewards, returns, dones = games states_actions = np.concatenate([states, actions], axis=1) next_states_actions = self.all_states_actions(next_states) error = self.gamma * value_function(next_states_actions).reshape( [states_actions.shape[0], self.actions_dim] ) if policy is None: error = core.get_matrix(error.max(1)) else: error = core.get_matrix((error * policy).sum(1)) error = value_function(states_actions) - rewards - error return error def optimal_bellman_error(self,games,value_function): return self.bellman_error(games,value_function)
[docs] def bellman_optimal_action( self, games, q_value_function ): """Computes the optimal actions for the given $Q(s,a)$ function, with $$Q^*(s,a) = R(s,a) + \gamma \max_{a'} Q^{\pi}(s',a')$$. :param games: :class:`tuple` SARSD in reverse order. :param q_value_function: :class:`codpy.kernel.Kernel` The Q-value function. :return: :class:`np.ndarray` The optimal actions one hot encoded. """ states, actions, next_states, rewards, returns, dones = games states_actions = np.concatenate([states, actions], axis=1) next_states_actions = self.all_states_actions(next_states) error = self.gamma * q_value_function(next_states_actions).reshape(actions.shape) + rewards error -= q_value_function(states_actions) actions = error.argmax(1)[:,None] return rl_hot_encoder(actions,self.actions_dim)
[docs] def update_probabilities( self, advantages, games, last_policy, dt=None, kernel=None, clip=None,**kwargs ): """Updates the policies for advantage-based algorithms. The advantage either is $\\nabla_{y} Q^\pi_k(\cdot)$ for Policy Gradient methods, or $R(s,a) + \gamma V^{\pi}(s') - V^{\pi}(s)$ for ActorCritic. It does normalize the advantages and then computes the new policy as an interpolation between the last policy and the new one. :param advantages: :class:`np.ndarray` :param games: :class:`tuple` SARSD in reverse order. :param last_policy: :class:`np.ndarray` The last policy. :return: :class:`codpy.kernel.Kernel` The new policy. """ ##this function assumes that advantages[i,j]=KCritic([states[i],j]) states, actions, next_states, rewards, returns, dones = games advantages -= advantages.mean(axis=1)[:, None] if dt is None: dt = advantages # dt = np.where(np.fabs(dt) < 1e-8,0.,.1/dt) # dt /= (dt * dt).mean(axis=1)[:,None] + 1e-9 dt /= ((dt*dt).mean(axis=1)[:, None] + 1e-9) if clip is not None: dt = clip*dt/ (np.fabs(dt).max(axis=1)[:, None] + 1e-9) # if dt < 1e-9: # dt = 0.0 # else: # dt = advantages / dt # dt -= dt.mean(axis=1)[:,None] interpolated_policy = Verhulst(last_policy, dt) params = kwargs.get("KActor", {}) # if kernel is None: return GamesKernelClassifier( x=states, y=states, fx=interpolated_policy, clip=None, **params )
# else: # kernel.update(z=states,fz=interpolated_policy) # return kernel def get_state_action_value_function(self, games, policy=None, max_y=None,kernel=None,**kwargs): states, actions, next_states, rewards, returns, dones = games if policy is None: policy = actions if max_y is None: max_y = sys.maxsize # for i in range(states.shape[0] // max_y + 1): # step= states.shape[0]/max_y # indices = [int(i*step) for i in range(max_y)] # indices = np.random.choice(states.shape[0], size=max_y, replace=False) # indices = list(range(i*max_y,min((i+1)*max_y,states.shape[0]))) # states_, actions_, next_states_, rewards_, returns_, dones_ = states[indices], actions[indices], next_states[indices], rewards[indices], returns[indices], dones[indices] states_, actions_, next_states_, rewards_, returns_, dones_ = states, actions, next_states, rewards, returns, dones policy_ = policy states_actions = np.concatenate([states_, actions_], axis=1) next_states_actions = self.all_states_actions(next_states_) value_function = self.kernel_type(**kwargs.get("KActor", {})) if kernel is None: value_function.set(x=states_actions, y=states_actions,fx=returns_) else: value_function.copy(kernel) knm = value_function.knm(x=states_actions, y=value_function.get_y()) projection_op = value_function.knm(x=next_states_actions, y=value_function.get_y()) def helper(i): if dones_[i] == True: return [i * self.actions_dim + j for j in range(self.actions_dim)] modif = [ item for i in range(dones_.shape[0]) if dones_[i] == True for item in helper(i) ] projection_op[modif] = 0.0 projection_op = projection_op.reshape( [ states_actions.shape[0], projection_op.shape[0] // states_actions.shape[0], value_function.get_y().shape[0] ] ) sum_policy = np.einsum("...ji,...j", projection_op, policy_) mat = knm - sum_policy * self.gamma thetas = LAlg.lstsq(mat, rewards) value_function.set_theta(thetas) value_function.games=(states_, actions_, next_states_, rewards_, returns_, dones_) # thetas = LAlg.prod(projection_op, rewards) # kernel.add_kernel(value_function) # check # error = self.bellman_error(states,actions,next_states, rewards,policy, value_function) return value_function
[docs] def get_derivatives_policy_state_action_value_function( self, games, policy, output_value_function=False, **kwargs ): """ Solve for $$\\nabla_{y} \\theta^\pi = \gamma \Big(K(Z,Z) - \gamma \sum_a \pi^a(S) K(W,Z) \Big)^{-1} \sum_a\Big(Q^{\pi}_k(W) \pi^a(\delta_b(a)-\pi^b)\Big)$$ Where: - $Z$ are the state actions - $W$ are the next state actions - $K(Z,Z)$ is the Gram matrix of the training points - $\gamma \sum_a \pi^a(S) K(W,Z)$ is the weighted projection operator onto the next state-actions - $Q^{\pi}_k(W)$ is the critic evaluated at the next state-actions - $\pi^a(\delta_b(a)-\pi^b)$ is an adjustment based on probability differences :param games: :class:`tuple` SARSD in reverse order. :param policy: :class:`np.ndarray` The policy to be used for weigthing the next state-actions. :return: :class:`codpy.kernel.Kernel` The derivative estimator of $Q(s,a)$ """ # return an estimator of \nabla_\pi V(\pi) where # - \pi is a policy, that is a probability distribution over actions # - V(\pi) is the value function of the policy \pi # - V(\pi) = E_{s,a} [ Q(s,a) | \pi ] states, actions, next_states, rewards, returns, dones = games states_actions = np.concatenate([states, actions], axis=1) next_states_actions = self.all_states_actions(next_states) value_function = Kernel() value_function.set(x=states_actions) knm = value_function.knm(x=states_actions, y=states_actions) projection_op = value_function.knm( x=next_states_actions, y=states_actions ).reshape([states_actions.shape[0], self.actions_dim, states_actions.shape[0]]) projection_op[[bool(d) for d in dones]] = 0.0 sum_policy = np.einsum("...ji,...j", projection_op, policy) projection_op = LAlg.lstsq(knm - sum_policy * self.gamma) thetas = LAlg.prod(projection_op, rewards) value_function.set_theta(thetas) ##end next_states_actions_values = value_function(next_states_actions).reshape( [states_actions.shape[0], self.actions_dim] ) coeffs = get_tensor_probas(policy) second_member = np.einsum("...i,...ij", next_states_actions_values, coeffs) derivative_estimator = Kernel() derivative_estimator.set_x(states_actions) derivative_estimator.set_theta(LAlg.prod(projection_op, second_member)) if output_value_function: return derivative_estimator, value_function return derivative_estimator
def get_state_value_function(self, games, policy): states, actions, next_states, rewards, returns, dones = games value_function = Kernel(x=states) operator_inv = value_function.knm( x=states, y=states ) - self.gamma * value_function.knm(x=next_states, y=states) operator = LAlg.lstsq(operator_inv) second_member = core.get_matrix((policy * rewards).sum(1)) value_function.set_theta(LAlg.prod(operator, second_member)) # def check(): # test = value_function(states)-self.gamma*value_function(next_states)-second_member # assert(np.abs(test).max() < 1e-4) # check() return value_function def format(self, sarsd, max_training_game_size=None, **kwargs): states, actions, next_states, rewards, dones = [ core.get_matrix(e) for e in sarsd ] actions = rl_hot_encoder(actions, self.actions_dim) returns = self.compute_returns( states, actions, next_states, rewards, dones, **kwargs ) dones = core.get_matrix(dones, dtype=bool) if max_training_game_size is not None: states, actions, next_states, rewards, returns, dones = ( states[-max_training_game_size:], actions[-max_training_game_size:], next_states[-max_training_game_size:], rewards[-max_training_game_size:], returns[-max_training_game_size:], dones[-max_training_game_size:], ) return states, actions, next_states, rewards, returns, dones def get_derivatives_policy_state_value_function( self, games, policy, output_value_function=False ): ##begin get_state_value_function code states, actions, next_states, rewards, returns, dones = games derivative_estimator = Kernel(x=states) operator = derivative_estimator.knm( x=states, y=states ) - self.gamma * derivative_estimator.knm(x=next_states, y=states) operator = LAlg.lstsq(operator) ##end @np.vectorize def fun(i, j, k): return rewards[i, j] * policy[i, j] * (float(j == k) - policy[i, k]) coeffs = np.fromfunction( fun, shape=[rewards.shape[0], self.actions_dim, self.actions_dim], dtype=int, ) coeffs = coeffs.sum(axis=1) # this is \nabla_ln(pi) sum_a R(s,a) \pi^a(s) = \nabla_ln(pi) E(R,\pi) # def check_derivative(policy): # #Let E(R,\pi) = sum_a R(s,a) \pi^a(s) the rewards expectation # #test if ( E(R,\pi) - E(R,\pi^\epsilon) )/epsilon = \nabla_ln(pi) E(R,\pi) . (log \pi^\epsilon - log \pi) # old_policy = policy.copy() # inc = np.random.normal(size=policy.shape) * 1e-4 # policy = softmax(np.log(policy) + inc,axis=1) # inc = np.log(policy)-np.log(old_policy) # test = ( (rewards_matrix*policy).sum(1)-(rewards_matrix*old_policy).sum(1) ) / 1e-4 # check_ = test - (coeffs*inc).sum(1)/ 1e-4 # assert(np.abs(check_).max() < 1e-4) # pass # check_derivative(policy) derivative_estimator.set_theta(LAlg.prod(operator, coeffs)) # def check(): # #check the derivated Bellman relation \nabla_ln \pi ( V(S_T) - gamma V(S_{T+1} - \E(R) ) =0) # test = derivative_estimator(states)-self.gamma*derivative_estimator(next_states)-coeffs # assert(np.abs(test).max() < 1e-4) # check() if output_value_function: value_function = Kernel(x=states) second_member = (policy * rewards).reshape(policy.shape).sum(1) value_function.set_theta(LAlg.prod(operator, second_member)) return derivative_estimator, value_function return derivative_estimator def get_greedy_policy(self, states, q_values): shape_ = [states.shape[0], self.actions_dim] states_actions = self.all_states_actions(states) greedy_policy = q_values(states_actions).reshape(shape_).argmax(1) out = np.zeros(shape_) fill(out, np.ones([shape_[0]]), greedy_policy) return out def get_all_actions(self): if self.all_actions_ is None: self.all_actions_ = core.get_matrix(range(0, self.actions_dim)) self.all_actions_ = hot_encoder( pd.DataFrame(self.all_actions_), cat_cols_include=[0] ).values return self.all_actions_ def all_states_actions(self, states, all_actions=None): if all_actions == None: all_actions = self.get_all_actions() def helper(i, j): out = np.concatenate([states[[i]], all_actions[[j]]], axis=1) return out test = np.concatenate( [ helper(i, j) for i in range(states.shape[0]) for j in range(all_actions.shape[0]) ], axis=0, ) return test def optimal_bellman_solver( self, thetas, next_states_projection, knm, rewards, maxiter=5, reg=1e-9, tol=1e-6, verbose=False, **kwargs, ): theta = thetas.copy() shape = [next_states_projection.shape[0] // self.actions_dim, self.actions_dim] def bellman_error(theta, full_output=False): error = ( LAlg.prod(next_states_projection, theta).reshape(shape) * self.gamma + rewards ) max_indices = error.argmax(1) error = gather(error, max_indices) error -= LAlg.prod(knm, theta) if full_output == True: return np.fabs(error).mean(), max_indices return np.fabs(error).mean() error, max_indices = bellman_error(theta, full_output=True) count = 0 if error < tol: return thetas, error, max_indices while count < maxiter and error > tol: indices = [ self.actions_dim * i + max_indices[i] for i in range(len(max_indices)) ] max_projection = next_states_projection[indices] next_theta = LAlg.lstsq(knm - max_projection * self.gamma, rewards, reg) def f(x): interpolated_thetas = theta * x + next_theta * (1.0 - x) out = bellman_error(interpolated_thetas) return out # return next_theta,f(0.) xmin, fval, iter, funcalls = optimize.brent( f, brack=(0.0, 1.0), maxiter=maxiter, full_output=True ) if fval >= error: break theta = theta * xmin + next_theta * (1.0 - xmin) error, max_indices = bellman_error(theta, full_output=True) count = count + 1 max_indices = LAlg.prod(next_states_projection, theta).reshape(shape).argmax(1) indices = [ self.actions_dim * i + max_indices[i] for i in range(len(max_indices)) ] if verbose: print("Computed global error Bellman mean: ", fval, " iter: ", count) return theta, fval, indices
[docs] def optimal_states_values_function( self, games, kernel=None, full_output=False, **kwargs ): """ Find a kernel regressor solving the Bellman equation $$Q(s,a) = r + \gamma \max_{a'} Q(s',a')$$ The algorithm computes $Q^n(s,a)$ iteratively : 1. Solve $\\theta^{\pi}_{n+1/2} = \Big( K(Z, Z) - \gamma \sum_a \pi_{n+1/2}^a(S) K(W^a,Z)\Big)^{-1} R$ 2. Refines the parameters $\\theta_{n+1}^{\pi} = \lambda \\theta^{\pi}_{n+1/2} + (1 - \lambda) \\theta_{n}^{\pi}.$ Where: - Z is the concatenation of the states and actions - $K(Z,Z)$ is the gram matrix of current state actions pairs - $K(W^a,Z)$ is the gram matrix of the next states and actions - $\pi_{n+1/2}^a(S) = \delta_{\\arg \max q^n(S,a) }(S)$ is the max of the next Q-values, with $q^n$ the current Q-values. - $R$ is the rewards function The function then assures a limit condition on the Q-values by setting the last Q-values equal to the rewards. :param games: :class:`tuple` SARSD in reverse order. :param kernel: :class:`codpy.kernel.Kernel` Kernel to be used. If None, a kernel fit on the returns is used. :return: :class:`codpy.kernel.Kernel` The kernel with the optimal Q-values. """ states, actions, next_states, rewards, returns, dones = games states_actions = np.concatenate([states, actions], axis=1) if kernel is None or not kernel.is_valid(): kernel = self.kernel_type(x=states_actions, fx=returns, **kwargs) else: states_actions = np.concatenate([kernel.get_x(), states_actions], axis=0) rewards = np.concatenate([kernel.games[3], rewards], axis=0) next_states = np.concatenate([kernel.games[2], next_states], axis=0) next_states_actions = self.all_states_actions(next_states) _projection_ = kernel.knm(x=next_states_actions, y=kernel.get_x()) def helper(i): if dones[i] == True: return [i * self.actions_dim + j for j in range(self.actions_dim)] modif = [ item for i in range(dones.shape[0]) if dones[i] == True for item in helper(i) ] _projection_[modif] = 0.0 _knm_ = kernel.knm(x=states_actions, y=kernel.get_x()) thetas, bellman_error, indices = self.optimal_bellman_solver( thetas=kernel.get_theta(), next_states_projection=_projection_, knm=_knm_, rewards=rewards, games=games, **kwargs, ) kernel.set_theta(thetas) kernel.bellman_error = bellman_error if full_output: return kernel, bellman_error, indices return kernel
[docs] class KActorCritic(KAgent): """KActorCritic Kernel algorithm. It is policy-based and uses a :class:`GamesKernelClassifier` as the actor. """ def __call__(self, state, **kwargs): # return 1 if self.actor.is_valid(): action_probs = self.actor(core.get_matrix(state).T) # if len(self.actor.kernels) > 1: # action_probs = softmax(action_probs, axis=1) # action = action_probs.argmax() action_probs = action_probs.squeeze() action = np.random.choice(len(action_probs), p=action_probs) return action else: return np.random.randint(0, self.actions_dim)
[docs] def get_advantages(self, games, policy, **kwargs): """Compute the advantage function $$A^{\pi^a}(s) = R(s,a) + \gamma V^{\pi}(s') - V^{\pi}(s), \quad s'=S(s,a).$$ Where : - $R(s,a)$ is the rewards function - $V^{\pi}(s)$ is the value function - $S(s,a)$ is the next state function. """ states, actions, next_states, rewards, returns, dones = games value_function = self.get_state_action_value_function( games, policy, max_y=None,**kwargs ) advantages = ( value_function(self.all_states_actions(next_states)).reshape(actions.shape) * self.gamma + rewards ) advantages -= value_function(np.concatenate([states, actions], axis=1)) advantages -= core.get_matrix((advantages).mean(1)) advantages[dones.flatten()] = 0.0 return advantages,value_function
def train(self, game, max_training_game_size=None, **kwargs): params = kwargs.get("KCritic", {}) states, actions, next_states, rewards, returns, dones = self.format( game, max_training_game_size=max_training_game_size, **kwargs ) if not hasattr(self,"scores"): self.scores = [rewards.sum()] else: self.scores.append(rewards.sum()) self.replay_buffer.push( states, actions, next_states, rewards, returns, dones, **kwargs ) states, actions, next_states, rewards, returns, dones = ( self.replay_buffer.memory ) dones[0] = True games = [states, actions, next_states, rewards, returns, dones] if self.actor.is_valid(): last_policy = self.actor(states) else: last_policy = np.full( [states.shape[0], self.actions_dim], 1.0 / self.actions_dim ) last_policy = np.where(last_policy < 1e-9, 1e-9,last_policy) last_policy = np.where(last_policy > 1.-1e-9,1.- 1e-9,last_policy) advantages, value_function = self.get_advantages(games, policy=last_policy, **kwargs) # update probabilities kernel = self.update_probabilities( advantages, games, last_policy=last_policy, kernel= value_function, **kwargs ) self.actor = kernel
[docs] class KQLearning(KActorCritic): """Implements KQLearning algorithm. Uses clustering by default in the :func:`train` method. """ def __call__(self, state, **kwargs): self.eps_threshold *= 0.999 if np.random.random() > self.eps_threshold and self.critic.is_valid() == True: z = self.all_states_actions(core.get_matrix(state).T) q_values = self.critic(z) q_values += np.random.random(q_values.shape) * 1e-9 return np.argmax(q_values) return np.random.randint(0, self.actions_dim) def train( self, game, max_training_game_size=None, format=True, tol=1e-2, **kwargs, ): if format: states, actions, next_states, rewards, returns, dones = self.format( game, max_training_game_size=max_training_game_size, **kwargs ) else: states, actions, next_states, rewards, returns, dones = game if self.critic.is_valid(): returns = self.critic(np.concatenate([states, actions], axis=1)) games = states, actions, next_states, rewards, returns, dones self.replay_buffer.push( states, actions, next_states, rewards, returns, dones, capacity=sys.maxsize ) if ( len(self.replay_buffer) <= self.replay_buffer.capacity ): # and self.critic.update_kernels == False: games = self.replay_buffer.memory kernel = self.optimal_states_values_function(games, verbose=True, **kwargs) kernel.games = games if len(self.critic.kernels) == 0: self.critic.add_kernel(kernel) else: self.critic.kernels[len(self.critic.kernels) - 1] = kernel return else: kernel = self.critic.kernels[len(self.critic.kernels) - 1] kernel.set( x=kernel.get_x()[: self.replay_buffer.capacity // 2], fx=kernel.get_fx()[: self.replay_buffer.capacity // 2], ) kernel.games = [ elt[: self.replay_buffer.capacity // 2] for elt in self.replay_buffer.memory ] games = self.replay_buffer.memory games = [ elt[self.replay_buffer.capacity // 2 :] for elt in self.replay_buffer.memory ] kernel = self.optimal_states_values_function(games, verbose=True, **kwargs) kernel.games = games self.critic.add_kernel(kernel) self.replay_buffer.memory = games delete_kernels = [] for i, k in self.critic.kernels.items(): error = self.critic.kernels[i].bellman_error if error > tol and not hasattr(self.critic.kernels[i], "flag_kill_me"): kernel = self.optimal_states_values_function( k.games, kernel=k, verbose=True, **kwargs ) kernel.games = self.critic.kernels[i].games if error <= self.critic.kernels[i].bellman_error: # delete_kernels.append(i) kernel.flag_kill_me = "please" else: self.critic.kernels[i] = kernel if ( len(delete_kernels) > 0 and len(self.critic.kernels) - len(delete_kernels) > 1 ): new_kernels = {}
# count = 0 # for i in range(len(self.critic.kernels)): # if i not in delete_kernels: # new_kernels[count] = self.critic.kernels[i] # count = count+1 # self.critic.kernels = new_kernels
[docs] class PolicyGradient(KActorCritic): """ PolicyGradient Kernel algorithm. It is policy-based and uses a :class:`GamesKernelClassifier` as the actor. """ def __init__(self, **kwargs): super().__init__(**kwargs) params = kwargs.get("KCritic", {}) self.actor = GamesKernelClassifier(**params) def __call__(self, state, **kwargs): if self.actor.get_x() is not None and self.actor.get_x().shape[0] > 1: action_probs = self.actor(core.get_matrix(state).T) action_probs = action_probs.squeeze() action = np.random.choice(len(action_probs), p=action_probs) # action = action_probs.argmax() return action else: return np.random.randint(0, self.actions_dim)
[docs] def get_advantages(self, games, policy, **kwargs): """Compute $$A^{\pi}(s) = \\nabla_{y} Q^\pi_k(\cdot) = K(\cdot, Z) \\nabla_{y} \\theta^\pi.$$ :param games: :class:`tuple` SARSD in reverse order. :param policy: :class:`np.ndarray` The policy to be used for weigthing the next state-actions. :param kwargs: :class:`dict` :return: :class:`tuple` The advantages of the policy along with a kernel estimator for new advantages on state-action pairs. """ # advantage taken as A = \nabla_\pi \pi Q^{pi}(S_T,A_T), so that the overall gradient policy can be written as # d/di \pi(t) = d/d\pi Q^{pi}(S_T,A_T) # Thus formally d/dt Q^{pi}(S_T,A_T) = < \nabla_\pi \pi Q^{pi}(S_T,A_T), d/dt \pi> = | \nabla_\pi \pi Q^{pi}(S_T,A_T)|^2 states, actions, next_states, rewards, returns, dones = games derivative_estimator = self.get_derivatives_policy_state_action_value_function( games, policy, **kwargs ) states_actions = np.concatenate([states, actions], axis=1) derivative_estimations = derivative_estimator(states_actions) return derivative_estimations, derivative_estimator
[docs] class KController(KAgent): """ Implements the KController algorithm. The specificities of this algorithm is that it uses a heuristic controller to be tuned. """ def __init__(self, state_dim, actions_dim, controller, **kwargs): self.controller = controller self.x, self.y = None, None self.expectation_estimator = None self.label = kwargs.get("label", None) super().__init__(state_dim=state_dim, actions_dim=actions_dim, **kwargs)
[docs] def __call__(self, z, **kwargs): """ The internal tuned heuristic controller directly outputs the action. :param z: :class:`np.ndarray the state :return: :class:`int` """ return self.controller(z, **kwargs)
def get_reward(self, game, **kwargs): states, actions, next_states, rewards, dones = game return get_matrix(rewards).mean() def get_expectation_estimator(self, x, y, **kwargs): class explore_kernel(Kernel): def distance(self, z, **kwargs): out = get_matrix(self.dnm(x=self.get_x(), y=z).min(axis=0)) return out params = kwargs.get("KController", {}) self.expectation_kernel = explore_kernel(x=x, fx=y, **params) return self.expectation_kernel
[docs] def get_function(self, **kwargs): """Defines the function to be optimized ${L}(R_{k,\lambda_e},\\theta)$. """ self.expectation_estimator = self.get_expectation_estimator( self.x, self.y, **kwargs ) # self.min_expectation_estimator = self.expectation_estimator(self.x).flatten() # self.min_expectation_estimator.sort() # self.min_expectation_estimator = self.min_expectation_estimator[int(self.x.shape[0] * 0.9)] def function(x): expectation = self.expectation_estimator( x ) # - self.min_expectation_estimator distance = self.expectation_estimator.distance(x) return expectation # + distance return function # to cope with exploration
[docs] def train(self, game, **kwargs): """ Solves for $$\\theta_{n+1} = \\arg \max_{\\theta \in \Theta_n} \mathcal{L}(R_{k,\lambda_e},\\theta), \quad \Theta_{n} = \\bar{\\theta_e} \cup \Theta_{N,n}$$ Where $\Theta_{N,n}$ is a screening around the last $\\theta_n$ and is defined as follow: $$\Theta_{N,n} = (\\theta_n+\\alpha^n \Theta_N) \cap \Theta$$ with $\\alpha^n$ is a contracting factor and ${L}(R_{k,\lambda_e},\\theta)$ is an optimization function which can be defined and tuned based on your needs at :func:`get_function`. :param game: :class:`tuple` SARSD in reverse order. """ states, actions, next_states, rewards, dones = self.format(game, **kwargs) self.replay_buffer.push(states, actions, next_states, rewards, dones, **kwargs) reward = get_matrix(rewards) last_theta = get_matrix(self.controller.get_thetas()).T if self.x is None: self.x = get_matrix(last_theta) self.y = get_matrix(reward) else: if ( self.expectation_estimator is None or self.expectation_estimator.distance(last_theta) > 1e-9 ): self.x = np.concatenate([self.x, last_theta]) self.y = np.concatenate([self.y, reward]) if self.x.shape[0] > 2: function = self.get_function(**kwargs) last_vals = function(self.x) last_val = last_vals.max() last_val_max_min = last_val - last_vals.min() max_val, new_theta = optimization.continuous_optimizer( function, self.controller.get_distribution(), include=self.x, **kwargs, ) new_theta = get_matrix(new_theta).T if last_val < max_val: # debug to see if parameters are updated tot = 1 pass self.controller.set_thetas(new_theta) else: self.controller.set_thetas(self.controller.get_distribution()(1))
def get_conditioned_kernel( games, expectation_kernel, base_class=codpy.conditioning.PiKernel, # base_class=codpy.conditioning.ConditionerKernel, # base_class=codpy.conditioning.NadarayaWatsonKernel, **kwargs, ): class ConditionerKernel(base_class): def __init__(self, **kwargs): super().__init__(**kwargs) states, actions, next_states, rewards, returns, dones = games expected_states = expectation_kernel(np.concatenate([states, actions], axis=1)) # permutation = Kernel(x=states).map(next_states, distance="norm22").permutation # states = states[permutation] # expected_states = expected_states[permutation] params = kwargs.get("HJBModel", kwargs) out = ConditionerKernel( x=np.concatenate([expected_states, actions], axis=1), y=next_states, expectation_kernel=expectation_kernel, **params, ) # out = ConditionerKernel(x=states_actions,y=next_states,expectation_kernel=expectation_kernel,**params) return out def get_expectation_kernel(games, **kwargs): class expectation_kernel(Kernel): def __init__(self, **kwargs): super().__init__(**kwargs) self.state_dim = kwargs.get("state_dim", None) def __call__(self, z, **kwargs): out = super().__call__(z) + z[:, : self.state_dim] return out states, actions, next_states, rewards, returns, dones = games states_actions = np.concatenate([states, actions], axis=1) params = kwargs.get("HJBModel", kwargs) out = expectation_kernel(x=states_actions, fx=next_states - states, **params) noise = next_states - out(states_actions) out.set_fx(out.get_fx() + noise.mean(axis=0)) return out
[docs] class KQLearningHJB(KQLearning): """Implements the Hamilton-Jacobi-Bellman Q-learning algorithm. """
[docs] def optimal_states_values_function( self, games, kernel=None, full_output=False, maxiter=5, reorder=False, **kwargs ): """ Solve the Bellman equation $$Q^{\pi}(s_t,a_t) = R(s_t,a_t) + \gamma \int \left[ \sum_{a \in \mathcal{A}} \pi^a(s_t) Q^{\pi}(s',a)\\right] d \mathbb{P}_S(s',s_t,a_t).$$ Numerically, we effectively solve for the set of parameters $\\theta$ of the kernel $K$ such that: $$\\theta = \Big( K(Z, Z) - \gamma \sum_{a} \pi^a(S)\Gamma(P^a) K(P, Z)\Big)^{-1} R, \quad P = \{ S+F_k(S,a), a \}$$ Where: - $K(Z,Z)$ is the kernel matrix of the states and actions. - $P$ is the set of the predicted next state actions possibilities. - $\Gamma(P^a)$ is the transition probability matrix. - $K(P,Z)$ is the kernel matrix of the predicted next state actions and the states and actions. """ states, actions, next_states, rewards, returns, dones = games states_actions = np.concatenate([states, actions], axis=1) if kernel is None or not kernel.is_valid(): kernel = self.kernel_type(x=states_actions, fx=returns, **kwargs) expectation_kernel_ = self.get_expectation_kernel(games, **kwargs) conditioned_kernel_ = self.get_conditioned_kernel( games=games, expectation_kernel=expectation_kernel_, **kwargs ) states_actions = np.concatenate([states, actions], axis=1) next_expected_states_actions = expectation_kernel_(states_actions) next_expected_all_states_actions = self.all_states_actions( next_expected_states_actions ) _projection_ = kernel.knm(x=next_expected_all_states_actions, y=kernel.get_x()) _knm_ = kernel.knm(x=states_actions, y=kernel.get_x()) def helper(i): if dones[i] == True: return [i * self.actions_dim + j for j in range(self.actions_dim)] modif = [ item for i in range(dones.shape[0]) if dones[i] == True for item in helper(i) ] _projection_[modif] = 0.0 thetas, bellman_error, indices = self.optimal_bellman_solver( thetas=kernel.get_theta(), next_states_projection=_projection_, knm=_knm_, rewards=rewards, maxiter=10, games=games, **kwargs, ) thetas = conditioned_kernel_.get_transition( y=next_expected_all_states_actions[indices, : self.state_dim], x=np.concatenate([next_expected_states_actions, actions], axis=1), fx=thetas, ) kernel.set_theta(thetas) # fx = conditioned_kernel_.get_transition( # y = next_expected_all_states_actions[indices,:self.state_dim], # x = np.concatenate([next_expected_states_actions, actions], axis=1), # fx = kernel.get_fx() # ) # kernel.set_fx(fx) kernel.bellman_error = bellman_error if full_output: return kernel, bellman_error, indices return kernel
class KActorCriticHJB(KActorCritic): def train(self, game, max_training_game_size=None, **kwargs): params = kwargs.get("KCritic", {}) states, actions, next_states, rewards, returns, dones = self.format( game, max_training_game_size=max_training_game_size, **kwargs ) self.replay_buffer.push( states, actions, next_states, rewards, returns, dones, **kwargs ) states, actions, next_states, rewards, returns, dones = ( self.replay_buffer.memory.copy() ) games = [states, actions, next_states, rewards, returns, dones] if self.actor.get_x() is not None and self.actor.get_x().shape[0] > 1: last_policy = self.actor(states) else: last_policy = np.full( [states.shape[0], self.actions_dim], 1.0 / self.actions_dim ) advantages = self.get_advantages(games, policy=last_policy, **kwargs) # update probabilities kernel = self.update_probabilities( advantages, games, last_policy=last_policy, **kwargs ) self.actor = kernel def get_state_action_value_function( self, games, policy, verbose=False, kernel=None, **kwargs ): states, actions, next_states, rewards, returns, dones = games states_actions = np.concatenate([states, actions], axis=1) # next_states_actions = self.all_states_actions(next_states) if kernel is None: kernel = self.kernel_type(x=states_actions, fx=returns, **kwargs) expectation_kernel_ = self.get_expectation_kernel(games, **kwargs) conditioned_kernel_ = self.get_conditioned_kernel( games, expectation_kernel=expectation_kernel_, **kwargs ) next_expected_states = expectation_kernel_(states_actions) next_expected_states_actions = self.all_states_actions(next_expected_states) next_expected_states = np.concatenate([next_expected_states, actions], axis=1) _projection_ = kernel.knm(x=next_expected_states_actions, y=kernel.get_x()) _knm_ = kernel.knm(x=states_actions, y=kernel.get_x()) _projection_ = conditioned_kernel_.get_transition_kernel()( next_expected_states_actions[:, : self.state_dim], next_expected_states_actions, _projection_, ) _projection_ = _projection_.reshape( [ states_actions.shape[0], _projection_.shape[0] // states_actions.shape[0], states_actions.shape[0], ] ) sum_policy = np.einsum("...ji,...j", _projection_, policy) thetas = LAlg.lstsq(_knm_ - sum_policy * self.gamma, rewards) kernel.set_theta(thetas) # check # error = self.bellman_error(states,actions,next_states, rewards,policy, value_function) return kernel