Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions queue-manager/core/src/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import { Status, SYNC_POLLING_INTERVAL } from './types';
export type ManagerContext = object;
export type QueueName = string;
export type QueueID = string;

export type BlockedReason = Record<string, unknown>;
export type BlockedTask = {
queue_id: string;
task_id: string;
action: string;
reason: Record<string, unknown>;
reason: BlockedReason;
storage: {
get: () => QueueStorage;
set: (data: QueueStorage) => QueueStorage;
Expand All @@ -34,11 +36,13 @@ export interface ExecuterActions<
schedule: (actionName: V) => void;
setStorage: SetStorage<T>;
getStorage: () => T;
block: (reason: Record<string, unknown>) => void;
block: (reason: BlockedReason) => void;
unblock: () => void;
context: C;
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
type WhenTaskBlockedEvent = any;
export interface QueueDef<
T extends QueueStorage = QueueStorage,
V extends string = string,
Expand All @@ -51,7 +55,7 @@ export interface QueueDef<
events?: Partial<QueueEventHandlers>;
run: V[];
whenTaskBlocked?: (
event: any,
event: WhenTaskBlockedEvent,
params: {
queue_id: string;
queue: Queue;
Expand Down Expand Up @@ -82,6 +86,8 @@ interface ManagerOptions {
isPaused?: boolean;
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
type _Storage = any;
export interface QueueInfo {
name: QueueName;
createdAt: number;
Expand All @@ -90,8 +96,8 @@ export interface QueueInfo {
actions: {
run: () => void;
cancel: () => void;
setStorage: SetStorage<any>;
getStorage: () => any;
setStorage: SetStorage<_Storage>;
getStorage: () => _Storage;
};
}

Expand Down Expand Up @@ -240,6 +246,7 @@ class Manager {
this.execute();
return queue_id;
} catch (e) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
throw new Error((e as any)?.message);
}
}
Expand Down Expand Up @@ -643,7 +650,7 @@ class Manager {
}

private getContext() {
// eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/ban-ts-comment
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
//@ts-ignore
return this.context?.current || {};
}
Expand Down
3 changes: 2 additions & 1 deletion queue-manager/rango-preset/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
"dependencies": {
"@rango-dev/logging-core": "^0.9.1-next.0",
"rango-types": "^0.1.88",
"uuid": "^9.0.0"
"uuid": "^9.0.0",
"ts-results": "^3.3.0"
},
"publishConfig": {
"access": "public"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
import type { SwapActionTypes, SwapQueueContext, SwapStorage } from '../types';
import type { ExecuterActions } from '@rango-dev/queue-manager-core';
import type {
SwapActionTypes,
SwapQueueContext,
SwapStorage,
} from '../../types';
import type {
BlockedReason,
ExecuterActions,
} from '@rango-dev/queue-manager-core';

import { PendingSwapNetworkStatus } from 'rango-types';
import { Err, Ok, type Result } from 'ts-results';

import {
ERROR_MESSAGE_DEPENDS_ON_OTHER_QUEUES,
ERROR_MESSAGE_WAIT_FOR_CHANGE_NETWORK,
ERROR_MESSAGE_WAIT_FOR_WALLET_DESCRIPTION,
ERROR_MESSAGE_WAIT_FOR_WALLET_DESCRIPTION_WRONG_WALLET,
} from '../constants';
} from '../../constants';
import {
claimQueue,
getCurrentStep,
Expand All @@ -18,44 +26,72 @@ import {
isRequiredWalletConnected,
isWalletNull,
resetNetworkStatus,
signTransaction,
updateNetworkStatus,
} from '../helpers';
import { getCurrentNamespaceOf } from '../shared';
import { BlockReason } from '../types';
} from '../../helpers';
import { getCurrentNamespaceOf } from '../../shared';
import { BlockReason } from '../../types';

import { isClaimedByCurrentQueue } from './utils';

/**
* Excecute a created transaction.
*
* This function implemented the parallel mode by `claim` mechanism which means
* All the queues the meet certain situation (like multiple evm transaction) will go through
* a `claim` mechanims that decides which queue should be run and it blocks other ones.
*
* A queue will be go to sign process, if the wallet and network is matched.
* Check for network & address be matched and queue to not be blocked and update the swap accordingly.
*/
export async function executeTransaction(
export async function checkEnvironmentBeforeExecuteTransaction(
actions: ExecuterActions<SwapStorage, SwapActionTypes, SwapQueueContext>
): Promise<void> {
): Promise<Result<true, BlockedReason>> {
const { getStorage, context } = actions;
const { meta, wallets, providers } = context;
const { claimedBy } = claimQueue();

const isClaimed = context.claimedBy === context._queue?.id;
const requestBlock: typeof actions.block = (blockedFor) => {
actions.block(blockedFor);
if (isClaimed && actions.context.resetClaimedBy) {
actions.context.resetClaimedBy();
}
};

const swap = getStorage().swapDetails;
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion

const currentStep = getCurrentStep(swap)!;

// Resetting network status, so we will set it again during the running of this task.
resetNetworkStatus(actions);

/* Make sure wallet is connected and also the connected wallet is matched with tx by checking address. */
const addressCheckResult = ensureRequriedWalletIsConnected(actions);
if (addressCheckResult.err) {
return addressCheckResult;
}

/* Wallet should be on correct network */
const networkResult = await ensureWalletIsOnCorrectNetwork(actions);
if (networkResult.err) {
return networkResult;
}

// Update network to mark it as network changed successfully.
updateNetworkStatus(actions, {
message: '',
details: 'Wallet network changed successfully',
status: PendingSwapNetworkStatus.NetworkChanged,
});

/*
*For avoiding conflict by making too many requests to wallet, we need to make sure
*We only run one request at a time (In parallel mode).
*/
const needsToBlockQueue = isNeedBlockQueueForParallel(currentStep);
const isClaimed = isClaimedByCurrentQueue(context);
if (needsToBlockQueue && !isClaimed) {
const blockedFor = {
reason: BlockReason.DEPENDS_ON_OTHER_QUEUES,
description: ERROR_MESSAGE_DEPENDS_ON_OTHER_QUEUES,
details: {},
};
return new Err(blockedFor);
}

return new Ok(true);
}

function ensureRequriedWalletIsConnected(
actions: ExecuterActions<SwapStorage, SwapActionTypes, SwapQueueContext>
): Result<true, BlockedReason> {
const { getStorage, context } = actions;
const { wallets } = context;
const swap = getStorage().swapDetails;

const isWrongAddress = !isRequiredWalletConnected(swap, context.state).ok;
if (isWrongAddress) {
const { type, address } = getRequiredWallet(swap);
Expand All @@ -71,29 +107,40 @@ export async function executeTransaction(
reason: BlockReason.WAIT_FOR_CONNECT_WALLET,
description,
};
requestBlock(blockedFor);
return;
return new Err(blockedFor);
}

/* Wallet should be on correct network */
return new Ok(true);
}

async function ensureWalletIsOnCorrectNetwork(
actions: ExecuterActions<SwapStorage, SwapActionTypes, SwapQueueContext>
): Promise<Result<true, BlockedReason>> {
const { getStorage, context } = actions;
const { meta, wallets, providers } = context;
const swap = getStorage().swapDetails;
const currentStep = getCurrentStep(swap)!;

const networkMatched = await isNetworkMatchedForTransaction(
swap,
currentStep,
wallets,
meta,
providers
);

const { claimedBy } = claimQueue();
const claimerId = claimedBy();
const isClaimedByAnyQueue = !!claimerId && !isClaimed;
const isClaimedByAnyQueue = !!claimerId && !isClaimedByCurrentQueue(context);

if (isClaimedByAnyQueue && !networkMatched) {
const details = ERROR_MESSAGE_DEPENDS_ON_OTHER_QUEUES;

const blockedFor = {
reason: BlockReason.DEPENDS_ON_OTHER_QUEUES,
details: details,
};
requestBlock(blockedFor);
return;
return new Err(blockedFor);
} else if (!networkMatched) {
const fromNamespace = getCurrentNamespaceOf(swap, currentStep);
const details = ERROR_MESSAGE_WAIT_FOR_CHANGE_NETWORK(
Expand All @@ -104,32 +151,8 @@ export async function executeTransaction(
reason: BlockReason.WAIT_FOR_NETWORK_CHANGE,
details: details,
};
requestBlock(blockedFor);
return;
}
// Update network to mark it as network changed successfully.
updateNetworkStatus(actions, {
message: '',
details: 'Wallet network changed successfully',
status: PendingSwapNetworkStatus.NetworkChanged,
});

/*
*For avoiding conflict by making too many requests to wallet, we need to make sure
*We only run one request at a time (In parallel mode).
*/
const needsToBlockQueue = isNeedBlockQueueForParallel(currentStep);

if (needsToBlockQueue && !isClaimed) {
const blockedFor = {
reason: BlockReason.DEPENDS_ON_OTHER_QUEUES,
description: ERROR_MESSAGE_DEPENDS_ON_OTHER_QUEUES,
details: {},
};
requestBlock(blockedFor);
return;
return new Err(blockedFor);
}

// All the conditions are met. We can safely send the tx to wallet for sign.
await signTransaction(actions);
return new Ok(true);
}
Loading