C鶷はどのようにTxtの大きいデータを読み、データベースに更新しますか?


環境
  • Sqlserver 2016
  •  .4.5.2 
  • 現在のテストデータは1300万3-4分です。(一回の読取本数とスレッド数を制限するとサーバの資源を節約します。サーバーを大きくしたら他のアプリケーションは走れないかもしれません。)Sql ServerDBHelperはデータベースヘルプクラスです。特別な処理はありません。接続台を配置する時、接続池を開けて記録します。
    また、以下のコードには毎回接続が作成されています。前に接続を繰り返してみました。130回ぐらいで20回ぐらいのデータベースが問題になります。必要な時間は7-8分ぐらいです。
    プロファイル:xxx.json
    
    [ {
     /*      */
     "ConnStr": "",
     "FilePath": "       ",
     /*       */
     "TableName": "        ",
     /*         */
     "ExecBeforeSql": "",
     /*         */
     "ExecAfterSql": "",
     /*     */
     "Mapping": [
     {
     "DBName": "XXX",
     "TxtName": "DDD"
     } 
     ],
     /*                         */
     "FilterRegex": [],
     /*       (              ) */
     "CheckData": false,
     /*    */
     "Separator": "\t",
     /*     */
     "HeaderRowsNum": 1
     }
    ]
    コードの読み取り:ConfigurationManager.AppSettings["frpage"] ConfigurationManager.AppSettings["fr"] は自分で設定する必要があります。
    
    //        
     List<dynamic> dt = JsonConvert.DeserializeObject<List<dynamic>>(File.ReadAllText(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "config\\ImportTxt.json")));
     LogUtil.Info("    txt  ,    :" + dt.Count + " ");
     if (dt.Count == 0)
     {
     return;
     }
    
    
     List<Task> li = new List<Task>();
     foreach (dynamic row in dt)
     {
     LogUtil.Info("      :" + JsonConvert.SerializeObject(row));
     li.Add(ProcessRow(row));
    
     }
     Task.WaitAll(li.ToArray());
     LogUtil.Info("      ");
    
    public async Task ProcessRow(dynamic row)
     {
     await Task.Run(() =>
     {
      AutoResetEvent AE = new AutoResetEvent(false);
      DataTable Data = null;
      string error = "", ConnStr, TableName, ExecBeforeSql, ExecAfterSql;
      Boolean IsCheck = Convert.ToBoolean(row["CheckData"]);
      TableName = Convert.ToString(row.TableName);
      ConnStr = Convert.ToString(row.ConnStr);
      ExecBeforeSql = Convert.ToString(row.ExecBeforeSql);
      ExecAfterSql = Convert.ToString(row.ExecAfterSql);
      int HeaderRowsNum = Convert.ToInt32(row.HeaderRowsNum);
      string Separator = Convert.ToString(row.Separator);
    
      Dictionary<string, string> dic = new Dictionary<string, string>();
    
      //            
      int fr = 0;
      if (!int.TryParse(ConfigurationManager.AppSettings["fr"], out fr))
      {
      fr = 100;
      }
      fr = fr * 1024 * 1024;
    
      //          
      int page = 0;
      if (!int.TryParse(ConfigurationManager.AppSettings["frpage"], out page))
      {
      page = 50000;
      }
    
      foreach (var dyn in row.Mapping)
      {
      dic.Add(Convert.ToString(dyn.TxtName), Convert.ToString(dyn.DBName));
      }
    
    
      List<string> regex = new List<string>();
      foreach (string item in row["FilterRegex"])
      {
      regex.Add(item);
      }
      string fpath = "", cpath = "";
    
    
    
    
      cpath = Convert.ToString(row["FilePath"]);
      string rootPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "tmp");
      if (!Directory.Exists(rootPath))
      {
      Directory.CreateDirectory(rootPath);
      }
    
      fpath = Path.Combine(rootPath, Path.GetFileName(cpath));
      File.Copy(cpath, fpath, true);
      LogUtil.Info("          .         ");
      int threadCount = Environment.ProcessorCount * 3;
    
      FileInfo fi = new FileInfo(fpath);
      //      100M       .  50  
      if (fi.Length > fr)
      {
    
      long sumCount = 0;
      StreamReader sr = new StreamReader(fi.OpenRead());  
      int headRow = 0;
      string rowstr = "";
    
      List<Thread> li_th = new List<Thread>();
      bool last = false;
      int ij = 0;
      LogUtil.Info("  StreamReader   ");
      #region     
      
      
      while (sr.Peek() > -1)
      {
      rowstr = sr.ReadLine();
      #region       DataTable
      if (headRow < HeaderRowsNum)
      {
      Data = new DataTable();
      foreach (string scol in rowstr.Split(new string[] { Separator }, StringSplitOptions.RemoveEmptyEntries))
      {
       Data.Columns.Add(scol.Trim(), typeof(string));
      }
      headRow++;
      continue;
      }
      else
      { //   
      if (headRow > 1)
      {
       for (int i = 1; i < headRow && sr.Peek() > -1; i++)
       {
       rowstr += " " + sr.ReadLine();
       }
      }
      Data.Rows.Add(rowstr.Split(new string[] { Separator }, StringSplitOptions.RemoveEmptyEntries));
      if (Data.Rows.Count < page && sr.Peek() > -1)
      {
       continue;
      }
      }
      last = (sr.Peek() == -1);
      #endregion
    
      sumCount += Data.Rows.Count;
    
      ProcessPath(Data, page, sr, ref ij, TableName, ExecBeforeSql, ExecAfterSql, dic, IsCheck, li_th);
       
    
      #region       
      if ((ij > 0 && (ij % threadCount) == 0) || last)
      {
      LogUtil.Info("           : " + sumCount);
      while (true)
      {
       bool isok = true;
       foreach (var item in li_th)
       {
       if (item.IsAlive)
       {
       isok = false;
       Application.DoEvents();
       Thread.Sleep(1000);
       }
       }
       if (isok)
       {
       li_th.Clear();
       break;
       }
      }
    
      //                
      if (sr.Peek() == -1)
      {
       WriteTODB(TableName, Data, ExecBeforeSql, ExecAfterSql, dic, false, true);
       LogUtil.Info("        ");
      }
      LogUtil.Info("           ...");
      }
      Data.Clear();
      #endregion
      }
      sr.Dispose();
      #endregion
      }
      else
      {
      using (SQLServerDBHelper sdb = new SQLServerDBHelper())
      {
      sdb.OpenConnection();
      #region        
      Data = LoadDataTableFromTxt(fpath, ref error, Separator, HeaderRowsNum, regex, IsCheck, dic, TableName);
      if (IsCheck)
      {
      DataRow[] rows = Data.Select("ErrorMsg is not null");
      if (rows.Length > 0)
      {
       LogUtil.Info($"  {TableName}      : {JsonConvert.SerializeObject(rows)}");
       return;
      }
      }
    
      LogUtil.Info($"  {TableName}  txt    .     :{Data.Rows.Count} ");
      if (Data.Rows.Count == 0 || !string.IsNullOrWhiteSpace(error))
      {
      if (!string.IsNullOrWhiteSpace(error))
      {
       LogUtil.Info("      ,  :" + Convert.ToString(row["FilePath"]) + " \r
    :" + error); } return; } sdb.BgeinTransaction(); try { WriteTODB(TableName, Data, ExecBeforeSql, ExecAfterSql, dic, sdb: sdb); sdb.CommitTransaction(); LogUtil.Info(TableName + " !!"); } catch (Exception ex) { LogUtil.Info(TableName + " , :" + ex.Message + " \r
    :" + ex.StackTrace); sdb.RollbackTransaction(); } #endregion } } GC.Collect(); }); } private void ProcessPath(DataTable Data, int page, StreamReader sr, ref int ij, string TableName, string ExecBeforeSql, string ExecAfterSql, Dictionary<string, string> dic, bool IsCheck, List<Thread> li_th) { int threadCount = Environment.ProcessorCount * 4; string error = ""; PoolModel p = new PoolModel { TableName = TableName, ExecBeforeSql = ExecBeforeSql, ExecAfterSql = ExecAfterSql, dic = dic }; p.Data = Data.Copy(); if (IsCheck) { using (SQLServerDBHelper sdb = new SQLServerDBHelper()) { error = CheckData(Data, TableName, dic, sdb); } DataRow[] rows = Data.Select("ErrorMsg is not null"); if (rows.Length > 0 || !string.IsNullOrWhiteSpace(error)) { LogUtil.Info($" {TableName} : {JsonConvert.SerializeObject(rows)}\r
    : " + error); return; } } ij++; if (ij == 1) { WriteTODB(p.TableName, p.Data, p.ExecBeforeSql, p.ExecAfterSql, p.dic, true, false); LogUtil.Info(" "); } else if (sr.Peek() > -1) { Thread t = new Thread(d => { PoolModel c = d as PoolModel; try { WriteTODB(c.TableName, c.Data, c.ExecBeforeSql, c.ExecAfterSql, c.dic, false, false); } catch (ThreadAbortException) { LogUtil.Error(" ................."); } catch (Exception ex) { LogUtil.Error(c.TableName + " :" + ex.Message + "\r
    :" + ex.StackTrace + "\r
    : " + JsonConvert.SerializeObject(c.Data)); ExitApp(); return; } }); t.IsBackground = true; t.Start(p); li_th.Add(t); } } public void ExitApp() { Application.Exit(); } public void WriteTODB(string TableName, DataTable Data, string ExecBeforeSql, string ExecAfterSql, Dictionary<string, string> dic, bool first = true, bool last = true, SQLServerDBHelper sdb = null) { bool have = false; if (sdb == null) { sdb = new SQLServerDBHelper(); have = true; } if (first && !string.IsNullOrWhiteSpace(ExecBeforeSql)) { LogUtil.Info(TableName + " Sql :" + ExecBeforeSql); sdb.ExecuteNonQuery(ExecBeforeSql); } sdb.BulkCopy(Data, TableName, dic); if (last && !string.IsNullOrWhiteSpace(ExecAfterSql)) { LogUtil.Info(TableName + " Sql :" + ExecAfterSql); sdb.ExecuteNonQuery(ExecAfterSql); } LogUtil.Info(TableName + " "); if (have) { sdb.Dispose(); } } public string CheckData(DataTable dt, string dbTableName, Dictionary<string, string> dic, SQLServerDBHelper sdb) { if (string.IsNullOrWhiteSpace(dbTableName)) { return " !"; } if (dic.Count == 0) { return " !"; } List<string> errorMsg = new List<string>(); List<string> Cols = new List<string>(); dic.Foreach(c => { if (!dt.Columns.Contains(c.Key)) { errorMsg.Add(c.Key); } Cols.Add(c.Key); }); if (errorMsg.Count > 0) { return " , ! :" + string.Join(",", errorMsg); } // dt.Columns.Add(new DataColumn("ErrorMsg", typeof(string)) { DefaultValue = "" }); string sql = @"-- SqlServer SELECT syscolumns.name as ColName,systypes.name as DBType,syscolumns.isnullable, syscolumns.length FROM syscolumns, systypes WHERE syscolumns.xusertype = systypes.xusertype AND syscolumns.id = object_id(@tb) ; "; DataSet ds = sdb.GetDataSet(sql, new SqlParameter[] { new SqlParameter("@tb", dbTableName) }); EnumerableRowCollection<DataRow> TableDef = ds.Tables[0].AsEnumerable(); // string colName=""; Object obj_val; // . var dic_Def = TableDef.ToDictionary(c => Convert.ToString(c["ColName"]), d => { string DBType = ""; string old = Convert.ToString(d["DBType"]).ToUpper(); DBType = GetCSharpType(old); return new { ColName = Convert.ToString(d["ColName"]), DBType = DBType, SqlType = old, IsNullble = Convert.ToBoolean(d["isnullable"]), Length = Convert.ToInt32(d["length"]) }; }); DateTime now = DateTime.Now; foreach (DataRow row in dt.Rows) { errorMsg.Clear(); foreach (string colName in Cols) { if (dic.ContainsKey(colName)) { if (!dic_Def.ContainsKey(dic[colName])) { return "Excel :" + colName + " :" + dic[colName] + " !"; } // row[colName] = obj_val = Convert.ToString(row[colName]).Trim(); var info = dic_Def[dic[colName]]; // DBNULL if (obj_val.Equals(DBNull.Value)) { if (!info.IsNullble) { errorMsg.Add(" " + colName + " !"); } } else { if (info.DBType == "String") { //time ( 17:12:30.0000) if (info.SqlType == "TIME") { if (!DateTime.TryParse(now.ToString("yyyy-MM-dd") + " " + obj_val.ToString(), out now)) { errorMsg.Add(" " + colName + " :17:30:12"); } } else if (Convert.ToString(obj_val).Length > info.Length) { errorMsg.Add(" " + colName + " :" + info.Length); } } else { Type t = Type.GetType("System." + info.DBType); try { // row[colName] = Convert.ChangeType(obj_val, t); ; } catch (Exception ex) { errorMsg.Add(" " + colName + " " + obj_val + " " + info.SqlType + " ."); } } } } } row["ErrorMsg"] = string.Join(" || ", errorMsg); } return ""; } /// <summary> /// wm 2018 11 28 13:37 /// C# (.Net ) /// </summary> /// <param name="old"></param> /// <returns></returns> private string GetCSharpType(string old) { string DBType = ""; switch (old) { case "INT": case "BIGINT": case "SMALLINT": DBType = "Int32"; break; case "DECIMAL": case "FLOAT": case "NUMERIC": DBType = "Decimal"; break; case "BIT": DBType = "Boolean"; break; case "TEXT": case "CHAR": case "NCHAR": case "VARCHAR": case "NVARCHAR": case "TIME": DBType = "String"; break; case "DATE": case "DATETIME": DBType = "DateTime"; break; default: throw new Exception("GetCSharpType " + DBType + " !"); } return DBType; } public class PoolModel { public string TableName { get; set; } public DataTable Data { get; set; } public string ExecBeforeSql { get; set; } public string ExecAfterSql { get; set; } public Dictionary<string, string> dic { get; set; } }
    
    /// <summary>
     /// wm 2018 11 28 13:32
     ///   Txt                ErrorMsg  DataTable,                   
     ///   :                  
     /// </summary>
     /// <param name="isCheck">         (            dbTableName             )</param>
     /// <param name="map">                   key Excel  ,Value      </param>
     /// <param name="dbTableName">         (         )</param>
     /// <param name="error">           </param>
     /// <param name="Regexs">         </param>
     /// <param name="path">       </param>
     /// <param name="Separator">    </param>
     /// <param name="HeaderRowsNum">     </param>
     /// <returns>             ErrorMsg  DataTable,                   ,                DataTable</returns>
     public DataTable LoadDataTableFromTxt(string path, ref string error, string Separator, int HeaderRowsNum, List<string> Regexs = null, bool isCheck = false, Dictionary<string, string> map = null, string dbTableName = "", SQLServerDBHelper sdb = null)
     {
     DataTable dt = new DataTable();
     error = "";
     if (isCheck && (map == null || map.Count == 0 || string.IsNullOrWhiteSpace(dbTableName)))
     {
     error = "               ,               .";
     return dt;
     }
     string txts = File.ReadAllText(path);
     #region            DataTable
    
     Regexs?.ForEach(c =>
     {
     txts = new Regex(c).Replace(txts, "");
     });
     ////        
     //Regex mu_re = new Regex(@"\+[-+]{4,}\s+\+[-+\s|\w./]{4,}\+"); //FTP new Regex(@"\+[-+]{4,}\s+\+[-+\s|\w./]{4,}\+"); //   -    new Regex(@"-{5,}(\s)+-{5,}\s+\|.+(\s)?\|.+(\s)?\|-{5,}");
     ////      
     //Regex mu_r = new Regex(@"[+-]{4,}"); //FTP new Regex(@"[+-]{4,}"); //  new Regex(@"(\|-{5,})|(-{5,})"); 
     //string s1 = mu_re.Replace(txts, "");
     //string s2 = mu_r.Replace(s1, "");
     // string[] tts = s2.Split(new string[] { "\r
    " }, StringSplitOptions.None); string[] tts = txts.Split(new string[] { "\r
    " }, StringSplitOptions.None); string[] vals; string s1; // | ( , ) int headerNum = -1;// DataRow dr; // col , A1 A2 Dictionary<string, int> col_Rep = new Dictionary<string, int>(); string colName = ""; bool isre = false;// int empty_HeaderRow = 0; for (int i = 0; i < tts.Length; i++) { s1 = tts[i]; // if (headerNum < HeaderRowsNum) { vals = s1.Split(new string[] { Separator }, StringSplitOptions.RemoveEmptyEntries); foreach (string col in vals) { colName = col.Trim(); if (col_Rep.Keys.Contains(colName)) { col_Rep[colName]++; isre = true; // //colName += col_Rep[colName]; continue; } else { col_Rep.Add(colName, 1); } dt.Columns.Add(colName, typeof(string)); } headerNum = (i == (HeaderRowsNum - 1)) ? HeaderRowsNum : 0; } else { if (string.IsNullOrWhiteSpace(s1.Trim()) || string.IsNullOrWhiteSpace(s1.Replace(Separator, ""))) { continue; } if (isre) { error = " :" + string.Join(",", col_Rep.Where(c => c.Value > 1).Select(c => c.Key)) + " "; return dt; } // if (headerNum > 1) { for (int j = 1; j < headerNum && (i + j) < tts.Length; j++) { // | s1 += " " + tts[i + j]; } } vals = s1.Split(new string[] { Separator }, StringSplitOptions.RemoveEmptyEntries); dr = dt.NewRow(); dr.ItemArray = vals; dt.Rows.Add(dr); // ++ headerNum-1 i += (headerNum - 1); } } #endregion if (isCheck) { //dt.Columns.Remove("Item"); //dt.Columns["Item1"].ColumnName = "Item"; //dt.Columns.RemoveAt(dt.Columns.Count - 2); error = CheckData(dt, dbTableName, map, sdb); } return dt; }
    締め括りをつける
    以上はこの文章の全部の内容です。本文の内容は皆さんの学習や仕事に対して一定の参考学習価値を持ってほしいです。ありがとうございます。