Synchronization Architecture

Overview

The synchronization architecture in cpm enables efficient repository distribution across multiple servers using rsync-based incremental transfers, peer-to-peer neighbor synchronization, and intelligent conflict detection. This document details the sync mechanisms, protocols, and strategies employed by cpm.

Synchronization Model

Client-Server Synchronization

Primary model for repository distribution:

              ┌──────────────┐
              │   Client     │
              │   (cpm)     │
              └───────┬──────┘
                      │
        ┌─────────────┴─────────────┐
        │                           │
        ▼                           ▼
┌──────────────┐          ┌──────────────┐
│ Main Server  │          │Backup Server │
│              │          │              │
│ - Repository │          │ - Repository │
│   store      │          │   store      │
│ - git daemon │          │ - git daemon │
└──────────────┘          └──────────────┘

Peer-to-Peer Synchronization

Neighbor-based distribution:

┌──────────────┐      ┌──────────────┐
│  Neighbor A  │◄────►│  Neighbor B  │
│  (cpm)      │      │  (cpm)      │
└──────┬───────┘      └──────┬───────┘
       │                     │
       │  ┌──────────────┐   │
       └─►│  Neighbor C  │◄──┘
          │  (cpm)      │
          └──────────────┘

Sync Manager Architecture

Location: internal/server/sync.go

Core Components

type SyncManager struct {
    serverManager *ServerManager
    repoBasePath  string
}

type SyncResult struct {
    RepoName     string
    TargetServer string
    Success      bool
    Error        error
    Duration     time.Duration
    BytesSync    int64
}

Sync Operations

Push (Client to Server)

┌─────────────────────────────────────────────┐
│  Push Synchronization Flow                 │
│                                             │
│  1. Validate repository exists locally      │
│  2. Retrieve target server configuration    │
│  3. Establish SSH connection                │
│  4. Execute rsync transfer                  │
│     - Compare checksums                     │
│     - Transfer differences                  │
│     - Delete removed files                  │
│  5. Update database sync timestamp          │
│  6. Return sync result                      │
└─────────────────────────────────────────────┘

Implementation:

func (sm *SyncManager) SyncRepoTo(repoName, targetServer string) (*SyncResult, error) {
    startTime := time.Now()

    // Get server configuration
    server, err := sm.serverManager.Get(targetServer)
    if err != nil {
        return nil, err
    }

    // Build paths
    localPath := fmt.Sprintf("%s/%s", sm.repoBasePath, repoName)
    remotePath := fmt.Sprintf("%s@%s:%s/%s",
        server.User, server.Host, sm.repoBasePath, repoName)

    // Execute rsync
    err = sm.executeRsync(localPath, remotePath, server)

    return &SyncResult{
        RepoName:     repoName,
        TargetServer: targetServer,
        Success:      err == nil,
        Error:        err,
        Duration:     time.Since(startTime),
    }, err
}

Pull (Server to Client)

┌─────────────────────────────────────────────┐
│  Pull Synchronization Flow                 │
│                                             │
│  1. Retrieve source server configuration    │
│  2. Check repository exists on server       │
│  3. Establish SSH connection                │
│  4. Execute rsync transfer                  │
│     - Receive file differences              │
│     - Update local repository               │
│     - Preserve git structure                │
│  5. Register/update in database             │
│  6. Return sync result                      │
└─────────────────────────────────────────────┘

rsync Protocol

Command Structure

rsync -avz --delete -e "ssh [options]" <source> <destination>

Options Breakdown

Option Purpose Benefit
-a Archive mode Preserves permissions, timestamps, symlinks
-v Verbose Detailed transfer output
-z Compression Reduces bandwidth usage
--delete Mirror deletion Removes files not in source
-e ssh SSH transport Secure, encrypted transfer
--progress Progress display Real-time transfer status

Incremental Transfer

rsync uses rolling checksums for efficiency:

1. List files in source and destination
2. For each file:
   a. Calculate checksum
   b. Compare with destination
   c. If different:
      - Break file into blocks
      - Transfer only changed blocks
   d. If same:
      - Skip transfer
3. Delete files not in source (if --delete)

Benefits:

  • Only transfers differences
  • Reduces bandwidth usage by 80-95% on updates
  • Fast for large repositories with small changes
  • Reliable with built-in error checking

Transfer Performance

