Bitcoin Forum
April 03, 2026, 01:07:35 AM *
News: Latest Bitcoin Core release: 30.2 [Torrent]
 
   Home   Help Search Login Register More  
Pages: [1]
  Print  
Author Topic: How would you make a list of non zero wallets & balances? with python.  (Read 171 times)
Tllsldr (OP)
Newbie
*
Offline Offline

Activity: 6
Merit: 0


View Profile
February 01, 2025, 12:53:55 AM
 #1

I would like to parse the blockchain and generate a list of all non zero wallets and their balances, and depending on how easy it is, add some extra columns of data like last in, last out, public key (if applicable). A pretty common project. Just like the bitcoin rich list websites that used to let you sort by column and were a bit more flexible than todays sites.

I was just wondering if you guys might share how you would do this. Or point me in the right direction.

I have a fully synchronized bitcoin core running and an RPC server enabled.

This is my attempt in python, just to prove that I have attempted something before coming running for help  Smiley

Code:
import time
import sys
import csv
import pickle
import os
import hashlib
from datetime import datetime, timedelta, timezone
from decimal import Decimal

from bitcoinrpc.authproxy import AuthServiceProxy, JSONRPCException
import bech32  # pip install bech32

# -------------------------------
# Configuration
# -------------------------------
RPC_USER = REDACTED
RPC_PASS = REDACTED
RPC_HOST = REDACTED
RPC_PORT = REDACTED

# Increase timeout to help with slow/large requests:
RPC_TIMEOUT = 300  # 5 minutes; adjust higher if needed

PROGRESS_INTERVAL = 1000        # Print progress every N blocks
SAVE_INTERVAL = 2000            # Save pickled progress every N blocks (reduced for more frequent saves)
WRITE_CSV_INTERVAL = 10000      # Write partial CSV every N blocks (reduced for more frequent writes)

OUTPUT_CSV = "master_list.csv"
SAVE_FILE = "blockchain_progress.pkl"

# -------------------------------
# Global Data Structures
# -------------------------------
utxo_map = {}  # (txid, vout) -> (address, value_sats, block_time)
addresses_info = {}  # address -> { balance_sats, first_in, last_in, first_out, last_out, pubkeys }
last_processed_block = 0

# -------------------------------
# Utility & Logging
# -------------------------------
def connect_rpc():
    """
    Create a fresh AuthServiceProxy connection with a long timeout.
    """
    return AuthServiceProxy(
        f"http://{RPC_USER}:{RPC_PASS}@{RPC_HOST}:{RPC_PORT}",
        timeout=RPC_TIMEOUT
    )

def format_time(seconds):
    return str(timedelta(seconds=int(seconds)))

def log(msg):
    print(msg, flush=True)

def debug(msg):
    # Comment out for less verbose logs if desired
    print(f"[DEBUG] {msg}", flush=True)

# -------------------------------
# Save & Load Progress (Atomic)
# -------------------------------
def save_progress(block_height):
    """
    Perform an atomic save by writing to a temporary file first, then renaming.
    This helps avoid corrupting the pickle file if the script is interrupted.
    """
    temp_file = SAVE_FILE + ".tmp"
    try:
        with open(temp_file, "wb") as f:
            pickle.dump((utxo_map, addresses_info, block_height), f)
        os.replace(temp_file, SAVE_FILE)  # Atomic rename
        log(f"Progress saved at block {block_height}.")
    except Exception as e:
        log(f"[ERROR] Failed to save progress: {e}")

def load_progress():
    """
    Safely load the existing pickle file if it exists and is non-empty.
    If it's missing or corrupted, start from block 0.
    """
    global utxo_map, addresses_info, last_processed_block

    if not os.path.exists(SAVE_FILE) or os.path.getsize(SAVE_FILE) == 0:
        log("No valid saved progress found (or file is empty). Starting from block 0.")
        return

    try:
        with open(SAVE_FILE, "rb") as f:
            loaded_utxo, loaded_addrinfo, loaded_height = pickle.load(f)
            utxo_map = loaded_utxo
            addresses_info = loaded_addrinfo
            last_processed_block = loaded_height
        log(f"Resuming from block {last_processed_block}.")
    except EOFError:
        log("Saved progress file is corrupted or truncated (EOFError). Starting from block 0.")
    except Exception as e:
        log(f"[ERROR] Unexpected error loading progress file: {e}. Starting from block 0.")

