Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drive failure tests #69

Merged
merged 5 commits into from
Dec 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions drivers/node/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,23 @@ type ErrFailedToRunCommand struct {
func (e *ErrFailedToRunCommand) Error() string {
return fmt.Sprintf("Failed to run command on: %v. Cause: %v", e.Addr, e.Cause)
}

// ErrFailedToYankDrive error type when we fail to simulate drive failure
type ErrFailedToYankDrive struct {
Node Node
Cause string
}

func (e *ErrFailedToYankDrive) Error() string {
return fmt.Sprintf("Failed to yank a drive on: %v. Cause: %v", e.Node.Name, e.Cause)
}

// ErrFailedToRecoverDrive error type when we fail to simulate drive failure
type ErrFailedToRecoverDrive struct {
Node Node
Cause string
}

func (e *ErrFailedToRecoverDrive) Error() string {
return fmt.Sprintf("Failed to recover a drive on: %v. Cause: %v", e.Node.Name, e.Cause)
}
21 changes: 21 additions & 0 deletions drivers/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ type Driver interface {

// TestConnection tests connection to given node. returns nil if driver can connect to given node
TestConnection(node Node, options ConnectionOpts) error

// YankDrive simulates a failure on the provided drive on the given node.
// It returns the UUID of the drive which can be used to recover it back
YankDrive(node Node, driveNameToFail string, options ConnectionOpts) (string, error)

// RecoverDrive recovers the given drive from failure on the given node.
RecoverDrive(node Node, driveNameToRecover string, driveUUID string, options ConnectionOpts) error
}

// Register registers the given node driver
Expand Down Expand Up @@ -158,6 +165,20 @@ func (d *notSupportedDriver) Systemctl(node Node, service string, options System
}
}

func (d *notSupportedDriver) YankDrive(node Node, driveToFail string, options ConnectionOpts) (string, error) {
return "", &errors.ErrNotSupported{
Type: "Function",
Operation: "YankDrive()",
}
}

func (d *notSupportedDriver) RecoverDrive(node Node, driveToRecover string, driveID string, options ConnectionOpts) error {
return &errors.ErrNotSupported{
Type: "Function",
Operation: "RecoverDrive()",
}
}

