Peer-to-Peer Networking Implementation

Exercise: Peer-to-Peer Networking Implementation

Difficulty - Advanced

Estimated Time - 5-7 hours

Learning Objectives

  • Understand peer-to-peer network architecture and protocols
  • Implement peer discovery and connection management
  • Master gossip protocols for message propagation
  • Design distributed hash tables for decentralized storage
  • Handle network partitions and peer churn
  • Build resilient distributed systems without central coordination

Problem Statement

Traditional client-server architectures have single points of failure and scalability bottlenecks. Peer-to-peer networks distribute functionality across all participants, creating resilient, censorship-resistant, and horizontally scalable systems.

In a P2P network, every node is both a client and server. Nodes must discover each other, maintain connections, route messages, and reach consensus—all without central coordination. This is the architecture powering Bitcoin, BitTorrent, IPFS, and many blockchain networks.

Real-World Scenario:

Consider a file-sharing application like BitTorrent, where files are distributed across thousands of peers:

 1// Client-Server: Single point of failure
 2type CentralizedDownload struct {
 3    server string  // If server fails, download stops
 4}
 5
 6func Download(file string) error {
 7    return downloadFrom(cd.server, file)  // Bottleneck!
 8}
 9
10// P2P: Resilient and scalable
11type P2PDownload struct {
12    peers map[string]*Peer  // Download from many peers
13    dht   *DHT              // Decentralized file location
14}
15
16func Download(file string) error {
17    // Find peers with file using DHT
18    peers := p2p.dht.FindPeers(hash(file))
19
20    // Download chunks from multiple peers in parallel
21    return p2p.downloadFromPeers(file, peers)
22}

If 90% of peers fail, the P2P network continues functioning. The client-server approach has 100% downtime if the server fails.

Requirements

Functional Requirements

  1. Peer Discovery

    • Bootstrap from seed nodes
    • Discover new peers through existing connections
    • Maintain routing table of known peers
    • Implement peer exchange protocol
    • Handle NAT traversal and firewall issues
  2. Connection Management

    • Establish TCP connections between peers
    • Handshake protocol with version negotiation
    • Maintain active peer connections
    • Automatic reconnection on failure
    • Connection limits and peer selection
  3. Message Protocol

    • Define binary message format with headers
    • Message types: PING/PONG, PEER_EXCHANGE, DATA, QUERY
    • Message routing and forwarding
    • Request-response pattern with timeouts
    • Message validation and authentication
  4. Gossip Protocol

    • Broadcast messages to network efficiently
    • Prevent message flooding with seen cache
    • Configurable fanout and TTL
    • Probabilistic message propagation
    • Eventual consistency guarantees
  5. Distributed Hash Table

    • Store and retrieve key-value pairs across network
    • Kademlia-style routing
    • k-bucket organization for peer routing
    • Iterative lookup algorithm
    • Node join/leave handling
  6. Data Replication

    • Replicate data across multiple peers
    • Configurable replication factor
    • Handle peer failures gracefully
    • Data consistency guarantees
    • Merkle tree verification

Non-Functional Requirements

  • Scalability: Support 100-10,000 peers
  • Resilience: Tolerate 50% peer failure
  • Performance: <100ms latency for local peers, <1s for DHT lookups
  • Security: Authenticated connections, prevent Sybil attacks
  • Efficiency: Minimize bandwidth usage, optimize routing

Background and Theory

Peer-to-peer networking emerged in the late 1990s with Napster, evolved through Gnutella and BitTorrent, and now powers modern blockchain networks.

P2P Network Topologies

Unstructured P2P: Random connections, flooding for queries

  • Examples: Gnutella, early Bitcoin
  • Simple but inefficient search)

Structured P2P: Organized overlay network

  • Examples: Kademlia, Chord, Pastry
  • Efficient O(log n) routing

Hybrid: Structured backbone with unstructured edges

  • Examples: Modern Bitcoin, Ethereum
  • Balance efficiency and resilience

Kademlia DHT

Kademlia uses XOR metric for distance between node IDs:

distance(A, B) = A XOR B

Properties:

  • Distance is symmetric: d(A,B) = d(B,A)
  • Distance to self is zero: d(A,A) = 0
  • Triangle inequality: d(A,C) ≤ d(A,B) + d(B,C)

K-Buckets:

  • Each node maintains k-buckets for different distance ranges
  • Bucket i contains peers with distance [2^i, 2^(i+1))
  • k=20 is typical

Node Lookup:

  1. Find k closest nodes to target from local buckets
  2. Query those nodes for their k closest
  3. Recursively query closer nodes
  4. Converges in O(log n) steps

Gossip Protocols

Also called epidemic protocols—information spreads like a disease:

Algorithm:

  1. Node receives message
  2. With probability p, forward to k random neighbors
  3. Track seen messages to prevent loops
  4. Repeat until TTL expires

Properties:

  • Eventually consistent
  • Probabilistically reliable
  • Scales to large networks
  • Resilient to failures

Use Cases

  1. Blockchain: Bitcoin, Ethereum transaction propagation
  2. File Sharing: BitTorrent distributed downloads
  3. Content Delivery: IPFS decentralized storage
  4. Messaging: Secure P2P chat
  5. Computing: Distributed computation
  6. Gaming: Decentralized multiplayer servers
  7. IoT: Device-to-device communication

