Bladeren bron

add:增加clickhouse 分区迁移脚本,准备做clickhouse分区

guoziyun 1 maand geleden
bovenliggende
commit
b4b12389ba
2 gewijzigde bestanden met toevoegingen van 273 en 4 verwijderingen
  1. 269 0
      oms/scripts/migrate-clickhouse-events-partition.sh
  2. 4 4
      oms/services/event-api-service.ts

+ 269 - 0
oms/scripts/migrate-clickhouse-events-partition.sh

@@ -0,0 +1,269 @@
+#!/usr/bin/env bash
+
+set -euo pipefail
+
+DB_NAME="${CLICKHOUSE_DATABASE:-omsdb}"
+SRC_TABLE="${CLICKHOUSE_SRC_TABLE:-events}"
+DST_TABLE="${CLICKHOUSE_DST_TABLE:-events_v2}"
+BACKUP_TABLE="${CLICKHOUSE_BACKUP_TABLE:-events_backup}"
+INGESTOR_PM2_NAME="${INGESTOR_PM2_NAME:-ingestor-service}"
+CURRENT_MONTH="${CURRENT_MONTH:-$(date +%Y%m)}"
+DRY_RUN=1
+SKIP_PM2=0
+FORCE=0
+
+usage() {
+  cat <<'EOF'
+Usage:
+  bash oms/scripts/migrate-clickhouse-events-partition.sh [options]
+
+Options:
+  --execute       Actually run the migration. Without this flag, the script is a dry-run.
+  --skip-pm2      Do not stop/start PM2 ingestor-service during cutover.
+  --force         Allow destination/backup tables to already exist.
+  --current-month Override the current month used for cutover backfill, format YYYYMM.
+  --help          Show this help message.
+
+Environment variables:
+  CLICKHOUSE_HOST            Default: localhost
+  CLICKHOUSE_PORT            Default: 9000
+  CLICKHOUSE_USER            Optional
+  CLICKHOUSE_PASSWORD        Optional
+  CLICKHOUSE_DATABASE        Default: omsdb
+  CLICKHOUSE_SRC_TABLE       Default: events
+  CLICKHOUSE_DST_TABLE       Default: events_v2
+  CLICKHOUSE_BACKUP_TABLE    Default: events_backup
+  INGESTOR_PM2_NAME          Default: ingestor-service
+  CURRENT_MONTH              Default: current system month in YYYYMM
+
+Notes:
+  - The script assumes ClickHouse writes come only from ingestor-service.
+  - event-api can stay online; RabbitMQ will buffer events while ingestor-service is paused.
+  - Review the printed plan first, then rerun with --execute.
+EOF
+}
+
+while [[ $# -gt 0 ]]; do
+  case "$1" in
+    --execute)
+      DRY_RUN=0
+      shift
+      ;;
+    --dry-run)
+      DRY_RUN=1
+      shift
+      ;;
+    --skip-pm2)
+      SKIP_PM2=1
+      shift
+      ;;
+    --force)
+      FORCE=1
+      shift
+      ;;
+    --current-month)
+      CURRENT_MONTH="$2"
+      shift 2
+      ;;
+    --help|-h)
+      usage
+      exit 0
+      ;;
+    *)
+      echo "Unknown argument: $1" >&2
+      usage
+      exit 1
+      ;;
+  esac
+done
+
+if [[ ! "$CURRENT_MONTH" =~ ^[0-9]{6}$ ]]; then
+  echo "CURRENT_MONTH must be in YYYYMM format, got: $CURRENT_MONTH" >&2
+  exit 1
+fi
+
+CLICKHOUSE_HOST="${CLICKHOUSE_HOST:-localhost}"
+CLICKHOUSE_PORT="${CLICKHOUSE_PORT:-9000}"
+CLICKHOUSE_USER="${CLICKHOUSE_USER:-}"
+CLICKHOUSE_PASSWORD="${CLICKHOUSE_PASSWORD:-}"
+
+CH_ARGS=(
+  --host "$CLICKHOUSE_HOST"
+  --port "$CLICKHOUSE_PORT"
+  --database "$DB_NAME"
+  --multiquery
+)
+
+if [[ -n "$CLICKHOUSE_USER" ]]; then
+  CH_ARGS+=(--user "$CLICKHOUSE_USER")
+fi
+
+if [[ -n "$CLICKHOUSE_PASSWORD" ]]; then
+  CH_ARGS+=(--password "$CLICKHOUSE_PASSWORD")
+fi
+
+require_command() {
+  if ! command -v "$1" >/dev/null 2>&1; then
+    echo "Required command not found: $1" >&2
+    exit 1
+  fi
+}
+
+require_command clickhouse-client
+
+if [[ $SKIP_PM2 -eq 0 ]]; then
+  require_command pm2
+fi
+
+run_sql() {
+  local sql="$1"
+  clickhouse-client "${CH_ARGS[@]}" --query "$sql"
+}
+
+print_sql_block() {
+  local title="$1"
+  local sql="$2"
+  printf '\n[%s]\n%s\n' "$title" "$sql"
+}
+
+run_step() {
+  local title="$1"
+  local command="$2"
+  printf '\n== %s ==\n' "$title"
+  printf '%s\n' "$command"
+  if [[ $DRY_RUN -eq 0 ]]; then
+    eval "$command"
+  fi
+}
+
+source_exists="$(run_sql "EXISTS TABLE ${DB_NAME}.${SRC_TABLE} FORMAT TabSeparatedRaw")"
+if [[ "$source_exists" != "1" ]]; then
+  echo "Source table ${DB_NAME}.${SRC_TABLE} does not exist." >&2
+  exit 1
+fi
+
+dst_exists="$(run_sql "EXISTS TABLE ${DB_NAME}.${DST_TABLE} FORMAT TabSeparatedRaw")"
+backup_exists="$(run_sql "EXISTS TABLE ${DB_NAME}.${BACKUP_TABLE} FORMAT TabSeparatedRaw")"
+
+if [[ $FORCE -eq 0 && "$dst_exists" == "1" ]]; then
+  echo "Destination table ${DB_NAME}.${DST_TABLE} already exists. Use --force only if you know it is safe." >&2
+  exit 1
+fi
+
+if [[ $FORCE -eq 0 && "$backup_exists" == "1" ]]; then
+  echo "Backup table ${DB_NAME}.${BACKUP_TABLE} already exists. Use --force only if you know it is safe." >&2
+  exit 1
+fi
+
+month_list="$(run_sql "SELECT DISTINCT toYYYYMM(time) AS ym FROM ${DB_NAME}.${SRC_TABLE} ORDER BY ym FORMAT TSV")"
+
+if [[ -z "$month_list" ]]; then
+  echo "No data found in ${DB_NAME}.${SRC_TABLE}." >&2
+  exit 1
+fi
+
+historical_months=()
+while IFS= read -r ym; do
+  [[ -z "$ym" ]] && continue
+  if [[ "$ym" != "$CURRENT_MONTH" ]]; then
+    historical_months+=("$ym")
+  fi
+done <<< "$month_list"
+
+create_sql=$(cat <<EOF
+CREATE TABLE IF NOT EXISTS ${DB_NAME}.${DST_TABLE}
+(
+  log_id String,
+  uid String,
+  project UInt8,
+  os Nullable(String),
+  version Nullable(String),
+  event String,
+  time DateTime,
+  res Nullable(String),
+  \`from\` Nullable(String),
+  position Nullable(Int32),
+  duration Nullable(UInt32),
+  ad_type Nullable(String),
+  ad_src Nullable(String),
+  revenue Nullable(Float64),
+  cc Nullable(String),
+  raw_json_data String
+)
+ENGINE = MergeTree
+PARTITION BY toYYYYMM(time)
+ORDER BY (uid, time, event)
+PRIMARY KEY (uid, time)
+TTL time + INTERVAL 6 MONTH
+EOF
+)
+
+printf 'Migration Plan\n'
+printf 'Database: %s\n' "$DB_NAME"
+printf 'Source table: %s\n' "$SRC_TABLE"
+printf 'Destination table: %s\n' "$DST_TABLE"
+printf 'Backup table: %s\n' "$BACKUP_TABLE"
+printf 'Current month for cutover: %s\n' "$CURRENT_MONTH"
+printf 'Historical months to backfill: %s\n' "${historical_months[*]:-(none)}"
+printf 'PM2 control: %s\n' "$([[ $SKIP_PM2 -eq 0 ]] && echo enabled || echo skipped)"
+printf 'Mode: %s\n' "$([[ $DRY_RUN -eq 0 ]] && echo execute || echo dry-run)"
+
+print_sql_block "Create destination table" "$create_sql"
+
+for ym in "${historical_months[@]}"; do
+  month_start="${ym:0:4}-${ym:4:2}-01 00:00:00"
+  insert_sql="INSERT INTO ${DB_NAME}.${DST_TABLE} SELECT * FROM ${DB_NAME}.${SRC_TABLE} WHERE time >= toDateTime('${month_start}') AND time < addMonths(toDateTime('${month_start}'), 1)"
+  print_sql_block "Backfill month ${ym}" "$insert_sql"
+done
+
+current_month_start="${CURRENT_MONTH:0:4}-${CURRENT_MONTH:4:2}-01 00:00:00"
+current_month_sql="INSERT INTO ${DB_NAME}.${DST_TABLE} SELECT * FROM ${DB_NAME}.${SRC_TABLE} WHERE time >= toDateTime('${current_month_start}') AND time < now()"
+print_sql_block "Cutover current month backfill" "$current_month_sql"
+
+rename_sql="RENAME TABLE ${DB_NAME}.${SRC_TABLE} TO ${DB_NAME}.${BACKUP_TABLE}, ${DB_NAME}.${DST_TABLE} TO ${DB_NAME}.${SRC_TABLE}"
+print_sql_block "Atomic rename" "$rename_sql"
+
+rollback_sql="RENAME TABLE ${DB_NAME}.${SRC_TABLE} TO ${DB_NAME}.${DST_TABLE}_bad, ${DB_NAME}.${BACKUP_TABLE} TO ${DB_NAME}.${SRC_TABLE}"
+print_sql_block "Rollback if needed" "$rollback_sql"
+
+if [[ $DRY_RUN -eq 1 ]]; then
+  printf '\nDry-run only. Re-run with --execute to perform the migration.\n'
+  exit 0
+fi
+
+run_step "Create partitioned destination table" "run_sql \"$create_sql\""
+
+for ym in "${historical_months[@]}"; do
+  month_start="${ym:0:4}-${ym:4:2}-01 00:00:00"
+  insert_sql="INSERT INTO ${DB_NAME}.${DST_TABLE} SELECT * FROM ${DB_NAME}.${SRC_TABLE} WHERE time >= toDateTime('${month_start}') AND time < addMonths(toDateTime('${month_start}'), 1)"
+  run_step "Backfill month ${ym}" "run_sql \"$insert_sql\""
+done
+
+if [[ $SKIP_PM2 -eq 0 ]]; then
+  run_step "Stop PM2 ingestor" "pm2 stop ${INGESTOR_PM2_NAME}"
+fi
+
+run_step "Backfill current month ${CURRENT_MONTH}" "run_sql \"$current_month_sql\""
+
+old_rows="$(run_sql "SELECT count() FROM ${DB_NAME}.${SRC_TABLE} FORMAT TabSeparatedRaw")"
+new_rows="$(run_sql "SELECT count() FROM ${DB_NAME}.${DST_TABLE} FORMAT TabSeparatedRaw")"
+printf '\nRow count check: source=%s destination=%s\n' "$old_rows" "$new_rows"
+
+if [[ "$old_rows" != "$new_rows" ]]; then
+  echo "Row count mismatch detected. Migration aborted before rename." >&2
+  if [[ $SKIP_PM2 -eq 0 ]]; then
+    pm2 start "$INGESTOR_PM2_NAME"
+  fi
+  exit 1
+fi
+
+run_step "Atomic rename" "run_sql \"$rename_sql\""
+
+if [[ $SKIP_PM2 -eq 0 ]]; then
+  run_step "Start PM2 ingestor" "pm2 start ${INGESTOR_PM2_NAME}"
+fi
+
+verify_sql="SELECT partition, sum(rows) AS rows, formatReadableSize(sum(data_compressed_bytes)) AS compressed_size, count() AS parts FROM system.parts WHERE database = '${DB_NAME}' AND \`table\` = '${SRC_TABLE}' AND active = 1 GROUP BY partition ORDER BY partition"
+run_step "Verify partitions" "run_sql \"$verify_sql\""
+
+printf '\nMigration completed. Keep %s.%s for rollback until you are satisfied.\n' "$DB_NAME" "$BACKUP_TABLE"

+ 4 - 4
oms/services/event-api-service.ts

@@ -16,9 +16,9 @@ const port = process.env.EVENT_PORT ? parseInt(process.env.EVENT_PORT, 10) : 300
 
 // RabbitMQ Configuration
 const RABBITMQ_URL = process.env.RABBITMQ_URL || "amqp://coloring:coloring123.@localhost:5672";
-const RABBITMQ_EXCHANGE_NAME = process.env.RABBITMQ_EXCHANGE || "event_exchange"; // <-- 从环境变量读取交换机名称
+const RABBITMQ_EXCHANGE_NAME = process.env.RABBITMQ_EXCHANGE || "event-exchange"; // <-- 从环境变量读取交换机名称
 const RABBITMQ_LOG_QUEUE = process.env.RABBITMQ_LOG_QUEUE || "log-event-queue"; // <-- 从环境变量读取日志队列名称
-const RABBITMQ_OMS_QUEUE = process.env.RABBITMQ_OMS_QUEUE || "oms_event_queue"; // <-- 从环境变量读取摄取器队列名称
+const RABBITMQ_OMS_QUEUE = process.env.RABBITMQ_OMS_QUEUE || "oms-event-queue"; // <-- 从环境变量读取摄取器队列名称
 
 let amqpConnection: amqp.ChannelModel;
 let amqpChannel: amqp.Channel;
@@ -83,7 +83,7 @@ if (process.env.NODE_ENV !== "production") {
   app.use(
     morgan("combined", {
       skip: (req, res) => res.statusCode < 400,
-    })
+    }),
   );
 }
 
@@ -116,7 +116,7 @@ const eventHandler = async (req: Request, res: Response) => {
       RABBITMQ_EXCHANGE_NAME,
       "", // Routing key (ignored by fanout exchange)
       Buffer.from(message),
-      { persistent: true } // Mark message as persistent
+      { persistent: true }, // Mark message as persistent
     );
 
     if (published) {