Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ibc): implement channel open pipelining for IBC #10402

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions a3p-integration/proposals/n:upgrade-next/initial.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ import test from 'ava';
import { getVatDetails } from '@agoric/synthetic-chain';

const vats = {
network: { incarnation: 1 },
ibc: { incarnation: 1 },
network: { incarnation: 2 },
ibc: { incarnation: 2 },
localchain: { incarnation: 1 },
orchestration: { incarnation: 0 },
orchestration: { incarnation: 1 },
transfer: { incarnation: 1 },
walletFactory: { incarnation: 5 },
zoe: { incarnation: 3 },
Expand Down
13 changes: 13 additions & 0 deletions golang/cosmos/app/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,19 @@ func unreleasedUpgradeHandler(app *GaiaApp, targetUpgrade string) func(sdk.Conte
)
}

// Upgrade for pieces of Orch to provide channel open pipelining.
pipelineOrchStep, err := buildProposalStepWithArgs(
"@agoric/builders/scripts/vats/upgrade-orch-core.js",
"defaultProposalBuilder",
map[string]any{
"bundleFilter": []string{"ibc", "network", "orchestration"},
},
)
if err != nil {
return nil, err
}
CoreProposalSteps = append(CoreProposalSteps, pipelineOrchStep)

// Each CoreProposalStep runs sequentially, and can be constructed from
// one or more modules executing in parallel within the step.
CoreProposalSteps = append(CoreProposalSteps,
Expand Down
17 changes: 12 additions & 5 deletions golang/cosmos/x/vibc/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,27 +86,29 @@ func (k Keeper) GetChannel(ctx sdk.Context, portID, channelID string) (channelty
// ReceiveChanOpenInit wraps the keeper's ChanOpenInit function.
func (k Keeper) ReceiveChanOpenInit(ctx sdk.Context, order channeltypes.Order, connectionHops []string,
portID, rPortID, version string,
) error {
) (string, error) {
capName := host.PortPath(portID)
portCap, ok := k.GetCapability(ctx, capName)
if !ok {
return sdkioerrors.Wrapf(porttypes.ErrInvalidPort, "could not retrieve port capability at: %s", capName)
return "", sdkioerrors.Wrapf(porttypes.ErrInvalidPort, "could not retrieve port capability at: %s", capName)
}
counterparty := channeltypes.Counterparty{
PortId: rPortID,
}
channelID, chanCap, err := k.channelKeeper.ChanOpenInit(ctx, order, connectionHops, portID, portCap, counterparty, version)
if err != nil {
return err
return "", err
}
chanCapName := host.ChannelCapabilityPath(portID, channelID)
err = k.ClaimCapability(ctx, chanCap, chanCapName)
if err != nil {
return err
return "", err
}

k.channelKeeper.WriteOpenInitChannel(ctx, portID, channelID, order, connectionHops, counterparty, version)
return nil

// Return the fresh local channel ID in order to enable the caller to pipeline.
return channelID, nil
}

// ReceiveSendPacket wraps the keeper's SendPacket function.
Expand Down Expand Up @@ -160,6 +162,11 @@ func (k Keeper) WriteAcknowledgement(ctx sdk.Context, chanCap *capability.Capabi
return k.channelKeeper.WriteAcknowledgement(ctx, chanCap, packet, ack)
}

func (k Keeper) ReceiveWriteOpenConfirmChannel(ctx sdk.Context, portID, channelID string) error {
k.channelKeeper.WriteOpenConfirmChannel(ctx, portID, channelID)
return nil
}

// ReceiveWriteOpenTryChannel wraps the keeper's WriteOpenTryChannel function.
func (k Keeper) ReceiveWriteOpenTryChannel(ctx sdk.Context, packet ibcexported.PacketI, order channeltypes.Order, connectionHops []string, version string) error {
portID := packet.GetDestPort()
Expand Down
1 change: 1 addition & 0 deletions golang/cosmos/x/vibc/types/expected_keepers.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type ChannelKeeper interface {
portCap *capability.Capability, counterparty channel.Counterparty, version string) (string, *capability.Capability, error)
WriteOpenInitChannel(ctx sdk.Context, portID, channelID string, order channel.Order,
connectionHops []string, counterparty channel.Counterparty, version string)
WriteOpenConfirmChannel(ctx sdk.Context, portID, channelID string)
WriteOpenTryChannel(ctx sdk.Context, portID, channelID string, order channel.Order,
connectionHops []string, counterparty channel.Counterparty, version string)
ChanCloseInit(ctx sdk.Context, portID, channelID string, chanCap *capability.Capability) error
Expand Down
36 changes: 18 additions & 18 deletions golang/cosmos/x/vibc/types/ibc_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,16 @@ func (im IBCModule) OnChanOpenInit(
}

type ChannelOpenTryEvent struct {
*vm.ActionHeader `actionType:"IBC_EVENT"`
Event string `json:"event" default:"channelOpenTry"`
Target string `json:"target,omitempty"`
Order string `json:"order"`
ConnectionHops []string `json:"connectionHops"`
PortID string `json:"portID"`
ChannelID string `json:"channelID"`
Counterparty channeltypes.Counterparty `json:"counterparty"`
Version string `json:"version"`
AsyncVersions bool `json:"asyncVersions"`
*vm.ActionHeader `actionType:"IBC_EVENT"`
Event string `json:"event" default:"channelOpenTry"`
Target string `json:"target,omitempty"`
Order string `json:"order"`
ConnectionHops []string `json:"connectionHops"`
PortID string `json:"portID"`
ChannelID string `json:"channelID"`
Counterparty channeltypes.Counterparty `json:"counterparty"`
CounterpartyVersion string `json:"counterpartyVersion"`
AsyncVersions bool `json:"asyncVersions"`
}

func (im IBCModule) OnChanOpenTry(
Expand All @@ -121,13 +121,13 @@ func (im IBCModule) OnChanOpenTry(
counterpartyVersion string,
) (string, error) {
event := ChannelOpenTryEvent{
Order: orderToString(order),
ConnectionHops: connectionHops,
PortID: portID,
ChannelID: channelID,
Counterparty: counterparty,
Version: counterpartyVersion,
AsyncVersions: AsyncVersions,
Order: orderToString(order),
ConnectionHops: connectionHops,
PortID: portID,
ChannelID: channelID,
Counterparty: counterparty,
CounterpartyVersion: counterpartyVersion,
AsyncVersions: AsyncVersions,
}

err := im.impl.PushAction(ctx, event)
Expand All @@ -142,7 +142,7 @@ func (im IBCModule) OnChanOpenTry(

if !event.AsyncVersions {
// We have to supply a synchronous version, so just echo back the one they sent.
return event.Version, nil
return event.CounterpartyVersion, nil
}

// Use an empty version string to indicate that the VM explicitly (possibly
Expand Down
18 changes: 15 additions & 3 deletions golang/cosmos/x/vibc/types/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ var (
type ReceiverImpl interface {
ReceiveSendPacket(ctx sdk.Context, packet exported.PacketI) (uint64, error)
ReceiveWriteAcknowledgement(ctx sdk.Context, packet exported.PacketI, ack exported.Acknowledgement) error
ReceiveChanOpenInit(ctx sdk.Context, order channeltypes.Order, hops []string, sourcePort, destinationPort, version string) error
ReceiveChanOpenInit(ctx sdk.Context, order channeltypes.Order, hops []string, sourcePort, destinationPort, version string) (string, error)
ReceiveWriteOpenConfirmChannel(ctx sdk.Context, sourcePort, sourceChannel string) error
ReceiveWriteOpenTryChannel(ctx sdk.Context, packet exported.PacketI, order channeltypes.Order, connectionHops []string, version string) error
ReceiveChanCloseInit(ctx sdk.Context, sourcePort, sourceChannel string) error
ReceiveBindPort(ctx sdk.Context, sourcePort string) error
Expand Down Expand Up @@ -118,7 +119,7 @@ func (ir Receiver) Receive(cctx context.Context, jsonRequest string) (jsonReply
packet := channeltypes.NewPacket(
msg.Packet.Data, 0,
msg.Packet.SourcePort, msg.Packet.SourceChannel,
msg.Packet.DestinationPort, msg.Packet.DestinationChannel,
"", "",
msg.Packet.TimeoutHeight, timeoutTimestamp,
)
seq, err := impl.ReceiveSendPacket(ctx, packet)
Expand All @@ -143,12 +144,23 @@ func (ir Receiver) Receive(cctx context.Context, jsonRequest string) (jsonReply
err = impl.ReceiveWriteAcknowledgement(ctx, msg.Packet, ack)

case "startChannelOpenInit":
err = impl.ReceiveChanOpenInit(
var channelID string
channelID, err = impl.ReceiveChanOpenInit(
ctx, stringToOrder(msg.Order), msg.Hops,
msg.Packet.SourcePort,
msg.Packet.DestinationPort,
msg.Version,
)
if err == nil {
var bz []byte
bz, err = json.Marshal(channelID)
if err == nil {
jsonReply = string(bz)
}
}

case "confirmOpenExecuted":
err = impl.ReceiveWriteOpenConfirmChannel(ctx, msg.Packet.SourcePort, msg.Packet.SourceChannel)

case "startChannelCloseInit":
err = impl.ReceiveChanCloseInit(ctx, msg.Packet.SourcePort, msg.Packet.SourceChannel)
Expand Down
8 changes: 5 additions & 3 deletions packages/boot/tools/supports.ts
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,11 @@ export const makeSwingsetTestKit = async (
case `${BridgeId.DIBC}:IBC_METHOD`:
case `${BridgeId.VTRANSFER}:IBC_METHOD`: {
switch (obj.method) {
case 'startChannelOpenInit':
pushInbound(BridgeId.DIBC, icaMocks.channelOpenAck(obj));
return undefined;
case 'startChannelOpenInit': {
const obj2 = icaMocks.channelOpenAck(obj);
pushInbound(BridgeId.DIBC, obj2);
return obj2.channelID;
}
case 'sendPacket': {
if (protoMsgMockMap[obj.packet.data]) {
return ackLater(obj, protoMsgMockMap[obj.packet.data]);
Expand Down
31 changes: 22 additions & 9 deletions packages/builders/scripts/vats/upgrade-orch-core.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,36 @@
import { makeHelpers } from '@agoric/deploy-script-support';

const bundleSources = {
ibc: '@agoric/vats/src/vat-ibc.js',
network: '@agoric/vats/src/vat-network.js',
localchain: '@agoric/vats/src/vat-localchain.js',
orchestration: '@agoric/orchestration/src/vat-orchestration.js',
transfer: '@agoric/vats/src/vat-transfer.js',
};

/** @type {import('@agoric/deploy-script-support/src/externalTypes.js').CoreEvalBuilder} */
export const defaultProposalBuilder = async ({ publishRef, install }) =>
harden({
export const defaultProposalBuilder = async (
{ publishRef, install },
opts = {},
) => {
/** @type {{ bundleFilter: string[] | undefined}} */
const { bundleFilter } = opts;
const bundleRefs = Object.fromEntries(
Object.entries(bundleSources)
.filter(([name]) => !bundleFilter || bundleFilter.includes(name))
.map(([name, source]) => [name, publishRef(install(source))]),
);
return harden({
sourceSpec: '@agoric/vats/src/proposals/upgrade-orch-core-proposal.js',
getManifestCall: [
'getManifestForUpgradingOrchCore',
{
bundleRefs: {
ibc: publishRef(install('@agoric/vats/src/vat-ibc.js')),
network: publishRef(install('@agoric/vats/src/vat-network.js')),
localchain: publishRef(install('@agoric/vats/src/vat-localchain.js')),
transfer: publishRef(install('@agoric/vats/src/vat-transfer.js')),
},
bundleRefs,
},
],
});
};

/** @type {import('@agoric/deploy-script-support/src/externalTypes.js').DeployScriptFunction} */
export default async (homeP, endowments) => {
const { writeCoreProposal } = await makeHelpers(homeP, endowments);
await writeCoreProposal('upgrade-network', defaultProposalBuilder);
Expand Down
34 changes: 26 additions & 8 deletions packages/network/src/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ function throwIfInvalidPortName(specifiedName) {
* @typedef {object} ConnectionOpts
* @property {Endpoint[]} addrs
* @property {Remote<Required<ConnectionHandler>>[]} handlers
* @property {MapStore<number, Connection>} conns
* @property {MapStore<0 | 1, Connection>} conns
* @property {WeakSetStore<Closable>} current
* @property {0|1} l
* @property {0|1} r
Expand Down Expand Up @@ -106,13 +106,19 @@ const prepareHalfConnection = (zone, { watch, allVows, finalizer }) => {
},
{
connection: {
getLocalAddress() {
async getLocalAddress() {
const { addrs, l } = this.state;
return addrs[l];
if (typeof addrs[l] === 'string') {
return addrs[l];
}
return watch(addrs[l], this.facets.memoizeAddressWatcher, l);
},
getRemoteAddress() {
async getRemoteAddress() {
const { addrs, r } = this.state;
return addrs[r];
if (typeof addrs[r] === 'string') {
return addrs[r];
}
return watch(addrs[r], this.facets.memoizeAddressWatcher, r);
},
/** @param {Bytes} packetBytes */
async send(packetBytes) {
Expand Down Expand Up @@ -152,6 +158,18 @@ const prepareHalfConnection = (zone, { watch, allVows, finalizer }) => {
return watch(innerVow, this.facets.rethrowUnlessMissingWatcher);
},
},
memoizeAddressWatcher: {
/**
* @param {Endpoint} addr
* @param {0 | 1} side
*/
onFulfilled(addr, side) {
const addrs = [...this.state.addrs];
addrs[side] = addr;
this.state.addrs = harden(addrs);
return addr;
},
},
openConnectionAckWatcher: {
onFulfilled(ack) {
return toBytes(ack || '');
Expand Down Expand Up @@ -207,7 +225,7 @@ export const crossoverConnection = (
) => {
const detached = zone.detached();

/** @type {MapStore<number, Connection>} */
/** @type {MapStore<0 | 1, Connection>} */
const conns = detached.mapStore('addrToConnections');

/** @type {Remote<Required<ConnectionHandler>>[]} */
Expand All @@ -224,8 +242,8 @@ export const crossoverConnection = (
};

/**
* @param {number} l local side of the connection
* @param {number} r remote side of the connection
* @param {0|1} l local side of the connection
* @param {0|1} r remote side of the connection
*/
const openHalfConnection = (l, r) => {
const lconn = conns.get(l);
Expand Down
7 changes: 5 additions & 2 deletions packages/network/src/shapes.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@ export const Shape = /** @type {const} */ harden({
.optional(Shape2.Opts)
.returns(Shape2.Vow$(Shape2.Bytes)),
close: M.callWhen().returns(Shape2.Vow$(M.undefined())),
getLocalAddress: M.call().returns(Shape2.Endpoint),
getRemoteAddress: M.call().returns(Shape2.Endpoint),
getLocalAddress: M.callWhen().returns(Shape2.Vow$(Shape2.Endpoint)),
getRemoteAddress: M.callWhen().returns(Shape2.Vow$(Shape2.Endpoint)),
}),
memoizeAddressWatcher: M.interface('MemoizeAddressWatcher', {
onFulfilled: M.call(M.any()).rest(M.any()).returns(M.any()),
}),
openConnectionAckWatcher: M.interface('OpenConnectionAckWatcher', {
onFulfilled: M.call(M.any()).rest(M.any()).returns(M.any()),
Expand Down
10 changes: 5 additions & 5 deletions packages/network/src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export {};

/**
* @import {Passable, RemotableObject} from '@endo/pass-style';
* @import {PromiseVow, Remote} from '@agoric/vow';
* @import {PromiseVow, Remote, Vow} from '@agoric/vow';
*/

/**
Expand Down Expand Up @@ -86,9 +86,9 @@ export {};
* ) => PromiseVow<Bytes>} send
* Send a packet on the connection
* @property {() => PromiseVow<void>} close Close both ends of the connection
* @property {() => Endpoint} getLocalAddress Get the locally bound name of this
* @property {() => PromiseVow<Endpoint>} getLocalAddress Get the locally bound name of this
* connection
* @property {() => Endpoint} getRemoteAddress Get the name of the counterparty
* @property {() => PromiseVow<Endpoint>} getRemoteAddress Get the name of the counterparty
*/
/**
* @typedef {RemotableObject & ConnectionI} Connection
Expand Down Expand Up @@ -123,8 +123,8 @@ export {};
/**
* @typedef {object} AttemptDescription
* @property {Remote<ConnectionHandler>} handler
* @property {Endpoint} [remoteAddress]
* @property {Endpoint} [localAddress]
* @property {Endpoint | Vow<Endpoint>} [remoteAddress]
* @property {Endpoint | Vow<Endpoint>} [localAddress]
*/

/**
Expand Down
4 changes: 2 additions & 2 deletions packages/orchestration/src/utils/address.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export const makeICAChannelAddress = (
encoding,
txType,
});
return `/ibc-hop/${controllerConnectionId}/ibc-port/icahost/${ordering}/${connString}`;
return `/ibc-hop/${controllerConnectionId}/ibc-port/icahost/pipelining/true/${ordering}/${connString}`;
};
harden(makeICAChannelAddress);

Expand All @@ -58,7 +58,7 @@ export const makeICQChannelAddress = (
version = DEFAULT_ICQ_VERSION,
) => {
controllerConnectionId || Fail`controllerConnectionId is required`;
return `/ibc-hop/${controllerConnectionId}/ibc-port/icqhost/unordered/${version}`;
return `/ibc-hop/${controllerConnectionId}/ibc-port/icqhost/pipelining/true/unordered/${version}`;
};
harden(makeICQChannelAddress);

Expand Down
Loading
Loading