# -------------------------------
# Base58 & Bech32 Helpers
# -------------------------------
ALPHABET = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz"

def b58encode(b: bytes) -> str:
    """Minimal base58check encode for addresses."""
    n = int.from_bytes(b, 'big')
    leading_zeros = len(b) - len(b.lstrip(b'\x00'))
    result = []
    while n > 0:
        n, r = divmod(n, 58)
        result.append(ALPHABET[r])
    result.reverse()
    return '1' * leading_zeros + ''.join(result)

def pubkey_hash_to_p2pkh(hash160_hex):
    """Given a 20-byte hash160 hex string, produce a base58 P2PKH address."""
    raw = bytes.fromhex(hash160_hex)
    if len(raw) != 20:
        raise ValueError(f"Expected 20-byte hash160, got {len(raw)} bytes")
    prefix_payload = b"\x00" + raw  # 0x00 for mainnet P2PKH
    chksum_full = hashlib.sha256(hashlib.sha256(prefix_payload).digest()).digest()
    return b58encode(prefix_payload + chksum_full[:4])

def pubkey_to_p2pkh(pubkey_hex):
    """Convert raw pubkey hex to base58 P2PKH address."""
    pubkey_bytes = bytes.fromhex(pubkey_hex)
    sha = hashlib.sha256(pubkey_bytes).digest()
    ripe = hashlib.new('ripemd160', sha).digest()
    prefix_payload = b"\x00" + ripe
    chksum_full = hashlib.sha256(hashlib.sha256(prefix_payload).digest()).digest()
    return b58encode(prefix_payload + chksum_full[:4])

def encode_bech32(witver, witdata_hex, hrp="bc"):
    """Convert witness version & data to bech32 (e.g. bc1...)."""
    witdata = bytes.fromhex(witdata_hex)
    converted = bech32.convertbits(witdata, 8, 5, True)
    return bech32.bech32_encode(hrp, [witver] + converted)

# -------------------------------
# ScriptPubKey Decoding
# -------------------------------
def extract_script_address(spk):
    """
    Attempt to handle:
      - pubkey
      - pubkeyhash (P2PKH)
      - scripthash (P2SH)
      - witness_v0_keyhash (P2WPKH)
      - witness_v0_scripthash (P2WSH)
      - witness_v1_taproot (Taproot)
    If unknown, raise ValueError; skip in caller.
    """
    address_type = spk.get("type", "")
    hex_str = spk.get("hex", "")
    asm = spk.get("asm", "")
    addresses_list = spk.get("addresses", [])

    # 1) pubkey
    if address_type == "pubkey":
        parts = asm.split()
        if len(parts) == 2 and parts[1].lower().endswith("checksig"):
            pubkey_hex = parts[0]
            return pubkey_to_p2pkh(pubkey_hex)
        else:
            raise ValueError(f"Can't parse 'pubkey' script. ASM={asm}")

    # 2) pubkeyhash (P2PKH)
    elif address_type == "pubkeyhash":
        if len(addresses_list) == 1:
            return addresses_list[0]
        else:
            # Typical P2PKH => 76a914{20-byte}88ac
            if hex_str.startswith("76a914") and hex_str.endswith("88ac") and len(hex_str) >= 52:
                h160 = hex_str[6:-4]
                return pubkey_hash_to_p2pkh(h160)
            else:
                raise ValueError(f"P2PKH but no addresses[] array. hex={hex_str}")

    # 3) scripthash (P2SH)
    elif address_type == "scripthash":
        if len(addresses_list) == 1:
            return addresses_list[0]
        else:
            # Standard P2SH => a914{20-byte}87
            if hex_str.startswith("a914") and hex_str.endswith("87") and len(hex_str) >= 46:
                h160 = hex_str[4:-2]
                raw = bytes.fromhex(h160)
                if len(raw) != 20:
                    raise ValueError("Invalid length for P2SH redeem hash160.")
                prefix_payload = b"\x05" + raw
                chksum_full = hashlib.sha256(hashlib.sha256(prefix_payload).digest()).digest()
                return b58encode(prefix_payload + chksum_full[:4])
            else:
                raise ValueError(f"P2SH but no addresses[] array. hex={hex_str}")

    # 4) witness_v0_keyhash (P2WPKH)
    elif address_type == "witness_v0_keyhash":
        if len(addresses_list) == 1:
            return addresses_list[0]
        else:
            # Typically hex: 0014{20-byte}
            if hex_str.startswith("0014") and len(hex_str) == 44:
                witdata = hex_str[4:]
                return encode_bech32(0, witdata)
            else:
                raise ValueError(f"P2WPKH fallback decode failed. hex={hex_str}")

    # 5) witness_v0_scripthash (P2WSH)
    elif address_type == "witness_v0_scripthash":
        if len(addresses_list) == 1:
            return addresses_list[0]
        else:
            # Typically 0020{32-byte}
            if hex_str.startswith("0020") and len(hex_str) == 68:
                witdata = hex_str[4:]
                return encode_bech32(0, witdata)
            else:
                raise ValueError(f"P2WSH fallback decode failed. hex={hex_str}")

    # 6) witness_v1_taproot
    elif address_type == "witness_v1_taproot":
        if len(addresses_list) == 1:
            return addresses_list[0]
        else:
            # Typically OP_1 32 bytes => 5120...
            if hex_str.startswith("5120") and len(hex_str) == 68:
                witdata = hex_str[4:]
                return encode_bech32(1, witdata)
            else:
                raise ValueError(f"Taproot fallback decode failed. hex={hex_str}")

    else:
        raise ValueError(f"Unrecognized script type: {address_type}, hex={hex_str}")