Implementation Challenges

Challenge 1: NAT Traversal

Most peers are behind NAT/firewalls and cannot accept incoming connections.

Solution Approach:

  • Use STUN servers to discover public IP/port
  • Implement hole punching for direct connections
  • Fall back to relay servers when direct fails
  • Support UPnP for automatic port forwarding

Challenge 2: Peer Churn

Nodes constantly join and leave. Connections fail frequently.

Solution Approach:

  • Implement exponential backoff for reconnection
  • Maintain multiple redundant connections
  • Periodic peer discovery to refresh routing table
  • Graceful degradation when peers leave

Challenge 3: Message Amplification

Naive gossip can cause message storms.

Solution Approach:

  • Track seen message IDs
  • Implement TTL counter
  • Limit fanout
  • Use probabilistic forwarding

Challenge 4: Sybil Attacks

Malicious actor creates many fake identities to control network.

Solution Approach:

  • Proof of work for peer ID generation
  • Reputation systems and trust metrics
  • Limit connections from same IP range
  • Require stake or payment for participation

Hints

Hint 1: Peer Data Structure

Represent each peer with connection and metadata:

 1type Peer struct {
 2    ID        []byte        // Unique peer identifier
 3    Address   string        // IP:Port
 4    Conn      net.Conn      // TCP connection
 5    LastSeen  time.Time     // For timeout detection
 6    Version   uint32        // Protocol version
 7
 8    outgoing  chan *Message // Send queue
 9    incoming  chan *Message // Receive queue
10
11    mu        sync.RWMutex
12    connected bool
13}
14
15type PeerTable struct {
16    peers   map[string]*Peer
17    maxConn int
18    mu      sync.RWMutex
19}
Hint 2: Message Protocol

Define binary message format:

 1type MessageType uint8
 2
 3const (
 4    MsgPing MessageType = iota
 5    MsgPong
 6    MsgPeerExchange
 7    MsgData
 8    MsgQuery
 9    MsgQueryResponse
10)
11
12type Message struct {
13    Type      MessageType
14    ID        uint64        // Unique message ID
15    Timestamp uint64        // Unix timestamp
16    TTL       uint8         // Time-to-live
17    Payload   []byte        // Message data
18}
19
20func Serialize() []byte {
21    buf := new(bytes.Buffer)
22    binary.Write(buf, binary.BigEndian, m.Type)
23    binary.Write(buf, binary.BigEndian, m.ID)
24    binary.Write(buf, binary.BigEndian, m.Timestamp)
25    binary.Write(buf, binary.BigEndian, m.TTL)
26    binary.Write(buf, binary.BigEndian, uint32(len(m.Payload)))
27    buf.Write(m.Payload)
28    return buf.Bytes()
29}
Hint 3: Gossip Implementation

Implement efficient message propagation:

 1type GossipManager struct {
 2    peers     *PeerTable
 3    seenMsgs  *bloomfilter.BloomFilter  // Track seen messages
 4    fanout    int                       // How many peers to forward to
 5    mu        sync.Mutex
 6}
 7
 8func Broadcast(msg *Message) {
 9    gm.mu.Lock()
10    defer gm.mu.Unlock()
11
12    // Check if already seen
13    msgID := fmt.Sprintf("%d", msg.ID)
14    if gm.seenMsgs.MayContainString(msgID) {
15        return  // Already propagated
16    }
17    gm.seenMsgs.AddString(msgID)
18
19    // Decrement TTL
20    if msg.TTL == 0 {
21        return  // Expired
22    }
23    msg.TTL--
24
25    // Forward to random subset of peers
26    selectedPeers := gm.peers.RandomPeers(gm.fanout)
27    for _, peer := range selectedPeers {
28        peer.Send(msg)
29    }
30}
Hint 4: DHT K-Buckets

Organize peers by XOR distance:

 1type KBucket struct {
 2    peers []*Peer
 3    k     int  // Bucket size
 4    mu    sync.RWMutex
 5}
 6
 7type RoutingTable struct {
 8    selfID  []byte
 9    buckets [256]*KBucket  // One bucket per bit position
10}
11
12func distance(peerID []byte) []byte {
13    // XOR distance
14    dist := make([]byte, len(rt.selfID))
15    for i := range dist {
16        dist[i] = rt.selfID[i] ^ peerID[i]
17    }
18    return dist
19}
20
21func bucketIndex(peerID []byte) int {
22    dist := rt.distance(peerID)
23
24    // Find first non-zero bit
25    for i := 0; i < len(dist); i++ {
26        for bit := 7; bit >= 0; bit-- {
27            if)) != 0 {
28                return i*8 +
29            }
30        }
31    }
32    return 0
33}
34
35func AddPeer(peer *Peer) {
36    idx := rt.bucketIndex(peer.ID)
37    rt.buckets[idx].Add(peer)
38}
Hint 5: DHT Lookup

