Paulfontrahel (OP)
Newbie
Offline
Activity: 13
Merit: 0
|
 |
March 30, 2025, 06:26:26 PM |
|
I apologize, but can you give me a little more detail? I have a public key, but I want to find the k nonce by signatures, not the private key. Can you, as a newbie in this difficult matter, give me more details? Now I use the following code:
import os import time import gmpy2 from coincurve import PublicKey from multiprocessing import Pool, cpu_count import random import signal from datetime import datetime
# Parameters of the secp256k1 curve n = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141
# Files input_file = "rsz_8.txt" output_file = "k_8.txt"
# Global flag for execution control stop_flag = False
def init_worker(): """Worker initialization for interrupt handling""" signal.signal(signal.SIGINT, signal.SIG_IGN)
def parse_signature_file(filename): """Reading signatures with handling of leading zeros in R and S""" signatures = [] with open(filename, 'r') as f: lines = f.readlines() current_r, current_s, current_z = None, None, None for line in lines: line = line.strip() if line.startswith("R ="): current_r = line.split('=')[1].strip().lower() current_r = current_r.lstrip("0") if len(current_r) > 64: continue current_r = current_r.zfill(64) elif line.startswith("S ="): current_s = line.split('=')[1].strip().lower() current_s = current_s.lstrip("0") if len(current_s) > 64: continue current_s = current_s.zfill(64) elif line.startswith("Z ="): current_z = line.split('=')[1].strip().lower() if current_r and current_s and current_z: signatures.append((current_r, current_s, current_z)) return signatures
def validate_signature(k, r_hex, s_hex, z_hex): """Signature verification for given k (corrected version)""" r = int(r_hex, 16) # Calculate R = k*G directly via coincurve try: # Generate temporary public key from k (this is R = k*G) pubkey = PublicKey.from_secret(k.to_bytes(32, 'big')) # Get x-coordinate of R (first 32 bytes without prefix) expected_r_bytes = pubkey.format()[1:33] expected_r = int.from_bytes(expected_r_bytes, 'big') % n except: return False return expected_r == r
def kangaroo_algorithm(start, end, signatures, pubkey=None): """Kangaroo algorithm with CPU load limitation""" global stop_flag N = 1 << 18 # Number of points jumps = [1 << random.randint(0, 18) for _ in range(16)] last_print_time = time.time() def f(x): return x + jumps[x % len(jumps)] # Kangaroo initialization xT, aT = end, 0 xW, aW = (end - start) // 2, 0 # Main loop with pauses to reduce load for iteration in range(N): if stop_flag: return None xT = f(xT) % n aT += 1 k_candidate = (xT - xW + aW) % n if start <= k_candidate <= end: for r, s, z in signatures: if validate_signature(k_candidate, r, s, z): return k_candidate # Progress output every 5 seconds current_time = time.time() if current_time - last_print_time > 5: progress = (iteration / N) * 100 print(f"[{datetime.now().strftime('%H:%M:%S')}] Progress: {progress:.2f}% | Checked: {iteration}/{N} iterations | Current candidate: {hex(k_candidate)}") last_print_time = current_time # Pause to reduce load (10ms every 1000 iterations) if iteration % 1000 == 0: time.sleep(0.01) # Continue search with pauses while not stop_flag: xW = f(xW) % n aW += 1 for d in range(-10, 10): k_candidate = (xW - xT + aT + d) % n if start <= k_candidate <= end: for r, s, z in signatures: if validate_signature(k_candidate, r, s, z): return k_candidate # Progress output every 5 seconds current_time = time.time() if current_time - last_print_time > 5: print(f"[{datetime.now().strftime('%H:%M:%S')}] Continuing search | Checked: {aW} additional iterations | Current candidate: {hex(k_candidate)}") last_print_time = current_time # Pause to reduce load if aW % 1000 == 0: time.sleep(0.02) return None
def worker(args): """Function for multi-threaded operation with load limitation""" try: chunk_start, chunk_end, signatures, pubkey = args print(f"[{datetime.now().strftime('%H:%M:%S')}] Starting search in range {hex(chunk_start)} - {hex(chunk_end)}") # Artificial speed limit for this worker start_time = time.time() k = kangaroo_algorithm(chunk_start, chunk_end, signatures, pubkey) if k is not None: return [(hex(k), r, s, z) for r, s, z in signatures if validate_signature(k, r, s, z)] except KeyboardInterrupt: pass return []
def main(): """Main function with load control""" global stop_flag try: open(output_file, 'w').close() signatures = parse_signature_file(input_file) if not signatures: print("No signatures to process!") return print(f"[{datetime.now().strftime('%H:%M:%S')}] Loaded {len(signatures)} signatures") pubkey = None # Define search range start_k = n - 2**40 end_k = n - 1 print(f"[{datetime.now().strftime('%H:%M:%S')}] Searching in range {hex(start_k)} - {hex(end_k)}") print(f"[{datetime.now().strftime('%H:%M:%S')}] Using {max(1, cpu_count() // 2)} of {cpu_count()} CPU cores") # Use only part of cores to reduce load num_cores = max(1, cpu_count() // 2) # Use half of the cores chunk_size = (end_k - start_k) // num_cores chunks = [(start_k + i*chunk_size, start_k + (i+1)*chunk_size -1 if i < num_cores-1 else end_k, signatures, pubkey) for i in range(num_cores)] # Start search with process count limitation with Pool(num_cores, initializer=init_worker) as pool: results = pool.imap_unordered(worker, chunks) for res in results: for k, r, s, z in res: with open(output_file, 'a') as f: f.write(f"k: {k}\nR: {r}\nS: {s}\nZ: {z}\n\n") print(f"[{datetime.now().strftime('%H:%M:%S')}] Found k: {k}") # Check stop flag if stop_flag: break except KeyboardInterrupt: print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Received interrupt signal, stopping...") stop_flag = True finally: print(f"[{datetime.now().strftime('%H:%M:%S')}] Execution time: {time.time() - start_time:.2f} sec")
if __name__ == "__main__": start_time = time.time() main()
|