Skip to content

Commit

Permalink
Added replication request to cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
ehsan6sha committed Feb 27, 2024
1 parent edc3b1d commit 5400e58
Show file tree
Hide file tree
Showing 2 changed files with 249 additions and 20 deletions.
265 changes: 245 additions & 20 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"net"
"net/http"
"os"
"path"
"reflect"
"strconv"
Expand Down Expand Up @@ -396,26 +397,7 @@ func (bl *FxBlockchain) serve(w http.ResponseWriter, r *http.Request) {

// Convert to the full request format
fullReq := convertMobileRequestToFullRequest(&mobileReq)

// Re-encode the converted request (marshaling it to JSON)
newBody, err := json.Marshal(fullReq)
if err != nil {
log.Error("failed to marshal converted request: %v", err)
http.Error(w, "", http.StatusInternalServerError)
return
}

// Create a new HTTP request using the converted body
newRequest, err := http.NewRequest(r.Method, r.URL.String(), bytes.NewReader(newBody))
if err != nil {
log.Error("failed to create new request: %v", err)
http.Error(w, "", http.StatusInternalServerError)
return
}

// Copy headers from the original request
newRequest.Header = r.Header
bl.handleAction(http.MethodPost, actionManifestBatchUpload, from, w, r)
bl.handleActionManifestBatchUpload(http.MethodPost, actionManifestBatchUpload, from, w, r, fullReq)
},
actionManifestRemove: func(from peer.ID, w http.ResponseWriter, r *http.Request) {
bl.handleAction(http.MethodPost, actionManifestRemove, from, w, r)
Expand Down Expand Up @@ -1243,3 +1225,246 @@ func (bl *FxBlockchain) GetMembers() map[peer.ID]common.MemberStatus {
}
return copy
}

const creatorPeerIDFilePath = "/internal/.tmp/pool_%d_creator.tmp"

func (bl *FxBlockchain) getClusterEndpoint(ctx context.Context, poolID int) (string, error) {
// 1. Check for existing creator peer ID
if creatorPeerID, err := loadCreatorPeerID(poolID); err == nil {
return creatorPeerID + ".functionyard.fula.network", nil
}

// 2. Fetch pool details
poolDetails, err := fetchPoolDetails(ctx, bl, poolID)
if err != nil {
return "", err
}

// 3. Extract creatorClusterPeerID
creatorClusterPeerID := poolDetails.Creator

// 4. Fetch user details
userDetails, err := bl.fetchUserDetails(ctx, poolID, creatorClusterPeerID)
if err != nil {
return "", err
}

// 5. Find the peer_id
peerID := findPeerID(creatorClusterPeerID, userDetails)

// 6. Save creator peer ID
err = saveCreatorPeerID(poolID, peerID)
if err != nil {
log.Debugf("Error saving creator peer ID to file: %v", err)
}

return peerID + ".functionyard.fula.network", nil
}
func loadCreatorPeerID(poolID int) (string, error) {
filename := fmt.Sprintf(creatorPeerIDFilePath, poolID)
data, err := os.ReadFile(filename)
if err != nil {
if os.IsNotExist(err) {
return "", nil // File doesn't exist, not an error
}
return "", fmt.Errorf("error reading creator peer ID file: %w", err)
}
return strings.TrimSpace(string(data)), nil
}

func saveCreatorPeerID(poolID int, peerID string) error {
filename := fmt.Sprintf(creatorPeerIDFilePath, poolID)
return os.WriteFile(filename, []byte(peerID), 0644) // Adjust permissions if needed
}

func fetchPoolDetails(ctx context.Context, bl *FxBlockchain, poolID int) (*Pool, error) {
req := PoolListRequestWithPoolId{PoolID: poolID}
action := "actionPoolList"

responseBody, statusCode, err := bl.callBlockchain(ctx, "POST", action, req)
if err != nil {
return nil, fmt.Errorf("blockchain call error: %w, status code: %d", err, statusCode)
}

if statusCode != http.StatusOK {
var errMsg map[string]interface{}
if jsonErr := json.Unmarshal(responseBody, &errMsg); jsonErr == nil {
return nil, fmt.Errorf("unexpected response status: %d, message: %s, description: %s",
statusCode, errMsg["message"], errMsg["description"])
} else {
return nil, fmt.Errorf("unexpected response status: %d, body: %s", statusCode, string(responseBody))
}
}

var response PoolListResponse
if err := json.Unmarshal(responseBody, &response); err != nil {
return nil, err
}

for _, pool := range response.Pools {
if pool.PoolID == poolID {
return &pool, nil
}
}

return nil, fmt.Errorf("pool with ID %d not found", poolID)
}