Iterative node lookup algorithm:

 1func Lookup(targetID []byte) []*Peer {
 2    // Start with k closest nodes from routing table
 3    closest := dht.routingTable.FindClosest(targetID, dht.k)
 4
 5    queried := make(map[string]bool)
 6    var found []*Peer
 7
 8    for len(closest) > 0 {
 9        // Query next closest node
10        peer := closest[0]
11        closest = closest[1:]
12
13        if queried[string(peer.ID)] {
14            continue
15        }
16        queried[string(peer.ID)] = true
17
18        // Ask peer for its k closest nodes
19        peers := dht.queryPeer(peer, targetID)
20
21        // Add newly discovered peers
22        for _, p := range peers {
23            if !queried[string(p.ID)] {
24                closest = append(closest, p)
25                found = append(found, p)
26            }
27        }
28
29        // Sort by distance to target
30        sort.Slice(closest, func(i, j int) bool {
31            return xorDistance(closest[i].ID, targetID) <
32                   xorDistance(closest[j].ID, targetID)
33        })
34
35        // Keep only k closest
36        if len(closest) > dht.k {
37            closest = closest[:dht.k]
38        }
39
40        // Converged if closest nodes haven't changed
41        if len(peers) == 0 {
42            break
43        }
44    }
45
46    return found[:min(dht.k, len(found))]
47}

Solution

Show Complete Solution

Approach

This implementation provides a complete P2P network with:

  1. Network Layer: TCP connections with handshake protocol
  2. Peer Management: Discovery, connection pooling, health checks
  3. Message Protocol: Binary serialization with routing
  4. Gossip: Epidemic broadcast with anti-flooding
  5. DHT: Kademlia-style distributed hash table
  6. Application: Simple distributed key-value store

The implementation prioritizes clarity and correctness, suitable for educational purposes and as a foundation for production systems.

