Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE!!!] [V8] Paranet sync rework (WIP) #3680

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
629 changes: 209 additions & 420 deletions src/commands/paranet/paranet-sync-command.js

Large diffs are not rendered by default.

19 changes: 9 additions & 10 deletions src/modules/blockchain/blockchain-module-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -184,16 +184,21 @@ class BlockchainModuleManager extends BaseModuleManager {
]);
}

async getParanetKnowledgeAssetsCount(blockchain, paranetId) {
return this.callImplementationFunction(blockchain, 'getParanetKnowledgeAssetsCount', [
async getParanetKnowledgeCollectionCount(blockchain, paranetId) {
return this.callImplementationFunction(blockchain, 'getParanetKnowledgeCollectionCount', [
paranetId,
]);
}

async getParanetKnowledgeAssetsWithPagination(blockchain, paranetId, offset, limit) {
async getParanetKnowledgeCollectionLocatorsWithPagination(
blockchain,
paranetId,
offset,
limit,
) {
return this.callImplementationFunction(
blockchain,
'getParanetKnowledgeAssetsWithPagination',
'getParanetKnowledgeCollectionLocatorsWithPagination',
[paranetId, offset, limit],
);
}
Expand Down Expand Up @@ -226,12 +231,6 @@ class BlockchainModuleManager extends BaseModuleManager {
return this.callImplementationFunction(blockchain, 'getDescription', [paranetId]);
}

async getParanetKnowledgeAssetLocator(blockchain, knowledgeAssetId) {
return this.callImplementationFunction(blockchain, 'getParanetKnowledgeAssetLocator', [
knowledgeAssetId,
]);
}

async paranetExists(blockchain, paranetId) {
return this.callImplementationFunction(blockchain, 'paranetExists', [paranetId]);
}
Expand Down
45 changes: 21 additions & 24 deletions src/modules/blockchain/implementation/web3-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -1036,22 +1036,30 @@ class Web3Service {
return blockTimestamp;
}

async getParanetKnowledgeAssetsCount(paranetId) {
return this.callContractFunction(
this.contracts.ParanetsRegistry,
'getKnowledgeAssetsCount',
[paranetId],
CONTRACTS.PARANETS_REGISTRY,
);
async getParanetKnowledgeCollectionCount(paranetId) {
throw new Error(`Not implemented getParanetKnowledgeCollectionCount(${paranetId})`);
// TODO: implement

// return this.callContractFunction(
// this.contracts.ParanetsRegistry,
// 'getKnowledgeAssetsCount',
// [paranetId],
// CONTRACTS.PARANETS_REGISTRY,
// );
}

async getParanetKnowledgeAssetsWithPagination(paranetId, offset, limit) {
return this.callContractFunction(
this.contracts.ParanetsRegistry,
'getKnowledgeAssetsWithPagination',
[paranetId, offset, limit],
CONTRACTS.PARANETS_REGISTRY,
async getParanetKnowledgeCollectionLocatorsWithPagination(paranetId, offset, limit) {
throw new Error(
`Not implemented getParanetKnowledgeCollectionLocatorsWithPagination(${paranetId}, ${offset}, ${limit}})`,
);
// TODO: implement

// return this.callContractFunction(
// this.contracts.ParanetsRegistry,
// 'getKnowledgeAssetsWithPagination',
// [paranetId, offset, limit],
// CONTRACTS.PARANETS_REGISTRY,
// );
}

async getParanetMetadata(paranetId) {
Expand Down Expand Up @@ -1081,17 +1089,6 @@ class Web3Service {
);
}

async getParanetKnowledgeAssetLocator(knowledgeAssetId) {
const [knowledgeAssetStorageContract, kaTokenId] = await this.callContractFunction(
this.contracts.ParanetKnowledgeAssetsRegistry,
'getKnowledgeAssetLocator',
[knowledgeAssetId],
);
const tokenId = kaTokenId.toNumber();
const knowledgeAssetLocator = { knowledgeAssetStorageContract, tokenId };
return knowledgeAssetLocator;
}

async paranetExists(paranetId) {
return this.callContractFunction(
this.contracts.ParanetsRegistry,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
export async function up({ context: { queryInterface, Sequelize } }) {
await queryInterface.createTable('paranet_kc', {
id: {
type: Sequelize.INTEGER,
primaryKey: true,
autoIncrement: true,
},
blockchain_id: {
type: Sequelize.STRING,
allowNull: false,
},
ual: {
type: Sequelize.STRING,
allowNull: false,
},
paranet_ual: {
type: Sequelize.STRING,
allowNull: false,
},
error_message: {
type: Sequelize.TEXT,
allowNull: true,
},
is_synced: {
type: Sequelize.BOOLEAN,
allowNull: false,
defaultValue: false,
},
retries: {
allowNull: false,
type: Sequelize.INTEGER,
defaultValue: 0,
},
created_at: {
allowNull: false,
type: Sequelize.DATE,
defaultValue: Sequelize.literal('NOW()'),
},
updated_at: {
allowNull: false,
type: Sequelize.DATE,
defaultValue: Sequelize.literal('NOW()'),
},
});
await queryInterface.addConstraint('paranet_kc', {
fields: ['ual', 'paranet_ual'],
type: 'unique',
});
await queryInterface.addIndex(
'paranet_kc',
['paranetUal', 'isSynced', 'retries', 'updatedAt'],
{ name: 'idx_paranet_kc_sync_batch' },
);

const [[{ triggerInsertExists }]] = await queryInterface.sequelize.query(`
SELECT COUNT(*) AS triggerInsertExists
FROM information_schema.triggers
WHERE trigger_schema = DATABASE() AND trigger_name = 'after_insert_paranet_kc';
`);
if (triggerInsertExists === 0) {
await queryInterface.sequelize.query(`
CREATE TRIGGER after_insert_paranet_kc
AFTER INSERT ON paranet_kc
FOR EACH ROW
BEGIN
SET NEW.created_at = NOW();
END;
`);
}

const [[{ triggerUpdateExists }]] = await queryInterface.sequelize.query(`
SELECT COUNT(*) AS triggerUpdateExists
FROM information_schema.triggers
WHERE trigger_schema = DATABASE() AND trigger_name = 'after_update_paranet_kc';
`);
if (triggerUpdateExists === 0) {
await queryInterface.sequelize.query(`
CREATE TRIGGER after_update_paranet_kc
AFTER UPDATE ON paranet_kc
FOR EACH ROW
BEGIN
SET NEW.updated_at = NOW();
END;
`);
}
}

export async function down({ context: { queryInterface } }) {
await queryInterface.removeIndex('paranet_kc', 'idx_paranet_kc_sync_batch');
await queryInterface.dropTable('paranet_kc');
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// NOT USED ANYMORE
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should create migration to drop tables that aren't used anymore?

Copy link
Contributor Author

@aleksaelezovic aleksaelezovic Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what is our plan regarding ot-node migration. I can add it when we decide, sure.


export default (sequelize, DataTypes) => {
const blockchain = sequelize.define(
'missed_paranet_asset',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
export default (sequelize, DataTypes) => {
const paranetKC = sequelize.define(
'paranet_kc',
{
id: {
autoIncrement: true,
primaryKey: true,
type: DataTypes.INTEGER,
},
blockchainId: {
allowNull: false,
type: DataTypes.STRING,
},
ual: {
allowNull: false,
type: DataTypes.STRING,
},
paranetUal: {
allowNull: false,
type: DataTypes.STRING,
},
errorMessage: {
allowNull: true,
type: DataTypes.TEXT,
},
isSynced: {
allowNull: false,
type: DataTypes.BOOLEAN,
defaultValue: false,
},
retries: {
allowNull: false,
type: DataTypes.INTEGER,
defaultValue: 0,
},
createdAt: {
type: DataTypes.DATE,
},
updatedAt: {
type: DataTypes.DATE,
},
},
{
underscored: true,
indexes: [
{
unique: true,
fields: ['ual', 'paranetUal'], // Composite unique constraint on `ual` and `paranetUal`
},
{
fields: ['paranetUal', 'isSynced', 'retries', 'updatedAt'],
},
],
},
);

paranetKC.associate = () => {
// Define associations here if needed
};

return paranetKC;
};
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { PARANET_SYNC_SOURCES } from '../../../../../constants/constants.js';

// NOT USED ANYMORE
export default (sequelize, DataTypes) => {
const blockchain = sequelize.define(
'paranet_synced_asset',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import Sequelize from 'sequelize';

class ParanetKcRepository {
constructor(models) {
this.sequelize = models.sequelize;
this.model = models.paranet_kc;
}

async createParanetKcRecords(paranetUal, blockchainId, uals, options = {}) {
return this.model.bulkCreate(
uals.map((ual) => ({ paranetUal, blockchainId, ual, isSynced: false })),
options,
);
}

async getCount(paranetUal, options = {}) {
return this.model.count({
where: {
paranetUal,
},
...options,
});
}

async getCountSynced(paranetUal, options = {}) {
return this.model.count({
where: {
paranetUal,
isSynced: true,
},
...options,
});
}

async getCountUnsynced(paranetUal, options = {}) {
return this.model.count({
where: {
paranetUal,
isSynced: false,
},
...options,
});
}

async getSyncBatch(paranetUal, maxRetries, delayInMs, limit = null, options = {}) {
const queryOptions = {
where: {
paranetUal,
isSynced: false,
[Sequelize.Op.and]: [
{ retries: { [Sequelize.Op.lt]: maxRetries } },
{
[Sequelize.Op.or]: [
{ retries: 0 },
{
updatedAt: {
[Sequelize.Op.lte]: new Date(Date.now() - delayInMs),
},
},
],
},
],
},
order: [['retries', 'DESC']],
...options,
};

if (limit !== null) {
queryOptions.limit = limit;
}

return this.model.findAll(queryOptions);
}

async incrementRetries(paranetUal, ual, errorMessage = null, options = {}) {
const [affectedRows] = await this.model.increment('retries', {
by: 1,
where: {
ual,
paranetUal,
errorMessage,
},
...options,
});

return affectedRows;
}

async markAsSynced(paranetUal, ual, options = {}) {
const [affectedRows] = await this.model.update(
{ isSynced: true },
{
where: {
ual,
paranetUal,
},
...options,
},
);

return affectedRows;
}
}

export default ParanetKcRepository;
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// DEPRECATED
class ParanetSyncedAssetRepository {
constructor(models) {
this.sequelize = models.sequelize;
Expand Down
Loading
Loading