func (d *notSupportedDriver) TestConnection(node Node, options ConnectionOpts) error {
return &errors.ErrNotSupported{
Type: "Function",
Expand Down
60 changes: 60 additions & 0 deletions drivers/node/ssh/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io/ioutil"
"os"
"strconv"
"strings"
"time"

"github.com/portworx/sched-ops/task"
Expand Down Expand Up @@ -181,6 +182,65 @@ func (s *ssh) ShutdownNode(n node.Node, options node.ShutdownNodeOpts) error {
return nil
}

func (s *ssh) YankDrive(n node.Node, driveNameToFail string, options node.ConnectionOpts) (string, error) {
// Currently only works for iSCSI drives
// TODO: Make it generic (Add support dev mapper devices)
addr, err := s.getAddrToConnect(n, options)
if err != nil {
return "", &node.ErrFailedToYankDrive{
Node: n,
Cause: fmt.Sprintf("failed to get node address due to: %v", err),
}
}

// Get the HBA number for the drive which would be then used to recover the drive
hbaCmd := "lsscsi | grep -n " + driveNameToFail + "| awk -F\":\" '{print $2}'" + "| awk -F\"[\" '{print $2}'"
driveID, err := s.doCmd(addr, hbaCmd, false)
if err != nil {
return "", &node.ErrFailedToYankDrive{
Node: n,
Cause: fmt.Sprintf("unable to find HBA attribute of the drive %v due to: %v", driveNameToFail, err),
}
}

driveID = strings.TrimRight(driveID, "\n")
driveNameToFail = strings.Trim(driveNameToFail, "/")
devices := strings.Split(driveNameToFail, "/")

// Disable the block device, so that it returns IO errors
yankCommand := "echo 1 > /sys/block/" + devices[len(devices)-1] + "/device/delete"

_, err = s.doCmd(addr, yankCommand, false)
if err != nil {
return "", &node.ErrFailedToYankDrive{
Node: n,
Cause: fmt.Sprintf("failed to yank drive %v due to: %v", driveNameToFail, err),
}
}
return driveID, nil
}

func (s *ssh) RecoverDrive(n node.Node, driveNameToRecover string, driveUUIDToRecover string, options node.ConnectionOpts) error {
addr, err := s.getAddrToConnect(n, options)
if err != nil {
return &node.ErrFailedToRecoverDrive{
Node: n,
Cause: fmt.Sprintf("failed to get node address due to: %v", err),
}
}

// Enable the drive by rescaning
recoverCmd := "echo \" - - -\" > /sys/class/scsi_host/host" + driveUUIDToRecover + "/scan"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this have a '/' after host and before driverUUID

Copy link
Member Author

@adityadani adityadani Nov 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No.
It is no a subdirectory under host, but it appends it to host to build the correct directory.

_, err = s.doCmd(addr, recoverCmd, false)
if err != nil {
return &node.ErrFailedToRecoverDrive{
Node: n,
Cause: fmt.Sprintf("Unable to rescan the drive (%v): %v", driveNameToRecover, err),
}
}
return nil
}

func (s *ssh) FindFiles(path string, n node.Node, options node.FindOpts) (string, error) {
addr, err := s.getAddrToConnect(n, options.ConnectionOpts)
if err != nil {
Expand Down
53 changes: 45 additions & 8 deletions drivers/scheduler/k8s/specs/mysql/px-mysql-app.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,25 @@
apiVersion: v1
kind: Service
metadata:
name: mysql
labels:
app: mysql
spec:
ports:
- port: 3306
selector:
app: mysql
---
apiVersion: apps/v1beta2
kind: Deployment
metadata:
name: mysql
labels:
app: mysql
spec:
selector:
matchLabels:
app: mysql
strategy:
rollingUpdate:
maxSurge: 1
Expand All @@ -17,7 +34,6 @@ spec:
metadata:
labels:
app: mysql
version: "1"
spec:
containers:
- image: mysql:5.6
Expand All @@ -30,17 +46,38 @@ spec:
volumeMounts:
- name: mysql-persistent-storage
mountPath: /var/lib/mysql
volumes:
- name: mysql-persistent-storage
persistentVolumeClaim:
claimName: mysql-data
---
apiVersion: apps/v1beta2
kind: Deployment
metadata:
name: mysqlslap
labels:
app: mysql
spec:
selector:
matchLabels:
app: mysql
strategy:
rollingUpdate:
maxSurge: 1
maxUnavailable: 1
type: RollingUpdate
replicas: 1
template:
metadata:
labels:
app: mysql
spec:
containers:
- name: mysqlslap
image: adityadani/mysqlslap
imagePullPolicy: "Always"
env:
- name: MYSQL_ROOT_PASSWORD
value: password
- name: MYSQL_SERVICE_HOST
value: "127.0.0.1"
- name: MYSQL_SERVICE_PORT
value: "3306"
volumes:
- name: mysql-persistent-storage
persistentVolumeClaim:
claimName: mysql-data
value: "3306"
2 changes: 1 addition & 1 deletion drivers/scheduler/k8s/specs/mysql/px-mysql-storage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
name: px-mysql-sc
provisioner: kubernetes.io/portworx-volume
parameters:
repl: "2"
repl: "3"
---
##### Portworx persistent volume claim
kind: PersistentVolumeClaim
Expand Down
16 changes: 14 additions & 2 deletions drivers/volume/portworx/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ func (e *ErrFailedToDeleteVolume) Error() string {
return fmt.Sprintf("Failed to delete volume: %v due to err: %v", e.ID, e.Cause)
}

// ErrFailedToWaitForPx error type for failing to wait for px to be up on a node
// ErrFailedToWaitForPx error type for failing to wait for PX to be up on a node
type ErrFailedToWaitForPx struct {
// Node is the node on which px was waited upon
// Node is the node on which PX was waited upon
Node node.Node
// Cause is the underlying cause of the error
Cause string
Expand All @@ -61,3 +61,15 @@ type ErrFailedToUpgradeVolumeDriver struct {
func (e *ErrFailedToUpgradeVolumeDriver) Error() string {
return fmt.Sprintf("Failed to upgrade volume driver to version: %v due to err: %v", e.Version, e.Cause)
}

// ErrFailedToRecoverDriver error type for failing to recover PX on a node
type ErrFailedToRecoverDriver struct {
// Node is the node on which PX failed to recover on
Node node.Node
// Cause is the underlying cause of the error
Cause string
}

func (e *ErrFailedToRecoverDriver) Error() string {
return fmt.Sprintf("Failed to wait for px to be up on: %v due to err: %v", e.Node.Name, e.Cause)
}
94 changes: 93 additions & 1 deletion drivers/volume/portworx/portworx.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/libopenstorage/openstorage/api"
"github.com/libopenstorage/openstorage/api/client"
clusterclient "github.com/libopenstorage/openstorage/api/client/cluster"
volumeclient "github.com/libopenstorage/openstorage/api/client/volume"
"github.com/libopenstorage/openstorage/api/spec"
Expand All @@ -27,6 +28,9 @@ const (
pxdClientSchedUserAgent = "pxd-sched"
pxdRestPort = 9001
pxVersionLabel = "PX Version"
maintenanceOpRetries = 3
enterMaintenancePath = "/entermaintenance"
exitMaintenancePath = "/exitmaintenance"
)

type portworx struct {
Expand Down Expand Up @@ -152,6 +156,78 @@ func (d *portworx) CleanupVolume(name string) error {
return nil
}

func (d *portworx) GetStorageDevices(n node.Node) ([]string, error) {
const (
storageInfoKey = "STORAGE-INFO"
resourcesKey = "Resources"
pathKey = "path"
)
pxNode, err := d.getClusterManager().Inspect(n.VolDriverNodeID)
if err != nil {
return nil, err
}

storageInfo, ok := pxNode.NodeData[storageInfoKey]
if !ok {
return nil, fmt.Errorf("Unable to find storage info for node: %v", n.Name)
}
storageInfoMap := storageInfo.(map[string]interface{})

resourcesMapIntf, ok := storageInfoMap[resourcesKey]
if !ok || resourcesMapIntf == nil {
return nil, fmt.Errorf("Unable to find resource info for node: %v", n.Name)
}
resourcesMap := resourcesMapIntf.(map[string]interface{})

devPaths := []string{}
for _, v := range resourcesMap {
resource := v.(map[string]interface{})
path, _ := resource[pathKey]
if path == "" {
continue
}
devPaths = append(devPaths, path.(string))
}
return devPaths, nil
}

func (d *portworx) RecoverDriver(n node.Node) error {

t := func() (interface{}, error) {
return nil, d.maintenanceOp(n, enterMaintenancePath)
}

if _, err := task.DoRetryWithTimeout(t, 1*time.Minute, 10*time.Second); err != nil {
return err
}
t = func() (interface{}, error) {
apiNode, err := d.getClusterManager().Inspect(n.Name)
if err != nil {
return nil, err
}
if apiNode.Status == api.Status_STATUS_MAINTENANCE {
return nil, nil
}
return nil, fmt.Errorf("Node %v is not in Maintenance mode", n.Name)
}

if _, err := task.DoRetryWithTimeout(t, 1*time.Minute, 10*time.Second); err != nil {
return &ErrFailedToRecoverDriver{
Node: n,
Cause: err.Error(),
}
}
t = func() (interface{}, error) {
return nil, d.maintenanceOp(n, exitMaintenancePath)
}

if _, err := task.DoRetryWithTimeout(t, 1*time.Minute, 10*time.Second); err != nil {
return err
}

return nil
}

func (d *portworx) ValidateCreateVolume(name string, params map[string]string) error {
t := func() (interface{}, error) {
vols, err := d.getVolDriver().Inspect([]string{name})
Expand Down Expand Up @@ -476,7 +552,7 @@ func (d *portworx) setDriver() error {
}

func (d *portworx) testAndSetEndpoint(endpoint string) error {
pxEndpoint := fmt.Sprintf("http://%s:%d", endpoint, pxdRestPort)
pxEndpoint := d.constructURL(endpoint)
cClient, err := clusterclient.NewClusterClient(pxEndpoint, "v1")
if err != nil {
return err
Expand Down Expand Up @@ -533,6 +609,22 @@ func (d *portworx) getClusterManager() cluster.Cluster {
d.setDriver()
}
return d.clusterManager

}

func (d *portworx) maintenanceOp(n node.Node, op string) error {
url := d.constructURL(n.Addresses[0])
c, err := client.NewClient(url, "", "")
if err != nil {
return err
}
req := c.Get().Resource(op)
resp := req.Do()
return resp.Error()
}

func (d *portworx) constructURL(ip string) string {
return fmt.Sprintf("http://%s:%d", ip, pxdRestPort)
}

func init() {
Expand Down
8 changes: 8 additions & 0 deletions drivers/volume/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ type Driver interface {

// RandomizeVolumeName randomizes the volume name from the given name
RandomizeVolumeName(name string) string

// RecoverDriver will recover a volume driver from a failure/storage down state.
// This could be used by a volume driver to recover itself from any underlying storage
// failure.
RecoverDriver(n node.Node) error

// GetStorageDevices returns the list of storage devices used by the given node.
GetStorageDevices(n node.Node) ([]string, error)
}

var (
Expand Down
Loading