func (sm *SyncManager) executeRsync(source, destination string, server *Server) error {
    args := []string{
        "-avz",           // Archive, verbose, compress
        "--delete",       // Mirror deletions
        "--stats",        // Show statistics
        "--timeout=300",  // 5 minute timeout
        "-e",            // SSH command follows
    }

    // Build SSH command
    sshCmd := fmt.Sprintf("ssh -o StrictHostKeyChecking=no")
    if server.SSHKeyPath != "" {
        sshCmd += fmt.Sprintf(" -i %s", server.SSHKeyPath)
    }
    if server.Port != 0 && server.Port != 22 {
        sshCmd += fmt.Sprintf(" -p %d", server.Port)
    }

    args = append(args, sshCmd, source, destination)

    cmd := exec.Command("rsync", args...)
    return cmd.Run()
}

Conflict Detection

Pre-Sync Validation

Before synchronizing, cpm checks for potential conflicts:

func (sm *SyncManager) detectConflicts(repoName string, direction string) ([]Conflict, error) {
    var conflicts []Conflict

    // Check for uncommitted changes
    if hasUncommittedChanges(repoName) {
        conflicts = append(conflicts, Conflict{
            Type:        "uncommitted_changes",
            Description: "Repository has uncommitted changes",
            Resolution:  "Commit or stash changes before sync",
        })
    }

    // Check for diverged branches
    if hasDivergedBranches(repoName) {
        conflicts = append(conflicts, Conflict{
            Type:        "diverged_branches",
            Description: "Local and remote have diverged",
            Resolution:  "Merge or rebase before sync",
        })
    }

    return conflicts, nil
}

Conflict Resolution Strategies

Conflict Type Strategy Command Flag
Uncommitted changes Abort or force --force
Diverged history Manual merge N/A (manual)
File permissions Preserve local Default
Deletion conflicts Mirror source --delete

Neighbor Synchronization

Discovery Protocol

┌────────────────────────────────────────────┐
│  Neighbor Discovery Process                │
│                                            │
│  1. Detect local network (CIDR)            │
│  2. Generate host list from CIDR           │
│  3. Concurrent TCP connection attempts     │
│     - Port 9418 (cpm default)             │
│     - 2 second timeout per host            │
│     - 50 concurrent connections            │
│  4. Filter reachable hosts                 │
│  5. Identify cpm servers                  │
│  6. Optional: auto-register as neighbors   │
└────────────────────────────────────────────┘

Implementation: internal/server/neighbor.go

func (nm *NeighborManager) Discover(network string) ([]DiscoveryResult, error) {
    // Parse CIDR
    ip, ipNet, err := net.ParseCIDR(network)
    if err != nil {
        return nil, err
    }

    // Generate hosts
    hosts := nm.generateHosts(ip, ipNet)

    // Scan concurrently
    results := make([]DiscoveryResult, 0)
    resultsChan := make(chan DiscoveryResult, len(hosts))
    var wg sync.WaitGroup
    semaphore := make(chan struct{}, 50) // Limit concurrency

    for _, host := range hosts {
        wg.Add(1)
        go func(h string) {
            defer wg.Done()
            semaphore <- struct{}{}
            defer func() { <-semaphore }()

            result := nm.scanHost(h, DefaultGitmPort)
            if result.Reachable {
                resultsChan <- result
            }
        }(host)
    }

    // Collect results
    go func() {
        wg.Wait()
        close(resultsChan)
    }()

    for result := range resultsChan {
        results = append(results, result)
    }

    return results, nil
}

Neighbor Sync Protocol

Direct peer-to-peer synchronization:

Neighbor A                    Neighbor B
    │                             │
    │ 1. Ping (connectivity test) │
    │────────────────────────────►│
    │◄────────────────────────────│
    │                             │
    │ 2. Sync request (repo name) │
    │────────────────────────────►│
    │                             │
    │ 3. rsync transfer           │
    │◄───────────────────────────►│
    │                             │
    │ 4. Verify completion        │
    │◄────────────────────────────│
    │                             │

Sync Strategies

Full Sync

Complete repository transfer (initial sync):

rsync -avz --delete \
      -e "ssh -i key" \
      /local/repo.git/ \
      user@host:/remote/repo.git/

Incremental Sync

Transfer only changes since last sync:

# rsync automatically handles this
rsync -avz --update \
      -e "ssh -i key" \
      /local/repo.git/ \
      user@host:/remote/repo.git/

Selective Sync