# -------------------------------
# Pubkey Extraction from vin
# -------------------------------
def extract_pubkeys_from_vin(txin):
    """
    For standard P2PKH => scriptSig: <signature> <pubkey>
    For segwit P2WPKH => txinwitness: [signature, pubkey]
    Returns a list of discovered pubkeys (hex).
    """
    pubkeys = []
    scriptSig = txin.get("scriptSig", {})
    asm = scriptSig.get("asm", "")
    parts = asm.split()

    # Legacy P2PKH
    if len(parts) == 2 and (parts[1].startswith("02") or parts[1].startswith("03") or parts[1].startswith("04")):
        pubkeys.append(parts[1])

    # Segwit P2WPKH
    witness = txin.get("txinwitness", [])
    if len(witness) == 2 and (
        witness[1].startswith("02") or
        witness[1].startswith("03") or
        witness[1].startswith("04")
    ):
        pubkeys.append(witness[1])

    return pubkeys

# -------------------------------
# Main Parsing Logic
# -------------------------------
def parse_blockchain():
    global last_processed_block

    # 1) Try to connect & get chain tip with a few retries
    best_height = None
    for attempt in range(5):
        try:
            rpc_test = connect_rpc()
            best_height = rpc_test.getblockcount()
            break
        except Exception as e:
            log(f"[ERROR] Attempt {attempt+1}/5 to get blockcount failed: {e}")
            time.sleep(10)

    if best_height is None:
        log("Could not get blockcount after 5 attempts. Exiting.")
        return

    log(f"Chain best height: {best_height}")
    start_time = time.time()

    # 2) Loop through blocks from last_processed_block+1 to best_height
    for height in range(last_processed_block + 1, best_height + 1):

        # Keep retrying the same block in case of transient errors
        while True:
            try:
                # Reconnect on each retry to avoid stale HTTP connections
                rpc = connect_rpc()

                # Show progress
                if height % PROGRESS_INTERVAL == 0:
                    elapsed = time.time() - start_time
                    done = height
                    remaining = best_height - done
                    rate = done / elapsed if elapsed > 0 else 0
                    eta = remaining / rate if rate else 999999
                    pct = (done / best_height) * 100
                    log(f"Block {height}/{best_height} ({pct:.2f}%) "
                        f"Elapsed: {format_time(elapsed)}, ETA: {format_time(eta)}")

                # Fetch block data
                block_hash = rpc.getblockhash(height)
                block_data = rpc.getblock(block_hash, 2)
                block_time = datetime.fromtimestamp(block_data["time"], timezone.utc)

                # Process TXs
                for tx in block_data["tx"]:
                    txid = tx["txid"]

                    # A) Process inputs => spending
                    for vin in tx.get("vin", []):
                        if "coinbase" in vin:
                            continue  # skip coinbase
                        prev_txid = vin.get("txid")
                        prev_vout = vin.get("vout")
                        if (prev_txid, prev_vout) in utxo_map:
                            (addr, value_sats, _prev_time) = utxo_map.pop((prev_txid, prev_vout))

                            if addr not in addresses_info:
                                addresses_info[addr] = {
                                    "balance_sats": 0,
                                    "first_in": None,
                                    "last_in": None,
                                    "first_out": None,
                                    "last_out": None,
                                    "pubkeys": set()
                                }

                            addresses_info[addr]["balance_sats"] -= value_sats

                            # Update first_out & last_out
                            if addresses_info[addr]["first_out"] is None or block_time < addresses_info[addr]["first_out"]:
                                addresses_info[addr]["first_out"] = block_time
                            if addresses_info[addr]["last_out"] is None or block_time > addresses_info[addr]["last_out"]:
                                addresses_info[addr]["last_out"] = block_time

                            # Extract pubkeys from this vin
                            found_pks = extract_pubkeys_from_vin(vin)
                            for pk in found_pks:
                                addresses_info[addr]["pubkeys"].add(pk)

                    # B) Process outputs => new UTXOs
                    for i, vout in enumerate(tx.get("vout", [])):
                        value_btc = vout["value"]
                        value_sats = int(Decimal(str(value_btc)) * 100_000_000)
                        spk = vout.get("scriptPubKey", {})

                        try:
                            address = extract_script_address(spk)
                        except ValueError:
                            # can't decode => skip
                            continue

                        utxo_map[(txid, i)] = (address, value_sats, block_time)

                        if address not in addresses_info:
                            addresses_info[address] = {
                                "balance_sats": 0,
                                "first_in": None,
                                "last_in": None,
                                "first_out": None,
                                "last_out": None,
                                "pubkeys": set()
                            }

                        addresses_info[address]["balance_sats"] += value_sats

                        if addresses_info[address]["first_in"] is None or block_time < addresses_info[address]["first_in"]:
                            addresses_info[address]["first_in"] = block_time
                        if addresses_info[address]["last_in"] is None or block_time > addresses_info[address]["last_in"]:
                            addresses_info[address]["last_in"] = block_time

                # If we finish processing without error, mark this block done
                last_processed_block = height

                # Periodic save
                if height % SAVE_INTERVAL == 0:
                    save_progress(height)

                # Periodic partial CSV
                if height % WRITE_CSV_INTERVAL == 0:
                    write_partial_csv()

                # Break out of the while True retry loop
                break

            except KeyboardInterrupt:
                log("KeyboardInterrupt received; exiting immediately.")
                sys.exit(1)
            except Exception as e:
                # Log the error, sleep, then retry the same block
                log(f"[UNEXPECTED ERROR] Block {height}: {e}. Retrying in 10s...")
                time.sleep(10)