Implementation

  1package p2pnet
  2
  3import (
  4	"bytes"
  5	"crypto/rand"
  6	"crypto/sha256"
  7	"encoding/binary"
  8	"encoding/json"
  9	"errors"
 10	"fmt"
 11	"io"
 12	"net"
 13	"sort"
 14	"sync"
 15	"time"
 16)
 17
 18// Peer represents a network peer
 19type Peer struct {
 20	ID       []byte
 21	Address  string
 22	conn     net.Conn
 23	lastSeen time.Time
 24	version  uint32
 25
 26	outgoing chan *Message
 27	incoming chan *Message
 28
 29	mu        sync.RWMutex
 30	connected bool
 31}
 32
 33// NewPeer creates a new peer
 34func NewPeer(address string) *Peer {
 35	id := generatePeerID(address)
 36	return &Peer{
 37		ID:       id,
 38		Address:  address,
 39		lastSeen: time.Now(),
 40		version:  1,
 41		outgoing: make(chan *Message, 100),
 42		incoming: make(chan *Message, 100),
 43	}
 44}
 45
 46// Connect establishes connection to peer
 47func Connect() error {
 48	p.mu.Lock()
 49	defer p.mu.Unlock()
 50
 51	conn, err := net.DialTimeout("tcp", p.Address, 5*time.Second)
 52	if err != nil {
 53		return err
 54	}
 55
 56	p.conn = conn
 57	p.connected = true
 58	return nil
 59}
 60
 61// Disconnect closes connection
 62func Disconnect() {
 63	p.mu.Lock()
 64	defer p.mu.Unlock()
 65
 66	if p.conn != nil {
 67		p.conn.Close()
 68		p.conn = nil
 69	}
 70	p.connected = false
 71}
 72
 73// Send queues a message to peer
 74func Send(msg *Message) error {
 75	select {
 76	case p.outgoing <- msg:
 77		return nil
 78	default:
 79		return errors.New("send queue full")
 80	}
 81}
 82
 83// IsConnected returns connection status
 84func IsConnected() bool {
 85	p.mu.RLock()
 86	defer p.mu.RUnlock()
 87	return p.connected
 88}
 89
 90// MessageType defines message types
 91type MessageType uint8
 92
 93const (
 94	MsgPing MessageType = iota
 95	MsgPong
 96	MsgPeerExchange
 97	MsgData
 98	MsgQuery
 99	MsgQueryResponse
100	MsgHandshake
101)
102
103// Message represents a P2P message
104type Message struct {
105	Type      MessageType
106	ID        uint64
107	Timestamp uint64
108	TTL       uint8
109	SenderID  []byte
110	Payload   []byte
111}
112
113// Serialize converts message to bytes
114func Serialize() {
115	buf := new(bytes.Buffer)
116
117	binary.Write(buf, binary.BigEndian, m.Type)
118	binary.Write(buf, binary.BigEndian, m.ID)
119	binary.Write(buf, binary.BigEndian, m.Timestamp)
120	binary.Write(buf, binary.BigEndian, m.TTL)
121
122	binary.Write(buf, binary.BigEndian, uint16(len(m.SenderID)))
123	buf.Write(m.SenderID)
124
125	binary.Write(buf, binary.BigEndian, uint32(len(m.Payload)))
126	buf.Write(m.Payload)
127
128	return buf.Bytes(), nil
129}
130
131// DeserializeMessage reads message from bytes
132func DeserializeMessage(data []byte) {
133	buf := bytes.NewReader(data)
134	msg := &Message{}
135
136	binary.Read(buf, binary.BigEndian, &msg.Type)
137	binary.Read(buf, binary.BigEndian, &msg.ID)
138	binary.Read(buf, binary.BigEndian, &msg.Timestamp)
139	binary.Read(buf, binary.BigEndian, &msg.TTL)
140
141	var senderIDLen uint16
142	binary.Read(buf, binary.BigEndian, &senderIDLen)
143	msg.SenderID = make([]byte, senderIDLen)
144	buf.Read(msg.SenderID)
145
146	var payloadLen uint32
147	binary.Read(buf, binary.BigEndian, &payloadLen)
148	msg.Payload = make([]byte, payloadLen)
149	buf.Read(msg.Payload)
150
151	return msg, nil
152}
153
154// PeerTable manages connected peers
155type PeerTable struct {
156	peers   map[string]*Peer
157	maxConn int
158	mu      sync.RWMutex
159}
160
161// NewPeerTable creates a peer table
162func NewPeerTable(maxConn int) *PeerTable {
163	return &PeerTable{
164		peers:   make(map[string]*Peer),
165		maxConn: maxConn,
166	}
167}
168
169// Add adds a peer
170func Add(peer *Peer) bool {
171	pt.mu.Lock()
172	defer pt.mu.Unlock()
173
174	if len(pt.peers) >= pt.maxConn {
175		return false
176	}
177
178	pt.peers[string(peer.ID)] = peer
179	return true
180}
181
182// Remove removes a peer
183func Remove(peerID []byte) {
184	pt.mu.Lock()
185	defer pt.mu.Unlock()
186	delete(pt.peers, string(peerID))
187}
188
189// Get retrieves a peer
190func Get(peerID []byte) {
191	pt.mu.RLock()
192	defer pt.mu.RUnlock()
193	peer, ok := pt.peers[string(peerID)]
194	return peer, ok
195}
196
197// All returns all peers
198func All() []*Peer {
199	pt.mu.RLock()
200	defer pt.mu.RUnlock()
201
202	peers := make([]*Peer, 0, len(pt.peers))
203	for _, p := range pt.peers {
204		peers = append(peers, p)
205	}
206	return peers
207}
208
209// RandomPeers returns n random peers
210func RandomPeers(n int) []*Peer {
211	all := pt.All()
212	if n > len(all) {
213		n = len(all)
214	}
215
216	// Simple random selection
217	selected := make([]*Peer, 0, n)
218	for i := 0; i < n && i < len(all); i++ {
219		selected = append(selected, all[i])
220	}
221	return selected
222}
223
224// Count returns number of peers
225func Count() int {
226	pt.mu.RLock()
227	defer pt.mu.RUnlock()
228	return len(pt.peers)
229}
230
231// Node represents a P2P network node
232type Node struct {
233	ID        []byte
234	Address   string
235	peers     *PeerTable
236	dht       *DHT
237	gossip    *GossipManager
238	listener  net.Listener
239
240	handlers  map[MessageType]MessageHandler
241
242	stopChan  chan struct{}
243	wg        sync.WaitGroup
244}
245
246// MessageHandler processes messages
247type MessageHandler func(*Node, *Peer, *Message) error
248
249// NewNode creates a P2P node
250func NewNode(address string, maxPeers int) *Node {
251	id := generatePeerID(address)
252
253	node := &Node{
254		ID:       id,
255		Address:  address,
256		peers:    NewPeerTable(maxPeers),
257		handlers: make(map[MessageType]MessageHandler),
258		stopChan: make(chan struct{}),
259	}
260
261	node.dht = NewDHT(node, 20)
262	node.gossip = NewGossipManager(node, 4)
263
264	// Register default handlers
265	node.RegisterHandler(MsgPing, handlePing)
266	node.RegisterHandler(MsgPong, handlePong)
267	node.RegisterHandler(MsgPeerExchange, handlePeerExchange)
268
269	return node
270}
271
272// Start begins node operations
273func Start() error {
274	listener, err := net.Listen("tcp", n.Address)
275	if err != nil {
276		return err
277	}
278
279	n.listener = listener
280
281	n.wg.Add(2)
282	go n.acceptConnections()
283	go n.maintainPeers()
284
285	return nil
286}
287
288// Stop halts node operations
289func Stop() {
290	close(n.stopChan)
291	if n.listener != nil {
292		n.listener.Close()
293	}
294
295	// Disconnect all peers
296	for _, peer := range n.peers.All() {
297		peer.Disconnect()
298	}
299
300	n.wg.Wait()
301}
302
303// acceptConnections handles incoming peer connections
304func acceptConnections() {
305	defer n.wg.Done()
306
307	for {
308		conn, err := n.listener.Accept()
309		if err != nil {
310			select {
311			case <-n.stopChan:
312				return
313			default:
314				continue
315			}
316		}
317
318		go n.handleConnection(conn)
319	}
320}
321
322// handleConnection processes peer connection
323func handleConnection(conn net.Conn) {
324	// Perform handshake
325	peer, err := n.handshake(conn)
326	if err != nil {
327		conn.Close()
328		return
329	}
330
331	peer.conn = conn
332	peer.connected = true
333
334	// Add to peer table
335	if !n.peers.Add(peer) {
336		peer.Disconnect()
337		return
338	}
339
340	// Handle messages
341	n.wg.Add(2)
342	go n.handlePeerMessages(peer)
343	go n.sendPeerMessages(peer)
344}
345
346// handshake performs connection handshake
347func handshake(conn net.Conn) {
348	// Send our ID
349	conn.Write(n.ID)
350
351	// Receive peer ID
352	peerID := make([]byte, 32)
353	_, err := io.ReadFull(conn, peerID)
354	if err != nil {
355		return nil, err
356	}
357
358	peer := &Peer{
359		ID:       peerID,
360		Address:  conn.RemoteAddr().String(),
361		lastSeen: time.Now(),
362		outgoing: make(chan *Message, 100),
363		incoming: make(chan *Message, 100),
364	}
365
366	return peer, nil
367}
368
369// handlePeerMessages reads messages from peer
370func handlePeerMessages(peer *Peer) {
371	defer n.wg.Done()
372	defer peer.Disconnect()
373
374	for {
375		// Read message length
376		var msgLen uint32
377		err := binary.Read(peer.conn, binary.BigEndian, &msgLen)
378		if err != nil {
379			return
380		}
381
382		// Read message data
383		data := make([]byte, msgLen)
384		_, err = io.ReadFull(peer.conn, data)
385		if err != nil {
386			return
387		}
388
389		// Deserialize message
390		msg, err := DeserializeMessage(data)
391		if err != nil {
392			continue
393		}
394
395		// Process message
396		if handler, ok := n.handlers[msg.Type]; ok {
397			handler(n, peer, msg)
398		}
399
400		peer.lastSeen = time.Now()
401	}
402}
403
404// sendPeerMessages writes queued messages to peer
405func sendPeerMessages(peer *Peer) {
406	defer n.wg.Done()
407
408	for {
409		select {
410		case msg := <-peer.outgoing:
411			data, err := msg.Serialize()
412			if err != nil {
413				continue
414			}
415
416			// Write message length
417			binary.Write(peer.conn, binary.BigEndian, uint32(len(data)))
418
419			// Write message data
420			_, err = peer.conn.Write(data)
421			if err != nil {
422				return
423			}
424
425		case <-n.stopChan:
426			return
427		}
428	}
429}
430
431// maintainPeers periodic peer maintenance
432func maintainPeers() {
433	defer n.wg.Done()
434
435	ticker := time.NewTicker(30 * time.Second)
436	defer ticker.Stop()
437
438	for {
439		select {
440		case <-ticker.C:
441			n.checkPeerHealth()
442		case <-n.stopChan:
443			return
444		}
445	}
446}
447
448// checkPeerHealth removes stale peers
449func checkPeerHealth() {
450	timeout := 2 * time.Minute
451	now := time.Now()
452
453	for _, peer := range n.peers.All() {
454		if now.Sub(peer.lastSeen) > timeout {
455			peer.Disconnect()
456			n.peers.Remove(peer.ID)
457		}
458	}
459}
460
461// Connect connects to a peer
462func Connect(address string) error {
463	peer := NewPeer(address)
464
465	if err := peer.Connect(); err != nil {
466		return err
467	}
468
469	// Perform handshake
470	peer.conn.Write(n.ID)
471
472	peerID := make([]byte, 32)
473	_, err := io.ReadFull(peer.conn, peerID)
474	if err != nil {
475		peer.Disconnect()
476		return err
477	}
478
479	peer.ID = peerID
480
481	if !n.peers.Add(peer) {
482		peer.Disconnect()
483		return errors.New("peer table full")
484	}
485
486	n.wg.Add(2)
487	go n.handlePeerMessages(peer)
488	go n.sendPeerMessages(peer)
489
490	return nil
491}
492
493// Broadcast sends message to all peers
494func Broadcast(msg *Message) {
495	msg.SenderID = n.ID
496	n.gossip.Broadcast(msg)
497}
498
499// RegisterHandler registers a message handler
500func RegisterHandler(msgType MessageType, handler MessageHandler) {
501	n.handlers[msgType] = handler
502}
503
504// Default message handlers
505
506func handlePing(n *Node, peer *Peer, msg *Message) error {
507	pong := &Message{
508		Type:      MsgPong,
509		ID:        msg.ID,
510		Timestamp: uint64(time.Now().Unix()),
511		SenderID:  n.ID,
512	}
513	return peer.Send(pong)
514}
515
516func handlePong(n *Node, peer *Peer, msg *Message) error {
517	// Update peer last seen
518	peer.lastSeen = time.Now()
519	return nil
520}
521
522func handlePeerExchange(n *Node, peer *Peer, msg *Message) error {
523	// Send list of known peers
524	var peerAddrs []string
525	for _, p := range n.peers.RandomPeers(10) {
526		peerAddrs = append(peerAddrs, p.Address)
527	}
528
529	payload, _ := json.Marshal(peerAddrs)
530	response := &Message{
531		Type:      MsgPeerExchange,
532		ID:        generateMessageID(),
533		Timestamp: uint64(time.Now().Unix()),
534		SenderID:  n.ID,
535		Payload:   payload,
536	}
537
538	return peer.Send(response)
539}
540
541// GossipManager handles message propagation
542type GossipManager struct {
543	node     *Node
544	fanout   int
545	seenMsgs map[uint64]bool
546	mu       sync.Mutex
547}
548
549// NewGossipManager creates gossip manager
550func NewGossipManager(node *Node, fanout int) *GossipManager {
551	return &GossipManager{
552		node:     node,
553		fanout:   fanout,
554		seenMsgs: make(map[uint64]bool),
555	}
556}
557
558// Broadcast propagates message to network
559func Broadcast(msg *Message) {
560	gm.mu.Lock()
561	defer gm.mu.Unlock()
562
563	// Check if already seen
564	if gm.seenMsgs[msg.ID] {
565		return
566	}
567	gm.seenMsgs[msg.ID] = true
568
569	// Check TTL
570	if msg.TTL == 0 {
571		msg.TTL = 10 // Default TTL
572	} else {
573		msg.TTL--
574		if msg.TTL == 0 {
575			return
576		}
577	}
578
579	// Forward to random peers
580	peers := gm.node.peers.RandomPeers(gm.fanout)
581	for _, peer := range peers {
582		peer.Send(msg)
583	}
584}
585
586// DHT implements distributed hash table
587type DHT struct {
588	node         *Node
589	k            int // Bucket size
590	routingTable *RoutingTable
591	storage      map[string][]byte
592	mu           sync.RWMutex
593}
594
595// NewDHT creates DHT
596func NewDHT(node *Node, k int) *DHT {
597	return &DHT{
598		node:         node,
599		k:            k,
600		routingTable: NewRoutingTable(node.ID, k),
601		storage:      make(map[string][]byte),
602	}
603}
604
605// Store stores key-value pair
606func Store(key string, value []byte) error {
607	dht.mu.Lock()
608	defer dht.mu.Unlock()
609
610	dht.storage[key] = value
611	return nil
612}
613
614// Get retrieves value by key
615func Get(key string) {
616	dht.mu.RLock()
617	defer dht.mu.RUnlock()
618
619	value, ok := dht.storage[key]
620	return value, ok
621}
622
623// Lookup finds nodes closest to key
624func Lookup(keyHash []byte) []*Peer {
625	return dht.routingTable.FindClosest(keyHash, dht.k)
626}
627
628// RoutingTable manages peer routing
629type RoutingTable struct {
630	selfID  []byte
631	buckets map[int][]*Peer
632	k       int
633	mu      sync.RWMutex
634}
635
636// NewRoutingTable creates routing table
637func NewRoutingTable(selfID []byte, k int) *RoutingTable {
638	return &RoutingTable{
639		selfID:  selfID,
640		buckets: make(map[int][]*Peer),
641		k:       k,
642	}
643}
644
645// AddPeer adds peer to routing table
646func AddPeer(peer *Peer) {
647	rt.mu.Lock()
648	defer rt.mu.Unlock()
649
650	bucketIdx := rt.bucketIndex(peer.ID)
651
652	bucket := rt.buckets[bucketIdx]
653	if len(bucket) < rt.k {
654		rt.buckets[bucketIdx] = append(bucket, peer)
655	}
656}
657
658// FindClosest finds k closest peers to target
659func FindClosest(target []byte, k int) []*Peer {
660	rt.mu.RLock()
661	defer rt.mu.RUnlock()
662
663	var allPeers []*Peer
664	for _, bucket := range rt.buckets {
665		allPeers = append(allPeers, bucket...)
666	}
667
668	// Sort by distance
669	sort.Slice(allPeers, func(i, j int) bool {
670		distI := xorDistance(allPeers[i].ID, target)
671		distJ := xorDistance(allPeers[j].ID, target)
672		return compareBytes(distI, distJ) < 0
673	})
674
675	if len(allPeers) > k {
676		return allPeers[:k]
677	}
678	return allPeers
679}
680
681func bucketIndex(peerID []byte) int {
682	dist := xorDistance(rt.selfID, peerID)
683
684	// Find first set bit
685	for i := 0; i < len(dist); i++ {
686		if dist[i] != 0 {
687			for bit := 7; bit >= 0; bit-- {
688				if)) != 0 {
689					return i*8 +
690				}
691			}
692		}
693	}
694	return 0
695}
696
697// Helper functions
698
699func generatePeerID(seed string) []byte {
700	hash := sha256.Sum256([]byte(seed + time.Now().String()))
701	return hash[:]
702}
703
704func generateMessageID() uint64 {
705	b := make([]byte, 8)
706	rand.Read(b)
707	return binary.BigEndian.Uint64(b)
708}
709
710func xorDistance(a, b []byte) []byte {
711	dist := make([]byte, len(a))
712	for i := range dist {
713		dist[i] = a[i] ^ b[i]
714	}
715	return dist
716}
717
718func compareBytes(a, b []byte) int {
719	for i := 0; i < len(a) && i < len(b); i++ {
720		if a[i] < b[i] {
721			return -1
722		} else if a[i] > b[i] {
723			return 1
724		}
725	}
726	return 0
727}

