diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index 20b7c3f..bfa362b 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -8,6 +8,7 @@ import ( "io" "net" "net/http" + "os" "path" "reflect" "strconv" @@ -396,26 +397,7 @@ func (bl *FxBlockchain) serve(w http.ResponseWriter, r *http.Request) { // Convert to the full request format fullReq := convertMobileRequestToFullRequest(&mobileReq) - - // Re-encode the converted request (marshaling it to JSON) - newBody, err := json.Marshal(fullReq) - if err != nil { - log.Error("failed to marshal converted request: %v", err) - http.Error(w, "", http.StatusInternalServerError) - return - } - - // Create a new HTTP request using the converted body - newRequest, err := http.NewRequest(r.Method, r.URL.String(), bytes.NewReader(newBody)) - if err != nil { - log.Error("failed to create new request: %v", err) - http.Error(w, "", http.StatusInternalServerError) - return - } - - // Copy headers from the original request - newRequest.Header = r.Header - bl.handleAction(http.MethodPost, actionManifestBatchUpload, from, w, r) + bl.handleActionManifestBatchUpload(http.MethodPost, actionManifestBatchUpload, from, w, r, fullReq) }, actionManifestRemove: func(from peer.ID, w http.ResponseWriter, r *http.Request) { bl.handleAction(http.MethodPost, actionManifestRemove, from, w, r) @@ -1243,3 +1225,246 @@ func (bl *FxBlockchain) GetMembers() map[peer.ID]common.MemberStatus { } return copy } + +const creatorPeerIDFilePath = "/internal/.tmp/pool_%d_creator.tmp" + +func (bl *FxBlockchain) getClusterEndpoint(ctx context.Context, poolID int) (string, error) { + // 1. Check for existing creator peer ID + if creatorPeerID, err := loadCreatorPeerID(poolID); err == nil { + return creatorPeerID + ".functionyard.fula.network", nil + } + + // 2. Fetch pool details + poolDetails, err := fetchPoolDetails(ctx, bl, poolID) + if err != nil { + return "", err + } + + // 3. Extract creatorClusterPeerID + creatorClusterPeerID := poolDetails.Creator + + // 4. Fetch user details + userDetails, err := bl.fetchUserDetails(ctx, poolID, creatorClusterPeerID) + if err != nil { + return "", err + } + + // 5. Find the peer_id + peerID := findPeerID(creatorClusterPeerID, userDetails) + + // 6. Save creator peer ID + err = saveCreatorPeerID(poolID, peerID) + if err != nil { + log.Debugf("Error saving creator peer ID to file: %v", err) + } + + return peerID + ".functionyard.fula.network", nil +} +func loadCreatorPeerID(poolID int) (string, error) { + filename := fmt.Sprintf(creatorPeerIDFilePath, poolID) + data, err := os.ReadFile(filename) + if err != nil { + if os.IsNotExist(err) { + return "", nil // File doesn't exist, not an error + } + return "", fmt.Errorf("error reading creator peer ID file: %w", err) + } + return strings.TrimSpace(string(data)), nil +} + +func saveCreatorPeerID(poolID int, peerID string) error { + filename := fmt.Sprintf(creatorPeerIDFilePath, poolID) + return os.WriteFile(filename, []byte(peerID), 0644) // Adjust permissions if needed +} + +func fetchPoolDetails(ctx context.Context, bl *FxBlockchain, poolID int) (*Pool, error) { + req := PoolListRequestWithPoolId{PoolID: poolID} + action := "actionPoolList" + + responseBody, statusCode, err := bl.callBlockchain(ctx, "POST", action, req) + if err != nil { + return nil, fmt.Errorf("blockchain call error: %w, status code: %d", err, statusCode) + } + + if statusCode != http.StatusOK { + var errMsg map[string]interface{} + if jsonErr := json.Unmarshal(responseBody, &errMsg); jsonErr == nil { + return nil, fmt.Errorf("unexpected response status: %d, message: %s, description: %s", + statusCode, errMsg["message"], errMsg["description"]) + } else { + return nil, fmt.Errorf("unexpected response status: %d, body: %s", statusCode, string(responseBody)) + } + } + + var response PoolListResponse + if err := json.Unmarshal(responseBody, &response); err != nil { + return nil, err + } + + for _, pool := range response.Pools { + if pool.PoolID == poolID { + return &pool, nil + } + } + + return nil, fmt.Errorf("pool with ID %d not found", poolID) +} + +func (bl *FxBlockchain) fetchUserDetails(ctx context.Context, poolID int, creatorClusterPeerID string) (*PoolUserListResponse, error) { + req := PoolUserListRequest{ + PoolID: poolID, + RequestPoolID: poolID, + } + action := "actionPoolUserList" + + responseBody, statusCode, err := bl.callBlockchain(ctx, "POST", action, req) + if err != nil { + return nil, fmt.Errorf("blockchain call error: %w, status code: %d", err, statusCode) + } + + if statusCode != http.StatusOK { + var errMsg map[string]interface{} + if jsonErr := json.Unmarshal(responseBody, &errMsg); jsonErr == nil { + return nil, fmt.Errorf("unexpected response status: %d, message: %s, description: %s", + statusCode, errMsg["message"], errMsg["description"]) + } else { + return nil, fmt.Errorf("unexpected response status: %d, body: %s", statusCode, string(responseBody)) + } + } + + var response PoolUserListResponse + if err := json.Unmarshal(responseBody, &response); err != nil { + return nil, err + } + + return &response, nil +} + +func findPeerID(creatorClusterPeerID string, userDetails *PoolUserListResponse) string { + for _, user := range userDetails.Users { + if user.Account == creatorClusterPeerID { + return user.PeerID + } + } + return "" +} + +func (bl *FxBlockchain) handleActionManifestBatchUpload(method string, action string, from peer.ID, w http.ResponseWriter, r *http.Request, req ManifestBatchUploadRequest) { + log := log.With("action", action, "from", from) + res := reflect.New(responseTypes[action]).Interface() + defer r.Body.Close() + //TODO: Ensure it is optimized for long-running calls + ctx, cancel := context.WithTimeout(r.Context(), time.Second*time.Duration(bl.timeout)) + defer cancel() + response, statusCode, err := bl.callBlockchain(ctx, method, action, req) + if err != nil { + log.Error("failed to call blockchain: %v", err) + w.WriteHeader(statusCode) + // Try to parse the error and format it as JSON + var errMsg map[string]interface{} + if jsonErr := json.Unmarshal(response, &errMsg); jsonErr != nil { + // If the response isn't JSON or can't be parsed, use a generic message + errMsg = map[string]interface{}{ + "message": "An error occurred", + "description": err.Error(), + } + } + json.NewEncoder(w).Encode(errMsg) + return + } + // If status code is not 200, attempt to format the response as JSON + if statusCode != http.StatusOK { + w.WriteHeader(statusCode) + var errMsg map[string]interface{} + if jsonErr := json.Unmarshal(response, &errMsg); jsonErr == nil { + // If it's already a JSON, write it as is + w.Write(response) + } else { + // If it's not JSON, wrap the response in the expected format + errMsg = map[string]interface{}{ + "message": "Error", + "description": string(response), + } + json.NewEncoder(w).Encode(errMsg) + } + return + } else { + // Call ipfs-cluster of the pool with replication request + clusterEndPoint, err := bl.getClusterEndpoint(ctx, req.PoolID) + if err != nil { + w.WriteHeader(http.StatusFailedDependency) + errMsg := map[string]interface{}{ + "message": "Error", + "description": string(err.Error()), + } + json.NewEncoder(w).Encode(errMsg) + return + } + if clusterEndPoint != "" { + // Construct the request for the cluster endpoint + replicationRequest := struct { + PoolID int `json:"pool_id"` + Cids []string `json:"cids"` + }{ + PoolID: req.PoolID, + Cids: req.Cid, + } + + reqBody, err := json.Marshal(replicationRequest) + if err != nil { + w.WriteHeader(http.StatusFailedDependency) + errMsg := map[string]interface{}{ + "message": "Error", + "description": string(err.Error()), + } + json.NewEncoder(w).Encode(errMsg) + return + } + + // Make the HTTP request to the cluster endpoint + clusterURL := fmt.Sprintf("%s/pins", clusterEndPoint) + resp, err := http.Post(clusterURL, "application/json", bytes.NewBuffer(reqBody)) + if err != nil { + w.WriteHeader(http.StatusFailedDependency) + errMsg := map[string]interface{}{ + "message": "Error", + "description": string(err.Error()), + } + json.NewEncoder(w).Encode(errMsg) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusAccepted { + // Replication request was not accepted + w.WriteHeader(resp.StatusCode) // Set the appropriate status code + responseBody, _ := io.ReadAll(resp.Body) + errMsg := map[string]interface{}{ + "message": "Replication Error", + "description": string(responseBody), + } + json.NewEncoder(w).Encode(errMsg) + return + } + + // Replication request was accepted - continue with existing response handling + } else { + w.WriteHeader(http.StatusFailedDependency) + errMsg := map[string]interface{}{ + "message": "Error", + "description": "Wrong cluster endpoint", + } + json.NewEncoder(w).Encode(errMsg) + return + } + } + w.WriteHeader(http.StatusAccepted) + err1 := json.Unmarshal(response, &res) + if err1 != nil { + log.Error("failed to format response: %v", err1) + } + + if err := json.NewEncoder(w).Encode(res); err != nil { + log.Error("failed to write response: %v", err) + } +} diff --git a/blockchain/interface.go b/blockchain/interface.go index 37b2c2d..20952f3 100644 --- a/blockchain/interface.go +++ b/blockchain/interface.go @@ -170,6 +170,10 @@ type PoolRequestsResponse struct { type PoolListRequest struct { } +type PoolListRequestWithPoolId struct { + PoolID int `json:"pool_id"` +} + type PoolUserListRequest struct { PoolID int `json:"pool_id"` RequestPoolID int `json:"request_pool_id"`