Sync specific branches or refs:

rsync -avz \
      --include='refs/heads/main' \
      --include='refs/heads/develop' \
      --exclude='refs/**' \
      -e "ssh -i key" \
      /local/repo.git/ \
      user@host:/remote/repo.git/

Progress Tracking

Real-time Progress

func (sm *SyncManager) SyncWithProgress(
    repoName, targetServer string,
    progressFn func(line string),
) (*SyncResult, error) {
    cmd := exec.Command("rsync", args...)

    stdout, err := cmd.StdoutPipe()
    if err != nil {
        return nil, err
    }

    if err := cmd.Start(); err != nil {
        return nil, err
    }

    // Read progress
    scanner := bufio.NewScanner(stdout)
    for scanner.Scan() {
        if progressFn != nil {
            progressFn(scanner.Text())
        }
    }

    return result, cmd.Wait()
}

Progress Display

Syncing repository 'webapp' to main server...

  0% [                    ] 0 B/s   ETA: --:--
 15% [###                 ] 12.5 MB/s ETA: 00:12
 47% [#########           ] 15.2 MB/s ETA: 00:06
 73% [##############      ] 16.8 MB/s ETA: 00:03
100% [####################] 15.9 MB/s

Files transferred: 892
Total size: 156.8 MB
Transfer time: 18.4s
Average speed: 8.5 MB/s

Sync Verification

Post-Sync Validation

func (sm *SyncManager) VerifySync(repoName, targetServer string) (bool, error) {
    // Use rsync dry-run to check differences
    args := []string{
        "-avz",
        "--dry-run",
        "--delete",
        localPath,
        remotePath,
    }

    cmd := exec.Command("rsync", args...)
    output, err := cmd.CombinedOutput()
    if err != nil {
        return false, err
    }

    // Parse output for differences
    lines := strings.Split(string(output), "\n")
    fileChanges := 0
    for _, line := range lines {
        if isFileLine(line) {
            fileChanges++
        }
    }

    return fileChanges == 0, nil
}

Performance Optimization

Bandwidth Optimization

  1. Compression: -z flag for on-the-fly compression
  2. Incremental: Only transfer differences
  3. Block-level: Transfer changed blocks within files
  4. Parallel: Multiple concurrent syncs for independent repos

Network Optimization

# Limit bandwidth
rsync --bwlimit=10000 ...  # 10 MB/s limit

# Adjust compression level
rsync -z --compress-level=6 ...  # Balance speed/compression

# Increase buffer size
rsync --buffer-size=8192 ...  # Larger network buffer

Error Recovery

func syncWithRetry(sm *SyncManager, repo, server string, maxRetries int) error {
    var lastErr error

    for i := 0; i < maxRetries; i++ {
        result, err := sm.SyncRepoTo(repo, server)
        if err == nil {
            return nil
        }

        lastErr = err
        log.Printf("Sync attempt %d failed: %v", i+1, err)

        // Exponential backoff
        time.Sleep(time.Duration(1<<i) * time.Second)
    }

    return fmt.Errorf("sync failed after %d retries: %w", maxRetries, lastErr)
}

Best Practices

Sync Frequency

  • Critical repos: Every 15-30 minutes
  • Active repos: Every 1-2 hours
  • Stable repos: Daily
  • Archive repos: Weekly

Sync Order

  1. Push to main server first
  2. Push to backup servers second
  3. Sync between neighbors last
  4. Verify each sync before proceeding

Error Handling

  • Always validate repositories before sync
  • Check disk space on target
  • Monitor transfer times
  • Log all sync operations
  • Implement retry with backoff
  • Alert on persistent failures

Security

  • Use SSH keys exclusively
  • Verify host keys
  • Limit sync permissions
  • Audit sync operations
  • Encrypt sensitive repositories

Monitoring and Logging

Sync Metrics

type SyncMetrics struct {
    TotalSyncs      int64
    SuccessfulSyncs int64
    FailedSyncs     int64
    TotalBytes      int64
    AverageSpeed    float64
    LastSyncTime    time.Time
}

Log Format

2024-01-15 10:30:00 [INFO] Sync started: repo=webapp, target=main-server
2024-01-15 10:30:18 [INFO] Sync completed: repo=webapp, duration=18.4s, size=156.8MB, speed=8.5MB/s
2024-01-15 10:30:18 [INFO] Files transferred: 892, bytes: 164,428,800

See Also