From 3eff0e0c503306f39d595c17930e8b2fac401db2 Mon Sep 17 00:00:00 2001 From: Ved Patel Date: Mon, 28 Jul 2025 20:03:07 -0700 Subject: [PATCH 1/2] feat(examples): add listen logsubscribe for cpswap migrations --- .../migrations_cpswap/listen_logsubscribe.py | 131 ++++++++++++++++++ 1 file changed, 131 insertions(+) create mode 100644 learning_examples_py/migrations_cpswap/listen_logsubscribe.py diff --git a/learning_examples_py/migrations_cpswap/listen_logsubscribe.py b/learning_examples_py/migrations_cpswap/listen_logsubscribe.py new file mode 100644 index 0000000..fe29003 --- /dev/null +++ b/learning_examples_py/migrations_cpswap/listen_logsubscribe.py @@ -0,0 +1,131 @@ +""" +Listens for Raydium Launchpad program logs. Get the transaction details using the signature. +Parse the token mint from the MigrateToCpswap instruction. + +Note: Since we are listening to the Launchpad logs, it will consume a lot of credits/compute units. +This scripts finds the all tokens that are migrated to the CPSwap program, not limited to only bonkfun. + +""" + + +import json +import asyncio +import os + +from dotenv import load_dotenv +from solana.rpc.async_api import AsyncClient +from solders.pubkey import Pubkey +from solana.rpc.async_api import AsyncClient +from solders.signature import Signature +import websockets + +load_dotenv() + +RPC_ENDPOINT = os.environ.get("SOLANA_NODE_RPC_ENDPOINT") +WSS_ENDPOINT = os.environ.get("SOLANA_NODE_WSS_ENDPOINT") +RAYDIUM_LAUNCHPAD_PROGRAM_ID = Pubkey.from_string("LanMV9sAd7wArD4vJFi2qDdfnVhFxYSUg6eADduJ3uj") + + +def is_transaction_successful(logs): + for log in logs: + if "AnchorError thrown" in log or "Error" in log: + print(f"[ERROR] Transaction failed: {log}") + return False + return True + + +async def process_transaction(signature: str): + client = AsyncClient(RPC_ENDPOINT) + signature = Signature.from_string(signature) + + try: + resp = await client.get_transaction( + signature, + encoding="jsonParsed", + commitment="confirmed", + max_supported_transaction_version=0, + ) + except Exception as e: + print(f"[ERROR] Failed to get transaction data time: {e}") + return + + # retrying if the node is not fully synced + if not resp.value: + await asyncio.sleep(5) + resp = await client.get_transaction( + signature, + encoding="jsonParsed", + commitment="confirmed", + max_supported_transaction_version=0, + ) + + if not resp.value: + print(f"[ERROR] Transaction not found: {signature}") + return + + instructions = resp.value.transaction.transaction.message.instructions + + for instruction in instructions: + if instruction.program_id == RAYDIUM_LAUNCHPAD_PROGRAM_ID and instruction.data == "PotQtwz6wf1": + if len(instruction.accounts) == 38: + token_mint = instruction.accounts[1] + print(f"[INFO] Token migrated to cpswap: {token_mint}") + # TODO : use the idl parser and get the more details for the pool and token + break + + +async def listen_for_migrations(): + while True: + try: + print("\n[INFO] Connecting to WebSocket ...") + async with websockets.connect(WSS_ENDPOINT) as websocket: + subscription_message = json.dumps( + { + "jsonrpc": "2.0", + "id": 1, + "method": "logsSubscribe", + "params": [ + {"mentions": [str(RAYDIUM_LAUNCHPAD_PROGRAM_ID)]}, + {"commitment": "confirmed"}, + ], + } + ) + await websocket.send(subscription_message) + print( + f"[INFO] Listening for migration instructions from program: {RAYDIUM_LAUNCHPAD_PROGRAM_ID}" + ) + + response = await websocket.recv() + print(f"[INFO] Subscription response: {response}") + + while True: + try: + response = await asyncio.wait_for(websocket.recv(), timeout=60) + data = json.loads(response) + log_data = data["params"]["result"]["value"] + logs = log_data.get("logs", []) + signature = log_data.get("signature", "unknown") + + is_migrated = any( + "Program log: Instruction: MigrateToCpswap" == log + for log in logs + ) + if not is_migrated: + continue + + asyncio.create_task(process_transaction(signature)) + + except TimeoutError: + print("[INFO] No new messages received, continuing...") + except Exception as e: + print(f"[ERROR] Error receiving message: {e}") + break + + except Exception as e: + print(f"[ERROR] Connection error: {e}") + print("[INFO] Retrying in 5 seconds...") + await asyncio.sleep(5) + + +if __name__ == "__main__": + asyncio.run(listen_for_migrations()) \ No newline at end of file From 49ecf44057c313ab668d8a65c282b29fd3624f5c Mon Sep 17 00:00:00 2001 From: Ved Patel Date: Mon, 28 Jul 2025 21:27:17 -0700 Subject: [PATCH 2/2] fix(examples)- remove duplicate import --- learning_examples_py/migrations_cpswap/listen_logsubscribe.py | 1 - 1 file changed, 1 deletion(-) diff --git a/learning_examples_py/migrations_cpswap/listen_logsubscribe.py b/learning_examples_py/migrations_cpswap/listen_logsubscribe.py index fe29003..45ab446 100644 --- a/learning_examples_py/migrations_cpswap/listen_logsubscribe.py +++ b/learning_examples_py/migrations_cpswap/listen_logsubscribe.py @@ -15,7 +15,6 @@ from dotenv import load_dotenv from solana.rpc.async_api import AsyncClient from solders.pubkey import Pubkey -from solana.rpc.async_api import AsyncClient from solders.signature import Signature import websockets