Exercise: Peer-to-Peer Networking Implementation
Difficulty - Advanced
Estimated Time - 5-7 hours
Learning Objectives
- Understand peer-to-peer network architecture and protocols
- Implement peer discovery and connection management
- Master gossip protocols for message propagation
- Design distributed hash tables for decentralized storage
- Handle network partitions and peer churn
- Build resilient distributed systems without central coordination
Problem Statement
Traditional client-server architectures have single points of failure and scalability bottlenecks. Peer-to-peer networks distribute functionality across all participants, creating resilient, censorship-resistant, and horizontally scalable systems.
In a P2P network, every node is both a client and server. Nodes must discover each other, maintain connections, route messages, and reach consensus—all without central coordination. This is the architecture powering Bitcoin, BitTorrent, IPFS, and many blockchain networks.
Real-World Scenario:
Consider a file-sharing application like BitTorrent, where files are distributed across thousands of peers:
1// Client-Server: Single point of failure
2type CentralizedDownload struct {
3 server string // If server fails, download stops
4}
5
6func Download(file string) error {
7 return downloadFrom(cd.server, file) // Bottleneck!
8}
9
10// P2P: Resilient and scalable
11type P2PDownload struct {
12 peers map[string]*Peer // Download from many peers
13 dht *DHT // Decentralized file location
14}
15
16func Download(file string) error {
17 // Find peers with file using DHT
18 peers := p2p.dht.FindPeers(hash(file))
19
20 // Download chunks from multiple peers in parallel
21 return p2p.downloadFromPeers(file, peers)
22}
If 90% of peers fail, the P2P network continues functioning. The client-server approach has 100% downtime if the server fails.
Requirements
Functional Requirements
-
Peer Discovery
- Bootstrap from seed nodes
- Discover new peers through existing connections
- Maintain routing table of known peers
- Implement peer exchange protocol
- Handle NAT traversal and firewall issues
-
Connection Management
- Establish TCP connections between peers
- Handshake protocol with version negotiation
- Maintain active peer connections
- Automatic reconnection on failure
- Connection limits and peer selection
-
Message Protocol
- Define binary message format with headers
- Message types: PING/PONG, PEER_EXCHANGE, DATA, QUERY
- Message routing and forwarding
- Request-response pattern with timeouts
- Message validation and authentication
-
Gossip Protocol
- Broadcast messages to network efficiently
- Prevent message flooding with seen cache
- Configurable fanout and TTL
- Probabilistic message propagation
- Eventual consistency guarantees
-
Distributed Hash Table
- Store and retrieve key-value pairs across network
- Kademlia-style routing
- k-bucket organization for peer routing
- Iterative lookup algorithm
- Node join/leave handling
-
Data Replication
- Replicate data across multiple peers
- Configurable replication factor
- Handle peer failures gracefully
- Data consistency guarantees
- Merkle tree verification
Non-Functional Requirements
- Scalability: Support 100-10,000 peers
- Resilience: Tolerate 50% peer failure
- Performance: <100ms latency for local peers, <1s for DHT lookups
- Security: Authenticated connections, prevent Sybil attacks
- Efficiency: Minimize bandwidth usage, optimize routing
Background and Theory
Peer-to-peer networking emerged in the late 1990s with Napster, evolved through Gnutella and BitTorrent, and now powers modern blockchain networks.
P2P Network Topologies
Unstructured P2P: Random connections, flooding for queries
- Examples: Gnutella, early Bitcoin
- Simple but inefficient search)
Structured P2P: Organized overlay network
- Examples: Kademlia, Chord, Pastry
- Efficient O(log n) routing
Hybrid: Structured backbone with unstructured edges
- Examples: Modern Bitcoin, Ethereum
- Balance efficiency and resilience
Kademlia DHT
Kademlia uses XOR metric for distance between node IDs:
distance(A, B) = A XOR B
Properties:
- Distance is symmetric: d(A,B) = d(B,A)
- Distance to self is zero: d(A,A) = 0
- Triangle inequality: d(A,C) ≤ d(A,B) + d(B,C)
K-Buckets:
- Each node maintains k-buckets for different distance ranges
- Bucket i contains peers with distance [2^i, 2^(i+1))
- k=20 is typical
Node Lookup:
- Find k closest nodes to target from local buckets
- Query those nodes for their k closest
- Recursively query closer nodes
- Converges in O(log n) steps
Gossip Protocols
Also called epidemic protocols—information spreads like a disease:
Algorithm:
- Node receives message
- With probability p, forward to k random neighbors
- Track seen messages to prevent loops
- Repeat until TTL expires
Properties:
- Eventually consistent
- Probabilistically reliable
- Scales to large networks
- Resilient to failures
Use Cases
- Blockchain: Bitcoin, Ethereum transaction propagation
- File Sharing: BitTorrent distributed downloads
- Content Delivery: IPFS decentralized storage
- Messaging: Secure P2P chat
- Computing: Distributed computation
- Gaming: Decentralized multiplayer servers
- IoT: Device-to-device communication
Implementation Challenges
Challenge 1: NAT Traversal
Most peers are behind NAT/firewalls and cannot accept incoming connections.
Solution Approach:
- Use STUN servers to discover public IP/port
- Implement hole punching for direct connections
- Fall back to relay servers when direct fails
- Support UPnP for automatic port forwarding
Challenge 2: Peer Churn
Nodes constantly join and leave. Connections fail frequently.
Solution Approach:
- Implement exponential backoff for reconnection
- Maintain multiple redundant connections
- Periodic peer discovery to refresh routing table
- Graceful degradation when peers leave
Challenge 3: Message Amplification
Naive gossip can cause message storms.
Solution Approach:
- Track seen message IDs
- Implement TTL counter
- Limit fanout
- Use probabilistic forwarding
Challenge 4: Sybil Attacks
Malicious actor creates many fake identities to control network.
Solution Approach:
- Proof of work for peer ID generation
- Reputation systems and trust metrics
- Limit connections from same IP range
- Require stake or payment for participation
Hints
Hint 1: Peer Data Structure
Represent each peer with connection and metadata:
1type Peer struct {
2 ID []byte // Unique peer identifier
3 Address string // IP:Port
4 Conn net.Conn // TCP connection
5 LastSeen time.Time // For timeout detection
6 Version uint32 // Protocol version
7
8 outgoing chan *Message // Send queue
9 incoming chan *Message // Receive queue
10
11 mu sync.RWMutex
12 connected bool
13}
14
15type PeerTable struct {
16 peers map[string]*Peer
17 maxConn int
18 mu sync.RWMutex
19}
Hint 2: Message Protocol
Define binary message format:
1type MessageType uint8
2
3const (
4 MsgPing MessageType = iota
5 MsgPong
6 MsgPeerExchange
7 MsgData
8 MsgQuery
9 MsgQueryResponse
10)
11
12type Message struct {
13 Type MessageType
14 ID uint64 // Unique message ID
15 Timestamp uint64 // Unix timestamp
16 TTL uint8 // Time-to-live
17 Payload []byte // Message data
18}
19
20func Serialize() []byte {
21 buf := new(bytes.Buffer)
22 binary.Write(buf, binary.BigEndian, m.Type)
23 binary.Write(buf, binary.BigEndian, m.ID)
24 binary.Write(buf, binary.BigEndian, m.Timestamp)
25 binary.Write(buf, binary.BigEndian, m.TTL)
26 binary.Write(buf, binary.BigEndian, uint32(len(m.Payload)))
27 buf.Write(m.Payload)
28 return buf.Bytes()
29}
Hint 3: Gossip Implementation
Implement efficient message propagation:
1type GossipManager struct {
2 peers *PeerTable
3 seenMsgs *bloomfilter.BloomFilter // Track seen messages
4 fanout int // How many peers to forward to
5 mu sync.Mutex
6}
7
8func Broadcast(msg *Message) {
9 gm.mu.Lock()
10 defer gm.mu.Unlock()
11
12 // Check if already seen
13 msgID := fmt.Sprintf("%d", msg.ID)
14 if gm.seenMsgs.MayContainString(msgID) {
15 return // Already propagated
16 }
17 gm.seenMsgs.AddString(msgID)
18
19 // Decrement TTL
20 if msg.TTL == 0 {
21 return // Expired
22 }
23 msg.TTL--
24
25 // Forward to random subset of peers
26 selectedPeers := gm.peers.RandomPeers(gm.fanout)
27 for _, peer := range selectedPeers {
28 peer.Send(msg)
29 }
30}
Hint 4: DHT K-Buckets
Organize peers by XOR distance:
1type KBucket struct {
2 peers []*Peer
3 k int // Bucket size
4 mu sync.RWMutex
5}
6
7type RoutingTable struct {
8 selfID []byte
9 buckets [256]*KBucket // One bucket per bit position
10}
11
12func distance(peerID []byte) []byte {
13 // XOR distance
14 dist := make([]byte, len(rt.selfID))
15 for i := range dist {
16 dist[i] = rt.selfID[i] ^ peerID[i]
17 }
18 return dist
19}
20
21func bucketIndex(peerID []byte) int {
22 dist := rt.distance(peerID)
23
24 // Find first non-zero bit
25 for i := 0; i < len(dist); i++ {
26 for bit := 7; bit >= 0; bit-- {
27 if)) != 0 {
28 return i*8 +
29 }
30 }
31 }
32 return 0
33}
34
35func AddPeer(peer *Peer) {
36 idx := rt.bucketIndex(peer.ID)
37 rt.buckets[idx].Add(peer)
38}
Hint 5: DHT Lookup
Iterative node lookup algorithm:
1func Lookup(targetID []byte) []*Peer {
2 // Start with k closest nodes from routing table
3 closest := dht.routingTable.FindClosest(targetID, dht.k)
4
5 queried := make(map[string]bool)
6 var found []*Peer
7
8 for len(closest) > 0 {
9 // Query next closest node
10 peer := closest[0]
11 closest = closest[1:]
12
13 if queried[string(peer.ID)] {
14 continue
15 }
16 queried[string(peer.ID)] = true
17
18 // Ask peer for its k closest nodes
19 peers := dht.queryPeer(peer, targetID)
20
21 // Add newly discovered peers
22 for _, p := range peers {
23 if !queried[string(p.ID)] {
24 closest = append(closest, p)
25 found = append(found, p)
26 }
27 }
28
29 // Sort by distance to target
30 sort.Slice(closest, func(i, j int) bool {
31 return xorDistance(closest[i].ID, targetID) <
32 xorDistance(closest[j].ID, targetID)
33 })
34
35 // Keep only k closest
36 if len(closest) > dht.k {
37 closest = closest[:dht.k]
38 }
39
40 // Converged if closest nodes haven't changed
41 if len(peers) == 0 {
42 break
43 }
44 }
45
46 return found[:min(dht.k, len(found))]
47}
Solution
Show Complete Solution
Approach
This implementation provides a complete P2P network with:
- Network Layer: TCP connections with handshake protocol
- Peer Management: Discovery, connection pooling, health checks
- Message Protocol: Binary serialization with routing
- Gossip: Epidemic broadcast with anti-flooding
- DHT: Kademlia-style distributed hash table
- Application: Simple distributed key-value store
The implementation prioritizes clarity and correctness, suitable for educational purposes and as a foundation for production systems.
Implementation
1package p2pnet
2
3import (
4 "bytes"
5 "crypto/rand"
6 "crypto/sha256"
7 "encoding/binary"
8 "encoding/json"
9 "errors"
10 "fmt"
11 "io"
12 "net"
13 "sort"
14 "sync"
15 "time"
16)
17
18// Peer represents a network peer
19type Peer struct {
20 ID []byte
21 Address string
22 conn net.Conn
23 lastSeen time.Time
24 version uint32
25
26 outgoing chan *Message
27 incoming chan *Message
28
29 mu sync.RWMutex
30 connected bool
31}
32
33// NewPeer creates a new peer
34func NewPeer(address string) *Peer {
35 id := generatePeerID(address)
36 return &Peer{
37 ID: id,
38 Address: address,
39 lastSeen: time.Now(),
40 version: 1,
41 outgoing: make(chan *Message, 100),
42 incoming: make(chan *Message, 100),
43 }
44}
45
46// Connect establishes connection to peer
47func Connect() error {
48 p.mu.Lock()
49 defer p.mu.Unlock()
50
51 conn, err := net.DialTimeout("tcp", p.Address, 5*time.Second)
52 if err != nil {
53 return err
54 }
55
56 p.conn = conn
57 p.connected = true
58 return nil
59}
60
61// Disconnect closes connection
62func Disconnect() {
63 p.mu.Lock()
64 defer p.mu.Unlock()
65
66 if p.conn != nil {
67 p.conn.Close()
68 p.conn = nil
69 }
70 p.connected = false
71}
72
73// Send queues a message to peer
74func Send(msg *Message) error {
75 select {
76 case p.outgoing <- msg:
77 return nil
78 default:
79 return errors.New("send queue full")
80 }
81}
82
83// IsConnected returns connection status
84func IsConnected() bool {
85 p.mu.RLock()
86 defer p.mu.RUnlock()
87 return p.connected
88}
89
90// MessageType defines message types
91type MessageType uint8
92
93const (
94 MsgPing MessageType = iota
95 MsgPong
96 MsgPeerExchange
97 MsgData
98 MsgQuery
99 MsgQueryResponse
100 MsgHandshake
101)
102
103// Message represents a P2P message
104type Message struct {
105 Type MessageType
106 ID uint64
107 Timestamp uint64
108 TTL uint8
109 SenderID []byte
110 Payload []byte
111}
112
113// Serialize converts message to bytes
114func Serialize() {
115 buf := new(bytes.Buffer)
116
117 binary.Write(buf, binary.BigEndian, m.Type)
118 binary.Write(buf, binary.BigEndian, m.ID)
119 binary.Write(buf, binary.BigEndian, m.Timestamp)
120 binary.Write(buf, binary.BigEndian, m.TTL)
121
122 binary.Write(buf, binary.BigEndian, uint16(len(m.SenderID)))
123 buf.Write(m.SenderID)
124
125 binary.Write(buf, binary.BigEndian, uint32(len(m.Payload)))
126 buf.Write(m.Payload)
127
128 return buf.Bytes(), nil
129}
130
131// DeserializeMessage reads message from bytes
132func DeserializeMessage(data []byte) {
133 buf := bytes.NewReader(data)
134 msg := &Message{}
135
136 binary.Read(buf, binary.BigEndian, &msg.Type)
137 binary.Read(buf, binary.BigEndian, &msg.ID)
138 binary.Read(buf, binary.BigEndian, &msg.Timestamp)
139 binary.Read(buf, binary.BigEndian, &msg.TTL)
140
141 var senderIDLen uint16
142 binary.Read(buf, binary.BigEndian, &senderIDLen)
143 msg.SenderID = make([]byte, senderIDLen)
144 buf.Read(msg.SenderID)
145
146 var payloadLen uint32
147 binary.Read(buf, binary.BigEndian, &payloadLen)
148 msg.Payload = make([]byte, payloadLen)
149 buf.Read(msg.Payload)
150
151 return msg, nil
152}
153
154// PeerTable manages connected peers
155type PeerTable struct {
156 peers map[string]*Peer
157 maxConn int
158 mu sync.RWMutex
159}
160
161// NewPeerTable creates a peer table
162func NewPeerTable(maxConn int) *PeerTable {
163 return &PeerTable{
164 peers: make(map[string]*Peer),
165 maxConn: maxConn,
166 }
167}
168
169// Add adds a peer
170func Add(peer *Peer) bool {
171 pt.mu.Lock()
172 defer pt.mu.Unlock()
173
174 if len(pt.peers) >= pt.maxConn {
175 return false
176 }
177
178 pt.peers[string(peer.ID)] = peer
179 return true
180}
181
182// Remove removes a peer
183func Remove(peerID []byte) {
184 pt.mu.Lock()
185 defer pt.mu.Unlock()
186 delete(pt.peers, string(peerID))
187}
188
189// Get retrieves a peer
190func Get(peerID []byte) {
191 pt.mu.RLock()
192 defer pt.mu.RUnlock()
193 peer, ok := pt.peers[string(peerID)]
194 return peer, ok
195}
196
197// All returns all peers
198func All() []*Peer {
199 pt.mu.RLock()
200 defer pt.mu.RUnlock()
201
202 peers := make([]*Peer, 0, len(pt.peers))
203 for _, p := range pt.peers {
204 peers = append(peers, p)
205 }
206 return peers
207}
208
209// RandomPeers returns n random peers
210func RandomPeers(n int) []*Peer {
211 all := pt.All()
212 if n > len(all) {
213 n = len(all)
214 }
215
216 // Simple random selection
217 selected := make([]*Peer, 0, n)
218 for i := 0; i < n && i < len(all); i++ {
219 selected = append(selected, all[i])
220 }
221 return selected
222}
223
224// Count returns number of peers
225func Count() int {
226 pt.mu.RLock()
227 defer pt.mu.RUnlock()
228 return len(pt.peers)
229}
230
231// Node represents a P2P network node
232type Node struct {
233 ID []byte
234 Address string
235 peers *PeerTable
236 dht *DHT
237 gossip *GossipManager
238 listener net.Listener
239
240 handlers map[MessageType]MessageHandler
241
242 stopChan chan struct{}
243 wg sync.WaitGroup
244}
245
246// MessageHandler processes messages
247type MessageHandler func(*Node, *Peer, *Message) error
248
249// NewNode creates a P2P node
250func NewNode(address string, maxPeers int) *Node {
251 id := generatePeerID(address)
252
253 node := &Node{
254 ID: id,
255 Address: address,
256 peers: NewPeerTable(maxPeers),
257 handlers: make(map[MessageType]MessageHandler),
258 stopChan: make(chan struct{}),
259 }
260
261 node.dht = NewDHT(node, 20)
262 node.gossip = NewGossipManager(node, 4)
263
264 // Register default handlers
265 node.RegisterHandler(MsgPing, handlePing)
266 node.RegisterHandler(MsgPong, handlePong)
267 node.RegisterHandler(MsgPeerExchange, handlePeerExchange)
268
269 return node
270}
271
272// Start begins node operations
273func Start() error {
274 listener, err := net.Listen("tcp", n.Address)
275 if err != nil {
276 return err
277 }
278
279 n.listener = listener
280
281 n.wg.Add(2)
282 go n.acceptConnections()
283 go n.maintainPeers()
284
285 return nil
286}
287
288// Stop halts node operations
289func Stop() {
290 close(n.stopChan)
291 if n.listener != nil {
292 n.listener.Close()
293 }
294
295 // Disconnect all peers
296 for _, peer := range n.peers.All() {
297 peer.Disconnect()
298 }
299
300 n.wg.Wait()
301}
302
303// acceptConnections handles incoming peer connections
304func acceptConnections() {
305 defer n.wg.Done()
306
307 for {
308 conn, err := n.listener.Accept()
309 if err != nil {
310 select {
311 case <-n.stopChan:
312 return
313 default:
314 continue
315 }
316 }
317
318 go n.handleConnection(conn)
319 }
320}
321
322// handleConnection processes peer connection
323func handleConnection(conn net.Conn) {
324 // Perform handshake
325 peer, err := n.handshake(conn)
326 if err != nil {
327 conn.Close()
328 return
329 }
330
331 peer.conn = conn
332 peer.connected = true
333
334 // Add to peer table
335 if !n.peers.Add(peer) {
336 peer.Disconnect()
337 return
338 }
339
340 // Handle messages
341 n.wg.Add(2)
342 go n.handlePeerMessages(peer)
343 go n.sendPeerMessages(peer)
344}
345
346// handshake performs connection handshake
347func handshake(conn net.Conn) {
348 // Send our ID
349 conn.Write(n.ID)
350
351 // Receive peer ID
352 peerID := make([]byte, 32)
353 _, err := io.ReadFull(conn, peerID)
354 if err != nil {
355 return nil, err
356 }
357
358 peer := &Peer{
359 ID: peerID,
360 Address: conn.RemoteAddr().String(),
361 lastSeen: time.Now(),
362 outgoing: make(chan *Message, 100),
363 incoming: make(chan *Message, 100),
364 }
365
366 return peer, nil
367}
368
369// handlePeerMessages reads messages from peer
370func handlePeerMessages(peer *Peer) {
371 defer n.wg.Done()
372 defer peer.Disconnect()
373
374 for {
375 // Read message length
376 var msgLen uint32
377 err := binary.Read(peer.conn, binary.BigEndian, &msgLen)
378 if err != nil {
379 return
380 }
381
382 // Read message data
383 data := make([]byte, msgLen)
384 _, err = io.ReadFull(peer.conn, data)
385 if err != nil {
386 return
387 }
388
389 // Deserialize message
390 msg, err := DeserializeMessage(data)
391 if err != nil {
392 continue
393 }
394
395 // Process message
396 if handler, ok := n.handlers[msg.Type]; ok {
397 handler(n, peer, msg)
398 }
399
400 peer.lastSeen = time.Now()
401 }
402}
403
404// sendPeerMessages writes queued messages to peer
405func sendPeerMessages(peer *Peer) {
406 defer n.wg.Done()
407
408 for {
409 select {
410 case msg := <-peer.outgoing:
411 data, err := msg.Serialize()
412 if err != nil {
413 continue
414 }
415
416 // Write message length
417 binary.Write(peer.conn, binary.BigEndian, uint32(len(data)))
418
419 // Write message data
420 _, err = peer.conn.Write(data)
421 if err != nil {
422 return
423 }
424
425 case <-n.stopChan:
426 return
427 }
428 }
429}
430
431// maintainPeers periodic peer maintenance
432func maintainPeers() {
433 defer n.wg.Done()
434
435 ticker := time.NewTicker(30 * time.Second)
436 defer ticker.Stop()
437
438 for {
439 select {
440 case <-ticker.C:
441 n.checkPeerHealth()
442 case <-n.stopChan:
443 return
444 }
445 }
446}
447
448// checkPeerHealth removes stale peers
449func checkPeerHealth() {
450 timeout := 2 * time.Minute
451 now := time.Now()
452
453 for _, peer := range n.peers.All() {
454 if now.Sub(peer.lastSeen) > timeout {
455 peer.Disconnect()
456 n.peers.Remove(peer.ID)
457 }
458 }
459}
460
461// Connect connects to a peer
462func Connect(address string) error {
463 peer := NewPeer(address)
464
465 if err := peer.Connect(); err != nil {
466 return err
467 }
468
469 // Perform handshake
470 peer.conn.Write(n.ID)
471
472 peerID := make([]byte, 32)
473 _, err := io.ReadFull(peer.conn, peerID)
474 if err != nil {
475 peer.Disconnect()
476 return err
477 }
478
479 peer.ID = peerID
480
481 if !n.peers.Add(peer) {
482 peer.Disconnect()
483 return errors.New("peer table full")
484 }
485
486 n.wg.Add(2)
487 go n.handlePeerMessages(peer)
488 go n.sendPeerMessages(peer)
489
490 return nil
491}
492
493// Broadcast sends message to all peers
494func Broadcast(msg *Message) {
495 msg.SenderID = n.ID
496 n.gossip.Broadcast(msg)
497}
498
499// RegisterHandler registers a message handler
500func RegisterHandler(msgType MessageType, handler MessageHandler) {
501 n.handlers[msgType] = handler
502}
503
504// Default message handlers
505
506func handlePing(n *Node, peer *Peer, msg *Message) error {
507 pong := &Message{
508 Type: MsgPong,
509 ID: msg.ID,
510 Timestamp: uint64(time.Now().Unix()),
511 SenderID: n.ID,
512 }
513 return peer.Send(pong)
514}
515
516func handlePong(n *Node, peer *Peer, msg *Message) error {
517 // Update peer last seen
518 peer.lastSeen = time.Now()
519 return nil
520}
521
522func handlePeerExchange(n *Node, peer *Peer, msg *Message) error {
523 // Send list of known peers
524 var peerAddrs []string
525 for _, p := range n.peers.RandomPeers(10) {
526 peerAddrs = append(peerAddrs, p.Address)
527 }
528
529 payload, _ := json.Marshal(peerAddrs)
530 response := &Message{
531 Type: MsgPeerExchange,
532 ID: generateMessageID(),
533 Timestamp: uint64(time.Now().Unix()),
534 SenderID: n.ID,
535 Payload: payload,
536 }
537
538 return peer.Send(response)
539}
540
541// GossipManager handles message propagation
542type GossipManager struct {
543 node *Node
544 fanout int
545 seenMsgs map[uint64]bool
546 mu sync.Mutex
547}
548
549// NewGossipManager creates gossip manager
550func NewGossipManager(node *Node, fanout int) *GossipManager {
551 return &GossipManager{
552 node: node,
553 fanout: fanout,
554 seenMsgs: make(map[uint64]bool),
555 }
556}
557
558// Broadcast propagates message to network
559func Broadcast(msg *Message) {
560 gm.mu.Lock()
561 defer gm.mu.Unlock()
562
563 // Check if already seen
564 if gm.seenMsgs[msg.ID] {
565 return
566 }
567 gm.seenMsgs[msg.ID] = true
568
569 // Check TTL
570 if msg.TTL == 0 {
571 msg.TTL = 10 // Default TTL
572 } else {
573 msg.TTL--
574 if msg.TTL == 0 {
575 return
576 }
577 }
578
579 // Forward to random peers
580 peers := gm.node.peers.RandomPeers(gm.fanout)
581 for _, peer := range peers {
582 peer.Send(msg)
583 }
584}
585
586// DHT implements distributed hash table
587type DHT struct {
588 node *Node
589 k int // Bucket size
590 routingTable *RoutingTable
591 storage map[string][]byte
592 mu sync.RWMutex
593}
594
595// NewDHT creates DHT
596func NewDHT(node *Node, k int) *DHT {
597 return &DHT{
598 node: node,
599 k: k,
600 routingTable: NewRoutingTable(node.ID, k),
601 storage: make(map[string][]byte),
602 }
603}
604
605// Store stores key-value pair
606func Store(key string, value []byte) error {
607 dht.mu.Lock()
608 defer dht.mu.Unlock()
609
610 dht.storage[key] = value
611 return nil
612}
613
614// Get retrieves value by key
615func Get(key string) {
616 dht.mu.RLock()
617 defer dht.mu.RUnlock()
618
619 value, ok := dht.storage[key]
620 return value, ok
621}
622
623// Lookup finds nodes closest to key
624func Lookup(keyHash []byte) []*Peer {
625 return dht.routingTable.FindClosest(keyHash, dht.k)
626}
627
628// RoutingTable manages peer routing
629type RoutingTable struct {
630 selfID []byte
631 buckets map[int][]*Peer
632 k int
633 mu sync.RWMutex
634}
635
636// NewRoutingTable creates routing table
637func NewRoutingTable(selfID []byte, k int) *RoutingTable {
638 return &RoutingTable{
639 selfID: selfID,
640 buckets: make(map[int][]*Peer),
641 k: k,
642 }
643}
644
645// AddPeer adds peer to routing table
646func AddPeer(peer *Peer) {
647 rt.mu.Lock()
648 defer rt.mu.Unlock()
649
650 bucketIdx := rt.bucketIndex(peer.ID)
651
652 bucket := rt.buckets[bucketIdx]
653 if len(bucket) < rt.k {
654 rt.buckets[bucketIdx] = append(bucket, peer)
655 }
656}
657
658// FindClosest finds k closest peers to target
659func FindClosest(target []byte, k int) []*Peer {
660 rt.mu.RLock()
661 defer rt.mu.RUnlock()
662
663 var allPeers []*Peer
664 for _, bucket := range rt.buckets {
665 allPeers = append(allPeers, bucket...)
666 }
667
668 // Sort by distance
669 sort.Slice(allPeers, func(i, j int) bool {
670 distI := xorDistance(allPeers[i].ID, target)
671 distJ := xorDistance(allPeers[j].ID, target)
672 return compareBytes(distI, distJ) < 0
673 })
674
675 if len(allPeers) > k {
676 return allPeers[:k]
677 }
678 return allPeers
679}
680
681func bucketIndex(peerID []byte) int {
682 dist := xorDistance(rt.selfID, peerID)
683
684 // Find first set bit
685 for i := 0; i < len(dist); i++ {
686 if dist[i] != 0 {
687 for bit := 7; bit >= 0; bit-- {
688 if)) != 0 {
689 return i*8 +
690 }
691 }
692 }
693 }
694 return 0
695}
696
697// Helper functions
698
699func generatePeerID(seed string) []byte {
700 hash := sha256.Sum256([]byte(seed + time.Now().String()))
701 return hash[:]
702}
703
704func generateMessageID() uint64 {
705 b := make([]byte, 8)
706 rand.Read(b)
707 return binary.BigEndian.Uint64(b)
708}
709
710func xorDistance(a, b []byte) []byte {
711 dist := make([]byte, len(a))
712 for i := range dist {
713 dist[i] = a[i] ^ b[i]
714 }
715 return dist
716}
717
718func compareBytes(a, b []byte) int {
719 for i := 0; i < len(a) && i < len(b); i++ {
720 if a[i] < b[i] {
721 return -1
722 } else if a[i] > b[i] {
723 return 1
724 }
725 }
726 return 0
727}
Testing
1package p2pnet
2
3import (
4 "fmt"
5 "testing"
6 "time"
7)
8
9func TestNodeCreation(t *testing.T) {
10 node := NewNode("localhost:8001", 50)
11
12 if node == nil {
13 t.Fatal("Failed to create node")
14 }
15
16 if len(node.ID) != 32 {
17 t.Error("Node ID should be 32 bytes")
18 }
19}
20
21func TestNodeStartStop(t *testing.T) {
22 node := NewNode("localhost:8002", 50)
23
24 err := node.Start()
25 if err != nil {
26 t.Fatalf("Failed to start node: %v", err)
27 }
28
29 time.Sleep(100 * time.Millisecond)
30
31 node.Stop()
32}
33
34func TestPeerConnection(t *testing.T) {
35 // Create two nodes
36 node1 := NewNode("localhost:8003", 50)
37 node2 := NewNode("localhost:8004", 50)
38
39 node1.Start()
40 node2.Start()
41 defer node1.Stop()
42 defer node2.Stop()
43
44 time.Sleep(100 * time.Millisecond)
45
46 // Node1 connects to Node2
47 err := node1.Connect("localhost:8004")
48 if err != nil {
49 t.Fatalf("Failed to connect: %v", err)
50 }
51
52 time.Sleep(100 * time.Millisecond)
53
54 if node1.peers.Count() != 1 {
55 t.Error("Node1 should have 1 peer")
56 }
57}
58
59func TestMessageBroadcast(t *testing.T) {
60 // Create 3-node network
61 nodes := make([]*Node, 3)
62 addresses := []string{
63 "localhost:8005",
64 "localhost:8006",
65 "localhost:8007",
66 }
67
68 for i, addr := range addresses {
69 nodes[i] = NewNode(addr, 50)
70 nodes[i].Start()
71 defer nodes[i].Stop()
72 }
73
74 time.Sleep(100 * time.Millisecond)
75
76 // Connect nodes in a chain
77 nodes[0].Connect(addresses[1])
78 nodes[1].Connect(addresses[2])
79
80 time.Sleep(100 * time.Millisecond)
81
82 // Track received messages
83 received := make(map[string]bool)
84 var mu sync.Mutex
85
86 for i, node := range nodes {
87 idx := i
88 node.RegisterHandler(MsgData, func(n *Node, p *Peer, m *Message) error {
89 mu.Lock()
90 defer mu.Unlock()
91 received[fmt.Sprintf("node%d", idx)] = true
92 return nil
93 })
94 }
95
96 // Broadcast from node 0
97 msg := &Message{
98 Type: MsgData,
99 ID: generateMessageID(),
100 Timestamp: uint64(time.Now().Unix()),
101 TTL: 10,
102 Payload: []byte("Hello P2P"),
103 }
104
105 nodes[0].Broadcast(msg)
106
107 time.Sleep(500 * time.Millisecond)
108
109 // All nodes should receive message
110 if len(received) < 2 {
111 t.Errorf("Expected messages at multiple nodes, got %d", len(received))
112 }
113}
114
115func TestDHTStorage(t *testing.T) {
116 node := NewNode("localhost:8008", 50)
117 node.Start()
118 defer node.Stop()
119
120 // Store and retrieve
121 key := "test-key"
122 value := []byte("test-value")
123
124 node.dht.Store(key, value)
125
126 retrieved, ok := node.dht.Get(key)
127 if !ok {
128 t.Error("Failed to retrieve stored value")
129 }
130
131 if string(retrieved) != string(value) {
132 t.Errorf("Retrieved value mismatch: got %s, want %s",
133 retrieved, value)
134 }
135}
136
137func TestRoutingTable(t *testing.T) {
138 selfID := generatePeerID("self")
139 rt := NewRoutingTable(selfID, 20)
140
141 // Add peers
142 for i := 0; i < 10; i++ {
143 peer := NewPeer(fmt.Sprintf("localhost:%d", 9000+i))
144 rt.AddPeer(peer)
145 }
146
147 // Find closest peers
148 target := generatePeerID("target")
149 closest := rt.FindClosest(target, 5)
150
151 if len(closest) > 5 {
152 t.Errorf("FindClosest returned too many peers: %d", len(closest))
153 }
154}
155
156func BenchmarkMessageSerialization(b *testing.B) {
157 msg := &Message{
158 Type: MsgData,
159 ID: 12345,
160 Timestamp: uint64(time.Now().Unix()),
161 TTL: 10,
162 SenderID: make([]byte, 32),
163 Payload: []byte("test payload"),
164 }
165
166 b.ResetTimer()
167 for i := 0; i < b.N; i++ {
168 msg.Serialize()
169 }
170}
171
172func BenchmarkXORDistance(b *testing.B) {
173 a := make([]byte, 32)
174 b := make([]byte, 32)
175 rand.Read(a)
176 rand.Read(b)
177
178 b.ResetTimer()
179 for i := 0; i < b.N; i++ {
180 xorDistance(a, b)
181 }
182}
Usage Example
1package main
2
3import (
4 "fmt"
5 "time"
6 "p2pnet"
7)
8
9func main() {
10 // Create P2P network with 5 nodes
11 fmt.Println("=== Creating P2P Network ===")
12
13 nodes := make([]*p2pnet.Node, 5)
14 addresses := make([]string, 5)
15
16 for i := 0; i < 5; i++ {
17 addr := fmt.Sprintf("localhost:%d", 9000+i)
18 addresses[i] = addr
19
20 nodes[i] = p2pnet.NewNode(addr, 50)
21 if err := nodes[i].Start(); err != nil {
22 fmt.Printf("Failed to start node %d: %v\n", i, err)
23 return
24 }
25
26 fmt.Printf("Started node %d at %s\n", i, addr)
27 defer nodes[i].Stop()
28 }
29
30 // Connect nodes in a mesh topology
31 fmt.Println("\n=== Connecting Peers ===")
32 time.Sleep(500 * time.Millisecond)
33
34 // Node 0 connects to 1, 2
35 nodes[0].Connect(addresses[1])
36 nodes[0].Connect(addresses[2])
37
38 // Node 1 connects to 3
39 nodes[1].Connect(addresses[3])
40
41 // Node 2 connects to 4
42 nodes[2].Connect(addresses[4])
43
44 time.Sleep(time.Second)
45
46 // Check peer counts
47 for i, node := range nodes {
48 fmt.Printf("Node %d has %d peers\n", i, node.peers.Count())
49 }
50
51 // Register message handler
52 fmt.Println("\n=== Setting Up Message Handlers ===")
53
54 for i, node := range nodes {
55 idx := i
56 node.RegisterHandler(p2pnet.MsgData, func(n *p2pnet.Node, p *p2pnet.Peer, m *p2pnet.Message) error {
57 fmt.Printf("[Node %d] Received: %s\n", idx, string(m.Payload))
58 return nil
59 })
60 }
61
62 // Broadcast message from node 0
63 fmt.Println("\n=== Broadcasting Message ===")
64
65 msg := &p2pnet.Message{
66 Type: p2pnet.MsgData,
67 ID: p2pnet.generateMessageID(),
68 Timestamp: uint64(time.Now().Unix()),
69 TTL: 10,
70 Payload: []byte("Hello from node 0!"),
71 }
72
73 nodes[0].Broadcast(msg)
74
75 time.Sleep(2 * time.Second)
76
77 // Use DHT to store and retrieve data
78 fmt.Println("\n=== DHT Storage Test ===")
79
80 nodes[0].dht.Store("user:1001", []byte(`{"name":"Alice","age":30}`))
81 nodes[2].dht.Store("user:1002", []byte(`{"name":"Bob","age":25}`))
82
83 // Retrieve from different nodes
84 if data, ok := nodes[0].dht.Get("user:1001"); ok {
85 fmt.Printf("Retrieved from node 0: %s\n", data)
86 }
87
88 if data, ok := nodes[2].dht.Get("user:1002"); ok {
89 fmt.Printf("Retrieved from node 2: %s\n", data)
90 }
91
92 // Simulate network activity
93 fmt.Println("\n=== Network Running ===")
94 time.Sleep(5 * time.Second)
95
96 fmt.Println("\n=== Shutting Down ===")
97}
Testing Your Solution
1. Node Lifecycle
- Start and stop individual nodes
- Verify listener is active
- Clean shutdown without resource leaks
2. Peer Connection
- Connect two nodes successfully
- Verify handshake protocol
- Test connection failure handling
3. Message Routing
- Send message between peers
- Verify message serialization/deserialization
- Test message handlers
4. Gossip Protocol
- Broadcast to network of 10 nodes
- Verify all nodes eventually receive message
- Test anti-flooding
5. DHT Operations
- Store and retrieve key-value pairs
- Test routing table organization
- Verify k-closest peer selection
Edge Cases
- Network partition
- All peers disconnect simultaneously
- Message queue overflow
- Duplicate peer connections
- Invalid message format
Bonus Challenges
-
NAT Traversal: Implement STUN/TURN for firewall traversal
-
Encryption: Add TLS for secure peer connections
-
Authentication: Implement challenge-response peer verification
-
Bandwidth Optimization: Compress messages, batch small messages
-
Advanced DHT: Implement full Kademlia with iterative lookup
-
Sybil Protection: Add proof-of-work for peer IDs
-
Monitoring Dashboard: Web UI showing network topology
Key Takeaways
- P2P networks eliminate single points of failure through decentralization
- Peer discovery is bootstrapped from seed nodes then grows organically
- Gossip protocols achieve eventual consistency with probabilistic delivery
- DHT enables decentralized key-value storage with O(log n) lookup
- XOR metric in Kademlia provides efficient routing topology
- Message flooding must be prevented with seen caches and TTL
- Peer churn is constant - systems must handle joins/leaves gracefully
- Blockchains use P2P for transaction and block propagation
- Scalability comes from horizontal growth - add more peers
- Security requires defense against Sybil, eclipse, and DDoS attacks
References
- Kademlia Paper: "Kademlia: A Peer-to-Peer Information System Based on the XOR Metric" by Maymounkov and Mazières
- BitTorrent Protocol: BEP 0003 specification
- Bitcoin P2P Protocol: Bitcoin Developer Reference
- IPFS: InterPlanetary File System white paper
- Chord DHT: "Chord: A Scalable Peer-to-peer Lookup Service for Internet Applications"
- Pastry: Microsoft Research distributed routing substrate
- NAT Traversal: RFC 5389, RFC 5766
- Gossip Protocols: "Epidemic Algorithms for Replicated Database Maintenance" by Demers et al.