def write_partial_csv():
    """Write a partial CSV of non-zero addresses."""
    partial_csv_name = f"partial_{OUTPUT_CSV}"
    log(f"Writing partial CSV: {partial_csv_name}")

    nonzero_count = 0
    with open(partial_csv_name, "w", newline="", encoding="utf-8") as f:
        w = csv.writer(f)
        w.writerow(["address", "balance_btc", "first_in", "last_in", "first_out", "last_out", "pubkeys"])
        for addr, info in addresses_info.items():
            if info["balance_sats"] != 0:
                nonzero_count += 1
                w.writerow([
                    addr,
                    info["balance_sats"] / 1e8,
                    info["first_in"],
                    info["last_in"],
                    info["first_out"],
                    info["last_out"],
                    ";".join(info["pubkeys"])
                ])

    log(f"Partial CSV written with {nonzero_count} non-zero addresses.")

def main():
    load_progress()
    parse_blockchain()

    # Final Save
    save_progress(last_processed_block)

    # Build final CSV with non-zero addresses
    log("Building final CSV of non-zero addresses...")
    nonzero_records = []
    for addr, info in addresses_info.items():
        if info["balance_sats"] != 0:
            nonzero_records.append((addr, info))

    # Sort by descending balance
    nonzero_records.sort(key=lambda x: x[1]["balance_sats"], reverse=True)
    log(f"Found {len(nonzero_records)} addresses with non-zero balance.")

    with open(OUTPUT_CSV, "w", newline="", encoding="utf-8") as f:
        w = csv.writer(f)
        w.writerow(["address", "balance_btc", "first_in", "last_in", "first_out", "last_out", "pubkeys"])
        for addr, d in nonzero_records:
            w.writerow([
                addr,
                d["balance_sats"] / 1e8,
                d["first_in"],
                d["last_in"],
                d["first_out"],
                d["last_out"],
                ";".join(d["pubkeys"])
            ])
    log(f"Done! Final CSV written to {OUTPUT_CSV}.")