func (bl *FxBlockchain) fetchUserDetails(ctx context.Context, poolID int, creatorClusterPeerID string) (*PoolUserListResponse, error) {
req := PoolUserListRequest{
PoolID: poolID,
RequestPoolID: poolID,
}
action := "actionPoolUserList"

responseBody, statusCode, err := bl.callBlockchain(ctx, "POST", action, req)
if err != nil {
return nil, fmt.Errorf("blockchain call error: %w, status code: %d", err, statusCode)
}

if statusCode != http.StatusOK {
var errMsg map[string]interface{}
if jsonErr := json.Unmarshal(responseBody, &errMsg); jsonErr == nil {
return nil, fmt.Errorf("unexpected response status: %d, message: %s, description: %s",
statusCode, errMsg["message"], errMsg["description"])
} else {
return nil, fmt.Errorf("unexpected response status: %d, body: %s", statusCode, string(responseBody))
}
}

var response PoolUserListResponse
if err := json.Unmarshal(responseBody, &response); err != nil {
return nil, err
}

return &response, nil
}

func findPeerID(creatorClusterPeerID string, userDetails *PoolUserListResponse) string {
for _, user := range userDetails.Users {
if user.Account == creatorClusterPeerID {
return user.PeerID
}
}
return ""
}

func (bl *FxBlockchain) handleActionManifestBatchUpload(method string, action string, from peer.ID, w http.ResponseWriter, r *http.Request, req ManifestBatchUploadRequest) {
log := log.With("action", action, "from", from)
res := reflect.New(responseTypes[action]).Interface()
defer r.Body.Close()
//TODO: Ensure it is optimized for long-running calls
ctx, cancel := context.WithTimeout(r.Context(), time.Second*time.Duration(bl.timeout))
defer cancel()
response, statusCode, err := bl.callBlockchain(ctx, method, action, req)
if err != nil {
log.Error("failed to call blockchain: %v", err)
w.WriteHeader(statusCode)
// Try to parse the error and format it as JSON
var errMsg map[string]interface{}
if jsonErr := json.Unmarshal(response, &errMsg); jsonErr != nil {
// If the response isn't JSON or can't be parsed, use a generic message
errMsg = map[string]interface{}{
"message": "An error occurred",
"description": err.Error(),
}
}
json.NewEncoder(w).Encode(errMsg)
return
}
// If status code is not 200, attempt to format the response as JSON
if statusCode != http.StatusOK {
w.WriteHeader(statusCode)
var errMsg map[string]interface{}
if jsonErr := json.Unmarshal(response, &errMsg); jsonErr == nil {
// If it's already a JSON, write it as is
w.Write(response)
} else {
// If it's not JSON, wrap the response in the expected format
errMsg = map[string]interface{}{
"message": "Error",
"description": string(response),
}
json.NewEncoder(w).Encode(errMsg)
}
return
} else {
// Call ipfs-cluster of the pool with replication request
clusterEndPoint, err := bl.getClusterEndpoint(ctx, req.PoolID)
if err != nil {
w.WriteHeader(http.StatusFailedDependency)
errMsg := map[string]interface{}{
"message": "Error",
"description": string(err.Error()),
}
json.NewEncoder(w).Encode(errMsg)
return
}
if clusterEndPoint != "" {
// Construct the request for the cluster endpoint
replicationRequest := struct {
PoolID int `json:"pool_id"`
Cids []string `json:"cids"`
}{
PoolID: req.PoolID,
Cids: req.Cid,
}

reqBody, err := json.Marshal(replicationRequest)
if err != nil {
w.WriteHeader(http.StatusFailedDependency)
errMsg := map[string]interface{}{
"message": "Error",
"description": string(err.Error()),
}
json.NewEncoder(w).Encode(errMsg)
return
}

// Make the HTTP request to the cluster endpoint
clusterURL := fmt.Sprintf("%s/pins", clusterEndPoint)
resp, err := http.Post(clusterURL, "application/json", bytes.NewBuffer(reqBody))
if err != nil {
w.WriteHeader(http.StatusFailedDependency)
errMsg := map[string]interface{}{
"message": "Error",
"description": string(err.Error()),
}
json.NewEncoder(w).Encode(errMsg)
return
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusAccepted {
// Replication request was not accepted
w.WriteHeader(resp.StatusCode) // Set the appropriate status code
responseBody, _ := io.ReadAll(resp.Body)
errMsg := map[string]interface{}{
"message": "Replication Error",
"description": string(responseBody),
}
json.NewEncoder(w).Encode(errMsg)
return
}

// Replication request was accepted - continue with existing response handling
} else {
w.WriteHeader(http.StatusFailedDependency)
errMsg := map[string]interface{}{
"message": "Error",
"description": "Wrong cluster endpoint",
}
json.NewEncoder(w).Encode(errMsg)
return
}
}
w.WriteHeader(http.StatusAccepted)
err1 := json.Unmarshal(response, &res)
if err1 != nil {
log.Error("failed to format response: %v", err1)
}

if err := json.NewEncoder(w).Encode(res); err != nil {
log.Error("failed to write response: %v", err)
}
}
4 changes: 4 additions & 0 deletions blockchain/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ type PoolRequestsResponse struct {
type PoolListRequest struct {
}

type PoolListRequestWithPoolId struct {
PoolID int `json:"pool_id"`
}

type PoolUserListRequest struct {
PoolID int `json:"pool_id"`
RequestPoolID int `json:"request_pool_id"`
Expand Down

0 comments on commit 5400e58

Please sign in to comment.