Skip to content

Commit

Permalink
Refactored pid code to use Trajectory class
Browse files Browse the repository at this point in the history
  • Loading branch information
danielbinschmid committed Jan 19, 2024
1 parent da88b1c commit 4ddfb85
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 53 deletions.
46 changes: 19 additions & 27 deletions runnables/pid.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@
in the X-Y plane, around point (0, -.3).
"""
import os
import sys
sys.path.append("..")


import time
import argparse
from datetime import datetime
import pdb
import math
import random
import numpy as np
import pybullet as p
import matplotlib.pyplot as plt
from trajectories import TrajectoryFactory

from gym_pybullet_drones.utils.enums import DroneModel, Physics
from gym_pybullet_drones.envs.CtrlAviary import CtrlAviary
Expand Down Expand Up @@ -66,14 +64,13 @@ def run(
INIT_RPYS = np.array([[0., 0., 0.]])

#### Initialize a circular trajectory ######################
NUM_WP = 4
TARGET_POS = np.array([
[0,0,1],
[1,0,1],
[1,1,1],
[0,1,1],
], np.float32)
wp = 0
NUM_STEPS = 4
STEP_SIZE = 1 / NUM_STEPS
TARGET_TRAJECTORY = TrajectoryFactory.get_linear_square_trajectory(
square_scale=1,
time_scale=1
)
current_step = 0


#### Create the environment ################################
Expand Down Expand Up @@ -115,10 +112,11 @@ def run(

#### Step the simulation ###################################
obs, reward, terminated, truncated, info = env.step(action)
target_position = TARGET_TRAJECTORY.get_waypoint(current_step).coordinate
#### Compute control for the current way point #############
action, _, _ = ctrl.computeControlFromState(control_timestep=env.CTRL_TIMESTEP,
state=obs[0],
target_pos=np.hstack([TARGET_POS[wp, 0:3]]),
target_pos=np.hstack([target_position]),
# target_pos=INIT_XYZS[j, :] + TARGET_POS[wp_counters[j], :],
target_rpy=INIT_RPYS[0]
)
Expand All @@ -127,19 +125,13 @@ def run(

#### Go to the next way point and loop #####################
position = obs[0][0:3]
distance = np.linalg.norm(TARGET_POS[wp, :3] - position)

distance = np.linalg.norm(target_position - position)
velocity = np.linalg.norm(obs[0][10:13])
if distance < 0.1 and velocity < 0.05:
wp = wp + 1 if wp < (NUM_WP-1) else 0

# #### Log the simulation ####################################
# logger.log(
# drone=1,
# timestamp=i/env.CTRL_FREQ,
# state=obs[0],
# control=np.hstack([TARGET_POS[wp, 0:2], INIT_XYZS[0][2], INIT_RPYS[0], np.zeros(6)])
# # control=np.hstack([INIT_XYZS[j, :]+TARGET_POS[wp_counters[j], :], INIT_RPYS[j, :], np.zeros(6)])
# )
current_step = (current_step + STEP_SIZE) % 1

##### Log the simulation ####################################

#### Printout
env.render()
Expand Down
1 change: 1 addition & 0 deletions trajectories/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .trajectory_factory import TrajectoryFactory
Empty file removed trajectories/test.py
Empty file.
58 changes: 58 additions & 0 deletions trajectories/trajectories.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from .waypoint import Waypoint
import numpy as np
import math

class Trajectory:

def __init__(self) -> None:
pass

def get_waypoint(self, time: float) -> Waypoint:
"""
Needs to be implemented by subclass. time \in [0,1]
"""
raise NotImplementedError()

class SquareLinearTrajectory(Trajectory):
"""
Linear interpolation for four corner points of a 2D square.
"""

square_scale: float
time_scale: float
corner_points: np.ndarray

def __init__(self, square_scale: float=1, time_scale: float=1) -> None:
super().__init__()
self.square_scale = square_scale
self.time_scale = time_scale
self.corner_points = np.array([
[0 ,0 ,self.square_scale],
[self.square_scale ,0 ,self.square_scale],
[self.square_scale ,self.square_scale ,self.square_scale],
[0 ,self.square_scale ,self.square_scale],
], np.float32)

def get_waypoint(self, time: float):
assert time >= 0 and time <= 1
time = time * 4

if int(time) == time:
# exactly at corner point
target_pos = self.corner_points[int(time)]
else:
# in-between two points, linear interpolation
cur_corner = math.floor(time)
upcomimg_corner = math.ceil(time)
diff = upcomimg_corner - cur_corner
target_pos = cur_corner + (time - int(time)) * (diff)

target_wp = Waypoint(
coordinate=target_pos,
timestamp=time * self.time_scale
)

return target_wp



33 changes: 7 additions & 26 deletions trajectories/trajectory_factory.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,14 @@
from time import thread_time
import numpy as np

class Waypoint:
"""
3D coordinate with a timestamp.
"""

coordinate: np.ndarray
"""R^3 coordinate"""
timestamp: float
"""Time as float"""

def __init__(self, coordinate: np.ndarray, timestamp: float) -> None:
self.coordinate = coordinate
self.timestamp = timestamp

from .trajectories import Trajectory, SquareLinearTrajectory

class TrajectoryFactory:
"""
Generates waypoints
Wrapper class for instantiating target trajectories.
"""

def __init__(self) -> None:
pass

def get_waypoint(self) -> Waypoint:
waypoint = Waypoint(
coordinate=np.asarray([0,0,0]),
timestamp=0
@classmethod
def get_linear_square_trajectory(cls, square_scale: float, time_scale: float) -> Trajectory:
return SquareLinearTrajectory(
square_scale, time_scale
)
return waypoint


16 changes: 16 additions & 0 deletions trajectories/waypoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import numpy as np
from enum import Enum, auto

class Waypoint:
"""
3D coordinate with a timestamp.
"""

coordinate: np.ndarray
"""R^3 coordinate"""
timestamp: float
"""Time as float"""

def __init__(self, coordinate: np.ndarray, timestamp: float) -> None:
self.coordinate = coordinate
self.timestamp = timestamp

0 comments on commit 4ddfb85

Please sign in to comment.