Testing

  1package p2pnet
  2
  3import (
  4	"fmt"
  5	"testing"
  6	"time"
  7)
  8
  9func TestNodeCreation(t *testing.T) {
 10	node := NewNode("localhost:8001", 50)
 11
 12	if node == nil {
 13		t.Fatal("Failed to create node")
 14	}
 15
 16	if len(node.ID) != 32 {
 17		t.Error("Node ID should be 32 bytes")
 18	}
 19}
 20
 21func TestNodeStartStop(t *testing.T) {
 22	node := NewNode("localhost:8002", 50)
 23
 24	err := node.Start()
 25	if err != nil {
 26		t.Fatalf("Failed to start node: %v", err)
 27	}
 28
 29	time.Sleep(100 * time.Millisecond)
 30
 31	node.Stop()
 32}
 33
 34func TestPeerConnection(t *testing.T) {
 35	// Create two nodes
 36	node1 := NewNode("localhost:8003", 50)
 37	node2 := NewNode("localhost:8004", 50)
 38
 39	node1.Start()
 40	node2.Start()
 41	defer node1.Stop()
 42	defer node2.Stop()
 43
 44	time.Sleep(100 * time.Millisecond)
 45
 46	// Node1 connects to Node2
 47	err := node1.Connect("localhost:8004")
 48	if err != nil {
 49		t.Fatalf("Failed to connect: %v", err)
 50	}
 51
 52	time.Sleep(100 * time.Millisecond)
 53
 54	if node1.peers.Count() != 1 {
 55		t.Error("Node1 should have 1 peer")
 56	}
 57}
 58
 59func TestMessageBroadcast(t *testing.T) {
 60	// Create 3-node network
 61	nodes := make([]*Node, 3)
 62	addresses := []string{
 63		"localhost:8005",
 64		"localhost:8006",
 65		"localhost:8007",
 66	}
 67
 68	for i, addr := range addresses {
 69		nodes[i] = NewNode(addr, 50)
 70		nodes[i].Start()
 71		defer nodes[i].Stop()
 72	}
 73
 74	time.Sleep(100 * time.Millisecond)
 75
 76	// Connect nodes in a chain
 77	nodes[0].Connect(addresses[1])
 78	nodes[1].Connect(addresses[2])
 79
 80	time.Sleep(100 * time.Millisecond)
 81
 82	// Track received messages
 83	received := make(map[string]bool)
 84	var mu sync.Mutex
 85
 86	for i, node := range nodes {
 87		idx := i
 88		node.RegisterHandler(MsgData, func(n *Node, p *Peer, m *Message) error {
 89			mu.Lock()
 90			defer mu.Unlock()
 91			received[fmt.Sprintf("node%d", idx)] = true
 92			return nil
 93		})
 94	}
 95
 96	// Broadcast from node 0
 97	msg := &Message{
 98		Type:      MsgData,
 99		ID:        generateMessageID(),
100		Timestamp: uint64(time.Now().Unix()),
101		TTL:       10,
102		Payload:   []byte("Hello P2P"),
103	}
104
105	nodes[0].Broadcast(msg)
106
107	time.Sleep(500 * time.Millisecond)
108
109	// All nodes should receive message
110	if len(received) < 2 {
111		t.Errorf("Expected messages at multiple nodes, got %d", len(received))
112	}
113}
114
115func TestDHTStorage(t *testing.T) {
116	node := NewNode("localhost:8008", 50)
117	node.Start()
118	defer node.Stop()
119
120	// Store and retrieve
121	key := "test-key"
122	value := []byte("test-value")
123
124	node.dht.Store(key, value)
125
126	retrieved, ok := node.dht.Get(key)
127	if !ok {
128		t.Error("Failed to retrieve stored value")
129	}
130
131	if string(retrieved) != string(value) {
132		t.Errorf("Retrieved value mismatch: got %s, want %s",
133			retrieved, value)
134	}
135}
136
137func TestRoutingTable(t *testing.T) {
138	selfID := generatePeerID("self")
139	rt := NewRoutingTable(selfID, 20)
140
141	// Add peers
142	for i := 0; i < 10; i++ {
143		peer := NewPeer(fmt.Sprintf("localhost:%d", 9000+i))
144		rt.AddPeer(peer)
145	}
146
147	// Find closest peers
148	target := generatePeerID("target")
149	closest := rt.FindClosest(target, 5)
150
151	if len(closest) > 5 {
152		t.Errorf("FindClosest returned too many peers: %d", len(closest))
153	}
154}
155
156func BenchmarkMessageSerialization(b *testing.B) {
157	msg := &Message{
158		Type:      MsgData,
159		ID:        12345,
160		Timestamp: uint64(time.Now().Unix()),
161		TTL:       10,
162		SenderID:  make([]byte, 32),
163		Payload:   []byte("test payload"),
164	}
165
166	b.ResetTimer()
167	for i := 0; i < b.N; i++ {
168		msg.Serialize()
169	}
170}
171
172func BenchmarkXORDistance(b *testing.B) {
173	a := make([]byte, 32)
174	b := make([]byte, 32)
175	rand.Read(a)
176	rand.Read(b)
177
178	b.ResetTimer()
179	for i := 0; i < b.N; i++ {
180		xorDistance(a, b)
181	}
182}