if __name__ == "__main__":
    main()

pooya87
Legendary
*
Offline Offline

Activity: 4102
Merit: 12279



View Profile
February 01, 2025, 05:01:09 AM
Merited by LoyceV (12), vapourminer (10), d5000 (3), hosemary (2), nc50lc (1), DdmrDdmr (1), apogio (1)
 #2

list of all non zero wallets and their balances,
Not wallets, but addresses.

Quote
I have a fully synchronized bitcoin core running and an RPC server enabled.
I don't think RPC is the most efficient way of doing that. If the node is synched then you should try to manually read the files from disk and parse them instead.

Quote
Code:
            # Typical P2PKH => 76a914{20-byte}88ac
            if hex_str.startswith("76a914") and hex_str.endswith("88ac") and len(hex_str) >= 52:
You used >= in a couple of places like this which is wrong. The length of standard output scripts is fixed. Like P2PKH which is 25 bytes (50 hex). If the length is anything else then it is non-standard and there is no address equivalent for it and they may even be unspendable.

Quote
Code:
def parse_blockchain():
            best_height = rpc_test.getblockcount()
                block_hash = rpc.getblockhash(height)
                block_data = rpc.getblock(block_hash, 2)
                block_time = datetime.fromtimestamp(block_data["time"], timezone.utc)

                # Process TXs
                for tx in block_data["tx"]:
                    txid = tx["txid"]
If I understood your code correctly, you are fetching each block then processing them creating a database. Your full node has already done that and created a database of UTXOs named chainstate. You have to check bitcoin core to see how this file is stored. As I said before, you should read this file and parse that if you want the current balance of each addresses instead of processing all blocks from #1 yourself fetching them from RPC one by one.


... or you can skip all that and get the same result you are looking for from here: https://bitcointalk.org/index.php?topic=5254914.0

███████████████████████████
███████▄████████████▄██████
████████▄████████▄████████
███▀█████▀▄███▄▀█████▀███
█████▀█▀▄██▀▀▀██▄▀█▀█████
███████▄███████████▄███████
███████████████████████████
███████▀███████████▀███████
████▄██▄▀██▄▄▄██▀▄██▄████
████▄████▄▀███▀▄████▄████
██▄███▀▀█▀██████▀█▀███▄███
██▀█▀████████████████▀█▀███
███████████████████████████
.
.Duelbits PREDICT..
█████████████████████████
█████████████████████████
███████████▀▀░░░░▀▀██████
██████████░░▄████▄░░████
█████████░░████████░░████
█████████░░████████░░████
█████████▄▀██████▀▄████
████████▀▀░░░▀▀▀▀░░▄█████
██████▀░░░░██▄▄▄▄████████
████▀░░░░▄███████████████
█████▄▄█████████████████
█████████████████████████
█████████████████████████
.
.WHERE EVERYTHING IS A MARKET..
█████
██
██







██
██
██████
Will Bitcoin hit $200,000
before January 1st 2027?

    No @1.15         Yes @6.00    
█████
██
██







██
██
██████

  CHECK MORE > 
ABCbits
Legendary
*
Offline Offline

Activity: 3570
Merit: 9887



View Profile
February 01, 2025, 08:58:58 AM
Merited by vapourminer (4), pooya87 (4), hosemary (2), apogio (1)
 #3

I would like to parse the blockchain and generate a list of all non zero wallets and their balances, and depending on how easy it is, add some extra columns of data like last in, last out, public key (if applicable). A pretty common project. Just like the bitcoin rich list websites that used to let you sort by column and were a bit more flexible than todays sites.

