Skip to content

Commit

Permalink
Switch from gRPC to REST API
Browse files Browse the repository at this point in the history
* The project has switched from gRPC to REST API.

  This change isn't compatible with old job configurations. Instead of
  configuring `serv:` for every job, it configures in one place:

  ```yaml
  # Include file with keys for accessing remote jobs and authenticate remote
  # clients. The filename is relative to filename of this configuration file.
  include_keys: "keys.yaml"

  listen:
    # Serve "sink" and "source" jobs for network access.
    - addr: ":8888"
      tls_cert: "/usr/local/etc/ssl/cert.pem"
      tls_key: "/usr/local/etc/ssl/key.pem"
      zfs: true
  ```

  This configuration serves http and https API requests. `tls_cert` and
  `tls_key` are optional and needed for serving https requests.

  `keys.yaml` contains authentication keys of remote clients:

  ```yaml
  # Clients with defined authentication keys have network access to "sink" and
  # "source" jobs. The key name is their client identity name.

  # Authentication token and client_identity for me.
  - name: "a.domain.com"	  # client_identity
    key: "long and secret token"
  ```

* All transports has been replaced by `local` and `http` transports.

  `local` transport configuration looks almost the same:

  ```yaml
  jobs:
    - name: "zroot-to-zdisk"
      type: "push"
      connect:
	type: "local"
	listener_name: "zdisk"
	client_identity: "localhost"
  ```

  with one exception. `listener_name` now is a remote job name actually.

  The new `http` transport replaced all network transports. Its configuration
  look like:

  ```yaml
  jobs:
    - name: "zroot-to-server"
      type: "push"
      connect:
	type: "http"
	server: "https://server:8888"
	listener_name: "zdisk"
	client_identity: "serverkey"

    - name: "server-to-zdisk"
      type: "pull"
      connect:
	type: "http"
	server: "https://server:8888"
	listener_name: "zroot-to-client"
	client_identity: "serverkey"
  ```

  `listener_name` is a job name on the server with type of `sink` or `source`.

  `client_identity` is a key name from `keys.yaml`. That key will be sent to
  the server for authentication and the server must have a key with the same
  `key` content in `keys.yaml`. `name` can be different, because `sink` and
  `source` jobs use key name as `client_identity`.
  • Loading branch information
dsh2dsh committed Nov 5, 2024
1 parent 29ddcf8 commit a1d94c8
Show file tree
Hide file tree
Showing 63 changed files with 2,239 additions and 900 deletions.
79 changes: 79 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,85 @@ Stable version of this project can be easy installed on FreeBSD using
pkg install zrepl-dsh2dsh
```

## Breaking changes!!!

* The project has switched from gRPC to REST API.

This change isn't compatible with old job configurations. Instead of
configuring `serv:` for every job, it configures in one place:

```yaml
# Include file with keys for accessing remote jobs and authenticate remote
# clients. The filename is relative to filename of this configuration file.
include_keys: "keys.yaml"