Usage Example

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6	"p2pnet"
 7)
 8
 9func main() {
10	// Create P2P network with 5 nodes
11	fmt.Println("=== Creating P2P Network ===")
12
13	nodes := make([]*p2pnet.Node, 5)
14	addresses := make([]string, 5)
15
16	for i := 0; i < 5; i++ {
17		addr := fmt.Sprintf("localhost:%d", 9000+i)
18		addresses[i] = addr
19
20		nodes[i] = p2pnet.NewNode(addr, 50)
21		if err := nodes[i].Start(); err != nil {
22			fmt.Printf("Failed to start node %d: %v\n", i, err)
23			return
24		}
25
26		fmt.Printf("Started node %d at %s\n", i, addr)
27		defer nodes[i].Stop()
28	}
29
30	// Connect nodes in a mesh topology
31	fmt.Println("\n=== Connecting Peers ===")
32	time.Sleep(500 * time.Millisecond)
33
34	// Node 0 connects to 1, 2
35	nodes[0].Connect(addresses[1])
36	nodes[0].Connect(addresses[2])
37
38	// Node 1 connects to 3
39	nodes[1].Connect(addresses[3])
40
41	// Node 2 connects to 4
42	nodes[2].Connect(addresses[4])
43
44	time.Sleep(time.Second)
45
46	// Check peer counts
47	for i, node := range nodes {
48		fmt.Printf("Node %d has %d peers\n", i, node.peers.Count())
49	}
50
51	// Register message handler
52	fmt.Println("\n=== Setting Up Message Handlers ===")
53
54	for i, node := range nodes {
55		idx := i
56		node.RegisterHandler(p2pnet.MsgData, func(n *p2pnet.Node, p *p2pnet.Peer, m *p2pnet.Message) error {
57			fmt.Printf("[Node %d] Received: %s\n", idx, string(m.Payload))
58			return nil
59		})
60	}
61
62	// Broadcast message from node 0
63	fmt.Println("\n=== Broadcasting Message ===")
64
65	msg := &p2pnet.Message{
66		Type:      p2pnet.MsgData,
67		ID:        p2pnet.generateMessageID(),
68		Timestamp: uint64(time.Now().Unix()),
69		TTL:       10,
70		Payload:   []byte("Hello from node 0!"),
71	}
72
73	nodes[0].Broadcast(msg)
74
75	time.Sleep(2 * time.Second)
76
77	// Use DHT to store and retrieve data
78	fmt.Println("\n=== DHT Storage Test ===")
79
80	nodes[0].dht.Store("user:1001", []byte(`{"name":"Alice","age":30}`))
81	nodes[2].dht.Store("user:1002", []byte(`{"name":"Bob","age":25}`))
82
83	// Retrieve from different nodes
84	if data, ok := nodes[0].dht.Get("user:1001"); ok {
85		fmt.Printf("Retrieved from node 0: %s\n", data)
86	}
87
88	if data, ok := nodes[2].dht.Get("user:1002"); ok {
89		fmt.Printf("Retrieved from node 2: %s\n", data)
90	}
91
92	// Simulate network activity
93	fmt.Println("\n=== Network Running ===")
94	time.Sleep(5 * time.Second)
95
96	fmt.Println("\n=== Shutting Down ===")
97}