I was just wondering if you guys might share how you would do this. Or point me in the right direction.

I have a fully synchronized bitcoin core running and an RPC server enabled.
--snip--

If you need real-time update, how about running one of Electrum server implementation? All of them have address index. You just need to either access their database directly or communicate with it using Electrum protocol[1].

[1] https://electrumx-spesmilo.readthedocs.io/en/latest/protocol.html

███████████████████████████
███████▄████████████▄██████
████████▄████████▄████████
███▀█████▀▄███▄▀█████▀███
█████▀█▀▄██▀▀▀██▄▀█▀█████
███████▄███████████▄███████
███████████████████████████
███████▀███████████▀███████
████▄██▄▀██▄▄▄██▀▄██▄████
████▄████▄▀███▀▄████▄████
██▄███▀▀█▀██████▀█▀███▄███
██▀█▀████████████████▀█▀███
███████████████████████████
.
.Duelbits PREDICT..
█████████████████████████
█████████████████████████
███████████▀▀░░░░▀▀██████
██████████░░▄████▄░░████
█████████░░████████░░████
█████████░░████████░░████
█████████▄▀██████▀▄████
████████▀▀░░░▀▀▀▀░░▄█████
██████▀░░░░██▄▄▄▄████████
████▀░░░░▄███████████████
█████▄▄█████████████████
█████████████████████████
█████████████████████████
.
.WHERE EVERYTHING IS A MARKET..
█████
██
██







██
██
██████
Will Bitcoin hit $200,000
before January 1st 2027?

    No @1.15         Yes @6.00    
█████
██
██







██
██
██████

  CHECK MORE > 
Tllsldr (OP)
Newbie
*
Offline Offline

Activity: 6
Merit: 0


View Profile
February 02, 2025, 08:43:54 PM
 #4

list of all non zero wallets and their balances,
Not wallets, but addresses.

Quote
I have a fully synchronized bitcoin core running and an RPC server enabled.
I don't think RPC is the most efficient way of doing that. If the node is synched then you should try to manually read the files from disk and parse them instead.

Quote
Code:
            # Typical P2PKH => 76a914{20-byte}88ac
            if hex_str.startswith("76a914") and hex_str.endswith("88ac") and len(hex_str) >= 52:
You used >= in a couple of places like this which is wrong. The length of standard output scripts is fixed. Like P2PKH which is 25 bytes (50 hex). If the length is anything else then it is non-standard and there is no address equivalent for it and they may even be unspendable.

Quote
Code:
def parse_blockchain():
            best_height = rpc_test.getblockcount()
                block_hash = rpc.getblockhash(height)
                block_data = rpc.getblock(block_hash, 2)
                block_time = datetime.fromtimestamp(block_data["time"], timezone.utc)

                # Process TXs
                for tx in block_data["tx"]:
                    txid = tx["txid"]
If I understood your code correctly, you are fetching each block then processing them creating a database. Your full node has already done that and created a database of UTXOs named chainstate. You have to check bitcoin core to see how this file is stored. As I said before, you should read this file and parse that if you want the current balance of each addresses instead of processing all blocks from #1 yourself fetching them from RPC one by one.


... or you can skip all that and get the same result you are looking for from here: https://bitcointalk.org/index.php?topic=5254914.0

Thank you that topic seems to be what I'm looking for. A bit disappointing I couldn't figure out how to do it myself though.
LoyceV
Legendary
*
Offline Offline

Activity: 4004
Merit: 21520


Thick-Skinned Gang Leader and Golden Feather 2021


View Profile WWW
March 01, 2025, 10:35:06 AM
 #5

A bit disappointing I couldn't figure out how to do it myself though.
I've seen several options to export the UTXO list, but never had to use them myself. Bitcoin-utxo-dump for instance (use at your own risk, do your own research).

¡uʍop ǝpᴉsdn pɐǝɥ ɹnoʎ ɥʇᴉʍ ʎuunɟ ʞool no⅄
Pages: [1]
  Print  
 
Jump to:  

Powered by MySQL Powered by PHP Powered by SMF 1.1.19 | SMF © 2006-2009, Simple Machines Valid XHTML 1.0! Valid CSS!