listen:
# Serve "sink" and "source" jobs for network access.
- addr: ":8888"
tls_cert: "/usr/local/etc/ssl/cert.pem"
tls_key: "/usr/local/etc/ssl/key.pem"
zfs: true
```
This configuration serves http and https API requests. `tls_cert` and
`tls_key` are optional and needed for serving https requests.

`keys.yaml` contains authentication keys of remote clients:

```yaml
# Clients with defined authentication keys have network access to "sink" and
# "source" jobs. The key name is their client identity name.
# Authentication token and client_identity for me.
- name: "a.domain.com" # client_identity
key: "long and secret token"
```

* All transports has been replaced by `local` and `http` transports.

`local` transport configuration looks almost the same:

```yaml
jobs:
- name: "zroot-to-zdisk"
type: "push"
connect:
type: "local"
listener_name: "zdisk"
client_identity: "localhost"
```

with one exception. `listener_name` now is a remote job name actually.

The new `http` transport replaced all network transports. Its configuration
look like:

```yaml
jobs:
- name: "zroot-to-server"
type: "push"
connect:
type: "http"
server: "https://server:8888"
listener_name: "zdisk"
client_identity: "serverkey"
- name: "server-to-zdisk"
type: "pull"
connect:
type: "http"
server: "https://server:8888"
listener_name: "zroot-to-client"
client_identity: "serverkey"
```

`listener_name` is a job name on the server with type of `sink` or `source`.

`client_identity` is a key name from `keys.yaml`. That key will be sent to
the server for authentication and the server must have a key with the same
`key` content in `keys.yaml`. `name` can be different, because `sink` and
`source` jobs use key name as `client_identity`.

## Changes from [upstream](https://github.com/zrepl/zrepl):

* Fresh dependencies
Expand Down
5 changes: 5 additions & 0 deletions dist/freebsd/etc/zrepl/keys.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Clients with defined authentication keys have network access to "sink" and
# "source" jobs. The key name is their client identity name.

# - name: "localhost" # client identity name
# key: "long long and secret key"
14 changes: 10 additions & 4 deletions dist/freebsd/etc/zrepl/zrepl.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# zrepl main configuration file. For documentation, refer to
# https://zrepl.github.io/ and https://github.com/dsh2dsh/zrepl
# https://github.com/dsh2dsh/zrepl and https://zrepl.github.io/
#

# Include file with keys for accessing remote jobs and authenticate remote
# clients. The filename is relative to filename of this configuration file.
include_keys: "keys.yaml"

global:
# rpc_timeout: "1m"
zfs_bin: "/sbin/zfs"
Expand All @@ -20,6 +25,10 @@ global:
filename: "/var/log/zrepl.log"

listen:
# Serve "sink" and "source" jobs for network access on port 8001.
# - addr: ":8001"
# zfs: true

# control socket for zrepl client, like `zrepl signal` or `zrepl status`.
- unix: "/var/run/zrepl/control"
unix_mode: 0o660 # write perm for group (optional)
Expand Down Expand Up @@ -65,9 +74,6 @@ jobs:
- name: "zdisk"
type: "sink"
root_fs: "zdisk/zrepl"
serve:
type: "local"
listener_name: "zdisk"
recv:
# execpipe:
# # mbuffer |zfs receive
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/google/uuid v1.6.0
github.com/jinzhu/copier v0.4.0
github.com/juju/ratelimit v1.0.2
github.com/klauspost/compress v1.17.11
github.com/kr/pretty v0.3.1
github.com/montanaflynn/stats v0.7.1
github.com/muesli/reflow v0.3.0
Expand All @@ -42,7 +43,6 @@ require (
github.com/charmbracelet/x/term v0.2.0 // indirect
github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f // indirect
github.com/gabriel-vasile/mimetype v1.4.6 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-localereader v0.0.1 // indirect
github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6 // indirect
Expand Down
14 changes: 3 additions & 11 deletions internal/client/configcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ import (
)

var configcheckArgs struct {
format string
what string
skipCertCheck bool
format string
what string
}

var ConfigcheckCmd = &cli.Subcommand{
Expand All @@ -30,7 +29,6 @@ var ConfigcheckCmd = &cli.Subcommand{
SetupFlags: func(f *pflag.FlagSet) {
f.StringVar(&configcheckArgs.format, "format", "", "dump parsed config object [pretty|yaml|json]")
f.StringVar(&configcheckArgs.what, "what", "all", "what to print [all|config|jobs|logging]")
f.BoolVar(&configcheckArgs.skipCertCheck, "skip-cert-check", false, "skip checking cert files")
},
Run: func(ctx context.Context, subcommand *cli.Subcommand, args []string) error {
formatMap := map[string]func(interface{}){
Expand Down Expand Up @@ -59,14 +57,8 @@ var ConfigcheckCmd = &cli.Subcommand{

var hadErr bool

parseFlags := config.ParseFlagsNone

if configcheckArgs.skipCertCheck {
parseFlags |= config.ParseFlagsNoCertCheck
}

// further: try to build jobs
confJobs, err := job.JobsFromConfig(subcommand.Config(), parseFlags)
confJobs, _, err := job.JobsFromConfig(subcommand.Config())
if err != nil {
err := fmt.Errorf("cannot build jobs from config: %w", err)
if configcheckArgs.what == "jobs" {
Expand Down
63 changes: 42 additions & 21 deletions internal/client/jsonclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ func WithHTTPClient(doer HttpRequestDoer) ClientOption {
}
}

func WithRequestEditorFn(fn RequestEditorFn) ClientOption {
return func(c *Client) error {
c.WithRequestEditorFn(fn)
return nil
}
}

// WithRequestEditorFn allows setting up a callback function, which will be
// called right before sending the request. This can be used to mutate the
// request.
Expand Down Expand Up @@ -165,38 +172,52 @@ func (self *Client) Do(req *http.Request, out any) error {
}
defer resp.Body.Close()

var b bytes.Buffer
if resp.StatusCode != http.StatusOK {
// ignore error, just display what we got
_, _ = io.CopyN(&b, resp.Body, 1024)
return fmt.Errorf("unexpected response from %q: %v (%v)",
req.URL, resp.Status, strings.TrimSpace(b.String()))
if err := checkStatusCode(resp); err != nil {
return fmt.Errorf("unexpected response from %q: %w", req.URL, err)
}
return unmarshalBody(resp.Body, out)
}

if out != nil && out != struct{}{} {
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return fmt.Errorf("marshal response: %w", err)
}
func checkStatusCode(resp *http.Response) error {
if resp.StatusCode == http.StatusOK {
return nil
}

var b bytes.Buffer
// ignore error, just display what we got
_, _ = io.CopyN(&b, resp.Body, 1024)
return fmt.Errorf("%v: %v", resp.Status, strings.TrimSpace(b.String()))
}

func unmarshalBody(r io.Reader, v any) error {
if !canUnmarshal(v) {
return nil
} else if err := json.NewDecoder(r).Decode(&v); err != nil {
return fmt.Errorf("unmarshal response: %w", err)
}
return nil
}

func canUnmarshal(v any) bool { return v != nil && v != struct{}{} }

func (self *Client) Post(ctx context.Context, endpoint string, in, out any,
reqEditors ...RequestEditorFn,
) error {
var body io.Reader
if in != nil && in != struct{}{} {
var b bytes.Buffer
if err := json.NewEncoder(&b).Encode(in); err != nil {
return fmt.Errorf("marshal request: %w", err)
}
body = &b
}

req, err := self.NewRequest(ctx, http.MethodPost, endpoint, body,
reqEditors...)
req, err := self.postRequest(ctx, endpoint, in, reqEditors...)
if err != nil {
return err
}
return self.Do(req, out)
}

func (self *Client) postRequest(ctx context.Context, endpoint string, in any,
reqEditors ...RequestEditorFn,
) (*http.Request, error) {
var body bytes.Buffer
if canUnmarshal(in) {
if err := json.NewEncoder(&body).Encode(in); err != nil {
return nil, fmt.Errorf("marshal request: %w", err)
}
}
return self.NewRequest(ctx, http.MethodPost, endpoint, &body, reqEditors...)
}
104 changes: 104 additions & 0 deletions internal/client/jsonclient/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package jsonclient

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"slices"
"strconv"
)

const jsonLenHeader = "X-Zrepl-Json-Length"

func (self *Client) PostStream(ctx context.Context, endpoint string,
in, out any, r io.Reader, reqEditors ...RequestEditorFn,
) error {
var b bytes.Buffer
if err := json.NewEncoder(&b).Encode(in); err != nil {
return fmt.Errorf("jsonclient: marshaling json payload: %w", err)
}
b.WriteString("\n")

body := io.MultiReader(&b, r)
editors := slices.Concat([]RequestEditorFn{
func(ctx context.Context, req *http.Request) error {
req.Header.Set("Content-Type", "application/octet-stream")
req.Header.Set(jsonLenHeader, strconv.Itoa(b.Len()))
return nil
},
}, reqEditors)

req, err := self.NewRequest(ctx, http.MethodPost, endpoint, body, editors...)
if err != nil {
return err
}
return self.Do(req, out)
}

func (self *Client) PostResponseStream(ctx context.Context, endpoint string,
in, out any, reqEditors ...RequestEditorFn,
) (io.ReadCloser, error) {
req, err := self.postRequest(ctx, endpoint, in, reqEditors...)
if err != nil {
return nil, err
}

resp, err := self.Client.Do(req)
if err != nil {
return nil, fmt.Errorf("http do: %w", err)
}

if err := parseJsonPayload(resp, out); err != nil {
_, _ = io.Copy(io.Discard, resp.Body)
resp.Body.Close()
return nil, fmt.Errorf("unexpected response from %q: %w", req.URL, err)
}
return resp.Body, nil
}

func parseJsonPayload(resp *http.Response, out any) error {
if err := checkStatusCode(resp); err != nil {
return err
} else if !canUnmarshal(out) {
return nil
}
return ReadJsonPayload(resp.Header, resp.Body, out)
}

func ReadJsonPayload(h http.Header, r io.Reader, out any) error {
lenStr := h.Get(jsonLenHeader)
if lenStr == "" {
return nil
}

jsonLen, err := strconv.Atoi(lenStr)
if err != nil {
return fmt.Errorf(
"jsonclient: parsing json payload length %q: %w", lenStr, err)
} else if jsonLen == 0 {
return nil
}

lr := io.LimitReader(r, int64(jsonLen))
if err = json.NewDecoder(lr).Decode(out); err != nil {
return fmt.Errorf("jsonclient: decoding json payload %w", err)
}
return nil
}

func WriteJsonPayload(h http.Header, w io.Writer, in any) error {
var b bytes.Buffer
if err := json.NewEncoder(&b).Encode(in); err != nil {
return fmt.Errorf("jsonclient: marshaling json payload: %w", err)
}
b.WriteString("\n")

h.Set(jsonLenHeader, strconv.Itoa(b.Len()))
if _, err := io.Copy(w, &b); err != nil {
return fmt.Errorf("jsonclient: writing json payload: %w", err)
}
return nil
}
Loading

0 comments on commit a1d94c8

Please sign in to comment.