Testing Your Solution

1. Node Lifecycle

  • Start and stop individual nodes
  • Verify listener is active
  • Clean shutdown without resource leaks

2. Peer Connection

  • Connect two nodes successfully
  • Verify handshake protocol
  • Test connection failure handling

3. Message Routing

  • Send message between peers
  • Verify message serialization/deserialization
  • Test message handlers

4. Gossip Protocol

  • Broadcast to network of 10 nodes
  • Verify all nodes eventually receive message
  • Test anti-flooding

5. DHT Operations

  • Store and retrieve key-value pairs
  • Test routing table organization
  • Verify k-closest peer selection

Edge Cases

  • Network partition
  • All peers disconnect simultaneously
  • Message queue overflow
  • Duplicate peer connections
  • Invalid message format

Bonus Challenges

  1. NAT Traversal: Implement STUN/TURN for firewall traversal

  2. Encryption: Add TLS for secure peer connections

  3. Authentication: Implement challenge-response peer verification

  4. Bandwidth Optimization: Compress messages, batch small messages

  5. Advanced DHT: Implement full Kademlia with iterative lookup

  6. Sybil Protection: Add proof-of-work for peer IDs

  7. Monitoring Dashboard: Web UI showing network topology

Key Takeaways

  • P2P networks eliminate single points of failure through decentralization
  • Peer discovery is bootstrapped from seed nodes then grows organically
  • Gossip protocols achieve eventual consistency with probabilistic delivery
  • DHT enables decentralized key-value storage with O(log n) lookup
  • XOR metric in Kademlia provides efficient routing topology
  • Message flooding must be prevented with seen caches and TTL
  • Peer churn is constant - systems must handle joins/leaves gracefully
  • Blockchains use P2P for transaction and block propagation
  • Scalability comes from horizontal growth - add more peers
  • Security requires defense against Sybil, eclipse, and DDoS attacks

References

  • Kademlia Paper: "Kademlia: A Peer-to-Peer Information System Based on the XOR Metric" by Maymounkov and Mazières
  • BitTorrent Protocol: BEP 0003 specification
  • Bitcoin P2P Protocol: Bitcoin Developer Reference
  • IPFS: InterPlanetary File System white paper
  • Chord DHT: "Chord: A Scalable Peer-to-peer Lookup Service for Internet Applications"
  • Pastry: Microsoft Research distributed routing substrate
  • NAT Traversal: RFC 5389, RFC 5766
  • Gossip Protocols: "Epidemic Algorithms for Replicated Database Maintenance" by Demers et al.