Tllsldr (OP)
Newbie
Offline
Activity: 6
Merit: 0
|
 |
February 01, 2025, 12:53:55 AM |
|
I would like to parse the blockchain and generate a list of all non zero wallets and their balances, and depending on how easy it is, add some extra columns of data like last in, last out, public key (if applicable). A pretty common project. Just like the bitcoin rich list websites that used to let you sort by column and were a bit more flexible than todays sites. I was just wondering if you guys might share how you would do this. Or point me in the right direction. I have a fully synchronized bitcoin core running and an RPC server enabled. This is my attempt in python, just to prove that I have attempted something before coming running for help  import time import sys import csv import pickle import os import hashlib from datetime import datetime, timedelta, timezone from decimal import Decimal
from bitcoinrpc.authproxy import AuthServiceProxy, JSONRPCException import bech32 # pip install bech32
# ------------------------------- # Configuration # ------------------------------- RPC_USER = REDACTED RPC_PASS = REDACTED RPC_HOST = REDACTED RPC_PORT = REDACTED
# Increase timeout to help with slow/large requests: RPC_TIMEOUT = 300 # 5 minutes; adjust higher if needed
PROGRESS_INTERVAL = 1000 # Print progress every N blocks SAVE_INTERVAL = 2000 # Save pickled progress every N blocks (reduced for more frequent saves) WRITE_CSV_INTERVAL = 10000 # Write partial CSV every N blocks (reduced for more frequent writes)
OUTPUT_CSV = "master_list.csv" SAVE_FILE = "blockchain_progress.pkl"
# ------------------------------- # Global Data Structures # ------------------------------- utxo_map = {} # (txid, vout) -> (address, value_sats, block_time) addresses_info = {} # address -> { balance_sats, first_in, last_in, first_out, last_out, pubkeys } last_processed_block = 0
# ------------------------------- # Utility & Logging # ------------------------------- def connect_rpc(): """ Create a fresh AuthServiceProxy connection with a long timeout. """ return AuthServiceProxy( f"http://{RPC_USER}:{RPC_PASS}@{RPC_HOST}:{RPC_PORT}", timeout=RPC_TIMEOUT )
def format_time(seconds): return str(timedelta(seconds=int(seconds)))
def log(msg): print(msg, flush=True)
def debug(msg): # Comment out for less verbose logs if desired print(f"[DEBUG] {msg}", flush=True)
# ------------------------------- # Save & Load Progress (Atomic) # ------------------------------- def save_progress(block_height): """ Perform an atomic save by writing to a temporary file first, then renaming. This helps avoid corrupting the pickle file if the script is interrupted. """ temp_file = SAVE_FILE + ".tmp" try: with open(temp_file, "wb") as f: pickle.dump((utxo_map, addresses_info, block_height), f) os.replace(temp_file, SAVE_FILE) # Atomic rename log(f"Progress saved at block {block_height}.") except Exception as e: log(f"[ERROR] Failed to save progress: {e}")
def load_progress(): """ Safely load the existing pickle file if it exists and is non-empty. If it's missing or corrupted, start from block 0. """ global utxo_map, addresses_info, last_processed_block
if not os.path.exists(SAVE_FILE) or os.path.getsize(SAVE_FILE) == 0: log("No valid saved progress found (or file is empty). Starting from block 0.") return
try: with open(SAVE_FILE, "rb") as f: loaded_utxo, loaded_addrinfo, loaded_height = pickle.load(f) utxo_map = loaded_utxo addresses_info = loaded_addrinfo last_processed_block = loaded_height log(f"Resuming from block {last_processed_block}.") except EOFError: log("Saved progress file is corrupted or truncated (EOFError). Starting from block 0.") except Exception as e: log(f"[ERROR] Unexpected error loading progress file: {e}. Starting from block 0.")
# ------------------------------- # Base58 & Bech32 Helpers # ------------------------------- ALPHABET = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz"
def b58encode(b: bytes) -> str: """Minimal base58check encode for addresses.""" n = int.from_bytes(b, 'big') leading_zeros = len(b) - len(b.lstrip(b'\x00')) result = [] while n > 0: n, r = divmod(n, 58) result.append(ALPHABET[r]) result.reverse() return '1' * leading_zeros + ''.join(result)
def pubkey_hash_to_p2pkh(hash160_hex): """Given a 20-byte hash160 hex string, produce a base58 P2PKH address.""" raw = bytes.fromhex(hash160_hex) if len(raw) != 20: raise ValueError(f"Expected 20-byte hash160, got {len(raw)} bytes") prefix_payload = b"\x00" + raw # 0x00 for mainnet P2PKH chksum_full = hashlib.sha256(hashlib.sha256(prefix_payload).digest()).digest() return b58encode(prefix_payload + chksum_full[:4])
def pubkey_to_p2pkh(pubkey_hex): """Convert raw pubkey hex to base58 P2PKH address.""" pubkey_bytes = bytes.fromhex(pubkey_hex) sha = hashlib.sha256(pubkey_bytes).digest() ripe = hashlib.new('ripemd160', sha).digest() prefix_payload = b"\x00" + ripe chksum_full = hashlib.sha256(hashlib.sha256(prefix_payload).digest()).digest() return b58encode(prefix_payload + chksum_full[:4])
def encode_bech32(witver, witdata_hex, hrp="bc"): """Convert witness version & data to bech32 (e.g. bc1...).""" witdata = bytes.fromhex(witdata_hex) converted = bech32.convertbits(witdata, 8, 5, True) return bech32.bech32_encode(hrp, [witver] + converted)
# ------------------------------- # ScriptPubKey Decoding # ------------------------------- def extract_script_address(spk): """ Attempt to handle: - pubkey - pubkeyhash (P2PKH) - scripthash (P2SH) - witness_v0_keyhash (P2WPKH) - witness_v0_scripthash (P2WSH) - witness_v1_taproot (Taproot) If unknown, raise ValueError; skip in caller. """ address_type = spk.get("type", "") hex_str = spk.get("hex", "") asm = spk.get("asm", "") addresses_list = spk.get("addresses", [])
# 1) pubkey if address_type == "pubkey": parts = asm.split() if len(parts) == 2 and parts[1].lower().endswith("checksig"): pubkey_hex = parts[0] return pubkey_to_p2pkh(pubkey_hex) else: raise ValueError(f"Can't parse 'pubkey' script. ASM={asm}")
# 2) pubkeyhash (P2PKH) elif address_type == "pubkeyhash": if len(addresses_list) == 1: return addresses_list[0] else: # Typical P2PKH => 76a914{20-byte}88ac if hex_str.startswith("76a914") and hex_str.endswith("88ac") and len(hex_str) >= 52: h160 = hex_str[6:-4] return pubkey_hash_to_p2pkh(h160) else: raise ValueError(f"P2PKH but no addresses[] array. hex={hex_str}")
# 3) scripthash (P2SH) elif address_type == "scripthash": if len(addresses_list) == 1: return addresses_list[0] else: # Standard P2SH => a914{20-byte}87 if hex_str.startswith("a914") and hex_str.endswith("87") and len(hex_str) >= 46: h160 = hex_str[4:-2] raw = bytes.fromhex(h160) if len(raw) != 20: raise ValueError("Invalid length for P2SH redeem hash160.") prefix_payload = b"\x05" + raw chksum_full = hashlib.sha256(hashlib.sha256(prefix_payload).digest()).digest() return b58encode(prefix_payload + chksum_full[:4]) else: raise ValueError(f"P2SH but no addresses[] array. hex={hex_str}")
# 4) witness_v0_keyhash (P2WPKH) elif address_type == "witness_v0_keyhash": if len(addresses_list) == 1: return addresses_list[0] else: # Typically hex: 0014{20-byte} if hex_str.startswith("0014") and len(hex_str) == 44: witdata = hex_str[4:] return encode_bech32(0, witdata) else: raise ValueError(f"P2WPKH fallback decode failed. hex={hex_str}")
# 5) witness_v0_scripthash (P2WSH) elif address_type == "witness_v0_scripthash": if len(addresses_list) == 1: return addresses_list[0] else: # Typically 0020{32-byte} if hex_str.startswith("0020") and len(hex_str) == 68: witdata = hex_str[4:] return encode_bech32(0, witdata) else: raise ValueError(f"P2WSH fallback decode failed. hex={hex_str}")
# 6) witness_v1_taproot elif address_type == "witness_v1_taproot": if len(addresses_list) == 1: return addresses_list[0] else: # Typically OP_1 32 bytes => 5120... if hex_str.startswith("5120") and len(hex_str) == 68: witdata = hex_str[4:] return encode_bech32(1, witdata) else: raise ValueError(f"Taproot fallback decode failed. hex={hex_str}")
else: raise ValueError(f"Unrecognized script type: {address_type}, hex={hex_str}")
# ------------------------------- # Pubkey Extraction from vin # ------------------------------- def extract_pubkeys_from_vin(txin): """ For standard P2PKH => scriptSig: <signature> <pubkey> For segwit P2WPKH => txinwitness: [signature, pubkey] Returns a list of discovered pubkeys (hex). """ pubkeys = [] scriptSig = txin.get("scriptSig", {}) asm = scriptSig.get("asm", "") parts = asm.split()
# Legacy P2PKH if len(parts) == 2 and (parts[1].startswith("02") or parts[1].startswith("03") or parts[1].startswith("04")): pubkeys.append(parts[1])
# Segwit P2WPKH witness = txin.get("txinwitness", []) if len(witness) == 2 and ( witness[1].startswith("02") or witness[1].startswith("03") or witness[1].startswith("04") ): pubkeys.append(witness[1])
return pubkeys
# ------------------------------- # Main Parsing Logic # ------------------------------- def parse_blockchain(): global last_processed_block
# 1) Try to connect & get chain tip with a few retries best_height = None for attempt in range(5): try: rpc_test = connect_rpc() best_height = rpc_test.getblockcount() break except Exception as e: log(f"[ERROR] Attempt {attempt+1}/5 to get blockcount failed: {e}") time.sleep(10)
if best_height is None: log("Could not get blockcount after 5 attempts. Exiting.") return
log(f"Chain best height: {best_height}") start_time = time.time()
# 2) Loop through blocks from last_processed_block+1 to best_height for height in range(last_processed_block + 1, best_height + 1):
# Keep retrying the same block in case of transient errors while True: try: # Reconnect on each retry to avoid stale HTTP connections rpc = connect_rpc()
# Show progress if height % PROGRESS_INTERVAL == 0: elapsed = time.time() - start_time done = height remaining = best_height - done rate = done / elapsed if elapsed > 0 else 0 eta = remaining / rate if rate else 999999 pct = (done / best_height) * 100 log(f"Block {height}/{best_height} ({pct:.2f}%) " f"Elapsed: {format_time(elapsed)}, ETA: {format_time(eta)}")
# Fetch block data block_hash = rpc.getblockhash(height) block_data = rpc.getblock(block_hash, 2) block_time = datetime.fromtimestamp(block_data["time"], timezone.utc)
# Process TXs for tx in block_data["tx"]: txid = tx["txid"]
# A) Process inputs => spending for vin in tx.get("vin", []): if "coinbase" in vin: continue # skip coinbase prev_txid = vin.get("txid") prev_vout = vin.get("vout") if (prev_txid, prev_vout) in utxo_map: (addr, value_sats, _prev_time) = utxo_map.pop((prev_txid, prev_vout))
if addr not in addresses_info: addresses_info[addr] = { "balance_sats": 0, "first_in": None, "last_in": None, "first_out": None, "last_out": None, "pubkeys": set() }
addresses_info[addr]["balance_sats"] -= value_sats
# Update first_out & last_out if addresses_info[addr]["first_out"] is None or block_time < addresses_info[addr]["first_out"]: addresses_info[addr]["first_out"] = block_time if addresses_info[addr]["last_out"] is None or block_time > addresses_info[addr]["last_out"]: addresses_info[addr]["last_out"] = block_time
# Extract pubkeys from this vin found_pks = extract_pubkeys_from_vin(vin) for pk in found_pks: addresses_info[addr]["pubkeys"].add(pk)
# B) Process outputs => new UTXOs for i, vout in enumerate(tx.get("vout", [])): value_btc = vout["value"] value_sats = int(Decimal(str(value_btc)) * 100_000_000) spk = vout.get("scriptPubKey", {})
try: address = extract_script_address(spk) except ValueError: # can't decode => skip continue
utxo_map[(txid, i)] = (address, value_sats, block_time)
if address not in addresses_info: addresses_info[address] = { "balance_sats": 0, "first_in": None, "last_in": None, "first_out": None, "last_out": None, "pubkeys": set() }
addresses_info[address]["balance_sats"] += value_sats
if addresses_info[address]["first_in"] is None or block_time < addresses_info[address]["first_in"]: addresses_info[address]["first_in"] = block_time if addresses_info[address]["last_in"] is None or block_time > addresses_info[address]["last_in"]: addresses_info[address]["last_in"] = block_time
# If we finish processing without error, mark this block done last_processed_block = height
# Periodic save if height % SAVE_INTERVAL == 0: save_progress(height)
# Periodic partial CSV if height % WRITE_CSV_INTERVAL == 0: write_partial_csv()
# Break out of the while True retry loop break
except KeyboardInterrupt: log("KeyboardInterrupt received; exiting immediately.") sys.exit(1) except Exception as e: # Log the error, sleep, then retry the same block log(f"[UNEXPECTED ERROR] Block {height}: {e}. Retrying in 10s...") time.sleep(10)
def write_partial_csv(): """Write a partial CSV of non-zero addresses.""" partial_csv_name = f"partial_{OUTPUT_CSV}" log(f"Writing partial CSV: {partial_csv_name}")
nonzero_count = 0 with open(partial_csv_name, "w", newline="", encoding="utf-8") as f: w = csv.writer(f) w.writerow(["address", "balance_btc", "first_in", "last_in", "first_out", "last_out", "pubkeys"]) for addr, info in addresses_info.items(): if info["balance_sats"] != 0: nonzero_count += 1 w.writerow([ addr, info["balance_sats"] / 1e8, info["first_in"], info["last_in"], info["first_out"], info["last_out"], ";".join(info["pubkeys"]) ])
log(f"Partial CSV written with {nonzero_count} non-zero addresses.")
def main(): load_progress() parse_blockchain()
# Final Save save_progress(last_processed_block)
# Build final CSV with non-zero addresses log("Building final CSV of non-zero addresses...") nonzero_records = [] for addr, info in addresses_info.items(): if info["balance_sats"] != 0: nonzero_records.append((addr, info))
# Sort by descending balance nonzero_records.sort(key=lambda x: x[1]["balance_sats"], reverse=True) log(f"Found {len(nonzero_records)} addresses with non-zero balance.")
with open(OUTPUT_CSV, "w", newline="", encoding="utf-8") as f: w = csv.writer(f) w.writerow(["address", "balance_btc", "first_in", "last_in", "first_out", "last_out", "pubkeys"]) for addr, d in nonzero_records: w.writerow([ addr, d["balance_sats"] / 1e8, d["first_in"], d["last_in"], d["first_out"], d["last_out"], ";".join(d["pubkeys"]) ]) log(f"Done! Final CSV written to {OUTPUT_CSV}.")
if __name__ == "__main__": main()
|